package com.xforceplus.ultraman.oqsengine.devops.rebuild;

import com.xforceplus.ultraman.oqsengine.common.id.LongIdGenerator;
import com.xforceplus.ultraman.oqsengine.common.pool.ExecutorHelper;
import com.xforceplus.ultraman.oqsengine.devops.rebuild.enums.BatchStatus;
import com.xforceplus.ultraman.oqsengine.devops.rebuild.handler.DefaultDevOpsTaskHandler;
import com.xforceplus.ultraman.oqsengine.devops.rebuild.handler.TaskHandler;
import com.xforceplus.ultraman.oqsengine.devops.rebuild.model.DefaultDevOpsTaskInfo;
import com.xforceplus.ultraman.oqsengine.devops.rebuild.model.DevOpsTaskInfo;
import com.xforceplus.ultraman.oqsengine.devops.rebuild.storage.SQLTaskStorage;
import com.xforceplus.ultraman.oqsengine.pojo.devops.DevOpsCdcMetrics;
import com.xforceplus.ultraman.oqsengine.pojo.dto.entity.IEntityClass;
import com.xforceplus.ultraman.oqsengine.pojo.page.Page;
import com.xforceplus.ultraman.oqsengine.storage.index.IndexStorage;
import com.xforceplus.ultraman.oqsengine.storage.master.MasterStorage;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xforceplus/ultraman/oqsengine/devops/rebuild/DevOpsRebuildIndexExecutor.class */
public class DevOpsRebuildIndexExecutor implements RebuildIndexExecutor {

    @Resource
    private MasterStorage masterStorage;

    @Resource
    private IndexStorage indexStorage;

    @Resource
    private SQLTaskStorage sqlTaskStorage;

    @Resource(name = "longNoContinuousPartialOrderIdGenerator")
    private LongIdGenerator idGenerator;
    private ExecutorService asyncThreadPool;
    final Logger logger = LoggerFactory.getLogger(DevOpsRebuildIndexExecutor.class);
    private ZoneOffset zoneOffset = OffsetDateTime.now().getOffset();

    public DevOpsRebuildIndexExecutor(int i, int i2) {
        this.asyncThreadPool = new ThreadPoolExecutor(i, i, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(i2), ExecutorHelper.buildNameThreadFactory("task-threads", false));
    }

    public void destroy() throws Exception {
        if (null != this.asyncThreadPool) {
            ExecutorHelper.shutdownAndAwaitTermination(this.asyncThreadPool, 3600L);
        }
    }

    @Override // com.xforceplus.ultraman.oqsengine.devops.rebuild.RebuildIndexExecutor
    public DevOpsTaskInfo rebuildIndex(IEntityClass iEntityClass, LocalDateTime localDateTime, LocalDateTime localDateTime2) throws Exception {
        DefaultDevOpsTaskInfo pending = pending(iEntityClass, localDateTime, localDateTime2);
        this.logger.info("pending rebuildIndex task, maintainId {}, entityClass {}, start {}, end {}", new Object[]{pending.id(), Long.valueOf(iEntityClass.id()), localDateTime, localDateTime2});
        if (0 == this.sqlTaskStorage.build(pending).intValue()) {
            return null;
        }
        this.asyncThreadPool.submit(() -> {
            try {
                int rebuild = this.masterStorage.rebuild(pending.getEntity(), pending.getMaintainid(), pending.getStarts(), pending.getEnds());
                if (rebuild > 0) {
                    pending.setBatchSize(rebuild);
                    pending.resetStatus(BatchStatus.RUNNING.getCode());
                    pending.resetMessage("TASK PROCESSING");
                } else {
                    pending.setBatchSize(0L);
                    pending.resetStatus(BatchStatus.DONE.getCode());
                    pending.resetMessage("TASK END");
                }
                this.sqlTaskStorage.update(pending);
            } catch (Exception e) {
                pending.resetMessage(e.getMessage());
                try {
                    this.sqlTaskStorage.error(pending);
                } catch (SQLException e2) {
                    e2.printStackTrace();
                }
            }
        });
        return pending;
    }

    @Override // com.xforceplus.ultraman.oqsengine.devops.rebuild.RebuildIndexExecutor
    public boolean cancel(long j) throws Exception {
        return this.sqlTaskStorage.cancel(j) > 0;
    }

    @Override // com.xforceplus.ultraman.oqsengine.devops.rebuild.RebuildIndexExecutor
    public Optional<TaskHandler> taskHandler(Long l) throws SQLException {
        return this.sqlTaskStorage.selectUnique(l.longValue()).map(this::newTaskHandler);
    }

    @Override // com.xforceplus.ultraman.oqsengine.devops.rebuild.RebuildIndexExecutor
    public Collection<TaskHandler> listActiveTasks(Page page) throws SQLException {
        Collection<DevOpsTaskInfo> listActives = this.sqlTaskStorage.listActives(page);
        return (null == listActives || 0 >= listActives.size()) ? new ArrayList() : (Collection) listActives.stream().map(this::newTaskHandler).collect(Collectors.toList());
    }

    @Override // com.xforceplus.ultraman.oqsengine.devops.rebuild.RebuildIndexExecutor
    public Optional<TaskHandler> getActiveTask(IEntityClass iEntityClass) throws SQLException {
        Collection<DevOpsTaskInfo> selectActive = this.sqlTaskStorage.selectActive(iEntityClass.id());
        if (1 < selectActive.size()) {
            throw new SQLException("more than 1 active task error.");
        }
        return !selectActive.isEmpty() ? Optional.of(newTaskHandler(selectActive.iterator().next())) : Optional.empty();
    }

    @Override // com.xforceplus.ultraman.oqsengine.devops.rebuild.RebuildIndexExecutor
    public Collection<TaskHandler> listAllTasks(Page page) throws SQLException {
        Collection<DevOpsTaskInfo> listAll = this.sqlTaskStorage.listAll(page);
        return (null == listAll || 0 >= listAll.size()) ? new ArrayList() : (Collection) listAll.stream().map(this::newTaskHandler).collect(Collectors.toList());
    }

    @Override // com.xforceplus.ultraman.oqsengine.devops.rebuild.RebuildIndexExecutor
    public void sync(Map<Long, DevOpsCdcMetrics> map) throws SQLException {
        map.forEach((l, devOpsCdcMetrics) -> {
            this.asyncThreadPool.submit(() -> {
                try {
                    Optional<DevOpsTaskInfo> selectUnique = this.sqlTaskStorage.selectUnique(l.longValue());
                    if (selectUnique.isPresent()) {
                        DevOpsTaskInfo devOpsTaskInfo = selectUnique.get();
                        if (devOpsTaskInfo.isEnd()) {
                            return;
                        }
                        boolean z = false;
                        if (devOpsCdcMetrics.getSuccess() > 0) {
                            z = true;
                            devOpsTaskInfo.resetIncrementSize(devOpsCdcMetrics.getSuccess());
                            devOpsTaskInfo.addFinishSize(devOpsCdcMetrics.getSuccess());
                        }
                        if (devOpsCdcMetrics.getFails() > 0) {
                            z = true;
                            devOpsTaskInfo.addErrorSize(devOpsCdcMetrics.getFails());
                        }
                        if (z) {
                            if (devOpsTaskInfo.getErrorSize() > 0) {
                                try {
                                    devOpsTaskInfo.resetMessage("task end with error.");
                                    this.sqlTaskStorage.error(devOpsTaskInfo);
                                    return;
                                } catch (SQLException e) {
                                    this.logger.warn("do task-error exception, maintainId {}.", Long.valueOf(devOpsTaskInfo.getMaintainid()));
                                    return;
                                }
                            }
                            try {
                                if (devOpsTaskInfo.getBatchSize() <= 0 || devOpsTaskInfo.getFinishSize() < devOpsTaskInfo.getBatchSize()) {
                                    devOpsTaskInfo.resetStatus(BatchStatus.RUNNING.getCode());
                                    this.sqlTaskStorage.update(devOpsTaskInfo);
                                } else {
                                    done(devOpsTaskInfo);
                                }
                            } catch (SQLException e2) {
                                this.logger.warn("do task-update exception, maintainId {}.", Long.valueOf(devOpsTaskInfo.getMaintainid()));
                            }
                        }
                    }
                } catch (SQLException e3) {
                    this.logger.warn("query task exception, maintainId {}.", l);
                }
            });
        });
    }

    private DefaultDevOpsTaskInfo pending(IEntityClass iEntityClass, LocalDateTime localDateTime, LocalDateTime localDateTime2) {
        return new DefaultDevOpsTaskInfo(((Long) this.idGenerator.next()).longValue(), iEntityClass, localDateTime.toInstant(this.zoneOffset).toEpochMilli(), localDateTime2.toInstant(this.zoneOffset).toEpochMilli());
    }

    private boolean done(DevOpsTaskInfo devOpsTaskInfo) throws SQLException {
        try {
            this.indexStorage.clean(devOpsTaskInfo.getEntity(), devOpsTaskInfo.getMaintainid(), devOpsTaskInfo.getStarts(), devOpsTaskInfo.getEnds());
        } catch (Exception e) {
            this.logger.warn(e.getMessage());
        }
        devOpsTaskInfo.resetMessage("success");
        boolean z = this.sqlTaskStorage.done(devOpsTaskInfo) > 0;
        if (z) {
            this.logger.info("task done, maintainId {}, finish batchSize {}", Long.valueOf(devOpsTaskInfo.getMaintainid()), Integer.valueOf(devOpsTaskInfo.getFinishSize()));
            ((DefaultDevOpsTaskInfo) devOpsTaskInfo).setStatus(BatchStatus.DONE.getCode());
        } else {
            this.logger.warn("task done error, task update finish status error, maintainId {}", Long.valueOf(devOpsTaskInfo.getMaintainid()));
        }
        return z;
    }

    private TaskHandler newTaskHandler(DevOpsTaskInfo devOpsTaskInfo) {
        return new DefaultDevOpsTaskHandler(this.sqlTaskStorage, devOpsTaskInfo);
    }
}
