package com.xforceplus.ultraman.cdc.core.remote.runner;

import com.alibaba.otter.canal.protocol.Message;
import com.xforceplus.ultraman.cdc.core.remote.connect.CDCConnector;
import com.xforceplus.ultraman.cdc.core.remote.context.RunnerContext;
import com.xforceplus.ultraman.cdc.core.remote.context.RunningStatus;
import com.xforceplus.ultraman.cdc.dto.enums.CDCStatus;
import com.xforceplus.ultraman.cdc.processor.DataProcessor;
import com.xforceplus.ultraman.cdc.reader.CanalPropertiesReader;
import com.xforceplus.ultraman.cdc.utils.TimeWaitUtils;
import com.xforceplus.ultraman.sdk.infra.logging.LoggingPattern;
import com.xforceplus.ultraman.sdk.infra.logging.LoggingUtils;
import io.vavr.Tuple2;
import java.sql.SQLException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xforceplus/ultraman/cdc/core/remote/runner/DefaultCDCConsumer.class */
public class DefaultCDCConsumer implements CDCConsumer {
    public static final int MESSAGE_GET_WARM_INTERVAL = 100;
    public static final int FREE_MESSAGE_WAIT_IN_MS = 5;
    private DataProcessor dataProcessor;
    private CDCConnector connector;
    private Map<String, CDCStatus> cdcStatusMap;
    private CanalPropertiesReader canalPropertiesReader;
    final Logger logger = LoggerFactory.getLogger(DefaultCDCConsumer.class);
    private RunnerContext context = new RunnerContext();

    public DefaultCDCConsumer(CDCConnector cDCConnector, DataProcessor dataProcessor, Map<String, CDCStatus> map, CanalPropertiesReader canalPropertiesReader) {
        this.connector = cDCConnector;
        this.dataProcessor = dataProcessor;
        this.cdcStatusMap = map;
        this.canalPropertiesReader = canalPropertiesReader;
    }

    @Override // com.xforceplus.ultraman.cdc.CDCLifeCycle
    public void init() {
        this.context.setRunningStatus(RunningStatus.RUN);
        this.connector.init();
    }

    @Override // com.xforceplus.ultraman.cdc.CDCLifeCycle
    public void destroy() {
        this.context.setRunningStatus(RunningStatus.TRY_STOP);
        int i = 10;
        while (i > 0 && !this.context.getRunningStatus().equals(RunningStatus.STOP_SUCCESS)) {
            i--;
            TimeWaitUtils.wakeupAfter(1L, TimeUnit.SECONDS);
        }
        this.connector.destroy();
    }

    @Override // com.xforceplus.ultraman.cdc.core.remote.runner.CDCConsumer
    public void execute() {
        this.logger.info("connector {} is start executing...", this.connector.name());
        while (!needTerminate()) {
            try {
                this.connector.open();
                if (this.context.getBatchId() > 0) {
                    this.connector.rollback();
                }
                this.cdcStatusMap.put(this.connector.name(), CDCStatus.CONNECTED);
                this.context.resetContinuesConnectFails();
                try {
                    consume();
                } catch (Exception e) {
                    this.cdcStatusMap.put(this.connector.name(), CDCStatus.CONSUME_FAILED);
                    closeAndCallBackError(CDCStatus.CONSUME_FAILED, String.format("[cdc-runner] canal-client consume error, %s", e.getMessage()));
                }
            } catch (Exception e2) {
                this.context.incrementContinuesConnectFails();
                if (this.connector.isMaxRetry(this.context.getContinuesConnectFails())) {
                    this.connector.close();
                    this.connector.init();
                    this.context.resetContinuesConnectFails();
                } else {
                    this.cdcStatusMap.put(this.connector.name(), CDCStatus.DIS_CONNECTED);
                    closeAndCallBackError(CDCStatus.DIS_CONNECTED, String.format("[cdc-runner] canal-server connection error, %s", e2.getMessage()));
                }
            }
        }
        this.logger.info("connector {} is finish executing...", this.connector.name());
    }

    @Override // com.xforceplus.ultraman.cdc.core.remote.runner.CDCConsumer
    public Tuple2<String, CDCStatus> namedCdcStatus() {
        return new Tuple2<>(this.connector.name(), this.context.getCdcStatus());
    }

    private void consume() throws SQLException {
        while (!this.context.getRunningStatus().shouldStop()) {
            executeOneBatch(this.connector);
        }
        this.context.setRunningStatus(RunningStatus.STOP_SUCCESS);
    }

    private void executeOneBatch(CDCConnector cDCConnector) throws SQLException {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            Message messageWithoutAck = cDCConnector.getMessageWithoutAck();
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            long id = messageWithoutAck.getId();
            if (currentTimeMillis2 > 100) {
                this.logger.debug("[batchProcess] read message from canal server use too much times, use timeMs : {}, batchId : {}", Long.valueOf(currentTimeMillis2), Long.valueOf(messageWithoutAck.getId()));
            }
            try {
                boolean z = false;
                long currentTimeMillis3 = System.currentTimeMillis();
                if (id != 0 || !messageWithoutAck.getEntries().isEmpty()) {
                    z = this.dataProcessor.onProcess(messageWithoutAck);
                }
                cDCConnector.ack(id);
                if (z) {
                    long currentTimeMillis4 = System.currentTimeMillis();
                    this.logger.info("[batchProcess] metrics total use time {}, consumer use time {},rows{}", new Object[]{Long.valueOf(currentTimeMillis4 - currentTimeMillis), Long.valueOf(currentTimeMillis4 - currentTimeMillis3), Integer.valueOf(messageWithoutAck.getEntries().size())});
                } else {
                    TimeWaitUtils.wakeupAfter(5L, TimeUnit.MILLISECONDS);
                }
            } catch (Exception e) {
                cDCConnector.rollback();
                LoggingUtils.logErrorPattern(this.logger, LoggingPattern.CDC_PROCESS_ERROR, "OneBatchConsumeError", "consume message error");
                throw new SQLException("consume message error");
            }
        } catch (Throwable th) {
            cDCConnector.rollback();
            String format = String.format("read message from canal server error, %s", th.getMessage());
            LoggingUtils.logErrorPattern(this.logger, LoggingPattern.CDC_PROCESS_ERROR, format);
            throw new SQLException(format);
        }
    }

    private void closeAndCallBackError(CDCStatus cDCStatus, String str) {
        this.connector.close();
        this.logger.error(str);
        TimeWaitUtils.wakeupAfter(3L, TimeUnit.SECONDS);
    }

    private boolean needTerminate() {
        if (!this.context.getRunningStatus().shouldStop()) {
            return false;
        }
        this.context.setRunningStatus(RunningStatus.STOP_SUCCESS);
        return true;
    }
}
