/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.oqsengine.cdc.consumer.process;

import com.alibaba.otter.canal.protocol.Message;
import com.xforceplus.ultraman.oqsengine.cdc.connect.AbstractCDCConnector;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.checker.CommitIdChecker;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.process.BatchProcessor;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.service.ConsumerService;
import com.xforceplus.ultraman.oqsengine.cdc.context.RunnerContext;
import com.xforceplus.ultraman.oqsengine.meta.common.utils.TimeWaitUtils;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.metrics.CDCMetrics;
import io.micrometer.core.annotation.Timed;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultBatchProcessor
implements BatchProcessor {
    final Logger logger = LoggerFactory.getLogger(DefaultBatchProcessor.class);
    @Resource
    private ConsumerService consumerService;
    @Resource
    private CommitIdChecker commitIdChecker;

    @Override
    @Timed(value="oqs.process.delay.latency", percentiles={0.5, 0.9, 0.99}, extraTags={"initiator", "cdc", "action", "oneBatch"})
    public void executeOneBatch(AbstractCDCConnector connector, RunnerContext context) throws SQLException {
        long batchId;
        Message message = null;
        try {
            long start = System.currentTimeMillis();
            message = connector.getMessageWithoutAck();
            long duration = System.currentTimeMillis() - start;
            batchId = message.getId();
            if (duration > 100L) {
                this.logger.debug("[batchProcess] read message from canal server use too much times, use timeMs : {}, batchId : {}", (Object)duration, (Object)message.getId());
            }
        }
        catch (Exception e) {
            connector.rollback();
            String error = String.format("read message from canal server error, %s", e.getMessage());
            this.logger.error("[batchProcess] {}", (Object)error);
            throw new SQLException(error);
        }
        boolean synced = false;
        try {
            CDCMetrics cdcMetrics = null;
            if (batchId != -1L || message.getEntries().size() != 0) {
                cdcMetrics = this.consumerService.consumeOneBatch(message.getEntries(), batchId, context.getCdcMetrics());
                synced = this.saveMetrics(cdcMetrics);
                this.finishBatch(cdcMetrics, connector, context);
                context.setCdcMetrics(cdcMetrics);
            } else {
                cdcMetrics = new CDCMetrics(batchId, context.getCdcMetrics().getCdcAckMetrics(), context.getCdcMetrics().getCdcUnCommitMetrics());
                cdcMetrics.getCdcAckMetrics().setExecuteRows(0);
                synced = this.saveMetrics(cdcMetrics);
                this.emptyBatch(batchId, connector);
            }
        }
        catch (Exception e) {
            String error = "";
            if (synced) {
                error = "ack finish status error";
            } else {
                connector.rollback();
                error = "consume message error";
            }
            this.logger.error("[batchProcess] consume batch error, connection will reset..., message : {}, {}", (Object)error, (Object)e.getMessage());
            throw new SQLException(error);
        }
    }

    @Override
    public void recover(AbstractCDCConnector connector, RunnerContext runnerContext) throws SQLException {
        CDCMetrics cdcMetrics = this.consumerService.metricsHandler().query();
        long originBatchId = -1L;
        if (null != cdcMetrics) {
            originBatchId = cdcMetrics.getBatchId();
            if (originBatchId != -9223372036854775807L && originBatchId != -1L) {
                this.backup(originBatchId, connector, cdcMetrics);
            }
            this.callBackSuccess(originBatchId, cdcMetrics, runnerContext, true);
            this.logger.info("[cdc-runner] recover from last ackMetrics position success..., originBatchId : {}", (Object)originBatchId);
        } else {
            this.consumerService.metricsHandler().renewConnect(runnerContext.getCdcMetrics());
            this.logger.info("[cdc-runner] renew connect success, originBatchId : {}", (Object)originBatchId);
        }
    }

    @Override
    public void init() {
        this.consumerService.metricsHandler().init();
        try {
            this.commitIdChecker.init();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Override
    public void shutdown() {
        this.consumerService.metricsHandler().shutdown();
        try {
            this.commitIdChecker.destroy();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Override
    public void error(RunnerContext context) {
        this.consumerService.metricsHandler().callBackError(context.getCdcMetrics());
    }

    private boolean saveMetrics(CDCMetrics cdcMetrics) {
        this.consumerService.metricsHandler().backup(cdcMetrics);
        return true;
    }

    private void finishBatch(CDCMetrics cdcMetrics, AbstractCDCConnector connector, RunnerContext context) throws SQLException {
        if (null != cdcMetrics) {
            long originBatchId = cdcMetrics.getBatchId();
            this.backup(originBatchId, connector, cdcMetrics);
            this.callBackSuccess(originBatchId, cdcMetrics, context, false);
        }
    }

    private void emptyBatch(long batchId, AbstractCDCConnector connector) throws SQLException {
        connector.ack(batchId);
        TimeWaitUtils.wakeupAfter((long)5L, (TimeUnit)TimeUnit.MILLISECONDS);
    }

    private void backup(long originBatchId, AbstractCDCConnector connector, CDCMetrics cdcMetrics) throws SQLException {
        connector.ack(originBatchId);
        this.logger.debug("ack batch success, batchId : {}", (Object)originBatchId);
        cdcMetrics.setBatchId(-9223372036854775807L);
        this.consumerService.metricsHandler().backup(cdcMetrics);
        this.logger.debug("rest cdcMetrics with buckUpId success, origin batchId : {}", (Object)originBatchId);
    }

    private void callBackSuccess(long originBatchId, CDCMetrics temp, RunnerContext context, boolean isConnectSync) {
        context.getCdcMetrics().setCdcUnCommitMetrics(temp.getCdcUnCommitMetrics());
        context.getCdcMetrics().consumeSuccess(originBatchId, temp, isConnectSync);
        this.consumerService.metricsHandler().callBackSuccess(context.getCdcMetrics());
    }
}

