package com.xforceplus.ultraman.cdc.core;

import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager;
import com.alibaba.otter.canal.instance.manager.model.Canal;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.xforceplus.ultraman.cdc.context.RunnerContext;
import com.xforceplus.ultraman.cdc.reader.CanalPropertiesReader;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/cdc-2023.6.12-112331-feature-merge.jar:com/xforceplus/ultraman/cdc/core/CanalDataProducer.class */
public class CanalDataProducer implements SourceDataProducer {
    private CanalServerWithEmbedded canalServerWithEmbedded;
    final Logger logger = LoggerFactory.getLogger((Class<?>) CanalDataProducer.class);
    private volatile boolean onSubscribe = false;
    private int batchSize = 1024;

    @Override // com.xforceplus.ultraman.cdc.core.CDCLifeCycle
    public void init() {
        CanalPropertiesReader propertiesReader = RunnerContext.instance().getPropertiesReader();
        if (propertiesReader.getBatchSize() > 0) {
            this.batchSize = propertiesReader.getBatchSize();
        }
        if (propertiesReader.isResetMeta()) {
            try {
                Files.deleteIfExists(Paths.get(propertiesReader.fullLogFile(), new String[0]));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        this.canalServerWithEmbedded = CanalServerWithEmbedded.instance();
        this.canalServerWithEmbedded.setCanalInstanceGenerator(str -> {
            return new CanalInstanceWithManager(buildCanal(propertiesReader), propertiesReader.getFilter());
        });
        this.canalServerWithEmbedded.start();
        this.canalServerWithEmbedded.start(propertiesReader.getDestination());
    }

    @Override // com.xforceplus.ultraman.cdc.core.CDCLifeCycle
    public void destroy() {
        if (this.onSubscribe && null != RunnerContext.instance().getSourceDataConsumer()) {
            this.canalServerWithEmbedded.unsubscribe(RunnerContext.instance().getSourceDataConsumer().client());
        }
        this.onSubscribe = false;
        this.canalServerWithEmbedded.stop();
    }

    @Override // com.xforceplus.ultraman.cdc.core.SourceDataProducer
    public boolean singletonRegistry() {
        if (this.onSubscribe) {
            return false;
        }
        this.canalServerWithEmbedded.subscribe(RunnerContext.instance().getSourceDataConsumer().client());
        this.onSubscribe = true;
        return true;
    }

    @Override // com.xforceplus.ultraman.cdc.core.SourceDataProducer
    public Message onMessage() {
        if (!this.onSubscribe) {
            return null;
        }
        Message message = null;
        try {
            message = this.canalServerWithEmbedded.getWithoutAck(RunnerContext.instance().getSourceDataConsumer().client(), this.batchSize);
            return toConsumeMessage(message);
        } catch (Exception e) {
            this.logger.error("producer onMessage error, reason : {}", e.getMessage());
            if (null == message || message.getId() <= 0) {
                return null;
            }
            ack(message.getId());
            return null;
        }
    }

    @Override // com.xforceplus.ultraman.cdc.core.SourceDataProducer
    public void ack(long j) {
        this.canalServerWithEmbedded.ack(RunnerContext.instance().getSourceDataConsumer().client(), j);
    }

    @Override // com.xforceplus.ultraman.cdc.core.SourceDataProducer
    public void rollback(long j) {
        this.canalServerWithEmbedded.rollback(RunnerContext.instance().getSourceDataConsumer().client(), Long.valueOf(j));
    }

    private Message toConsumeMessage(Message message) throws InvalidProtocolBufferException {
        Message message2 = null;
        if (message.isRaw() && !message.getRawEntries().isEmpty()) {
            message2 = new Message(message.getId());
            Iterator<ByteString> it = message.getRawEntries().iterator();
            while (it.hasNext()) {
                message2.addEntry(CanalEntry.Entry.parseFrom(it.next()));
            }
            message2.setRaw(false);
        } else if (!message.getEntries().isEmpty()) {
            message2 = message;
        }
        return message2;
    }

    private Canal buildCanal(CanalPropertiesReader canalPropertiesReader) {
        Canal canal = new Canal();
        canal.setId(1L);
        canal.setName(canalPropertiesReader.getDestination());
        canal.setDesc(canalPropertiesReader.getDesc());
        CanalParameter canalParameter = new CanalParameter();
        canalParameter.setMetaMode(CanalParameter.MetaMode.LOCAL_FILE);
        canalParameter.setDataDir(canalPropertiesReader.getDataDir());
        canalParameter.setIndexMode(CanalParameter.IndexMode.MEMORY_META_FAILBACK);
        canalParameter.setMemoryStorageBufferSize(Integer.valueOf(canalPropertiesReader.getMemoryStorageBufferSize()));
        canalParameter.setSourcingType(CanalParameter.SourcingType.MYSQL);
        canalParameter.setDbAddresses(Arrays.asList(new InetSocketAddress(canalPropertiesReader.getMasterHost(), canalPropertiesReader.getMasterPort()), new InetSocketAddress(canalPropertiesReader.getMasterHost(), canalPropertiesReader.getMasterPort())));
        canalParameter.setDbUsername(canalPropertiesReader.getMasterUser());
        canalParameter.setDbPassword(canalPropertiesReader.getMasterPasswd());
        canalParameter.setSlaveId(Long.valueOf(canalPropertiesReader.getSlaveId()));
        canal.setCanalParameter(canalParameter);
        return canal;
    }
}
