/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.oqsengine.devops.rebuild;

import com.xforceplus.ultraman.oqsengine.common.id.LongIdGenerator;
import com.xforceplus.ultraman.oqsengine.common.pool.ExecutorHelper;
import com.xforceplus.ultraman.oqsengine.devops.rebuild.RebuildIndexExecutor;
import com.xforceplus.ultraman.oqsengine.devops.rebuild.enums.BatchStatus;
import com.xforceplus.ultraman.oqsengine.devops.rebuild.handler.DefaultDevOpsTaskHandler;
import com.xforceplus.ultraman.oqsengine.devops.rebuild.handler.TaskHandler;
import com.xforceplus.ultraman.oqsengine.devops.rebuild.model.DefaultDevOpsTaskInfo;
import com.xforceplus.ultraman.oqsengine.devops.rebuild.model.DevOpsTaskInfo;
import com.xforceplus.ultraman.oqsengine.devops.rebuild.storage.SQLTaskStorage;
import com.xforceplus.ultraman.oqsengine.pojo.devops.DevOpsCdcMetrics;
import com.xforceplus.ultraman.oqsengine.pojo.dto.entity.IEntityClass;
import com.xforceplus.ultraman.oqsengine.pojo.page.Page;
import com.xforceplus.ultraman.oqsengine.storage.index.IndexStorage;
import com.xforceplus.ultraman.oqsengine.storage.master.MasterStorage;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DevOpsRebuildIndexExecutor
implements RebuildIndexExecutor {
    final Logger logger = LoggerFactory.getLogger(DevOpsRebuildIndexExecutor.class);
    @Resource
    private MasterStorage masterStorage;
    @Resource
    private IndexStorage indexStorage;
    @Resource
    private SQLTaskStorage sqlTaskStorage;
    @Resource(name="longNoContinuousPartialOrderIdGenerator")
    private LongIdGenerator idGenerator;
    private ZoneOffset zoneOffset = OffsetDateTime.now().getOffset();
    private ExecutorService asyncThreadPool;
    private long doubleCheckDistance = 2048L;

    public DevOpsRebuildIndexExecutor(int taskSize, int maxQueueSize) {
        this.asyncThreadPool = new ThreadPoolExecutor(taskSize, taskSize, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(maxQueueSize), ExecutorHelper.buildNameThreadFactory((String)"task-threads", (boolean)false));
    }

    @Override
    public void resetDoubleCheckDistance(long distance) {
        if (distance > 2048L) {
            this.doubleCheckDistance = distance;
        }
    }

    public void destroy() throws Exception {
        if (null != this.asyncThreadPool) {
            ExecutorHelper.shutdownAndAwaitTermination((ExecutorService)this.asyncThreadPool, (long)3600L);
        }
    }

    @Override
    public Collection<DevOpsTaskInfo> rebuildIndexes(Collection<IEntityClass> entityClasses, LocalDateTime start, LocalDateTime end) throws Exception {
        ArrayList<DevOpsTaskInfo> devOps = new ArrayList<DevOpsTaskInfo>();
        ArrayList<DefaultDevOpsTaskInfo> errorTasks = new ArrayList<DefaultDevOpsTaskInfo>();
        for (IEntityClass entityClass : entityClasses) {
            DefaultDevOpsTaskInfo devOpsTaskInfo = this.pending(entityClass, start, end);
            this.logger.info("pending rebuildIndex task, maintainId {}, entityClass {}, start {}, end {}", new Object[]{devOpsTaskInfo.id(), entityClass.id(), start, end});
            try {
                if (0 == this.sqlTaskStorage.build(devOpsTaskInfo)) {
                    devOpsTaskInfo.resetMessage("init task failed...");
                    devOpsTaskInfo.resetStatus(BatchStatus.ERROR.getCode());
                    errorTasks.add(devOpsTaskInfo);
                    continue;
                }
                devOps.add(devOpsTaskInfo);
            }
            catch (Exception e) {
                devOpsTaskInfo.resetMessage("init task failed...");
                devOpsTaskInfo.resetStatus(BatchStatus.ERROR.getCode());
                errorTasks.add(devOpsTaskInfo);
            }
        }
        this.asyncThreadPool.submit(() -> devOps.forEach(this::handleTask));
        devOps.addAll(errorTasks);
        return devOps;
    }

    @Override
    public DevOpsTaskInfo rebuildIndex(IEntityClass entityClass, LocalDateTime start, LocalDateTime end) throws Exception {
        DefaultDevOpsTaskInfo devOpsTaskInfo = this.pending(entityClass, start, end);
        this.logger.info("pending rebuildIndex task, maintainId {}, entityClass {}, start {}, end {}", new Object[]{devOpsTaskInfo.id(), entityClass.id(), start, end});
        if (0 == this.sqlTaskStorage.build(devOpsTaskInfo)) {
            return null;
        }
        this.asyncThreadPool.submit(() -> this.handleTask(devOpsTaskInfo));
        return devOpsTaskInfo;
    }

    private void handleTask(DevOpsTaskInfo devOpsTaskInfo) {
        try {
            int rebuildCount = this.masterStorage.rebuild(devOpsTaskInfo.getEntity(), devOpsTaskInfo.getMaintainid(), devOpsTaskInfo.getStarts(), devOpsTaskInfo.getEnds());
            if (rebuildCount > 0) {
                devOpsTaskInfo.setBatchSize(rebuildCount);
                devOpsTaskInfo.resetStatus(BatchStatus.RUNNING.getCode());
                devOpsTaskInfo.resetMessage("TASK PROCESSING");
            } else {
                devOpsTaskInfo.setBatchSize(0L);
                devOpsTaskInfo.resetStatus(BatchStatus.DONE.getCode());
                devOpsTaskInfo.resetMessage("TASK END");
            }
            this.sqlTaskStorage.update(devOpsTaskInfo);
        }
        catch (Exception e) {
            devOpsTaskInfo.resetMessage(e.getMessage());
            try {
                this.sqlTaskStorage.error(devOpsTaskInfo);
            }
            catch (SQLException ex) {
                ex.printStackTrace();
            }
        }
    }

    @Override
    public boolean cancel(long maintainId) throws Exception {
        return this.sqlTaskStorage.cancel(maintainId) > 0;
    }

    @Override
    public Optional<TaskHandler> taskHandler(Long maintainId) throws SQLException {
        return this.sqlTaskStorage.selectUnique(maintainId).map(this::newTaskHandler);
    }

    @Override
    public Collection<TaskHandler> listActiveTasks(Page page) throws SQLException {
        Collection<DevOpsTaskInfo> taskInfoList = this.sqlTaskStorage.listActives(page);
        return null != taskInfoList && 0 < taskInfoList.size() ? (Collection)taskInfoList.stream().map(this::newTaskHandler).collect(Collectors.toList()) : new ArrayList<TaskHandler>();
    }

    @Override
    public Optional<TaskHandler> getActiveTask(IEntityClass entityClass) throws SQLException {
        Collection<DevOpsTaskInfo> taskInfoCollection = this.sqlTaskStorage.selectActive(entityClass.id());
        if (1 < taskInfoCollection.size()) {
            throw new SQLException("more than 1 active task error.");
        }
        if (!taskInfoCollection.isEmpty()) {
            return Optional.of(this.newTaskHandler(taskInfoCollection.iterator().next()));
        }
        return Optional.empty();
    }

    @Override
    public Collection<TaskHandler> listAllTasks(Page page) throws SQLException {
        Collection<DevOpsTaskInfo> taskInfoList = this.sqlTaskStorage.listAll(page);
        return null != taskInfoList && 0 < taskInfoList.size() ? (Collection)taskInfoList.stream().map(this::newTaskHandler).collect(Collectors.toList()) : new ArrayList<TaskHandler>();
    }

    @Override
    public void sync(Map<Long, DevOpsCdcMetrics> devOpsCdcMetrics) throws SQLException {
        devOpsCdcMetrics.forEach((maintainId, devOpsMetrics) -> this.asyncThreadPool.submit(() -> {
            block20: {
                Optional<DevOpsTaskInfo> devOpsTaskInfoOp = null;
                try {
                    devOpsTaskInfoOp = this.sqlTaskStorage.selectUnique((long)maintainId);
                }
                catch (SQLException ex) {
                    this.logger.warn("query task exception, maintainId {}.", maintainId);
                    return;
                }
                if (devOpsTaskInfoOp.isPresent()) {
                    DevOpsTaskInfo dt = devOpsTaskInfoOp.get();
                    if (dt.isEnd()) {
                        if (dt.isDone() && dt.getStatus() != BatchStatus.DONE.getCode()) {
                            try {
                                this.done(dt);
                            }
                            catch (Exception e) {
                                this.logger.warn("done task exception, maintainId {}, message {}.", maintainId, (Object)e.getMessage());
                            }
                        }
                        return;
                    }
                    boolean needUpdate = false;
                    if (devOpsMetrics.getSuccess() > 0) {
                        needUpdate = true;
                        dt.resetIncrementSize(devOpsMetrics.getSuccess());
                        dt.addFinishSize(devOpsMetrics.getSuccess());
                    }
                    if (devOpsMetrics.getFails() > 0) {
                        needUpdate = true;
                        dt.addErrorSize(devOpsMetrics.getFails());
                    }
                    if (needUpdate) {
                        if (dt.getErrorSize() > 0) {
                            try {
                                dt.resetMessage("task end with error.");
                                this.sqlTaskStorage.error(dt);
                            }
                            catch (SQLException ex) {
                                this.logger.warn("do task-error exception, maintainId {}.", (Object)dt.getMaintainid());
                            }
                        } else {
                            try {
                                long distance = dt.getBatchSize() - (long)dt.getFinishSize();
                                if (distance <= 0L) {
                                    this.done(dt);
                                    break block20;
                                }
                                dt.resetStatus(BatchStatus.RUNNING.getCode());
                                this.sqlTaskStorage.update(dt);
                                if (distance >= this.doubleCheckDistance) break block20;
                                try {
                                    devOpsTaskInfoOp = this.sqlTaskStorage.selectUnique((long)maintainId);
                                    if (devOpsTaskInfoOp.isPresent() && devOpsTaskInfoOp.get().getBatchSize() <= (long)devOpsTaskInfoOp.get().getFinishSize()) {
                                        this.done(devOpsTaskInfoOp.get());
                                    }
                                }
                                catch (SQLException ex) {
                                    this.logger.warn("query task exception, maintainId {}.", maintainId);
                                }
                            }
                            catch (SQLException ex) {
                                this.logger.warn("do task-update exception, maintainId {}.", (Object)dt.getMaintainid());
                            }
                        }
                    }
                }
            }
        }));
    }

    private DefaultDevOpsTaskInfo pending(IEntityClass entityClass, LocalDateTime start, LocalDateTime end) {
        return new DefaultDevOpsTaskInfo((Long)this.idGenerator.next(), entityClass, start.toInstant(this.zoneOffset).toEpochMilli(), end.toInstant(this.zoneOffset).toEpochMilli());
    }

    private boolean done(DevOpsTaskInfo devOpsTaskInfo) throws SQLException {
        try {
            this.indexStorage.clean(devOpsTaskInfo.getEntity(), devOpsTaskInfo.getMaintainid(), devOpsTaskInfo.getStarts(), devOpsTaskInfo.getEnds());
        }
        catch (Exception e) {
            this.logger.warn(e.getMessage());
        }
        devOpsTaskInfo.resetMessage("success");
        if (this.sqlTaskStorage.done(devOpsTaskInfo) > 0) {
            this.logger.info("task done, maintainId {}, finish batchSize {}", (Object)devOpsTaskInfo.getMaintainid(), (Object)devOpsTaskInfo.getFinishSize());
            ((DefaultDevOpsTaskInfo)devOpsTaskInfo).setStatus(BatchStatus.DONE.getCode());
        } else {
            this.logger.warn("task done error, task update finish status error, maintainId {}", (Object)devOpsTaskInfo.getMaintainid());
        }
        return true;
    }

    private TaskHandler newTaskHandler(DevOpsTaskInfo taskInfo) {
        return new DefaultDevOpsTaskHandler(this.sqlTaskStorage, taskInfo);
    }
}

