package com.xforceplus.ultraman.oqsengine.cdc.consumer.service;

import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.dto.ConsumerType;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.dto.ParseResult;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.error.ErrorRecorder;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.factory.BinLogParserFactory;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.tools.CommonUtils;
import com.xforceplus.ultraman.oqsengine.cdc.context.ParserContext;
import com.xforceplus.ultraman.oqsengine.cdc.metrics.CDCMetricsHandler;
import com.xforceplus.ultraman.oqsengine.metadata.MetaManager;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.metrics.CDCMetrics;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.metrics.CDCMetricsRecorder;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.metrics.CDCUnCommitMetrics;
import com.xforceplus.ultraman.oqsengine.storage.index.IndexStorage;
import io.micrometer.core.annotation.Timed;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xforceplus/ultraman/oqsengine/cdc/consumer/service/DefaultConsumerService.class */
public class DefaultConsumerService implements ConsumerService {

    @Resource
    private CDCMetricsHandler cdcMetricsHandler;

    @Resource(name = "indexStorage")
    private IndexStorage sphinxQLIndexStorage;

    @Resource
    private MetaManager metaManager;

    @Resource
    private ErrorRecorder errorRecorder;
    final Logger logger = LoggerFactory.getLogger(DefaultConsumerService.class);
    private long skipCommitId = -1;
    private ConsumerType consumerType = ConsumerType.MIXED;
    private boolean supportPhysicalDelete = false;
    private boolean checkCommitReady = true;
    protected ParseResult parseResult = new ParseResult();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.xforceplus.ultraman.oqsengine.cdc.consumer.service.DefaultConsumerService$1, reason: invalid class name */
    /* loaded from: input_file:com/xforceplus/ultraman/oqsengine/cdc/consumer/service/DefaultConsumerService$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$alibaba$otter$canal$protocol$CanalEntry$EntryType = new int[CanalEntry.EntryType.values().length];

        static {
            try {
                $SwitchMap$com$alibaba$otter$canal$protocol$CanalEntry$EntryType[CanalEntry.EntryType.TRANSACTIONEND.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$alibaba$otter$canal$protocol$CanalEntry$EntryType[CanalEntry.EntryType.ROWDATA.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public void setSkipCommitId(long j) {
        this.skipCommitId = j;
    }

    public void setCheckCommitReady(boolean z) {
        this.checkCommitReady = z;
    }

    public void setSupportPhysicalDelete(boolean z) {
        this.supportPhysicalDelete = z;
    }

    public void setConsumerType(int i) {
        this.consumerType = ConsumerType.instance(i);
    }

    public ParseResult printParseResult() {
        return this.parseResult;
    }

    @Override // com.xforceplus.ultraman.oqsengine.cdc.consumer.service.ConsumerService
    @Timed(value = "oqs.process.delay.latency", percentiles = {0.5d, 0.9d, 0.99d}, extraTags = {"initiator", "cdc", "action", "consume"})
    public CDCMetrics consumeOneBatch(List<CanalEntry.Entry> list, long j, CDCMetrics cDCMetrics) throws SQLException {
        try {
            CDCMetricsRecorder init = init(cDCMetrics.getCdcUnCommitMetrics(), j);
            CDCMetrics cdcMetrics = init.finishRecord(parseCanalEntries(list, init.getCdcMetrics())).getCdcMetrics();
            if (this.parseResult.getErrors().size() > 0) {
                this.errorRecorder.record(j, this.parseResult.getErrors());
            }
            this.parseResult.clean();
            return cdcMetrics;
        } catch (Throwable th) {
            if (this.parseResult.getErrors().size() > 0) {
                this.errorRecorder.record(j, this.parseResult.getErrors());
            }
            this.parseResult.clean();
            throw th;
        }
    }

    @Override // com.xforceplus.ultraman.oqsengine.cdc.consumer.service.ConsumerService
    public CDCMetricsHandler metricsHandler() {
        return this.cdcMetricsHandler;
    }

    private CDCMetricsRecorder init(CDCUnCommitMetrics cDCUnCommitMetrics, long j) {
        return new CDCMetricsRecorder().startRecord(cDCUnCommitMetrics, j);
    }

    private int parseCanalEntries(List<CanalEntry.Entry> list, CDCMetrics cDCMetrics) throws SQLException {
        ParserContext parserContext = new ParserContext(this.skipCommitId, this.checkCommitReady, this.consumerType, cDCMetrics, this.metaManager);
        for (CanalEntry.Entry entry : list) {
            switch (AnonymousClass1.$SwitchMap$com$alibaba$otter$canal$protocol$CanalEntry$EntryType[entry.getEntryType().ordinal()]) {
                case 1:
                    transactionEnd(parserContext);
                    break;
                case 2:
                    rowDataParse(entry, parserContext);
                    break;
            }
        }
        BinLogParserFactory.getInstance().dynamicParser().parser(parserContext, this.parseResult);
        if (!this.parseResult.isReadyCommitIds().isEmpty()) {
            this.cdcMetricsHandler.isReady(new ArrayList(this.parseResult.isReadyCommitIds()));
        }
        if (!this.parseResult.getFinishEntries().isEmpty()) {
            try {
                this.sphinxQLIndexStorage.saveOrDeleteOriginalEntities(this.parseResult.getFinishEntries().values());
            } catch (Exception e) {
                String format = String.format("write sphinx-batch error, commitIds : %s, startId : %s, message : %s", this.parseResult.isReadyCommitIds(), Long.valueOf(this.parseResult.getStartId()), e.getMessage());
                this.parseResult.addError(this.parseResult.getStartId(), this.parseResult.getFinishEntries().get(Long.valueOf(this.parseResult.getStartId())).getCommitid(), -1, CommonUtils.toErrorCommitIdStr(this.parseResult.isReadyCommitIds(), parserContext.getCdcMetrics().getCdcUnCommitMetrics().getUnCommitIds()), String.format("batch : %d consumer columns failed, %s", Long.valueOf(parserContext.getCdcMetrics().getBatchId()), format));
                throw new SQLException(format);
            }
        }
        batchLogged(cDCMetrics);
        return this.parseResult.getFinishEntries().size();
    }

    private void batchLogged(CDCMetrics cDCMetrics) {
        if (cDCMetrics.getCdcUnCommitMetrics().getUnCommitIds().size() > 0) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("[cdc-consumer] batch : {} end with un-commit ids : {}", Long.valueOf(cDCMetrics.getBatchId()), JSON.toJSON(cDCMetrics.getCdcUnCommitMetrics().getUnCommitIds()));
            }
            if (cDCMetrics.getCdcUnCommitMetrics().getUnCommitIds().size() > 1) {
                this.logger.warn("[cdc-consumer] batch : {}, one transaction has more than one commitId, ids : {}", Long.valueOf(cDCMetrics.getBatchId()), JSON.toJSON(cDCMetrics.getCdcUnCommitMetrics().getUnCommitIds()));
            }
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("[cdc-consumer] batch end, batchId : {}, commitIds : {}, un-commitIds : {}", new Object[]{Long.valueOf(cDCMetrics.getBatchId()), JSON.toJSON(cDCMetrics.getCdcAckMetrics().getCommitList()), JSON.toJSON(cDCMetrics.getCdcUnCommitMetrics().getUnCommitIds())});
        }
    }

    private boolean hasUnCommitIds(ParserContext parserContext) {
        return parserContext.getCdcMetrics().getCdcUnCommitMetrics().getUnCommitIds().size() > 1;
    }

    private void transactionEnd(ParserContext parserContext) {
        if (hasUnCommitIds(parserContext) && this.logger.isWarnEnabled()) {
            this.logger.warn("[cdc-consumer] transaction end, batch : {}, one transaction has more than one commitId, ids : {}", Long.valueOf(parserContext.getCdcMetrics().getBatchId()), JSON.toJSON(parserContext.getCdcMetrics().getCdcUnCommitMetrics().getUnCommitIds()));
        }
        parserContext.getCdcMetrics().getCdcAckMetrics().getCommitList().addAll(parserContext.getCdcMetrics().getCdcUnCommitMetrics().getUnCommitIds());
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("[cdc-consumer] transaction end, batchId : {}, add new commitIds : {}", Long.valueOf(parserContext.getCdcMetrics().getBatchId()), JSON.toJSON(parserContext.getCdcMetrics().getCdcUnCommitMetrics().getUnCommitIds()));
        }
        parserContext.getCdcMetrics().getCdcUnCommitMetrics().getUnCommitIds().clear();
    }

    private void rowDataParse(CanalEntry.Entry entry, ParserContext parserContext) throws SQLException {
        List<CanalEntry.Column> afterColumnsList;
        try {
            CanalEntry.RowChange parseFrom = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            CanalEntry.EventType eventType = parseFrom.getEventType();
            if (supportEventType(eventType)) {
                for (CanalEntry.RowData rowData : parseFrom.getRowDatasList()) {
                    boolean z = false;
                    if (eventType.equals(CanalEntry.EventType.DELETE)) {
                        z = true;
                        afterColumnsList = rowData.getBeforeColumnsList();
                    } else {
                        afterColumnsList = rowData.getAfterColumnsList();
                    }
                    if (afterColumnsList.size() == 0) {
                        throw new SQLException(String.format("batch : %d, columns must not be null", Long.valueOf(parserContext.getCdcMetrics().getBatchId())));
                    }
                    BinLogParserFactory.getInstance().dynamicParser().merge(afterColumnsList, z, parserContext, this.parseResult);
                }
            }
        } catch (Exception e) {
            throw new SQLException(String.format("batch : %d, parse entry value failed, [%s], [%s]", Long.valueOf(parserContext.getCdcMetrics().getBatchId()), entry.getStoreValue(), e));
        }
    }

    private boolean supportEventType(CanalEntry.EventType eventType) {
        return eventType.equals(CanalEntry.EventType.INSERT) || eventType.equals(CanalEntry.EventType.UPDATE) || (this.supportPhysicalDelete && eventType.equals(CanalEntry.EventType.DELETE));
    }
}
