/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.oqsengine.cdc.consumer;

import com.alibaba.otter.canal.protocol.Message;
import com.xforceplus.ultraman.oqsengine.cdc.connect.AbstractCDCConnector;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.ConsumerService;
import com.xforceplus.ultraman.oqsengine.cdc.metrics.CDCMetricsService;
import com.xforceplus.ultraman.oqsengine.devops.rebuild.RebuildIndexExecutor;
import com.xforceplus.ultraman.oqsengine.meta.common.utils.TimeWaitUtils;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.enums.CDCStatus;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.enums.RunningStatus;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.metrics.CDCMetrics;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerRunner
extends Thread {
    final Logger logger = LoggerFactory.getLogger(ConsumerRunner.class);
    private ConsumerService consumerService;
    private CDCMetricsService cdcMetricsService;
    private AbstractCDCConnector connector;
    private RebuildIndexExecutor rebuildIndexExecutor;
    private static volatile RunningStatus runningStatus;

    public ConsumerRunner(ConsumerService consumerService, CDCMetricsService cdcMetricsService, AbstractCDCConnector connector, RebuildIndexExecutor rebuildIndexExecutor) {
        this.consumerService = consumerService;
        this.cdcMetricsService = cdcMetricsService;
        this.connector = connector;
        this.rebuildIndexExecutor = rebuildIndexExecutor;
        this.rebuildIndexExecutor.resetDoubleCheckDistance((long)connector.getBatchSize());
    }

    public void shutdown() {
        int useTime;
        runningStatus = RunningStatus.TRY_STOP;
        this.cdcMetricsService.shutdown();
        for (useTime = 0; useTime < 25; ++useTime) {
            TimeWaitUtils.wakeupAfter((long)4L, (TimeUnit)TimeUnit.SECONDS);
            if (!runningStatus.equals((Object)RunningStatus.STOP_SUCCESS)) continue;
            this.logger.info("[cdc-runner] consumer success stop.");
            break;
        }
        if (useTime >= 25) {
            this.logger.warn("[cdc-runner] force stop after {} seconds.", (Object)(useTime * 4));
        }
    }

    @Override
    public void run() {
        runningStatus = RunningStatus.INIT;
        this.cdcMetricsService.startMetrics();
        int currentConnectTimes = 0;
        while (!this.needTerminate()) {
            try {
                this.connectAndReset(currentConnectTimes);
            }
            catch (Exception e) {
                ++currentConnectTimes;
                this.closeToNextReconnect(CDCStatus.DIS_CONNECTED, String.format("[cdc-runner] canal-server connection error, %s", e.getMessage()));
                continue;
            }
            runningStatus = RunningStatus.RUN;
            currentConnectTimes = 0;
            try {
                this.consume();
            }
            catch (Exception e) {
                this.closeToNextReconnect(CDCStatus.CONSUME_FAILED, String.format("[cdc-runner] canal-client consume error, %s", e.getMessage()));
            }
        }
    }

    private void connectAndReset(int currentConnectTimes) throws SQLException {
        if (this.connector.canUseConnector(currentConnectTimes)) {
            this.connector.open();
            this.syncAndRecover();
        } else {
            this.connector.close();
            this.connector.init();
            this.connectAndReset(0);
        }
    }

    private void closeToNextReconnect(CDCStatus cdcStatus, String errorMessage) {
        this.connector.close();
        this.logger.error(errorMessage);
        this.callBackError(3, cdcStatus);
    }

    private boolean needTerminate() {
        if (runningStatus.ordinal() >= RunningStatus.TRY_STOP.ordinal()) {
            this.connector.shutdown();
            runningStatus = RunningStatus.STOP_SUCCESS;
            return true;
        }
        return false;
    }

    public void consume() throws SQLException {
        while (true) {
            long batchId;
            if (runningStatus.ordinal() >= RunningStatus.TRY_STOP.ordinal()) break;
            Message message = null;
            try {
                long start = System.currentTimeMillis();
                message = this.connector.getMessageWithoutAck();
                long duration = System.currentTimeMillis() - start;
                batchId = message.getId();
                if (duration > 100L && batchId != -1L) {
                    this.logger.info("[cdc-runner] get message from canal server use too much times, use timeMs : {}, batchId : {}", (Object)duration, (Object)message.getId());
                }
            }
            catch (Exception e) {
                this.connector.rollback();
                String error = String.format("get message from canal server error, %s", e.toString());
                this.logger.error("[cdc-runner] {}", (Object)error);
                throw new SQLException(error);
            }
            boolean synced = false;
            try {
                CDCMetrics cdcMetrics = null;
                if (batchId != -1L || message.getEntries().size() != 0) {
                    cdcMetrics = this.consumerService.consume(message.getEntries(), batchId, this.cdcMetricsService);
                    synced = this.saveMetrics(cdcMetrics);
                    this.finishAck(cdcMetrics);
                    if (cdcMetrics.getDevOpsMetrics().isEmpty()) continue;
                    this.rebuildIndexExecutor.sync(cdcMetrics.getDevOpsMetrics());
                    continue;
                }
                cdcMetrics = new CDCMetrics(batchId, this.cdcMetricsService.getCdcMetrics().getCdcAckMetrics(), this.cdcMetricsService.getCdcMetrics().getCdcUnCommitMetrics());
                cdcMetrics.getCdcAckMetrics().setExecuteRows(0);
                synced = this.saveMetrics(cdcMetrics);
                this.emptyAck(batchId);
            }
            catch (Exception e) {
                String error = "";
                if (!synced) {
                    this.connector.rollback();
                    error = "consume message error";
                } else {
                    error = "sync finish status error";
                }
                this.logger.error("[cdc-runner] sync error, will reconnect..., message : {}, {}", (Object)error, (Object)e.toString());
                throw new SQLException(error);
            }
        }
        runningStatus = RunningStatus.STOP_SUCCESS;
    }

    private boolean saveMetrics(CDCMetrics cdcMetrics) {
        this.cdcMetricsService.backup(cdcMetrics);
        return true;
    }

    private void syncAndRecover() throws SQLException {
        this.cdcMetricsService.connectOk();
        CDCMetrics cdcMetrics = this.cdcMetricsService.query();
        long originBatchId = -1L;
        if (null != cdcMetrics) {
            originBatchId = cdcMetrics.getBatchId();
            if (originBatchId != -9223372036854775807L && originBatchId != -1L) {
                this.backAfterAck(originBatchId, cdcMetrics);
            }
            this.callBackSuccess(originBatchId, cdcMetrics, true);
            this.logger.info("[cdc-runner] recover from last ackMetrics position success..., originBatchId : {}", (Object)originBatchId);
        } else {
            this.cdcMetricsService.newConnectCallBack();
            this.logger.info("[cdc-runner] new connect callBack success, originBatchId : {}", (Object)originBatchId);
        }
        this.connector.rollback();
    }

    private void finishAck(CDCMetrics cdcMetrics) throws SQLException {
        if (null != cdcMetrics) {
            long originBatchId = cdcMetrics.getBatchId();
            this.backAfterAck(originBatchId, cdcMetrics);
            this.callBackSuccess(originBatchId, cdcMetrics, false);
        }
    }

    private void emptyAck(long batchId) throws SQLException {
        this.connector.ack(batchId);
        TimeWaitUtils.wakeupAfter((long)5L, (TimeUnit)TimeUnit.MILLISECONDS);
    }

    private void backAfterAck(long originBatchId, CDCMetrics cdcMetrics) throws SQLException {
        this.connector.ack(originBatchId);
        this.logger.debug("ack batch success, batchId : {}", (Object)originBatchId);
        cdcMetrics.setBatchId(-9223372036854775807L);
        this.cdcMetricsService.backup(cdcMetrics);
        this.logger.debug("rest cdcMetrics with buckUpId success, origin batchId : {}", (Object)originBatchId);
    }

    private void callBackError(int waitInSeconds, CDCStatus cdcStatus) {
        TimeWaitUtils.wakeupAfter((long)waitInSeconds, (TimeUnit)TimeUnit.SECONDS);
        this.cdcMetricsService.callBackError(cdcStatus);
    }

    private void callBackSuccess(long originBatchId, CDCMetrics cdcMetrics, boolean isConnectSync) {
        this.cdcMetricsService.callBackSuccess(originBatchId, cdcMetrics, isConnectSync);
    }
}

