package com.xforceplus.ultraman.oqsengine.cdc.consumer.process;

import com.alibaba.otter.canal.protocol.Message;
import com.xforceplus.ultraman.oqsengine.cdc.connect.AbstractCDCConnector;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.service.ConsumerService;
import com.xforceplus.ultraman.oqsengine.cdc.context.RunnerContext;
import com.xforceplus.ultraman.oqsengine.meta.common.utils.TimeWaitUtils;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.metrics.CDCMetrics;
import io.micrometer.core.annotation.Timed;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xforceplus/ultraman/oqsengine/cdc/consumer/process/DefaultBatchProcessor.class */
public class DefaultBatchProcessor implements BatchProcessor {
    final Logger logger = LoggerFactory.getLogger(DefaultBatchProcessor.class);

    @Resource
    private ConsumerService consumerService;

    @Override // com.xforceplus.ultraman.oqsengine.cdc.consumer.process.BatchProcessor
    @Timed(value = "oqs.process.delay.latency", percentiles = {0.5d, 0.9d, 0.99d}, extraTags = {"initiator", "cdc", "action", "oneBatch"})
    public void executeOneBatch(AbstractCDCConnector abstractCDCConnector, RunnerContext runnerContext) throws SQLException {
        String str;
        try {
            long currentTimeMillis = System.currentTimeMillis();
            Message messageWithoutAck = abstractCDCConnector.getMessageWithoutAck();
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            long id = messageWithoutAck.getId();
            if (currentTimeMillis2 > 100) {
                this.logger.debug("[batchProcess] read message from canal server use too much times, use timeMs : {}, batchId : {}", Long.valueOf(currentTimeMillis2), Long.valueOf(messageWithoutAck.getId()));
            }
            boolean z = false;
            if (id == -1) {
                try {
                    if (messageWithoutAck.getEntries().size() == 0) {
                        CDCMetrics cDCMetrics = new CDCMetrics(id, runnerContext.getCdcMetrics().getCdcAckMetrics(), runnerContext.getCdcMetrics().getCdcUnCommitMetrics());
                        cDCMetrics.getCdcAckMetrics().setExecuteRows(0);
                        z = saveMetrics(cDCMetrics);
                        emptyBatch(id, abstractCDCConnector);
                    }
                } catch (Exception e) {
                    if (z) {
                        str = "ack finish status error";
                    } else {
                        abstractCDCConnector.rollback();
                        str = "consume message error";
                    }
                    this.logger.error("[batchProcess] consume batch error, connection will reset..., message : {}, {}", str, e.getMessage());
                    throw new SQLException(str);
                }
            }
            CDCMetrics consumeOneBatch = this.consumerService.consumeOneBatch(messageWithoutAck.getEntries(), id, runnerContext.getCdcMetrics());
            z = saveMetrics(consumeOneBatch);
            finishBatch(consumeOneBatch, abstractCDCConnector, runnerContext);
            runnerContext.setCdcMetrics(consumeOneBatch);
        } catch (Exception e2) {
            abstractCDCConnector.rollback();
            String format = String.format("read message from canal server error, %s", e2.getMessage());
            this.logger.error("[batchProcess] {}", format);
            throw new SQLException(format);
        }
    }

    @Override // com.xforceplus.ultraman.oqsengine.cdc.consumer.process.BatchProcessor
    public void recover(AbstractCDCConnector abstractCDCConnector, RunnerContext runnerContext) throws SQLException {
        CDCMetrics query = this.consumerService.metricsHandler().query();
        if (null == query) {
            this.consumerService.metricsHandler().renewConnect(runnerContext.getCdcMetrics());
            this.logger.info("[cdc-runner] renew connect success, originBatchId : {}", -1L);
            return;
        }
        long batchId = query.getBatchId();
        if (batchId != -9223372036854775807L && batchId != -1) {
            backup(batchId, abstractCDCConnector, query);
        }
        callBackSuccess(batchId, query, runnerContext, true);
        this.logger.info("[cdc-runner] recover from last ackMetrics position success..., originBatchId : {}", Long.valueOf(batchId));
    }

    @Override // com.xforceplus.ultraman.oqsengine.cdc.consumer.process.BatchProcessor
    public void init() {
        this.consumerService.metricsHandler().init();
    }

    @Override // com.xforceplus.ultraman.oqsengine.cdc.consumer.process.BatchProcessor
    public void shutdown() {
        this.consumerService.metricsHandler().shutdown();
    }

    @Override // com.xforceplus.ultraman.oqsengine.cdc.consumer.process.BatchProcessor
    public void error(RunnerContext runnerContext) {
        this.consumerService.metricsHandler().callBackError(runnerContext.getCdcMetrics());
    }

    private boolean saveMetrics(CDCMetrics cDCMetrics) {
        this.consumerService.metricsHandler().backup(cDCMetrics);
        return true;
    }

    private void finishBatch(CDCMetrics cDCMetrics, AbstractCDCConnector abstractCDCConnector, RunnerContext runnerContext) throws SQLException {
        if (null != cDCMetrics) {
            long batchId = cDCMetrics.getBatchId();
            backup(batchId, abstractCDCConnector, cDCMetrics);
            callBackSuccess(batchId, cDCMetrics, runnerContext, false);
        }
    }

    private void emptyBatch(long j, AbstractCDCConnector abstractCDCConnector) throws SQLException {
        abstractCDCConnector.ack(j);
        TimeWaitUtils.wakeupAfter(5L, TimeUnit.MILLISECONDS);
    }

    private void backup(long j, AbstractCDCConnector abstractCDCConnector, CDCMetrics cDCMetrics) throws SQLException {
        abstractCDCConnector.ack(j);
        this.logger.debug("ack batch success, batchId : {}", Long.valueOf(j));
        cDCMetrics.setBatchId(-9223372036854775807L);
        this.consumerService.metricsHandler().backup(cDCMetrics);
        this.logger.debug("rest cdcMetrics with buckUpId success, origin batchId : {}", Long.valueOf(j));
    }

    private void callBackSuccess(long j, CDCMetrics cDCMetrics, RunnerContext runnerContext, boolean z) {
        runnerContext.getCdcMetrics().setCdcUnCommitMetrics(cDCMetrics.getCdcUnCommitMetrics());
        runnerContext.getCdcMetrics().consumeSuccess(j, cDCMetrics, z);
        this.consumerService.metricsHandler().callBackSuccess(runnerContext.getCdcMetrics());
    }
}
