package com.xforceplus.ultraman.oqsengine.cdc;

import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
import com.xforceplus.ultraman.oqsengine.cdc.context.RunnerContext;
import com.xforceplus.ultraman.oqsengine.cdc.core.SourceDataConsumer;
import com.xforceplus.ultraman.oqsengine.cdc.core.SourceDataProducer;
import com.xforceplus.ultraman.oqsengine.cdc.metrics.CDCMetricsHandler;
import com.xforceplus.ultraman.oqsengine.cdc.reader.CanalPropertiesReader;
import com.xforceplus.ultraman.oqsengine.inner.pojo.cdc.metrics.CDCMetrics;
import java.sql.SQLException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/xplat-meta-oqsengine-cdc-2.0.0-SNAPSHOT.jar:com/xforceplus/ultraman/oqsengine/cdc/CDCServer.class */
public class CDCServer {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) CDCServer.class);

    @Resource
    private SourceDataProducer sourceDataProducer;

    @Resource
    private SourceDataConsumer sourceDataConsumer;

    @Resource
    private CDCMetricsHandler metricsHandler;

    @Resource
    private CanalPropertiesReader propertiesReader;
    private ScheduledExecutorService executor = null;
    private int loopsDurationInMillis = 3;
    private int stopWaitInMillis = 1000;

    @PostConstruct
    public void init() throws SQLException {
        initRunnerContext();
        RunnerContext.instance().resetSourceDataProducer(this.sourceDataProducer);
        this.sourceDataProducer.init();
        RunnerContext.instance().resetSourceDataConsumer(this.sourceDataConsumer);
        this.sourceDataConsumer.init();
        this.executor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("cdc-server-runner"));
        logger.info("cdc-server is start.");
        this.executor.execute(() -> {
            while (!RunnerContext.instance().isStopped()) {
                if (!this.sourceDataConsumer.onConsume()) {
                    wakeupAfter(this.loopsDurationInMillis, TimeUnit.MILLISECONDS);
                }
            }
            logger.info("cdc-consumer executor is going to stop...");
        });
    }

    @PreDestroy
    public void destroy() {
        RunnerContext.instance().stopped(true);
        wakeupAfter(this.stopWaitInMillis, TimeUnit.MILLISECONDS);
        this.sourceDataConsumer.destroy();
        this.sourceDataProducer.destroy();
        if (null != this.executor) {
            this.executor.shutdown();
        }
        logger.info("cdc-server is shutdown.");
    }

    private void initRunnerContext() throws SQLException {
        try {
            RunnerContext.instance().resetPropertiesReader(this.propertiesReader);
            CDCMetrics query = this.metricsHandler.query();
            if (null != query) {
                RunnerContext.instance().setCdcMetrics(query);
            }
            RunnerContext.instance().stopped(false);
        } catch (Exception e) {
            logger.error("CDCServer start error, reason : query cdcMetrics failed, message : {}", e.getMessage());
            throw e;
        }
    }

    private void wakeupAfter(long j, TimeUnit timeUnit) {
        try {
            Thread.sleep(timeUnit.toMillis(j));
        } catch (InterruptedException e) {
        }
    }
}
