/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.oqsengine.cdc.consumer.service;

import com.alibaba.fastjson.JSON;
import com.alibaba.google.common.collect.Lists;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.google.protobuf.ByteString;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.dto.ConsumerType;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.dto.ParseResult;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.error.ErrorRecorder;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.factory.BinLogParserFactory;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.service.ConsumerService;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.tools.CommonUtils;
import com.xforceplus.ultraman.oqsengine.cdc.context.ParserContext;
import com.xforceplus.ultraman.oqsengine.cdc.metrics.CDCMetricsHandler;
import com.xforceplus.ultraman.oqsengine.metadata.MetaManager;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.metrics.CDCMetrics;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.metrics.CDCMetricsRecorder;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.metrics.CDCUnCommitMetrics;
import com.xforceplus.ultraman.oqsengine.storage.index.IndexStorage;
import com.xforceplus.ultraman.oqsengine.storage.pojo.OqsEngineEntity;
import io.micrometer.core.annotation.Timed;
import io.micrometer.core.instrument.Metrics;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultConsumerService
implements ConsumerService {
    final Logger logger = LoggerFactory.getLogger(DefaultConsumerService.class);
    @Resource
    private CDCMetricsHandler cdcMetricsHandler;
    @Resource(name="indexStorage")
    private IndexStorage sphinxQLIndexStorage;
    @Resource
    private MetaManager metaManager;
    @Resource
    private ErrorRecorder errorRecorder;
    @Resource(name="taskThreadPool")
    private ExecutorService threadPool;
    private static final int PARTITION_SIZE = 50;
    private long skipCommitId = -1L;
    private ConsumerType consumerType = ConsumerType.MIXED;
    private boolean supportPhysicalDelete = false;
    private boolean checkCommitReady = true;
    private boolean openMultiThread = true;
    protected ParseResult parseResult = new ParseResult();
    private AtomicLong countSize = new AtomicLong(0L);
    private AtomicLong useTime = new AtomicLong(0L);
    private AtomicLong tps = (AtomicLong)Metrics.gauge((String)"oqs.cdc.index.write.latency", (Number)new AtomicLong(0L));
    private Timer timer;

    @PostConstruct
    public void init() {
        this.timer = new Timer("index-count-per-second", true);
        this.timer.schedule((TimerTask)new UpdateMetricsTask(), 1000L, 1000L);
    }

    public void setSkipCommitId(long skipCommitId) {
        this.skipCommitId = skipCommitId;
    }

    public void setCheckCommitReady(boolean checkCommitReady) {
        this.checkCommitReady = checkCommitReady;
    }

    public void setSupportPhysicalDelete(boolean supportPhysicalDelete) {
        this.supportPhysicalDelete = supportPhysicalDelete;
    }

    public void setConsumerType(int consumerType) {
        this.consumerType = ConsumerType.instance(consumerType);
    }

    public ParseResult printParseResult() {
        return this.parseResult;
    }

    public void setOpenMultiThread(boolean openMultiThread) {
        this.openMultiThread = openMultiThread;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @Timed(value="oqs.process.delay.latency", percentiles={0.5, 0.9, 0.99}, extraTags={"initiator", "cdc", "action", "consume"})
    public CDCMetrics consumeOneBatch(List<CanalEntry.Entry> entries, long batchId, CDCMetrics cdcMetrics) throws SQLException {
        try {
            CDCMetricsRecorder cdcMetricsRecorder = this.init(cdcMetrics.getCdcUnCommitMetrics(), batchId);
            int syncs = this.parseCanalEntries(entries, cdcMetricsRecorder.getCdcMetrics());
            CDCMetrics cDCMetrics = cdcMetricsRecorder.finishRecord(syncs).getCdcMetrics();
            return cDCMetrics;
        }
        finally {
            if (this.parseResult.getErrors().size() > 0) {
                this.errorRecorder.record(batchId, this.parseResult.getErrors());
            }
            this.parseResult.clean();
        }
    }

    @Override
    public CDCMetricsHandler metricsHandler() {
        return this.cdcMetricsHandler;
    }

    private CDCMetricsRecorder init(CDCUnCommitMetrics cdcUnCommitMetrics, long batchId) {
        CDCMetricsRecorder cdcMetricsRecorder = new CDCMetricsRecorder();
        return cdcMetricsRecorder.startRecord(cdcUnCommitMetrics, batchId);
    }

    private int parseCanalEntries(List<CanalEntry.Entry> entries, CDCMetrics cdcMetrics) throws SQLException {
        ParserContext parserContext = new ParserContext(this.skipCommitId, this.checkCommitReady, this.consumerType, cdcMetrics, this.metaManager);
        for (CanalEntry.Entry entry : entries) {
            switch (entry.getEntryType()) {
                case TRANSACTIONEND: {
                    this.transactionEnd(parserContext);
                    break;
                }
                case ROWDATA: {
                    this.rowDataParse(entry, parserContext);
                    break;
                }
            }
        }
        BinLogParserFactory.getInstance().dynamicParser().parser(parserContext, this.parseResult);
        if (!this.parseResult.isReadyCommitIds().isEmpty()) {
            this.cdcMetricsHandler.isReady(new ArrayList<Long>(this.parseResult.isReadyCommitIds()));
        }
        if (!this.parseResult.getFinishEntries().isEmpty()) {
            try {
                ArrayList<OqsEngineEntity> entities = new ArrayList<OqsEngineEntity>(this.parseResult.getFinishEntries().values());
                if (this.openMultiThread) {
                    this.multiSaveOrDelete(entities);
                } else {
                    this.saveOrDelete(entities);
                }
            }
            catch (Exception e) {
                String message = String.format("write sphinx-batch error, commitIds : %s, startId : %s, message : %s", this.parseResult.isReadyCommitIds(), this.parseResult.getStartId(), e.getMessage());
                this.parseResult.addError(this.parseResult.getStartId(), this.parseResult.getFinishEntries().get(this.parseResult.getStartId()).getCommitid(), -1, CommonUtils.toErrorCommitIdStr(this.parseResult.isReadyCommitIds(), parserContext.getCdcMetrics().getCdcUnCommitMetrics().getUnCommitIds()), String.format("batch : %d consumer columns failed, %s", parserContext.getCdcMetrics().getBatchId(), message));
                throw new SQLException(message);
            }
        }
        this.batchLogged(cdcMetrics);
        return this.parseResult.getFinishEntries().size();
    }

    public void saveOrDelete(Collection<OqsEngineEntity> oqsEngineEntities) throws SQLException {
        long start = System.currentTimeMillis();
        this.sphinxQLIndexStorage.saveOrDeleteOriginalEntities(oqsEngineEntities);
        long end = System.currentTimeMillis();
        this.countSize.addAndGet(oqsEngineEntities.size());
        this.useTime.addAndGet(end - start);
        this.logger.debug("finish saveOrDelete, total count {}, total times {}, tps {}", new Object[]{this.countSize.get(), this.useTime.get() / 1000L, this.countSize.get() / (this.useTime.get() / 1000L)});
    }

    public void multiSaveOrDelete(Collection<OqsEngineEntity> oqsEngineEntities) throws SQLException {
        long start = System.currentTimeMillis();
        List partitions = Lists.partition((List)((List)oqsEngineEntities), (int)50);
        ArrayList<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(partitions.size());
        for (List list : partitions) {
            futures.add(this.threadPool.submit(new SaveOrDeleteTask(list)));
        }
        for (Future future : futures) {
            try {
                if (((Boolean)future.get()).booleanValue()) continue;
                throw new SQLException("Failed to saveOrDelete for unknown reason.");
            }
            catch (Exception e) {
                throw new SQLException(e.getCause());
            }
        }
        long end = System.currentTimeMillis();
        this.countSize.addAndGet(oqsEngineEntities.size());
        this.useTime.addAndGet(end - start);
        this.logger.debug("finish multiSaveOrDelete, current-use-threads-size{}, total count {}, total times {}, tps {}", new Object[]{futures.size(), this.countSize.get(), this.useTime.get() / 1000L, this.countSize.get() / (this.useTime.get() / 1000L)});
    }

    private void batchLogged(CDCMetrics cdcMetrics) {
        if (cdcMetrics.getCdcUnCommitMetrics().getUnCommitIds().size() > 0) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("[cdc-consumer] batch : {} end with un-commit ids : {}", (Object)cdcMetrics.getBatchId(), JSON.toJSON((Object)cdcMetrics.getCdcUnCommitMetrics().getUnCommitIds()));
            }
            if (cdcMetrics.getCdcUnCommitMetrics().getUnCommitIds().size() > 1) {
                this.logger.warn("[cdc-consumer] batch : {}, one transaction has more than one commitId, ids : {}", (Object)cdcMetrics.getBatchId(), JSON.toJSON((Object)cdcMetrics.getCdcUnCommitMetrics().getUnCommitIds()));
            }
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("[cdc-consumer] batch end, batchId : {}, commitIds : {}, un-commitIds : {}", new Object[]{cdcMetrics.getBatchId(), JSON.toJSON((Object)cdcMetrics.getCdcAckMetrics().getCommitList()), JSON.toJSON((Object)cdcMetrics.getCdcUnCommitMetrics().getUnCommitIds())});
        }
    }

    private boolean hasUnCommitIds(ParserContext parserContext) {
        return parserContext.getCdcMetrics().getCdcUnCommitMetrics().getUnCommitIds().size() > 1;
    }

    private void transactionEnd(ParserContext parserContext) {
        if (this.hasUnCommitIds(parserContext) && this.logger.isWarnEnabled()) {
            this.logger.warn("[cdc-consumer] transaction end, batch : {}, one transaction has more than one commitId, ids : {}", (Object)parserContext.getCdcMetrics().getBatchId(), JSON.toJSON((Object)parserContext.getCdcMetrics().getCdcUnCommitMetrics().getUnCommitIds()));
        }
        parserContext.getCdcMetrics().getCdcAckMetrics().getCommitList().addAll(parserContext.getCdcMetrics().getCdcUnCommitMetrics().getUnCommitIds());
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("[cdc-consumer] transaction end, batchId : {}, add new commitIds : {}", (Object)parserContext.getCdcMetrics().getBatchId(), JSON.toJSON((Object)parserContext.getCdcMetrics().getCdcUnCommitMetrics().getUnCommitIds()));
        }
        parserContext.getCdcMetrics().getCdcUnCommitMetrics().getUnCommitIds().clear();
    }

    private void rowDataParse(CanalEntry.Entry entry, ParserContext parserContext) throws SQLException {
        CanalEntry.RowChange rowChange = null;
        try {
            rowChange = CanalEntry.RowChange.parseFrom((ByteString)entry.getStoreValue());
        }
        catch (Exception e) {
            throw new SQLException(String.format("batch : %d, parse entry value failed, [%s], [%s]", parserContext.getCdcMetrics().getBatchId(), entry.getStoreValue(), e));
        }
        CanalEntry.EventType eventType = rowChange.getEventType();
        if (this.supportEventType(eventType)) {
            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                List columns = null;
                boolean isPhysicalDelete = false;
                if (eventType.equals((Object)CanalEntry.EventType.DELETE)) {
                    isPhysicalDelete = true;
                    columns = rowData.getBeforeColumnsList();
                } else {
                    columns = rowData.getAfterColumnsList();
                }
                if (columns.size() == 0) {
                    throw new SQLException(String.format("batch : %d, columns must not be null", parserContext.getCdcMetrics().getBatchId()));
                }
                BinLogParserFactory.getInstance().dynamicParser().merge(columns, isPhysicalDelete, parserContext, this.parseResult);
            }
        }
    }

    private boolean supportEventType(CanalEntry.EventType eventType) {
        return eventType.equals((Object)CanalEntry.EventType.INSERT) || eventType.equals((Object)CanalEntry.EventType.UPDATE) || this.supportPhysicalDelete && eventType.equals((Object)CanalEntry.EventType.DELETE);
    }

    private class UpdateMetricsTask
    extends TimerTask {
        private long last = 0L;

        private UpdateMetricsTask() {
        }

        @Override
        public void run() {
            try {
                DefaultConsumerService.this.tps.set(DefaultConsumerService.this.countSize.get() - this.last);
                this.last = DefaultConsumerService.this.countSize.get();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
        }
    }

    private class SaveOrDeleteTask
    implements Callable<Boolean> {
        private List<OqsEngineEntity> storageEntities;

        public SaveOrDeleteTask(List<OqsEngineEntity> storageEntities) {
            this.storageEntities = storageEntities;
        }

        @Override
        public Boolean call() throws Exception {
            DefaultConsumerService.this.sphinxQLIndexStorage.saveOrDeleteOriginalEntities(this.storageEntities);
            return true;
        }
    }
}

