/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.cdc.core.remote.runner;

import com.alibaba.otter.canal.protocol.Message;
import com.xforceplus.ultraman.cdc.core.remote.connect.CDCConnector;
import com.xforceplus.ultraman.cdc.core.remote.context.RunnerContext;
import com.xforceplus.ultraman.cdc.core.remote.context.RunningStatus;
import com.xforceplus.ultraman.cdc.core.remote.runner.CDCConsumer;
import com.xforceplus.ultraman.cdc.dto.enums.CDCStatus;
import com.xforceplus.ultraman.cdc.processor.DataProcessor;
import com.xforceplus.ultraman.cdc.reader.CanalPropertiesReader;
import com.xforceplus.ultraman.cdc.utils.TimeWaitUtils;
import io.vavr.Tuple2;
import java.sql.SQLException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultCDCConsumer
implements CDCConsumer {
    final Logger logger = LoggerFactory.getLogger(DefaultCDCConsumer.class);
    public static final int MESSAGE_GET_WARM_INTERVAL = 100;
    public static final int FREE_MESSAGE_WAIT_IN_MS = 5;
    private RunnerContext context;
    private DataProcessor dataProcessor;
    private CDCConnector connector;
    private Map<String, CDCStatus> cdcStatusMap;
    private CanalPropertiesReader canalPropertiesReader;

    public DefaultCDCConsumer(CDCConnector connector, DataProcessor dataProcessor, Map<String, CDCStatus> cdcStatusMap, CanalPropertiesReader canalPropertiesReader) {
        this.connector = connector;
        this.dataProcessor = dataProcessor;
        this.context = new RunnerContext();
        this.cdcStatusMap = cdcStatusMap;
        this.canalPropertiesReader = canalPropertiesReader;
    }

    @Override
    public void init() {
        this.context.setRunningStatus(RunningStatus.RUN);
        this.connector.init();
    }

    @Override
    public void destroy() {
        this.context.setRunningStatus(RunningStatus.TRY_STOP);
        for (int loopTime = 10; loopTime > 0 && !this.context.getRunningStatus().equals((Object)RunningStatus.STOP_SUCCESS); --loopTime) {
            TimeWaitUtils.wakeupAfter(1L, TimeUnit.SECONDS);
        }
        this.connector.destroy();
    }

    @Override
    public void execute() {
        this.logger.info("connector {} is start executing...", (Object)this.connector.name());
        while (!this.needTerminate()) {
            try {
                this.connector.open();
                if (this.context.getBatchId() > 0L) {
                    this.connector.rollback();
                }
                this.cdcStatusMap.put(this.connector.name(), CDCStatus.CONNECTED);
            }
            catch (Exception e) {
                this.context.incrementContinuesConnectFails();
                if (this.connector.isMaxRetry(this.context.getContinuesConnectFails())) {
                    this.connector.close();
                    this.connector.init();
                    this.context.resetContinuesConnectFails();
                    continue;
                }
                this.cdcStatusMap.put(this.connector.name(), CDCStatus.DIS_CONNECTED);
                this.closeAndCallBackError(CDCStatus.DIS_CONNECTED, String.format("[cdc-runner] canal-server connection error, %s", e.getMessage()));
                continue;
            }
            this.context.resetContinuesConnectFails();
            try {
                this.consume();
            }
            catch (Exception e) {
                this.cdcStatusMap.put(this.connector.name(), CDCStatus.CONSUME_FAILED);
                this.closeAndCallBackError(CDCStatus.CONSUME_FAILED, String.format("[cdc-runner] canal-client consume error, %s", e.getMessage()));
            }
        }
        this.logger.info("connector {} is finish executing...", (Object)this.connector.name());
    }

    @Override
    public Tuple2<String, CDCStatus> namedCdcStatus() {
        return new Tuple2((Object)this.connector.name(), (Object)this.context.getCdcStatus());
    }

    private void consume() throws SQLException {
        while (true) {
            if (this.context.getRunningStatus().shouldStop()) break;
            this.executeOneBatch(this.connector);
        }
        this.context.setRunningStatus(RunningStatus.STOP_SUCCESS);
    }

    private void executeOneBatch(CDCConnector connector) throws SQLException {
        long batchId;
        Message message = null;
        long start = System.currentTimeMillis();
        try {
            message = connector.getMessageWithoutAck();
            long duration = System.currentTimeMillis() - start;
            batchId = message.getId();
            if (duration > 100L) {
                this.logger.debug("[batchProcess] read message from canal server use too much times, use timeMs : {}, batchId : {}", (Object)duration, (Object)message.getId());
            }
        }
        catch (Exception e) {
            connector.rollback();
            String error = String.format("read message from canal server error, %s", e.getMessage());
            this.logger.error("[batchProcess] {}", (Object)error);
            throw new SQLException(error);
        }
        try {
            boolean isNotEmptyBatch = false;
            long consumerStart = System.currentTimeMillis();
            if (batchId != 0L || message.getEntries().size() > 0) {
                isNotEmptyBatch = this.dataProcessor.onProcess(message);
            }
            connector.ack(batchId);
            if (!isNotEmptyBatch) {
                TimeWaitUtils.wakeupAfter(5L, TimeUnit.MILLISECONDS);
            } else {
                long consumerEnd = System.currentTimeMillis();
                this.logger.info("[batchProcess] metrics total use time {}, consumer use time {},rows{}", new Object[]{consumerEnd - start, consumerEnd - consumerStart, message.getEntries().size()});
            }
        }
        catch (Exception e) {
            String error = "consume message error";
            connector.rollback();
            this.logger.error("[batchProcess] consume batch error, connection will reset..., message : {}, {}", (Object)error, (Object)e.getMessage());
            throw new SQLException(error);
        }
    }

    private void closeAndCallBackError(CDCStatus cdcStatus, String errorMessage) {
        this.connector.close();
        this.logger.error(errorMessage);
        TimeWaitUtils.wakeupAfter(3L, TimeUnit.SECONDS);
    }

    private boolean needTerminate() {
        if (this.context.getRunningStatus().shouldStop()) {
            this.context.setRunningStatus(RunningStatus.STOP_SUCCESS);
            return true;
        }
        return false;
    }
}

