package com.xforceplus.ultraman.cdc.core.remote;

import com.alibaba.otter.canal.common.utils.NamedThreadFactory;
import com.xforceplus.ultraman.cdc.CDCServer;
import com.xforceplus.ultraman.cdc.core.remote.connect.RemoteCDCConnector;
import com.xforceplus.ultraman.cdc.core.remote.runner.DefaultCDCConsumer;
import com.xforceplus.ultraman.cdc.processor.DataProcessor;
import com.xforceplus.ultraman.cdc.reader.CDCPropertyPackage;
import com.xforceplus.ultraman.cdc.reader.CanalPropertiesReader;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

/* loaded from: input_file:BOOT-INF/lib/cdc-2023.6.27-184023-feature-merge.jar:com/xforceplus/ultraman/cdc/core/remote/DispatcherCDCServer.class */
public class DispatcherCDCServer implements CDCServer {
    private ScheduledExecutorService executor;
    private List<DefaultCDCConsumer> consumers = new ArrayList();
    private volatile boolean ready = false;

    public DispatcherCDCServer(CDCPropertyPackage cDCPropertyPackage, DataProcessor dataProcessor) {
        this.executor = null;
        this.executor = Executors.newScheduledThreadPool(cDCPropertyPackage.readers().size(), new NamedThreadFactory("cdc-server-runner"));
        Iterator<CanalPropertiesReader> it = cDCPropertyPackage.readers().iterator();
        while (it.hasNext()) {
            this.consumers.add(new DefaultCDCConsumer(new RemoteCDCConnector(it.next()), dataProcessor));
        }
    }

    @Override // com.xforceplus.ultraman.cdc.CDCLifeCycle
    public void init() {
        Iterator<DefaultCDCConsumer> it = this.consumers.iterator();
        while (it.hasNext()) {
            it.next().init();
        }
        this.ready = true;
    }

    @Override // com.xforceplus.ultraman.cdc.CDCLifeCycle
    public void destroy() {
        Iterator<DefaultCDCConsumer> it = this.consumers.iterator();
        while (it.hasNext()) {
            it.next().destroy();
        }
        this.executor.shutdown();
        this.ready = false;
    }

    @Override // com.xforceplus.ultraman.cdc.CDCServer
    public void execute() {
        for (DefaultCDCConsumer defaultCDCConsumer : this.consumers) {
            ScheduledExecutorService scheduledExecutorService = this.executor;
            defaultCDCConsumer.getClass();
            scheduledExecutorService.submit(defaultCDCConsumer::execute);
        }
    }

    @Override // com.xforceplus.ultraman.cdc.CDCServer
    public boolean allReady() {
        return this.ready;
    }
}
