/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.cdc.core.local;

import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
import com.alibaba.otter.canal.protocol.ClientIdentity;
import com.alibaba.otter.canal.protocol.Message;
import com.xforceplus.ultraman.cdc.CDCServer;
import com.xforceplus.ultraman.cdc.core.local.embed.EmbedDataConsumer;
import com.xforceplus.ultraman.cdc.core.local.embed.EmbedDataProducer;
import com.xforceplus.ultraman.cdc.core.local.embed.SourceDataConsumer;
import com.xforceplus.ultraman.cdc.core.local.embed.SourceDataProducer;
import com.xforceplus.ultraman.cdc.dto.enums.CDCStatus;
import com.xforceplus.ultraman.cdc.processor.DataProcessor;
import com.xforceplus.ultraman.cdc.reader.CDCPropertyPackage;
import com.xforceplus.ultraman.cdc.reader.CanalPropertiesReader;
import com.xforceplus.ultraman.cdc.utils.TimeWaitUtils;
import io.vavr.Tuple2;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EmbedCDCServer
implements CDCServer {
    private static final Logger logger = LoggerFactory.getLogger(EmbedCDCServer.class);
    private SourceDataProducer sourceDataProducer;
    private SourceDataConsumer sourceDataConsumer;
    private ScheduledExecutorService executor = null;
    private volatile boolean isStopped = false;
    private volatile boolean ready = false;
    private int loopsDurationInMillis = 3;
    private int stopWaitInMillis = 1000;
    private long totalTime;
    private long totaldocs;

    public EmbedCDCServer(CDCPropertyPackage cdcPropertyPackage, DataProcessor dataProcessor) {
        this.executor = Executors.newScheduledThreadPool(cdcPropertyPackage.readers().size(), (ThreadFactory)new NamedThreadFactory("cdc-server-runner"));
        this.sourceDataProducer = new EmbedDataProducer(cdcPropertyPackage);
        this.sourceDataConsumer = new EmbedDataConsumer(cdcPropertyPackage, dataProcessor);
    }

    @Override
    public void init() {
        this.sourceDataProducer.init();
        this.sourceDataConsumer.init();
        for (ClientIdentity clientIdentity : this.sourceDataConsumer.clients()) {
            if (this.sourceDataProducer.singletonRegistry(clientIdentity)) continue;
            throw new RuntimeException("CDCServer start error, reason : register consumer failed.");
        }
        this.ready = true;
        logger.info("cdc-server is start.");
    }

    @Override
    public void destroy() {
        this.isStopped = true;
        TimeWaitUtils.wakeupAfter(this.stopWaitInMillis, TimeUnit.MILLISECONDS);
        this.sourceDataConsumer.destroy();
        this.sourceDataProducer.destroy();
        if (null != this.executor) {
            this.executor.shutdown();
        }
        this.ready = false;
    }

    @Override
    public void execute() {
        if (null != this.executor && !this.isStopped) {
            for (Tuple2<ClientIdentity, CanalPropertiesReader> value : this.sourceDataProducer.clientIdentityWithReader()) {
                CanalPropertiesReader reader = (CanalPropertiesReader)value._2();
                int batchSize = null != reader ? reader.getBatchSize() : 1024;
                int threadBatchSize = null != reader ? reader.getThreadBatchSize() : 1024;
                ClientIdentity clientIdentity = (ClientIdentity)value._1();
                this.executor.execute(() -> {
                    while (!this.isStopped) {
                        Message message = null;
                        boolean result = false;
                        if (null == clientIdentity) {
                            logger.warn("client is null, cdc status is disconnect, task will stopped.");
                            return;
                        }
                        long batchId = -1L;
                        try {
                            metrics.put(clientIdentity.getDestination(), CDCStatus.CONNECTED);
                            message = this.sourceDataProducer.onMessage(clientIdentity, batchSize);
                            if (null != message) {
                                batchId = message.getId();
                                result = this.sourceDataConsumer.onConsume(message, threadBatchSize);
                                this.sourceDataProducer.ack((ClientIdentity)value._1(), batchId);
                            }
                            result = true;
                        }
                        catch (Exception e) {
                            if (null != message) {
                                this.sourceDataProducer.rollback(clientIdentity, message.getId());
                            }
                            metrics.put(clientIdentity.getDestination(), CDCStatus.CONSUME_FAILED);
                        }
                        if (result) continue;
                        TimeWaitUtils.wakeupAfter(5L, TimeUnit.MILLISECONDS);
                    }
                    logger.info("cdc-consumer executor is going to stop...");
                });
            }
        }
    }

    @Override
    public boolean allReady() {
        return this.ready;
    }

    @Override
    public Map<String, CDCStatus> metrics() {
        return metrics;
    }
}

