package com.xforceplus.ultraman.cdc;

import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.xforceplus.ultraman.cdc.core.local.EmbedCDCServer;
import com.xforceplus.ultraman.cdc.core.remote.DispatcherCDCServer;
import com.xforceplus.ultraman.cdc.dto.enums.CDCStatus;
import com.xforceplus.ultraman.cdc.processor.DataProcessor;
import com.xforceplus.ultraman.cdc.reader.CDCPropertyPackage;
import com.xforceplus.ultraman.cdc.reader.CDCPropertyPackageInternal;
import com.xforceplus.ultraman.cdc.utils.TimeWaitUtils;
import com.xforceplus.ultraman.sdk.infra.utils.JacksonDefaultMapper;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xforceplus/ultraman/cdc/CDCStarter.class */
public class CDCStarter implements CDCLifeCycle {
    private static final Logger logger = LoggerFactory.getLogger(CDCStarter.class);
    private CDCServer cdcServer;
    private volatile boolean isShutdown = false;
    private ScheduledExecutorService executor;

    public CDCStarter(CDCPropertyPackage cDCPropertyPackage, DataProcessor dataProcessor) {
        this.executor = null;
        this.executor = Executors.newScheduledThreadPool(cDCPropertyPackage.readers().size(), new NamedThreadFactory("cdc-server-metrics"));
        if (cDCPropertyPackage.model().equals(CDCPropertyPackageInternal.Model.REMOTE)) {
            this.cdcServer = new DispatcherCDCServer(cDCPropertyPackage, dataProcessor);
        } else if (cDCPropertyPackage.model().equals(CDCPropertyPackageInternal.Model.LOCAL)) {
            this.cdcServer = new EmbedCDCServer(cDCPropertyPackage, dataProcessor);
        }
    }

    @Override // com.xforceplus.ultraman.cdc.CDCLifeCycle
    @PostConstruct
    public void init() {
        this.cdcServer.init();
        this.isShutdown = false;
        this.cdcServer.execute();
        this.executor.submit(this::metrics);
    }

    @Override // com.xforceplus.ultraman.cdc.CDCLifeCycle
    @PreDestroy
    public void destroy() {
        this.cdcServer.destroy();
        this.isShutdown = true;
        this.executor.shutdown();
    }

    public CDCServer cdcServer() {
        return this.cdcServer;
    }

    private void metrics() {
        while (!this.isShutdown) {
            Map<String, CDCStatus> metrics = this.cdcServer.metrics();
            if (null != metrics) {
                try {
                    logger.info("cdc status : {}", JacksonDefaultMapper.OBJECT_MAPPER.writeValueAsString(metrics));
                    TimeWaitUtils.wakeupAfter(30L, TimeUnit.SECONDS);
                } catch (JsonProcessingException e) {
                    throw new RuntimeException((Throwable) e);
                }
            }
        }
    }
}
