/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.cdc.core.remote;

import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
import com.xforceplus.ultraman.cdc.CDCServer;
import com.xforceplus.ultraman.cdc.core.remote.connect.RemoteCDCConnector;
import com.xforceplus.ultraman.cdc.core.remote.runner.DefaultCDCConsumer;
import com.xforceplus.ultraman.cdc.dto.enums.CDCStatus;
import com.xforceplus.ultraman.cdc.processor.DataProcessor;
import com.xforceplus.ultraman.cdc.reader.CDCPropertyPackage;
import com.xforceplus.ultraman.cdc.reader.CanalPropertiesReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;

public class DispatcherCDCServer
implements CDCServer {
    private List<DefaultCDCConsumer> consumers = new ArrayList<DefaultCDCConsumer>();
    private ScheduledExecutorService executor = null;
    private volatile boolean ready = false;

    public DispatcherCDCServer(CDCPropertyPackage cdcPropertyPackage, DataProcessor dataProcessor) {
        this.executor = Executors.newScheduledThreadPool(cdcPropertyPackage.readers().size(), (ThreadFactory)new NamedThreadFactory("cdc-server-runner"));
        for (CanalPropertiesReader canalPropertiesReader : cdcPropertyPackage.readers()) {
            this.consumers.add(new DefaultCDCConsumer(new RemoteCDCConnector(canalPropertiesReader), dataProcessor, metrics));
        }
    }

    @Override
    public void init() {
        for (DefaultCDCConsumer consumer : this.consumers) {
            consumer.init();
        }
        this.ready = true;
    }

    @Override
    public void destroy() {
        for (DefaultCDCConsumer consumer : this.consumers) {
            consumer.destroy();
        }
        this.executor.shutdown();
        this.ready = false;
    }

    @Override
    public void execute() {
        for (DefaultCDCConsumer consumer : this.consumers) {
            this.executor.submit(consumer::execute);
        }
    }

    @Override
    public boolean allReady() {
        return this.ready;
    }

    @Override
    public Map<String, CDCStatus> metrics() {
        return metrics;
    }
}

