package com.xforceplus.ultraman.oqsengine.cdc.consumer;

import com.alibaba.otter.canal.protocol.Message;
import com.xforceplus.ultraman.oqsengine.cdc.connect.CDCConnector;
import com.xforceplus.ultraman.oqsengine.cdc.metrics.CDCMetricsService;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.enums.CDCStatus;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.enums.RunningStatus;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.metrics.CDCMetrics;
import java.sql.SQLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StopWatch;

/* loaded from: input_file:com/xforceplus/ultraman/oqsengine/cdc/consumer/ConsumerRunner.class */
public class ConsumerRunner extends Thread {
    final Logger logger = LoggerFactory.getLogger(ConsumerRunner.class);
    private ConsumerService consumerService;
    private CDCMetricsService cdcMetricsService;
    private CDCConnector cdcConnector;
    private static volatile RunningStatus runningStatus;

    public ConsumerRunner(ConsumerService consumerService, CDCMetricsService cDCMetricsService, CDCConnector cDCConnector) {
        this.consumerService = consumerService;
        this.cdcMetricsService = cDCMetricsService;
        this.cdcConnector = cDCConnector;
    }

    public void shutdown() {
        runningStatus = RunningStatus.TRY_STOP;
        this.cdcMetricsService.shutdown();
        int i = 0;
        while (true) {
            if (i >= 25) {
                break;
            }
            try {
                Thread.sleep(4000L);
            } catch (Exception e) {
                this.logger.warn("cdc-runner] shutdown error, message : {}", e.getMessage());
            }
            if (isShutdown()) {
                this.logger.info("[cdc-runner] consumer success stop.");
                break;
            }
            i++;
        }
        if (i >= 25) {
            this.logger.warn("[cdc-runner] force stop after {} seconds.", Integer.valueOf(i * 4));
        }
    }

    public boolean isShutdown() {
        return runningStatus.equals(RunningStatus.STOP_SUCCESS);
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        runningStatus = RunningStatus.INIT;
        this.cdcMetricsService.startMetrics();
        while (!checkForStop()) {
            try {
                connectAndReset();
                runningStatus = RunningStatus.RUN;
                try {
                    consume();
                } catch (Exception e) {
                    closeToNextReconnect(CDCStatus.CONSUME_FAILED, String.format("[cdc-runner] %s, %s", "canal-client consume error", e.getMessage()));
                }
            } catch (Exception e2) {
                closeToNextReconnect(CDCStatus.DIS_CONNECTED, String.format("[cdc-runner] %s, %s", "canal-server connection error", e2.getMessage()));
            }
        }
    }

    private void connectAndReset() throws SQLException {
        this.cdcConnector.open();
        syncAndRecover();
    }

    private void closeToNextReconnect(CDCStatus cDCStatus, String str) {
        this.cdcConnector.close();
        this.logger.error(str);
        callBackError(3000, cDCStatus);
    }

    private boolean checkForStop() {
        if (runningStatus.ordinal() < RunningStatus.TRY_STOP.ordinal()) {
            return false;
        }
        this.cdcConnector.shutdown();
        runningStatus = RunningStatus.STOP_SUCCESS;
        return true;
    }

    public void consume() throws SQLException {
        String str;
        StopWatch stopWatch = new StopWatch("getMessage");
        while (runningStatus.ordinal() < RunningStatus.TRY_STOP.ordinal()) {
            try {
                stopWatch.start();
                Message messageWithoutAck = this.cdcConnector.getMessageWithoutAck();
                stopWatch.stop();
                long id = messageWithoutAck.getId();
                if (stopWatch.getLastTaskTimeMillis() > 100 && id != -1) {
                    this.logger.info("[cdc-runner] get message from canal server use too much times, use timeMs : {}, batchId : {}", Long.valueOf(stopWatch.getLastTaskTimeMillis()), Long.valueOf(messageWithoutAck.getId()));
                }
                if (id == -1) {
                    try {
                        if (messageWithoutAck.getEntries().size() == 0) {
                            CDCMetrics cDCMetrics = new CDCMetrics(id, this.cdcMetricsService.getCdcMetrics().getCdcAckMetrics(), this.cdcMetricsService.getCdcMetrics().getCdcUnCommitMetrics());
                            cDCMetrics.getCdcAckMetrics().setExecuteRows(0);
                            backMetrics(cDCMetrics);
                            syncFree(id);
                        }
                    } catch (Exception e) {
                        if (0 == 0) {
                            this.cdcConnector.rollback();
                            str = "consume message error";
                        } else {
                            str = "sync finish status error";
                        }
                        e.printStackTrace();
                        this.logger.error("[cdc-runner] sync error, will reconnect..., message : {}, {}", str, e.getMessage());
                        throw new SQLException(str);
                    }
                }
                CDCMetrics consume = this.consumerService.consume(messageWithoutAck.getEntries(), id, this.cdcMetricsService);
                backMetrics(consume);
                syncSuccess(consume);
            } catch (Exception e2) {
                this.cdcConnector.rollback();
                e2.printStackTrace();
                String format = String.format("[cdc-runner] get message from canal server error, %s", e2);
                this.logger.error(format);
                throw new SQLException(format);
            }
        }
        runningStatus = RunningStatus.STOP_SUCCESS;
    }

    private boolean backMetrics(CDCMetrics cDCMetrics) {
        this.cdcMetricsService.backup(cDCMetrics);
        return true;
    }

    private void syncFree(long j) throws SQLException {
        this.cdcConnector.ack(j);
        threadSleep(5);
    }

    private void syncAndRecover() throws SQLException {
        this.cdcMetricsService.connectOk();
        CDCMetrics query = this.cdcMetricsService.query();
        if (null != query) {
            long batchId = query.getBatchId();
            if (batchId != -9223372036854775807L && batchId != -1) {
                backAfterAck(batchId, query);
            }
            callBackSuccess(batchId, query, true);
            this.logger.info("[cdc-runner] recover from last ackMetrics position success..., originBatchId : {}", Long.valueOf(batchId));
        } else {
            this.cdcMetricsService.newConnectCallBack();
            this.logger.info("[cdc-runner] new connect callBack success, originBatchId : {}", -1L);
        }
        this.cdcConnector.rollback();
    }

    private void syncSuccess(CDCMetrics cDCMetrics) throws SQLException {
        if (null != cDCMetrics) {
            long batchId = cDCMetrics.getBatchId();
            backAfterAck(batchId, cDCMetrics);
            callBackSuccess(batchId, cDCMetrics, false);
        }
    }

    private void backAfterAck(long j, CDCMetrics cDCMetrics) throws SQLException {
        this.cdcConnector.ack(j);
        cDCMetrics.setBatchId(-9223372036854775807L);
        this.cdcMetricsService.backup(cDCMetrics);
    }

    private void threadSleep(int i) {
        try {
            Thread.sleep(i);
        } catch (InterruptedException e) {
        }
    }

    private void callBackError(int i, CDCStatus cDCStatus) {
        threadSleep(i);
        this.cdcMetricsService.callBackError(cDCStatus);
    }

    private void callBackSuccess(long j, CDCMetrics cDCMetrics, boolean z) {
        this.cdcMetricsService.callBackSuccess(j, cDCMetrics, z);
    }
}
