/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.oqsengine.cdc.consumer;

import com.alibaba.otter.canal.protocol.Message;
import com.xforceplus.ultraman.oqsengine.cdc.connect.AbstractCDCConnector;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.ConsumerService;
import com.xforceplus.ultraman.oqsengine.cdc.metrics.CDCMetricsService;
import com.xforceplus.ultraman.oqsengine.devops.rebuild.RebuildIndexExecutor;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.enums.CDCStatus;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.enums.RunningStatus;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.metrics.CDCMetrics;
import java.sql.SQLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerRunner
extends Thread {
    final Logger logger = LoggerFactory.getLogger(ConsumerRunner.class);
    private ConsumerService consumerService;
    private CDCMetricsService cdcMetricsService;
    private AbstractCDCConnector abstractCdcConnector;
    private RebuildIndexExecutor rebuildIndexExecutor;
    private static volatile RunningStatus runningStatus;

    public ConsumerRunner(ConsumerService consumerService, CDCMetricsService cdcMetricsService, AbstractCDCConnector abstractCdcConnector, RebuildIndexExecutor rebuildIndexExecutor) {
        this.consumerService = consumerService;
        this.cdcMetricsService = cdcMetricsService;
        this.abstractCdcConnector = abstractCdcConnector;
        this.rebuildIndexExecutor = rebuildIndexExecutor;
    }

    public void shutdown() {
        int useTime;
        runningStatus = RunningStatus.TRY_STOP;
        this.cdcMetricsService.shutdown();
        for (useTime = 0; useTime < 25; ++useTime) {
            try {
                Thread.sleep(4000L);
            }
            catch (Exception e) {
                this.logger.warn("[cdc-runner] shutdown error, message : {}", (Object)e.getMessage());
            }
            if (!this.isShutdown()) continue;
            this.logger.info("[cdc-runner] consumer success stop.");
            break;
        }
        if (useTime >= 25) {
            this.logger.warn("[cdc-runner] force stop after {} seconds.", (Object)(useTime * 4));
        }
    }

    public boolean isShutdown() {
        return runningStatus.equals((Object)RunningStatus.STOP_SUCCESS);
    }

    @Override
    public void run() {
        runningStatus = RunningStatus.INIT;
        this.cdcMetricsService.startMetrics();
        while (!this.checkForStop()) {
            try {
                this.connectAndReset();
            }
            catch (Exception e) {
                this.closeToNextReconnect(CDCStatus.DIS_CONNECTED, String.format("[cdc-runner] %s, %s", "canal-server connection error", e.getMessage()));
                continue;
            }
            runningStatus = RunningStatus.RUN;
            try {
                this.consume();
            }
            catch (Exception e) {
                this.closeToNextReconnect(CDCStatus.CONSUME_FAILED, String.format("[cdc-runner] %s, %s", "canal-client consume error", e.getMessage()));
            }
        }
    }

    private void connectAndReset() throws SQLException {
        this.abstractCdcConnector.open();
        this.syncAndRecover();
    }

    private void closeToNextReconnect(CDCStatus cdcStatus, String errorMessage) {
        this.abstractCdcConnector.close();
        this.logger.error(errorMessage);
        this.callBackError(3000, cdcStatus);
    }

    private boolean checkForStop() {
        if (runningStatus.ordinal() >= RunningStatus.TRY_STOP.ordinal()) {
            this.abstractCdcConnector.shutdown();
            runningStatus = RunningStatus.STOP_SUCCESS;
            return true;
        }
        return false;
    }

    public void consume() throws SQLException {
        while (true) {
            long batchId;
            if (runningStatus.ordinal() >= RunningStatus.TRY_STOP.ordinal()) break;
            Message message = null;
            try {
                long start = System.currentTimeMillis();
                message = this.abstractCdcConnector.getMessageWithoutAck();
                long duration = System.currentTimeMillis() - start;
                batchId = message.getId();
                if (duration > 100L && batchId != -1L) {
                    this.logger.info("[cdc-runner] get message from canal server use too much times, use timeMs : {}, batchId : {}", (Object)duration, (Object)message.getId());
                }
            }
            catch (Exception e) {
                this.abstractCdcConnector.rollback();
                String error = String.format("get message from canal server error, %s", e.toString());
                this.logger.error("[cdc-runner] {}", (Object)error);
                throw new SQLException(error);
            }
            boolean synced = false;
            try {
                CDCMetrics cdcMetrics = null;
                if (batchId != -1L || message.getEntries().size() != 0) {
                    cdcMetrics = this.consumerService.consume(message.getEntries(), batchId, this.cdcMetricsService);
                    synced = this.backMetrics(cdcMetrics);
                    this.syncSuccess(cdcMetrics);
                    if (cdcMetrics.getDevOpsMetrics().isEmpty()) continue;
                    this.rebuildIndexExecutor.sync(cdcMetrics.getDevOpsMetrics());
                    continue;
                }
                cdcMetrics = new CDCMetrics(batchId, this.cdcMetricsService.getCdcMetrics().getCdcAckMetrics(), this.cdcMetricsService.getCdcMetrics().getCdcUnCommitMetrics());
                cdcMetrics.getCdcAckMetrics().setExecuteRows(0);
                synced = this.backMetrics(cdcMetrics);
                this.syncFree(batchId);
            }
            catch (Exception e) {
                String error = "";
                if (!synced) {
                    this.abstractCdcConnector.rollback();
                    error = "consume message error";
                } else {
                    error = "sync finish status error";
                }
                this.logger.error("[cdc-runner] sync error, will reconnect..., message : {}, {}", (Object)error, (Object)e.toString());
                throw new SQLException(error);
            }
        }
        runningStatus = RunningStatus.STOP_SUCCESS;
    }

    private boolean backMetrics(CDCMetrics cdcMetrics) {
        this.cdcMetricsService.backup(cdcMetrics);
        return true;
    }

    private void syncFree(long batchId) throws SQLException {
        this.abstractCdcConnector.ack(batchId);
        this.threadSleep(5);
    }

    private void syncAndRecover() throws SQLException {
        this.cdcMetricsService.connectOk();
        CDCMetrics cdcMetrics = this.cdcMetricsService.query();
        long originBatchId = -1L;
        if (null != cdcMetrics) {
            originBatchId = cdcMetrics.getBatchId();
            if (originBatchId != -9223372036854775807L && originBatchId != -1L) {
                this.backAfterAck(originBatchId, cdcMetrics);
            }
            this.callBackSuccess(originBatchId, cdcMetrics, true);
            this.logger.info("[cdc-runner] recover from last ackMetrics position success..., originBatchId : {}", (Object)originBatchId);
        } else {
            this.cdcMetricsService.newConnectCallBack();
            this.logger.info("[cdc-runner] new connect callBack success, originBatchId : {}", (Object)originBatchId);
        }
        this.abstractCdcConnector.rollback();
    }

    private void syncSuccess(CDCMetrics cdcMetrics) throws SQLException {
        if (null != cdcMetrics) {
            long originBatchId = cdcMetrics.getBatchId();
            this.backAfterAck(originBatchId, cdcMetrics);
            this.callBackSuccess(originBatchId, cdcMetrics, false);
        }
    }

    private void backAfterAck(long originBatchId, CDCMetrics cdcMetrics) throws SQLException {
        this.abstractCdcConnector.ack(originBatchId);
        this.logger.debug("ack batch success, batchId : {}", (Object)originBatchId);
        cdcMetrics.setBatchId(-9223372036854775807L);
        this.cdcMetricsService.backup(cdcMetrics);
        this.logger.debug("rest cdcMetrics with buckUpId success, origin batchId : {}", (Object)originBatchId);
    }

    private void threadSleep(int waitInSeconds) {
        try {
            Thread.sleep(waitInSeconds);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private void callBackError(int waitInSeconds, CDCStatus cdcStatus) {
        this.threadSleep(waitInSeconds);
        this.cdcMetricsService.callBackError(cdcStatus);
    }

    private void callBackSuccess(long originBatchId, CDCMetrics cdcMetrics, boolean isConnectSync) {
        this.cdcMetricsService.callBackSuccess(originBatchId, cdcMetrics, isConnectSync);
    }
}

