/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.cdc.core.remote.connect;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.Message;
import com.xforceplus.ultraman.cdc.core.remote.connect.CDCConnector;
import com.xforceplus.ultraman.cdc.reader.CanalPropertiesReader;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.sql.SQLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RemoteCDCConnector
implements CDCConnector {
    private Logger logger = LoggerFactory.getLogger(RemoteCDCConnector.class);
    protected CanalConnector canalConnector;
    protected CanalPropertiesReader canalPropertiesReader;
    private int batchSize = 1024;
    private boolean isClosed = true;

    public RemoteCDCConnector(CanalPropertiesReader canalPropertiesReader) {
        this.canalPropertiesReader = canalPropertiesReader;
        if (null != canalPropertiesReader && canalPropertiesReader.getBatchSize() > 0) {
            this.batchSize = canalPropertiesReader.getBatchSize();
        }
    }

    @Override
    public void init() {
        if (null == this.canalPropertiesReader) {
            throw new RuntimeException("canal properties could not be null.");
        }
        this.canalConnector = null != this.canalPropertiesReader.getZkServers() && !this.canalPropertiesReader.getZkServers().isEmpty() ? CanalConnectors.newClusterConnector((String)this.canalPropertiesReader.getZkServers(), (String)this.canalPropertiesReader.getDestination(), (String)this.canalPropertiesReader.getMasterUser(), (String)this.canalPropertiesReader.getMasterPasswd()) : CanalConnectors.newSingleConnector((SocketAddress)new InetSocketAddress(this.canalPropertiesReader.getMasterHost(), this.canalPropertiesReader.getMasterPort()), (String)this.canalPropertiesReader.getDestination(), (String)this.canalPropertiesReader.getMasterUser(), (String)this.canalPropertiesReader.getMasterPasswd());
    }

    @Override
    public void destroy() {
        try {
            if (this.canalConnector.checkValid()) {
                this.close();
            }
        }
        catch (Exception e) {
            this.logger.warn("[cdc-connector {}] shutdown error, message : {}", (Object)this.canalPropertiesReader.getDestination(), (Object)e.getMessage());
        }
    }

    @Override
    public void open() {
        if (null != this.canalConnector) {
            this.canalConnector.connect();
            this.canalConnector.subscribe();
            this.logger.info("[cdc-connector {}] connect to canal server...", (Object)this.canalPropertiesReader.getDestination());
            this.isClosed = false;
        }
    }

    @Override
    public void close() {
        if (null != this.canalConnector && !this.isClosed) {
            try {
                this.canalConnector.disconnect();
                this.logger.info("[cdc-connector {}] close canal connector...", (Object)this.canalPropertiesReader.getDestination());
            }
            catch (Exception e) {
                this.logger.error("[cdc-connector {}] close error, ex : {}", (Object)this.canalPropertiesReader.getDestination(), (Object)e.getMessage());
            }
            finally {
                this.isClosed = true;
            }
        }
    }

    @Override
    public void rollback() {
        if (null != this.canalConnector) {
            this.canalConnector.rollback();
        }
    }

    @Override
    public void rollback(long batchId) {
        if (null != this.canalConnector) {
            this.canalConnector.rollback(batchId);
        }
    }

    @Override
    public void ack(long batchId) throws SQLException {
        if (null == this.canalConnector) {
            this.notInitException(this.canalPropertiesReader.getDestination());
        }
        this.canalConnector.ack(batchId);
    }

    @Override
    public Message getMessageWithoutAck() throws SQLException {
        if (null == this.canalConnector) {
            this.notInitException(this.canalPropertiesReader.getDestination());
        }
        return this.canalConnector.getWithoutAck(this.batchSize);
    }

    @Override
    public int getBatchSize() {
        return this.batchSize;
    }

    @Override
    public String name() {
        return this.canalPropertiesReader.getDestination();
    }

    private void notInitException(String destination) throws SQLException {
        throw new SQLException("[cdc-connector {}] canal connector not init.", destination);
    }
}

