package com.xforceplus.ultraman.cdc.core.local;

import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
import com.alibaba.otter.canal.protocol.ClientIdentity;
import com.alibaba.otter.canal.protocol.Message;
import com.xforceplus.ultraman.cdc.CDCServer;
import com.xforceplus.ultraman.cdc.core.local.embed.EmbedDataConsumer;
import com.xforceplus.ultraman.cdc.core.local.embed.EmbedDataProducer;
import com.xforceplus.ultraman.cdc.core.local.embed.SourceDataConsumer;
import com.xforceplus.ultraman.cdc.core.local.embed.SourceDataProducer;
import com.xforceplus.ultraman.cdc.dto.constant.CDCConstant;
import com.xforceplus.ultraman.cdc.dto.enums.CDCStatus;
import com.xforceplus.ultraman.cdc.processor.DataProcessor;
import com.xforceplus.ultraman.cdc.reader.CDCPropertyPackage;
import com.xforceplus.ultraman.cdc.reader.CanalPropertiesReader;
import com.xforceplus.ultraman.cdc.utils.TimeWaitUtils;
import io.vavr.Tuple2;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xforceplus/ultraman/cdc/core/local/EmbedCDCServer.class */
public class EmbedCDCServer implements CDCServer {
    private static final Logger logger = LoggerFactory.getLogger(EmbedCDCServer.class);
    private SourceDataProducer sourceDataProducer;
    private SourceDataConsumer sourceDataConsumer;
    private ScheduledExecutorService executor;
    private volatile boolean isStopped = false;
    private volatile boolean ready = false;
    private int loopsDurationInMillis = 3;
    private int stopWaitInMillis = 1000;
    private long totalTime;
    private long totaldocs;

    public EmbedCDCServer(CDCPropertyPackage cDCPropertyPackage, DataProcessor dataProcessor) {
        this.executor = null;
        this.executor = Executors.newScheduledThreadPool(cDCPropertyPackage.readers().size(), new NamedThreadFactory("cdc-server-runner"));
        this.sourceDataProducer = new EmbedDataProducer(cDCPropertyPackage);
        this.sourceDataConsumer = new EmbedDataConsumer(cDCPropertyPackage, dataProcessor);
    }

    @Override // com.xforceplus.ultraman.cdc.CDCLifeCycle
    public void init() {
        this.sourceDataProducer.init();
        this.sourceDataConsumer.init();
        Iterator<ClientIdentity> it = this.sourceDataConsumer.clients().iterator();
        while (it.hasNext()) {
            if (!this.sourceDataProducer.singletonRegistry(it.next())) {
                throw new RuntimeException("CDCServer start error, reason : register consumer failed.");
            }
        }
        this.ready = true;
        logger.info("cdc-server is start.");
    }

    @Override // com.xforceplus.ultraman.cdc.CDCLifeCycle
    public void destroy() {
        this.isStopped = true;
        TimeWaitUtils.wakeupAfter(this.stopWaitInMillis, TimeUnit.MILLISECONDS);
        this.sourceDataConsumer.destroy();
        this.sourceDataProducer.destroy();
        if (null != this.executor) {
            this.executor.shutdown();
        }
        this.ready = false;
    }

    @Override // com.xforceplus.ultraman.cdc.CDCServer
    public void execute() {
        if (null == this.executor || this.isStopped) {
            return;
        }
        for (Tuple2<ClientIdentity, CanalPropertiesReader> tuple2 : this.sourceDataProducer.clientIdentityWithReader()) {
            CanalPropertiesReader canalPropertiesReader = (CanalPropertiesReader) tuple2._2();
            int batchSize = null != canalPropertiesReader ? canalPropertiesReader.getBatchSize() : CDCConstant.DEFAULT_BATCH_SIZE;
            int threadBatchSize = null != canalPropertiesReader ? canalPropertiesReader.getThreadBatchSize() : CDCConstant.DEFAULT_BATCH_SIZE;
            ClientIdentity clientIdentity = (ClientIdentity) tuple2._1();
            this.executor.execute(() -> {
                while (!this.isStopped) {
                    Message message = null;
                    boolean z = false;
                    if (null == clientIdentity) {
                        logger.warn("client is null, cdc status is disconnect, task will stopped.");
                        return;
                    }
                    try {
                        metrics.put(clientIdentity.getDestination(), CDCStatus.CONNECTED);
                        message = this.sourceDataProducer.onMessage(clientIdentity, batchSize);
                        if (null != message) {
                            long id = message.getId();
                            this.sourceDataConsumer.onConsume(message, threadBatchSize);
                            this.sourceDataProducer.ack((ClientIdentity) tuple2._1(), id);
                        }
                        z = true;
                    } catch (Exception e) {
                        if (null != message) {
                            this.sourceDataProducer.rollback(clientIdentity, message.getId());
                        }
                        metrics.put(clientIdentity.getDestination(), CDCStatus.CONSUME_FAILED);
                    }
                    if (!z) {
                        TimeWaitUtils.wakeupAfter(5L, TimeUnit.MILLISECONDS);
                    }
                }
                logger.info("cdc-consumer executor is going to stop...");
            });
        }
    }

    @Override // com.xforceplus.ultraman.cdc.CDCServer
    public boolean allReady() {
        return this.ready;
    }

    @Override // com.xforceplus.ultraman.cdc.CDCServer
    public Map<String, CDCStatus> metrics() {
        return metrics;
    }
}
