package com.xforceplus.ultraman.cdc.core.local.embed;

import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager;
import com.alibaba.otter.canal.instance.manager.model.Canal;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.ClientIdentity;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.xforceplus.ultraman.cdc.reader.CDCPropertyPackage;
import com.xforceplus.ultraman.cdc.reader.CanalPropertiesReader;
import io.vavr.Tuple2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xforceplus/ultraman/cdc/core/local/embed/EmbedDataProducer.class */
public class EmbedDataProducer implements SourceDataProducer {
    private CDCPropertyPackage cdcPropertyPackage;
    final Logger logger = LoggerFactory.getLogger(EmbedDataProducer.class);
    private CanalServerWithEmbedded canalServerWithEmbedded = CanalServerWithEmbedded.instance();
    private Map<String, Tuple2<ClientIdentity, CanalPropertiesReader>> clientIdentities = new HashMap();

    public EmbedDataProducer(CDCPropertyPackage cDCPropertyPackage) {
        this.cdcPropertyPackage = cDCPropertyPackage;
    }

    @Override // com.xforceplus.ultraman.cdc.CDCLifeCycle
    public void init() {
        for (CanalPropertiesReader canalPropertiesReader : this.cdcPropertyPackage.readers()) {
            if (canalPropertiesReader.isResetMeta()) {
                try {
                    Files.deleteIfExists(Paths.get(canalPropertiesReader.fullLogFile(), new String[0]));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            this.canalServerWithEmbedded.setCanalInstanceGenerator(str -> {
                return new CanalInstanceWithManager(buildCanal(canalPropertiesReader), canalPropertiesReader.getFilter());
            });
        }
        this.canalServerWithEmbedded.start();
        Iterator<CanalPropertiesReader> it = this.cdcPropertyPackage.readers().iterator();
        while (it.hasNext()) {
            this.canalServerWithEmbedded.start(it.next().getDestination());
        }
    }

    @Override // com.xforceplus.ultraman.cdc.CDCLifeCycle
    public void destroy() {
        for (Map.Entry<String, Tuple2<ClientIdentity, CanalPropertiesReader>> entry : this.clientIdentities.entrySet()) {
            this.canalServerWithEmbedded.unsubscribe((ClientIdentity) entry.getValue()._1());
            this.logger.info("cdc-instance destination {} is shutdown.", ((ClientIdentity) entry.getValue()._1()).getDestination());
        }
        this.canalServerWithEmbedded.stop();
    }

    @Override // com.xforceplus.ultraman.cdc.core.local.embed.SourceDataProducer
    public boolean singletonRegistry(ClientIdentity clientIdentity) {
        Optional<CanalPropertiesReader> find = this.cdcPropertyPackage.find(clientIdentity.getDestination());
        if (this.clientIdentities.containsKey(clientIdentity.getDestination()) || !find.isPresent()) {
            return false;
        }
        this.clientIdentities.put(clientIdentity.getDestination(), new Tuple2<>(clientIdentity, find.get()));
        this.canalServerWithEmbedded.subscribe(clientIdentity);
        return true;
    }

    @Override // com.xforceplus.ultraman.cdc.core.local.embed.SourceDataProducer
    public Collection<Tuple2<ClientIdentity, CanalPropertiesReader>> clientIdentityWithReader() {
        return this.clientIdentities.values();
    }

    @Override // com.xforceplus.ultraman.cdc.core.local.embed.SourceDataProducer
    public Message onMessage(ClientIdentity clientIdentity, int i) {
        Message message = null;
        try {
            message = this.canalServerWithEmbedded.getWithoutAck(clientIdentity, i);
            return toConsumeMessage(message);
        } catch (Exception e) {
            this.logger.error("producer onMessage error, reason : {}", e.getMessage());
            if (null == message || message.getId() <= 0) {
                return null;
            }
            ack(clientIdentity, message.getId());
            return null;
        }
    }

    @Override // com.xforceplus.ultraman.cdc.core.local.embed.SourceDataProducer
    public void ack(ClientIdentity clientIdentity, long j) {
        this.canalServerWithEmbedded.ack(clientIdentity, j);
    }

    @Override // com.xforceplus.ultraman.cdc.core.local.embed.SourceDataProducer
    public void rollback(ClientIdentity clientIdentity, long j) {
        this.canalServerWithEmbedded.rollback(clientIdentity, Long.valueOf(j));
    }

    private Message toConsumeMessage(Message message) throws InvalidProtocolBufferException {
        Message message2 = null;
        if (message.isRaw() && !message.getRawEntries().isEmpty()) {
            message2 = new Message(message.getId());
            Iterator it = message.getRawEntries().iterator();
            while (it.hasNext()) {
                message2.addEntry(CanalEntry.Entry.parseFrom((ByteString) it.next()));
            }
            message2.setRaw(false);
        } else if (!message.getEntries().isEmpty()) {
            message2 = message;
        }
        return message2;
    }

    private Canal buildCanal(CanalPropertiesReader canalPropertiesReader) {
        Canal canal = new Canal();
        canal.setId(1L);
        canal.setName(canalPropertiesReader.getDestination());
        canal.setDesc(canalPropertiesReader.getDesc());
        CanalParameter canalParameter = new CanalParameter();
        canalParameter.setMetaMode(CanalParameter.MetaMode.LOCAL_FILE);
        canalParameter.setDataDir(canalPropertiesReader.getDataDir());
        canalParameter.setIndexMode(CanalParameter.IndexMode.MEMORY_META_FAILBACK);
        canalParameter.setMemoryStorageBufferSize(Integer.valueOf(canalPropertiesReader.getMemoryStorageBufferSize()));
        canalParameter.setSourcingType(CanalParameter.SourcingType.MYSQL);
        canalParameter.setDbAddresses(Arrays.asList(new InetSocketAddress(canalPropertiesReader.getMasterHost(), canalPropertiesReader.getMasterPort()), new InetSocketAddress(canalPropertiesReader.getMasterHost(), canalPropertiesReader.getMasterPort())));
        canalParameter.setDbUsername(canalPropertiesReader.getMasterUser());
        canalParameter.setDbPassword(canalPropertiesReader.getMasterPasswd());
        canalParameter.setSlaveId(Long.valueOf(canalPropertiesReader.getSlaveId()));
        canal.setCanalParameter(canalParameter);
        return canal;
    }
}
