/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.oqsengine.plus.devops;

import com.xforceplus.ultraman.cdc.adapter.EngineAdapterService;
import com.xforceplus.ultraman.metadata.cdc.OqsEngineEntity;
import com.xforceplus.ultraman.metadata.engine.EntityClassEngine;
import com.xforceplus.ultraman.metadata.entity.IEntityClass;
import com.xforceplus.ultraman.oqsengine.plus.common.dto.BatchStatus;
import com.xforceplus.ultraman.oqsengine.plus.common.iterator.DataIterator;
import com.xforceplus.ultraman.oqsengine.plus.devops.DevOpsService;
import com.xforceplus.ultraman.oqsengine.plus.devops.dto.DefaultDevOpsTaskInfo;
import com.xforceplus.ultraman.oqsengine.plus.devops.dto.DevOpsTaskInfo;
import com.xforceplus.ultraman.oqsengine.plus.devops.storage.SQLTaskStorage;
import com.xforceplus.ultraman.oqsengine.plus.master.iterator.exception.TaskRetryException;
import com.xforceplus.ultraman.oqsengine.plus.master.mysql.MasterStorage;
import com.xforceplus.ultraman.sdk.infra.base.id.LongIdGenerator;
import com.xforceplus.ultraman.sdk.infra.base.id.SnowflakeLongIdGenerator;
import com.xforceplus.ultraman.sdk.infra.base.id.node.NodeIdGenerator;
import com.xforceplus.ultraman.sdk.infra.base.id.node.TimeRandomNodeIdGenerator;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RebuildDevOpsService
implements DevOpsService {
    final Logger logger = LoggerFactory.getLogger(RebuildDevOpsService.class);
    @Resource
    private MasterStorage masterStorage;
    @Resource
    private EngineAdapterService engineAdapterService;
    @Resource
    private EntityClassEngine engine;
    @Resource
    private SQLTaskStorage sqlTaskStorage;
    private LongIdGenerator idGenerator;
    private ZoneOffset zoneOffset = OffsetDateTime.now().getOffset();
    private ExecutorService asyncThreadPool;
    private static final int BATCH_QUERY_SIZE = 2048;
    private int querySize = 2048;
    private Queue<DevOpsTaskInfo> failedTaskQueue;
    private static final int MAX_AUTO_RETRY = 100;
    private static final int FAILED_RETRY_TIME = 20000;

    public RebuildDevOpsService(int querySize, ExecutorService executorService) {
        if (querySize > 0) {
            this.querySize = querySize;
        }
        this.asyncThreadPool = executorService;
        this.failedTaskQueue = new ArrayBlockingQueue<DevOpsTaskInfo>(1024);
        this.idGenerator = new SnowflakeLongIdGenerator((NodeIdGenerator)new TimeRandomNodeIdGenerator());
        this.asyncThreadPool.submit(this::runFailedTask);
    }

    private void runFailedTask() {
        while (true) {
            DevOpsTaskInfo e = this.failedTaskQueue.poll();
            try {
                if (null != e) {
                    Thread.sleep(20000L);
                    this.resume(e);
                    continue;
                }
                Thread.sleep(20000L);
                continue;
            }
            catch (InterruptedException | SQLException ex) {
                this.logger.warn("runFailedTask : {}", (Object)ex.getMessage());
                continue;
            }
            break;
        }
    }

    private void resume(DevOpsTaskInfo e) throws SQLException {
        if (e.getAutoRetryTime() > 100) {
            return;
        }
        e.incAutoRetryTime();
        e.resetStatus(BatchStatus.PENDING.getCode());
        this.sqlTaskStorage.update(e);
        this.logger.info("task will auto retry : maintainId : {}, times : {}", (Object)e.getMaintainid(), (Object)e.getAutoRetryTime());
        this.asyncThreadPool.submit(() -> this.handleTask(e));
    }

    @Override
    public DevOpsTaskInfo rebuildIndex(IEntityClass entityClass, LocalDateTime start, LocalDateTime end, boolean useCDC, long startId) {
        DefaultDevOpsTaskInfo devOpsTaskInfo = this.pending((Long)this.idGenerator.next(), entityClass, start, end, useCDC, startId);
        this.logger.info("pending rebuildIndex task, maintainId {}, entityClass {}, start {}, end {}, useCDC {}, startId {}", new Object[]{devOpsTaskInfo.id(), entityClass.id(), start, end, useCDC, startId});
        if (0 == this.sqlTaskStorage.build(devOpsTaskInfo)) {
            return null;
        }
        this.asyncThreadPool.submit(() -> this.handleTask(devOpsTaskInfo));
        return devOpsTaskInfo;
    }

    @Override
    public Collection<DevOpsTaskInfo> rebuildIndexes(Collection<IEntityClass> entityClasses, LocalDateTime start, LocalDateTime end, boolean useCDC) {
        ArrayList<DevOpsTaskInfo> devOps = new ArrayList<DevOpsTaskInfo>();
        long batchId = (Long)this.idGenerator.next();
        for (IEntityClass entityClass : entityClasses) {
            DefaultDevOpsTaskInfo devOpsTaskInfo = this.pending(batchId, entityClass, start, end, useCDC, 0L);
            this.logger.info("pending rebuildIndex task, maintainId {}, entityClass {}, start {}, end {}, useCDC {}, startId {}", new Object[]{devOpsTaskInfo.id(), entityClass.id(), start, end, useCDC, 0});
            try {
                if (0 == this.sqlTaskStorage.build(devOpsTaskInfo)) {
                    devOpsTaskInfo.resetMessage("init task failed...");
                    devOpsTaskInfo.resetStatus(BatchStatus.ERROR.getCode());
                }
            }
            catch (Exception e) {
                devOpsTaskInfo.resetMessage("init task failed...");
                devOpsTaskInfo.resetStatus(BatchStatus.ERROR.getCode());
            }
            devOps.add(devOpsTaskInfo);
        }
        for (DevOpsTaskInfo devOpsTaskInfo : devOps) {
            this.asyncThreadPool.submit(() -> this.handleTask(devOpsTaskInfo));
        }
        return devOps;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleTask(DevOpsTaskInfo devOpsTaskInfo) {
        DataIterator iterator = null;
        try {
            if (devOpsTaskInfo.getStatus() == BatchStatus.ERROR.getCode()) {
                this.sqlTaskStorage.error(devOpsTaskInfo);
                return;
            }
            int updateFlag = 0;
            int frequency = 10;
            iterator = devOpsTaskInfo.useCDC() ? this.masterStorage.iterator(devOpsTaskInfo.getEntityClass(), devOpsTaskInfo.getStarts(), devOpsTaskInfo.getEnds(), devOpsTaskInfo.getStartId(), this.querySize, true) : this.masterStorage.iteratorEntity(devOpsTaskInfo.getEntityClass(), devOpsTaskInfo.getStarts(), devOpsTaskInfo.getEnds(), devOpsTaskInfo.getStartId(), this.querySize);
            ArrayList<Object> ids = new ArrayList<Object>();
            boolean isCanceled = false;
            while (iterator.hasNext()) {
                Object v = iterator.next();
                ids.add(v);
                if (ids.size() != this.querySize) continue;
                this.writeToIndex(devOpsTaskInfo.useCDC(), devOpsTaskInfo.getEntityClass(), ids);
                if (devOpsTaskInfo.useCDC()) {
                    devOpsTaskInfo.setStartId((Long)v);
                } else {
                    devOpsTaskInfo.setStartId(((OqsEngineEntity)v).getId());
                }
                devOpsTaskInfo.addBatchSize(ids.size());
                devOpsTaskInfo.addFinishSize(ids.size());
                ids.clear();
                if (++updateFlag != frequency) continue;
                if (0 == this.sqlTaskStorage.update(devOpsTaskInfo)) {
                    isCanceled = true;
                    break;
                }
                updateFlag = 0;
            }
            if (!isCanceled) {
                if (!ids.isEmpty()) {
                    this.writeToIndex(devOpsTaskInfo.useCDC(), devOpsTaskInfo.getEntityClass(), ids);
                    devOpsTaskInfo.addBatchSize(ids.size());
                    devOpsTaskInfo.addFinishSize(ids.size());
                }
                devOpsTaskInfo.setStartId(0L);
                this.done(devOpsTaskInfo);
            }
        }
        catch (Exception e) {
            devOpsTaskInfo.resetMessage(e.getMessage());
            try {
                this.sqlTaskStorage.error(devOpsTaskInfo);
            }
            catch (Exception ex) {
                this.logger.error(ex.getMessage(), (Throwable)ex);
            }
            if (e instanceof TaskRetryException) {
                this.logger.error("task run failed, it will add to failedTask retry queue...");
                this.failedTaskQueue.add(devOpsTaskInfo);
            }
        }
        finally {
            if (iterator != null) {
                try {
                    iterator.destroy();
                }
                catch (Exception ex) {
                    this.logger.error(ex.getMessage(), (Throwable)ex);
                }
            }
        }
    }

    private void writeToIndex(boolean useCDC, IEntityClass entityClass, List ids) throws SQLException {
        boolean r = useCDC ? this.masterStorage.rebuildIndex(entityClass, ids) : this.engineAdapterService.batchUpsertOperation((Collection)ids);
        if (!r) {
            throw new TaskRetryException("write index failed...");
        }
    }

    @Override
    public boolean cancel(long maintainId) throws Exception {
        return this.sqlTaskStorage.cancel(maintainId) > 0;
    }

    @Override
    public Optional<DevOpsTaskInfo> listTask(Long maintainId) throws SQLException {
        return this.sqlTaskStorage.selectUnique(maintainId);
    }

    @Override
    public Collection<DevOpsTaskInfo> listTasks(long batchId) throws SQLException {
        return this.sqlTaskStorage.selectActive(batchId);
    }

    private boolean done(DevOpsTaskInfo devOpsTaskInfo) throws SQLException {
        devOpsTaskInfo.resetMessage("success");
        if (this.sqlTaskStorage.done(devOpsTaskInfo) > 0) {
            this.logger.info("task done, maintainId {}, finish batchSize {}", (Object)devOpsTaskInfo.getMaintainid(), (Object)devOpsTaskInfo.getFinishSize());
            ((DefaultDevOpsTaskInfo)devOpsTaskInfo).setStatus(BatchStatus.DONE.getCode());
        } else {
            this.logger.warn("task done error, task update finish status error, maintainId {}", (Object)devOpsTaskInfo.getMaintainid());
        }
        return true;
    }

    private DefaultDevOpsTaskInfo pending(long batchId, IEntityClass entityClass, LocalDateTime start, LocalDateTime end, boolean useCDC, long startId) {
        DefaultDevOpsTaskInfo devOpsTaskInfo = new DefaultDevOpsTaskInfo(batchId, (Long)this.idGenerator.next(), entityClass, start.toInstant(this.zoneOffset).toEpochMilli(), end.toInstant(this.zoneOffset).toEpochMilli());
        devOpsTaskInfo.setProfile(entityClass.profile());
        devOpsTaskInfo.setUseCDC(useCDC);
        devOpsTaskInfo.setStartId(startId);
        return devOpsTaskInfo;
    }
}

