/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.cdc.core.local.embed;

import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager;
import com.alibaba.otter.canal.instance.manager.model.Canal;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.ClientIdentity;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.xforceplus.ultraman.cdc.core.local.embed.SourceDataProducer;
import com.xforceplus.ultraman.cdc.reader.CDCPropertyPackage;
import com.xforceplus.ultraman.cdc.reader.CanalPropertiesReader;
import io.vavr.Tuple2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EmbedDataProducer
implements SourceDataProducer {
    final Logger logger = LoggerFactory.getLogger(EmbedDataProducer.class);
    private CanalServerWithEmbedded canalServerWithEmbedded;
    private CDCPropertyPackage cdcPropertyPackage;
    private Map<String, Tuple2<ClientIdentity, CanalPropertiesReader>> clientIdentities;

    public EmbedDataProducer(CDCPropertyPackage cdcPropertyPackage) {
        this.cdcPropertyPackage = cdcPropertyPackage;
        this.canalServerWithEmbedded = CanalServerWithEmbedded.instance();
        this.clientIdentities = new HashMap<String, Tuple2<ClientIdentity, CanalPropertiesReader>>();
    }

    @Override
    public void init() {
        for (CanalPropertiesReader reader : this.cdcPropertyPackage.readers()) {
            if (reader.isResetMeta()) {
                try {
                    Files.deleteIfExists(Paths.get(reader.fullLogFile(), new String[0]));
                }
                catch (IOException e) {
                    e.printStackTrace();
                }
            }
            this.canalServerWithEmbedded.setCanalInstanceGenerator(destination -> {
                Canal canal = this.buildCanal(reader);
                return new CanalInstanceWithManager(canal, reader.getFilter());
            });
        }
        this.canalServerWithEmbedded.start();
        for (CanalPropertiesReader reader : this.cdcPropertyPackage.readers()) {
            this.canalServerWithEmbedded.start(reader.getDestination());
        }
    }

    @Override
    public void destroy() {
        for (Map.Entry<String, Tuple2<ClientIdentity, CanalPropertiesReader>> entry : this.clientIdentities.entrySet()) {
            this.canalServerWithEmbedded.unsubscribe((ClientIdentity)entry.getValue()._1());
            this.logger.info("cdc-instance destination {} is shutdown.", (Object)((ClientIdentity)entry.getValue()._1()).getDestination());
        }
        this.canalServerWithEmbedded.stop();
    }

    @Override
    public boolean singletonRegistry(ClientIdentity client) {
        Optional<CanalPropertiesReader> canalPropertiesReader = this.cdcPropertyPackage.find(client.getDestination());
        if (this.clientIdentities.containsKey(client.getDestination()) || !canalPropertiesReader.isPresent()) {
            return false;
        }
        this.clientIdentities.put(client.getDestination(), (Tuple2<ClientIdentity, CanalPropertiesReader>)new Tuple2((Object)client, (Object)canalPropertiesReader.get()));
        this.canalServerWithEmbedded.subscribe(client);
        return true;
    }

    @Override
    public Collection<Tuple2<ClientIdentity, CanalPropertiesReader>> clientIdentityWithReader() {
        return this.clientIdentities.values();
    }

    @Override
    public Message onMessage(ClientIdentity clientIdentity, int batchSize) {
        Message message = null;
        try {
            message = this.canalServerWithEmbedded.getWithoutAck(clientIdentity, batchSize);
            return this.toConsumeMessage(message);
        }
        catch (Exception e) {
            this.logger.error("producer onMessage error, reason : {}", (Object)e.getMessage());
            if (null != message && message.getId() > 0L) {
                this.ack(clientIdentity, message.getId());
            }
            return null;
        }
    }

    @Override
    public void ack(ClientIdentity clientIdentity, long batchId) {
        this.canalServerWithEmbedded.ack(clientIdentity, batchId);
    }

    @Override
    public void rollback(ClientIdentity clientIdentity, long batchId) {
        this.canalServerWithEmbedded.rollback(clientIdentity, Long.valueOf(batchId));
    }

    private Message toConsumeMessage(Message message) throws InvalidProtocolBufferException {
        Message result = null;
        if (message.isRaw() && !message.getRawEntries().isEmpty()) {
            result = new Message(message.getId());
            for (ByteString byteString : message.getRawEntries()) {
                result.addEntry(CanalEntry.Entry.parseFrom((ByteString)byteString));
            }
            result.setRaw(false);
        } else if (!message.getEntries().isEmpty()) {
            result = message;
        }
        return result;
    }

    private Canal buildCanal(CanalPropertiesReader propertiesReader) {
        Canal canal = new Canal();
        canal.setId(Long.valueOf(1L));
        canal.setName(propertiesReader.getDestination());
        canal.setDesc(propertiesReader.getDesc());
        CanalParameter parameter = new CanalParameter();
        parameter.setMetaMode(CanalParameter.MetaMode.LOCAL_FILE);
        parameter.setDataDir(propertiesReader.getDataDir());
        parameter.setIndexMode(CanalParameter.IndexMode.MEMORY_META_FAILBACK);
        parameter.setMemoryStorageBufferSize(Integer.valueOf(propertiesReader.getMemoryStorageBufferSize()));
        parameter.setSourcingType(CanalParameter.SourcingType.MYSQL);
        parameter.setDbAddresses(Arrays.asList(new InetSocketAddress(propertiesReader.getMasterHost(), propertiesReader.getMasterPort()), new InetSocketAddress(propertiesReader.getMasterHost(), propertiesReader.getMasterPort())));
        parameter.setDbUsername(propertiesReader.getMasterUser());
        parameter.setDbPassword(propertiesReader.getMasterPasswd());
        parameter.setSlaveId(Long.valueOf(propertiesReader.getSlaveId()));
        canal.setCanalParameter(parameter);
        return canal;
    }
}

