/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.oqsengine.cdc.consumer.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.google.protobuf.ByteString;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.ConsumerService;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.impl.SyncExecutor;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.tools.BinLogParseUtils;
import com.xforceplus.ultraman.oqsengine.cdc.metrics.CDCMetricsService;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.dto.RawEntry;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.enums.OqsBigEntityColumns;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.metrics.CDCMetrics;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.metrics.CDCMetricsRecorder;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.metrics.CDCUnCommitMetrics;
import com.xforceplus.ultraman.oqsengine.storage.transaction.commit.CommitHelper;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SphinxConsumerService
implements ConsumerService {
    final Logger logger = LoggerFactory.getLogger(SphinxConsumerService.class);
    @Resource(name="syncExecutor")
    private SyncExecutor sphinxSyncExecutor;
    private long skipCommitId = -1L;
    private boolean checkCommitReady = true;

    public void setSkipCommitId(long skipCommitId) {
        this.skipCommitId = skipCommitId;
    }

    public void setCheckCommitReady(boolean checkCommitReady) {
        this.checkCommitReady = checkCommitReady;
    }

    @Override
    public CDCMetrics consume(List<CanalEntry.Entry> entries, long batchId, CDCMetricsService cdcMetricsService) throws SQLException {
        CDCMetricsRecorder cdcMetricsRecorder = this.init(cdcMetricsService.getCdcMetrics().getCdcUnCommitMetrics(), batchId);
        int syncs = this.syncAfterDataFilter(entries, cdcMetricsRecorder.getCdcMetrics(), cdcMetricsService);
        return cdcMetricsRecorder.finishRecord(syncs).getCdcMetrics();
    }

    private CDCMetricsRecorder init(CDCUnCommitMetrics cdcUnCommitMetrics, long batchId) {
        CDCMetricsRecorder cdcMetricsRecorder = new CDCMetricsRecorder();
        return cdcMetricsRecorder.startRecord(cdcUnCommitMetrics, batchId);
    }

    private int syncAfterDataFilter(List<CanalEntry.Entry> entries, CDCMetrics cdcMetrics, CDCMetricsService cdcMetricsService) throws SQLException {
        int syncCount = 0;
        HashMap<Long, RawEntry> rawEntries = new HashMap<Long, RawEntry>();
        for (CanalEntry.Entry entry : entries) {
            switch (entry.getEntryType()) {
                case TRANSACTIONEND: {
                    this.cleanUnCommit(cdcMetrics);
                    break;
                }
                case ROWDATA: {
                    this.internalDataSync(entry, cdcMetrics, cdcMetricsService, rawEntries);
                }
            }
        }
        if (!rawEntries.isEmpty()) {
            syncCount += this.sphinxSyncExecutor.execute(rawEntries.values(), cdcMetrics);
        }
        this.batchLogged(cdcMetrics);
        return syncCount;
    }

    private void batchLogged(CDCMetrics cdcMetrics) {
        if (cdcMetrics.getCdcUnCommitMetrics().getUnCommitIds().size() > 0) {
            this.logger.info("[cdc-consumer] batch : {} end with un-commit ids : {}", (Object)cdcMetrics.getBatchId(), JSON.toJSON((Object)cdcMetrics.getCdcUnCommitMetrics().getUnCommitIds()));
            if (cdcMetrics.getCdcUnCommitMetrics().getUnCommitIds().size() > 1) {
                this.logger.warn("[cdc-consumer] batch : {}, one transaction has more than one commitId, ids : {}", (Object)cdcMetrics.getBatchId(), JSON.toJSON((Object)cdcMetrics.getCdcUnCommitMetrics().getUnCommitIds()));
            }
        }
        this.logger.info("[cdc-consumer] batch end, batchId : {}, commitIds : {}, un-commitIds : {}", new Object[]{cdcMetrics.getBatchId(), JSON.toJSON((Object)cdcMetrics.getCdcAckMetrics().getCommitList()), JSON.toJSON((Object)cdcMetrics.getCdcUnCommitMetrics().getUnCommitIds())});
    }

    private void cleanUnCommit(CDCMetrics cdcMetrics) {
        if (cdcMetrics.getCdcUnCommitMetrics().getUnCommitIds().size() > 1) {
            this.logger.warn("[cdc-consumer] transaction end, batch : {}, one transaction has more than one commitId, ids : {}", (Object)cdcMetrics.getBatchId(), JSON.toJSON((Object)cdcMetrics.getCdcUnCommitMetrics().getUnCommitIds()));
        }
        cdcMetrics.getCdcAckMetrics().getCommitList().addAll(cdcMetrics.getCdcUnCommitMetrics().getUnCommitIds());
        this.logger.debug("[cdc-consumer] transaction end, batchId : {}, add new commitIds : {}", (Object)cdcMetrics.getBatchId(), JSON.toJSON((Object)cdcMetrics.getCdcUnCommitMetrics().getUnCommitIds()));
        cdcMetrics.getCdcUnCommitMetrics().getUnCommitIds().clear();
    }

    private void internalDataSync(CanalEntry.Entry entry, CDCMetrics cdcMetrics, CDCMetricsService cdcMetricsService, Map<Long, RawEntry> rawEntries) throws SQLException {
        CanalEntry.RowChange rowChange = null;
        try {
            rowChange = CanalEntry.RowChange.parseFrom((ByteString)entry.getStoreValue());
        }
        catch (Exception e) {
            throw new SQLException(String.format("batch : %d, parse entry value failed, [%s], [%s]", cdcMetrics.getBatchId(), entry.getStoreValue(), e));
        }
        CanalEntry.EventType eventType = rowChange.getEventType();
        if (this.supportEventType(eventType)) {
            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                List columns = rowData.getAfterColumnsList();
                if (null == columns || columns.size() == 0) {
                    throw new SQLException(String.format("batch : %d, columns must not be null", cdcMetrics.getBatchId()));
                }
                Long id = -1L;
                Long commitId = -1L;
                try {
                    id = BinLogParseUtils.getLongFromColumn(columns, OqsBigEntityColumns.ID);
                    commitId = BinLogParseUtils.getLongFromColumn(columns, OqsBigEntityColumns.COMMITID);
                }
                catch (Exception e) {
                    this.sphinxSyncExecutor.errorRecord(cdcMetrics.getBatchId(), id, commitId, String.format("batch : %d, parse id, commitid from columns failed, message : %s", cdcMetrics.getBatchId(), e.getMessage()));
                    continue;
                }
                if (commitId == CommitHelper.getUncommitId()) continue;
                if (commitId > this.skipCommitId || commitId == 0L && this.skipCommitId != 0L) {
                    if (this.checkCommitReady && !cdcMetrics.getCdcUnCommitMetrics().getUnCommitIds().contains(commitId)) {
                        cdcMetricsService.isReadyCommit(commitId, cdcMetricsService);
                    }
                    cdcMetrics.getCdcUnCommitMetrics().getUnCommitIds().add(commitId);
                    rawEntries.put(id, new RawEntry(id.longValue(), commitId.longValue(), entry.getHeader().getExecuteTime(), rowData.getAfterColumnsList()));
                    continue;
                }
                this.logger.warn("[cdc-consumer] batch : {}, ignore commitId less than skipCommitId, current id : {}, commitId : {}, skipCommitId : {}", new Object[]{cdcMetrics.getBatchId(), id, commitId, this.skipCommitId});
            }
        }
    }

    private boolean supportEventType(CanalEntry.EventType eventType) {
        return eventType.equals((Object)CanalEntry.EventType.INSERT) || eventType.equals((Object)CanalEntry.EventType.UPDATE);
    }
}

