package com.xforceplus.ultraman.oqsengine.cdc.consumer.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.ConsumerService;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.tools.BinLogParseUtils;
import com.xforceplus.ultraman.oqsengine.cdc.metrics.CDCMetricsService;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.dto.RawEntry;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.enums.OqsBigEntityColumns;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.metrics.CDCMetrics;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.metrics.CDCMetricsRecorder;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.metrics.CDCUnCommitMetrics;
import com.xforceplus.ultraman.oqsengine.storage.transaction.commit.CommitHelper;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xforceplus/ultraman/oqsengine/cdc/consumer/impl/SphinxConsumerService.class */
public class SphinxConsumerService implements ConsumerService {

    @Resource(name = "syncExecutor")
    private SyncExecutor sphinxSyncExecutor;
    final Logger logger = LoggerFactory.getLogger(SphinxConsumerService.class);
    private long skipCommitId = -1;
    private boolean checkCommitReady = true;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.xforceplus.ultraman.oqsengine.cdc.consumer.impl.SphinxConsumerService$1, reason: invalid class name */
    /* loaded from: input_file:com/xforceplus/ultraman/oqsengine/cdc/consumer/impl/SphinxConsumerService$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$alibaba$otter$canal$protocol$CanalEntry$EntryType = new int[CanalEntry.EntryType.values().length];

        static {
            try {
                $SwitchMap$com$alibaba$otter$canal$protocol$CanalEntry$EntryType[CanalEntry.EntryType.TRANSACTIONEND.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$alibaba$otter$canal$protocol$CanalEntry$EntryType[CanalEntry.EntryType.ROWDATA.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public void setSkipCommitId(long j) {
        this.skipCommitId = j;
    }

    public void setCheckCommitReady(boolean z) {
        this.checkCommitReady = z;
    }

    @Override // com.xforceplus.ultraman.oqsengine.cdc.consumer.ConsumerService
    public CDCMetrics consume(List<CanalEntry.Entry> list, long j, CDCMetricsService cDCMetricsService) throws SQLException {
        CDCMetricsRecorder init = init(cDCMetricsService.getCdcMetrics().getCdcUnCommitMetrics(), j);
        return init.finishRecord(syncAfterDataFilter(list, init.getCdcMetrics(), cDCMetricsService)).getCdcMetrics();
    }

    private CDCMetricsRecorder init(CDCUnCommitMetrics cDCUnCommitMetrics, long j) {
        return new CDCMetricsRecorder().startRecord(cDCUnCommitMetrics, j);
    }

    private int syncAfterDataFilter(List<CanalEntry.Entry> list, CDCMetrics cDCMetrics, CDCMetricsService cDCMetricsService) throws SQLException {
        HashMap hashMap = new HashMap();
        for (CanalEntry.Entry entry : list) {
            switch (AnonymousClass1.$SwitchMap$com$alibaba$otter$canal$protocol$CanalEntry$EntryType[entry.getEntryType().ordinal()]) {
                case 1:
                    cleanUnCommit(cDCMetrics);
                    break;
                case 2:
                    internalDataSync(entry, cDCMetrics, cDCMetricsService, hashMap);
                    break;
            }
        }
        int execute = hashMap.isEmpty() ? 0 : 0 + this.sphinxSyncExecutor.execute(hashMap.values(), cDCMetrics);
        batchLogged(cDCMetrics);
        return execute;
    }

    private void batchLogged(CDCMetrics cDCMetrics) {
        if (cDCMetrics.getCdcUnCommitMetrics().getUnCommitIds().size() > 0) {
            this.logger.info("[cdc-consumer] batch : {} end with un-commit ids : {}", Long.valueOf(cDCMetrics.getBatchId()), JSON.toJSON(cDCMetrics.getCdcUnCommitMetrics().getUnCommitIds()));
            if (cDCMetrics.getCdcUnCommitMetrics().getUnCommitIds().size() > 1) {
                this.logger.warn("[cdc-consumer] batch : {}, one transaction has more than one commitId, ids : {}", Long.valueOf(cDCMetrics.getBatchId()), JSON.toJSON(cDCMetrics.getCdcUnCommitMetrics().getUnCommitIds()));
            }
        }
        this.logger.info("[cdc-consumer] batch end, batchId : {}, commitIds : {}, un-commitIds : {}", new Object[]{Long.valueOf(cDCMetrics.getBatchId()), JSON.toJSON(cDCMetrics.getCdcAckMetrics().getCommitList()), JSON.toJSON(cDCMetrics.getCdcUnCommitMetrics().getUnCommitIds())});
    }

    private void cleanUnCommit(CDCMetrics cDCMetrics) {
        if (cDCMetrics.getCdcUnCommitMetrics().getUnCommitIds().size() > 1) {
            this.logger.warn("[cdc-consumer] transaction end, batch : {}, one transaction has more than one commitId, ids : {}", Long.valueOf(cDCMetrics.getBatchId()), JSON.toJSON(cDCMetrics.getCdcUnCommitMetrics().getUnCommitIds()));
        }
        cDCMetrics.getCdcAckMetrics().getCommitList().addAll(cDCMetrics.getCdcUnCommitMetrics().getUnCommitIds());
        this.logger.debug("[cdc-consumer] transaction end, batchId : {}, add new commitIds : {}", Long.valueOf(cDCMetrics.getBatchId()), JSON.toJSON(cDCMetrics.getCdcUnCommitMetrics().getUnCommitIds()));
        cDCMetrics.getCdcUnCommitMetrics().getUnCommitIds().clear();
    }

    private void internalDataSync(CanalEntry.Entry entry, CDCMetrics cDCMetrics, CDCMetricsService cDCMetricsService, Map<Long, RawEntry> map) throws SQLException {
        try {
            CanalEntry.RowChange parseFrom = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            if (supportEventType(parseFrom.getEventType())) {
                for (CanalEntry.RowData rowData : parseFrom.getRowDatasList()) {
                    List afterColumnsList = rowData.getAfterColumnsList();
                    if (null == afterColumnsList || afterColumnsList.size() == 0) {
                        throw new SQLException(String.format("batch : %d, columns must not be null", Long.valueOf(cDCMetrics.getBatchId())));
                    }
                    Long l = -1L;
                    Long l2 = -1L;
                    try {
                        l = Long.valueOf(BinLogParseUtils.getLongFromColumn(afterColumnsList, OqsBigEntityColumns.ID));
                        l2 = Long.valueOf(BinLogParseUtils.getLongFromColumn(afterColumnsList, OqsBigEntityColumns.COMMITID));
                        if (l2.longValue() != CommitHelper.getUncommitId()) {
                            if (l2.longValue() > this.skipCommitId || (l2.longValue() == 0 && this.skipCommitId != 0)) {
                                if (this.checkCommitReady && !cDCMetrics.getCdcUnCommitMetrics().getUnCommitIds().contains(l2)) {
                                    cDCMetricsService.isReadyCommit(l2.longValue(), cDCMetricsService);
                                }
                                cDCMetrics.getCdcUnCommitMetrics().getUnCommitIds().add(l2);
                                map.put(l, new RawEntry(l.longValue(), l2.longValue(), entry.getHeader().getExecuteTime(), rowData.getAfterColumnsList()));
                            } else {
                                this.logger.warn("[cdc-consumer] batch : {}, ignore commitId less than skipCommitId, current id : {}, commitId : {}, skipCommitId : {}", new Object[]{Long.valueOf(cDCMetrics.getBatchId()), l, l2, Long.valueOf(this.skipCommitId)});
                            }
                        }
                    } catch (Exception e) {
                        this.sphinxSyncExecutor.errorRecord(cDCMetrics.getBatchId(), l.longValue(), l2.longValue(), String.format("batch : %d, parse id, commitid from columns failed, message : %s", Long.valueOf(cDCMetrics.getBatchId()), e.getMessage()));
                    }
                }
            }
        } catch (Exception e2) {
            throw new SQLException(String.format("batch : %d, parse entry value failed, [%s], [%s]", Long.valueOf(cDCMetrics.getBatchId()), entry.getStoreValue(), e2));
        }
    }

    private boolean supportEventType(CanalEntry.EventType eventType) {
        return eventType.equals(CanalEntry.EventType.INSERT) || eventType.equals(CanalEntry.EventType.UPDATE);
    }
}
