/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.oqsengine.cdc.consumer;

import com.xforceplus.ultraman.oqsengine.cdc.connect.AbstractCDCConnector;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.process.BatchProcessor;
import com.xforceplus.ultraman.oqsengine.cdc.context.RunnerContext;
import com.xforceplus.ultraman.oqsengine.meta.common.utils.TimeWaitUtils;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.enums.CDCStatus;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.enums.RunningStatus;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CDCRunner
extends Thread {
    final Logger logger = LoggerFactory.getLogger(CDCRunner.class);
    private BatchProcessor batchProcessor;
    private AbstractCDCConnector connector;
    private RunnerContext context;

    public RunnerContext getContext() {
        return this.context;
    }

    public CDCRunner(BatchProcessor batchProcessor, AbstractCDCConnector connector) {
        this.setName("cdcRunner");
        this.batchProcessor = batchProcessor;
        this.connector = connector;
        this.context = new RunnerContext();
    }

    public void shutdown() {
        int useTime;
        this.context.setRunningStatus(RunningStatus.TRY_STOP);
        this.batchProcessor.shutdown();
        for (useTime = 0; useTime < 25; ++useTime) {
            TimeWaitUtils.wakeupAfter((long)4L, (TimeUnit)TimeUnit.SECONDS);
            if (!this.context.getRunningStatus().isStop()) continue;
            this.logger.info("[cdc-runner] consumer success stop.");
            break;
        }
        if (useTime >= 25) {
            this.logger.warn("[cdc-runner] force stop after {} seconds.", (Object)(useTime * 4));
        }
    }

    @Override
    public void run() {
        this.batchProcessor.init();
        while (!this.needTerminate()) {
            try {
                this.connector.open();
                this.recover();
            }
            catch (Exception e) {
                this.context.incrementContinuesConnectFails();
                if (this.connector.isMaxRetry(this.context.getContinuesConnectFails())) {
                    this.connector.close();
                    this.connector.init();
                    this.context.resetContinuesConnectFails();
                    continue;
                }
                this.closeAndCallBackError(CDCStatus.DIS_CONNECTED, String.format("[cdc-runner] canal-server connection error, %s", e.getMessage()));
                continue;
            }
            this.context.resetContinuesConnectFails();
            try {
                this.consume();
            }
            catch (Exception e) {
                this.closeAndCallBackError(CDCStatus.CONSUME_FAILED, String.format("[cdc-runner] canal-client consume error, %s", e.getMessage()));
            }
        }
    }

    private void closeAndCallBackError(CDCStatus cdcStatus, String errorMessage) {
        this.context.getCdcMetrics().getCdcAckMetrics().setCdcConsumerStatus(cdcStatus);
        this.connector.close();
        this.logger.error(errorMessage);
        TimeWaitUtils.wakeupAfter((long)3L, (TimeUnit)TimeUnit.SECONDS);
        this.batchProcessor.error(this.context);
    }

    private boolean needTerminate() {
        if (this.context.getRunningStatus().shouldStop()) {
            this.connector.shutdown();
            this.context.setRunningStatus(RunningStatus.STOP_SUCCESS);
            return true;
        }
        return false;
    }

    private void consume() throws SQLException {
        while (true) {
            if (this.context.getRunningStatus().shouldStop()) break;
            this.batchProcessor.executeOneBatch(this.connector, this.context);
        }
        this.context.setRunningStatus(RunningStatus.STOP_SUCCESS);
    }

    private void recover() throws SQLException {
        this.context.getCdcMetrics().connected();
        this.batchProcessor.recover(this.connector, this.context);
        this.connector.rollback();
    }
}

