package com.xforceplus.ultraman.cdc.core.remote.connect;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.Message;
import com.xforceplus.ultraman.cdc.reader.CanalPropertiesReader;
import java.net.InetSocketAddress;
import java.sql.SQLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/cdc-2023.6.27-185319-feature-merge.jar:com/xforceplus/ultraman/cdc/core/remote/connect/RemoteCDCConnector.class */
public class RemoteCDCConnector implements CDCConnector {
    protected CanalConnector canalConnector;
    protected CanalPropertiesReader canalPropertiesReader;
    private int batchSize;
    private Logger logger = LoggerFactory.getLogger((Class<?>) RemoteCDCConnector.class);
    private boolean isClosed = true;

    public RemoteCDCConnector(CanalPropertiesReader canalPropertiesReader) {
        this.batchSize = 1024;
        this.canalPropertiesReader = canalPropertiesReader;
        if (null == canalPropertiesReader || canalPropertiesReader.getBatchSize() <= 0) {
            return;
        }
        this.batchSize = this.batchSize;
    }

    @Override // com.xforceplus.ultraman.cdc.CDCLifeCycle
    public void init() {
        if (null == this.canalPropertiesReader) {
            throw new RuntimeException("canal properties could not be null.");
        }
        this.canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(this.canalPropertiesReader.getMasterHost(), this.canalPropertiesReader.getMasterPort()), this.canalPropertiesReader.getDestination(), this.canalPropertiesReader.getMasterUser(), this.canalPropertiesReader.getMasterPasswd());
    }

    @Override // com.xforceplus.ultraman.cdc.CDCLifeCycle
    public void destroy() {
        try {
            if (this.canalConnector.checkValid()) {
                close();
            }
        } catch (Exception e) {
            this.logger.warn("[cdc-connector {}] shutdown error, message : {}", this.canalPropertiesReader.getDestination(), e.getMessage());
        }
    }

    @Override // com.xforceplus.ultraman.cdc.core.remote.connect.CDCConnector
    public void open() {
        if (null != this.canalConnector) {
            this.canalConnector.connect();
            this.canalConnector.subscribe();
            this.logger.info("[cdc-connector {}] connect to canal server...", this.canalPropertiesReader.getDestination());
            this.isClosed = false;
        }
    }

    @Override // com.xforceplus.ultraman.cdc.core.remote.connect.CDCConnector
    public void close() {
        if (null != this.canalConnector) {
            try {
            } catch (Exception e) {
                this.logger.error("[cdc-connector {}] close error, ex : {}", this.canalPropertiesReader.getDestination(), e.getMessage());
            } finally {
                this.isClosed = true;
            }
            if (this.isClosed) {
                return;
            }
            this.canalConnector.disconnect();
            this.logger.info("[cdc-connector {}] close canal connector...", this.canalPropertiesReader.getDestination());
        }
    }

    @Override // com.xforceplus.ultraman.cdc.core.remote.connect.CDCConnector
    public void rollback() {
        if (null != this.canalConnector) {
            this.canalConnector.rollback();
        }
    }

    @Override // com.xforceplus.ultraman.cdc.core.remote.connect.CDCConnector
    public void ack(long j) throws SQLException {
        if (null == this.canalConnector) {
            notInitException(this.canalPropertiesReader.getDestination());
        }
        this.canalConnector.ack(j);
    }

    @Override // com.xforceplus.ultraman.cdc.core.remote.connect.CDCConnector
    public Message getMessageWithoutAck() throws SQLException {
        if (null == this.canalConnector) {
            notInitException(this.canalPropertiesReader.getDestination());
        }
        return this.canalConnector.getWithoutAck(this.batchSize);
    }

    @Override // com.xforceplus.ultraman.cdc.core.remote.connect.CDCConnector
    public int getBatchSize() {
        return this.batchSize;
    }

    @Override // com.xforceplus.ultraman.cdc.core.remote.connect.CDCConnector
    public String name() {
        return this.canalPropertiesReader.getDestination();
    }

    private void notInitException(String str) throws SQLException {
        throw new SQLException("[cdc-connector {}] canal connector not init.", str);
    }
}
