package com.xforceplus.ultraman.oqsengine.cdc.consumer;

import com.xforceplus.ultraman.oqsengine.cdc.connect.AbstractCDCConnector;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.process.BatchProcessor;
import com.xforceplus.ultraman.oqsengine.cdc.context.RunnerContext;
import com.xforceplus.ultraman.oqsengine.meta.common.utils.TimeWaitUtils;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.enums.CDCStatus;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.enums.RunningStatus;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xforceplus/ultraman/oqsengine/cdc/consumer/CDCRunner.class */
public class CDCRunner extends Thread {
    final Logger logger = LoggerFactory.getLogger(CDCRunner.class);
    private BatchProcessor batchProcessor;
    private AbstractCDCConnector connector;
    private RunnerContext context;

    public RunnerContext getContext() {
        return this.context;
    }

    public CDCRunner(BatchProcessor batchProcessor, AbstractCDCConnector abstractCDCConnector) {
        setName("cdcRunner");
        this.batchProcessor = batchProcessor;
        this.connector = abstractCDCConnector;
        this.context = new RunnerContext();
    }

    public void shutdown() {
        this.context.setRunningStatus(RunningStatus.TRY_STOP);
        this.batchProcessor.shutdown();
        int i = 0;
        while (true) {
            if (i >= 25) {
                break;
            }
            TimeWaitUtils.wakeupAfter(4L, TimeUnit.SECONDS);
            if (this.context.getRunningStatus().isStop()) {
                this.logger.info("[cdc-runner] consumer success stop.");
                break;
            }
            i++;
        }
        if (i >= 25) {
            this.logger.warn("[cdc-runner] force stop after {} seconds.", Integer.valueOf(i * 4));
        }
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        this.batchProcessor.init();
        while (!needTerminate()) {
            try {
                this.connector.open();
                recover();
                this.context.resetContinuesConnectFails();
                try {
                    consume();
                } catch (Exception e) {
                    closeAndCallBackError(CDCStatus.CONSUME_FAILED, String.format("[cdc-runner] canal-client consume error, %s", e.getMessage()));
                }
            } catch (Exception e2) {
                this.context.incrementContinuesConnectFails();
                if (this.connector.isMaxRetry(this.context.getContinuesConnectFails())) {
                    this.connector.close();
                    this.connector.init();
                    this.context.resetContinuesConnectFails();
                } else {
                    closeAndCallBackError(CDCStatus.DIS_CONNECTED, String.format("[cdc-runner] canal-server connection error, %s", e2.getMessage()));
                }
            }
        }
    }

    private void closeAndCallBackError(CDCStatus cDCStatus, String str) {
        this.context.getCdcMetrics().getCdcAckMetrics().setCdcConsumerStatus(cDCStatus);
        this.connector.close();
        this.logger.error(str);
        TimeWaitUtils.wakeupAfter(3L, TimeUnit.SECONDS);
        this.batchProcessor.error(this.context);
    }

    private boolean needTerminate() {
        if (!this.context.getRunningStatus().shouldStop()) {
            return false;
        }
        this.connector.shutdown();
        this.context.setRunningStatus(RunningStatus.STOP_SUCCESS);
        return true;
    }

    private void consume() throws SQLException {
        while (!this.context.getRunningStatus().shouldStop()) {
            this.batchProcessor.executeOneBatch(this.connector, this.context);
        }
        this.context.setRunningStatus(RunningStatus.STOP_SUCCESS);
    }

    private void recover() throws SQLException {
        this.context.getCdcMetrics().connected();
        this.batchProcessor.recover(this.connector, this.context);
        this.connector.rollback();
    }
}
