/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.oqsengine.cdc.metrics;

import com.alibaba.fastjson.JSON;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.callback.CDCMetricsCallback;
import com.xforceplus.ultraman.oqsengine.cdc.metrics.CDCMetricsHandler;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.metrics.CDCMetrics;
import java.sql.SQLException;
import java.util.List;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultCDCMetricsHandler
implements CDCMetricsHandler {
    final Logger logger = LoggerFactory.getLogger(DefaultCDCMetricsHandler.class);
    @Resource
    private CDCMetricsCallback cdcMetricsCallback;
    private volatile boolean shutdown = false;

    @Override
    public void init() {
        this.logger.info("[cdc-metrics] start, it will start hearBeat thread");
        Thread heartBeat = new Thread(this::heartBeat);
        heartBeat.setName("cdc-heartBeat");
        heartBeat.start();
        this.logger.info("[cdc-metrics] hearBeat thread start ok...");
    }

    @Override
    public void heartBeat() {
        this.shutdown = false;
        long lastHeartBeatTime = 0L;
        while (!this.shutdown) {
            try {
                this.cdcMetricsCallback.heartBeat();
                long now = System.currentTimeMillis();
                if (now - lastHeartBeatTime > 10000L) {
                    lastHeartBeatTime = now;
                    this.logger.debug("[cdc-metrics] current heartBeat timeStamps : {}", (Object)lastHeartBeatTime);
                }
                Thread.sleep(1000L);
            }
            catch (Exception e) {
                this.logger.warn("[cdc-metrics] heartBeat error, message :{}", (Object)e.getMessage());
            }
        }
    }

    @Override
    public void shutdown() {
        this.shutdown = true;
    }

    @Override
    public void callBackSuccess(CDCMetrics cdcMetrics) {
        this.callback(cdcMetrics);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("[cdc-metrics] success consumer, cdcMetrics : {}, batchId : {}", JSON.toJSON((Object)cdcMetrics), (Object)cdcMetrics.getBatchId());
        }
    }

    @Override
    public void callBackError(CDCMetrics cdcMetrics) {
        this.logger.warn("[cdc-metrics] callback error, cdcStatus : {}", (Object)cdcMetrics.getCdcAckMetrics().getCdcConsumerStatus());
        this.callback(cdcMetrics);
    }

    @Override
    public void backup(CDCMetrics cdcMetrics) {
        try {
            this.cdcMetricsCallback.saveLastUnCommit(cdcMetrics);
        }
        catch (Exception e) {
            this.logger.error("[cdc-metrics] back up unCommitMetrics to redis error, batch : {}, unCommitMetrics : {}", (Object)cdcMetrics.getBatchId(), JSON.toJSON((Object)cdcMetrics));
        }
    }

    @Override
    public CDCMetrics query() throws SQLException {
        try {
            return this.cdcMetricsCallback.queryLastUnCommit();
        }
        catch (Exception e) {
            throw new SQLException("[cdc-metrics] query unCommitMetrics from redis error.");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void isReady(List<Long> commitIds) {
        long start = System.currentTimeMillis();
        int loops = 0;
        boolean recoverMonitor = false;
        try {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("[cdc-metrics] attempt check ready to commitIds , commitIds : {}", commitIds);
            }
            while (!(commitIds = this.cdcMetricsCallback.notReady(commitIds)).isEmpty()) {
                try {
                    Thread.sleep(10L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                if (++loops <= 1000) continue;
                recoverMonitor = true;
                loops = 0;
                this.logger.warn("[cdc-metrics] loops for wait ready commit missed current check point (10s), commitId : {}", commitIds);
                commitIds.forEach(this::notReady);
            }
        }
        finally {
            long duration = System.currentTimeMillis() - start;
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("[cdc-metrics] success check ready to commitIds, commitIds : {}", commitIds);
            }
            if (duration > 100L) {
                this.logger.warn("[cdc-metrics] wait for ready commitId use too much times, commitIds {}, use time : {}ms", commitIds, (Object)duration);
            }
            if (recoverMonitor) {
                this.notReady(-1L);
            }
        }
    }

    @Override
    public void renewConnect(CDCMetrics cdcMetrics) {
        this.callback(cdcMetrics);
    }

    private void notReady(long commitId) {
        this.cdcMetricsCallback.notReady(commitId);
    }

    private void callback(CDCMetrics cdcMetrics) {
        cdcMetrics.getCdcAckMetrics().setLastUpdateTime(System.currentTimeMillis());
        try {
            this.logger.debug("[cdc-metrics] callback ack metrics : {}", JSON.toJSON((Object)cdcMetrics.getCdcAckMetrics()));
        }
        catch (Exception ex) {
            this.logger.debug("[cdc-metrics] print ack metrics error, message : {}", (Object)ex.getMessage());
        }
        try {
            this.cdcMetricsCallback.ack(cdcMetrics.getCdcAckMetrics());
        }
        catch (Exception e) {
            try {
                this.logger.error("[cdc-metrics] callback error, metrics : {}, message : {}", JSON.toJSON((Object)cdcMetrics.getCdcAckMetrics()), (Object)e.getMessage());
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
    }
}

