package com.xforceplus.bi.ultraman.binlog;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.GtidSet;
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.RotateEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import com.google.common.collect.Lists;
import com.xforceplus.bi.ultraman.binlog.handler.CDCBusinessHandler;
import com.xforceplus.bi.ultraman.binlog.handler.PositionHandler;
import com.xforceplus.bi.ultraman.binlog.pojo.BinlogPositionEntity;
import com.xforceplus.bi.ultraman.binlog.pojo.DatabaseInfo;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.List;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xforceplus/bi/ultraman/binlog/MySQLBinaryLogCDC.class */
public class MySQLBinaryLogCDC implements LifeCycleCDC {
    private static final Logger log = LoggerFactory.getLogger(MySQLBinaryLogCDC.class);
    private List<CDCBusinessHandler> cdcBusinessHandlers = Lists.newArrayList();
    private DatabaseInfo databaseInfo;
    private PositionHandler positionHandler;
    private boolean useGTID;
    private GtidSet gtidSet;
    private BinaryLogClient client;
    private BinaryLogClient.EventListener eventListener;
    private BinaryLogClient.LifecycleListener lifecycleListener;

    public MySQLBinaryLogCDC(DatabaseInfo databaseInfo, PositionHandler positionHandler, boolean z) {
        this.databaseInfo = databaseInfo;
        this.positionHandler = positionHandler;
        this.useGTID = z;
    }

    public void addCDCEventHandler(CDCBusinessHandler cDCBusinessHandler) {
        if (this.client == null || !this.client.isConnected()) {
            this.cdcBusinessHandlers.add(cDCBusinessHandler);
        } else {
            log.error("{} has started, cannot add new CDCEventHandler into it", getClass().getSimpleName());
        }
    }

    @Override // com.xforceplus.bi.ultraman.binlog.LifeCycleCDC
    public void start() throws Exception {
        if (isStarted()) {
            log.warn("{} has already started, do not do it again", getClass().getSimpleName());
            return;
        }
        this.client = createBinlogClient();
        this.client.setEventDeserializer(new EventDeserializer());
        this.client.setServerId(getRandomServerId());
        Object position = this.positionHandler.getPosition(this.databaseInfo);
        if (this.useGTID) {
            this.client.setGtidSet(position == null ? "" : (String) position);
            this.client.setGtidSetFallbackToPurged(true);
            this.gtidSet = new GtidSet(this.client.getGtidSet());
            log.info("Now gtid is {}", this.client.getGtidSet());
        } else {
            BinlogPositionEntity binlogPositionEntity = (BinlogPositionEntity) position;
            if (binlogPositionEntity != null && binlogPositionEntity.getBinlogName() != null && binlogPositionEntity.getPosition() != null) {
                this.client.setBinlogFilename(binlogPositionEntity.getBinlogName());
                this.client.setBinlogPosition(binlogPositionEntity.getPosition().longValue());
                log.info("Set binlog position = {}", binlogPositionEntity.getPosition());
            }
            log.info("Now position is {}, {}", this.client.getBinlogFilename(), Long.valueOf(this.client.getBinlogPosition()));
        }
        this.eventListener = createBinlogEventListener();
        this.client.registerEventListener(this.eventListener);
        this.lifecycleListener = createBinlogLifecycleListener();
        this.client.registerLifecycleListener(this.lifecycleListener);
        this.client.connect();
    }

    private long getRandomServerId() {
        try {
            return SecureRandom.getInstanceStrong().nextLong();
        } catch (NoSuchAlgorithmException e) {
            return RandomUtils.nextLong();
        }
    }

    private BinaryLogClient createBinlogClient() {
        return new BinaryLogClient(this.databaseInfo.getHostname(), this.databaseInfo.getPort(), this.databaseInfo.getUsername(), this.databaseInfo.getPassword());
    }

    private BinaryLogClient.EventListener createBinlogEventListener() {
        return event -> {
            EventHeaderV4 header = event.getHeader();
            if (header.getEventType() == EventType.FORMAT_DESCRIPTION || header.getEventType() == EventType.HEARTBEAT) {
                return;
            }
            if (!this.useGTID) {
                BinlogPositionEntity binlogPositionEntity = new BinlogPositionEntity();
                if (header.getEventType().equals(EventType.ROTATE)) {
                    RotateEventData data = event.getData();
                    binlogPositionEntity.setBinlogName(data.getBinlogFilename());
                    binlogPositionEntity.setPosition(Long.valueOf(data.getBinlogPosition()));
                } else {
                    binlogPositionEntity = (BinlogPositionEntity) this.positionHandler.getPosition(this.databaseInfo);
                    binlogPositionEntity.setPosition(Long.valueOf(header.getPosition()));
                }
                binlogPositionEntity.setServerId(Long.valueOf(header.getServerId()));
                this.positionHandler.savePosition(this.databaseInfo, binlogPositionEntity);
            } else if (header.getEventType() == EventType.GTID) {
                this.gtidSet.add(event.getData().getGtid());
                this.positionHandler.savePosition(this.databaseInfo, this.gtidSet.toString());
            }
            this.cdcBusinessHandlers.forEach(cDCBusinessHandler -> {
                cDCBusinessHandler.handle(event);
            });
        };
    }

    private BinaryLogClient.LifecycleListener createBinlogLifecycleListener() {
        return new BinaryLogClient.LifecycleListener() { // from class: com.xforceplus.bi.ultraman.binlog.MySQLBinaryLogCDC.1
            public void onConnect(BinaryLogClient binaryLogClient) {
                MySQLBinaryLogCDC.log.info("Successfully connect to the mysql server");
            }

            public void onCommunicationFailure(BinaryLogClient binaryLogClient, Exception exc) {
                MySQLBinaryLogCDC.log.error("Communication Failure", exc);
                MySQLBinaryLogCDC.this.positionHandler.clear(MySQLBinaryLogCDC.this.databaseInfo);
                try {
                    MySQLBinaryLogCDC.this.stop();
                } catch (Exception e) {
                    MySQLBinaryLogCDC.log.error("Fatal to stop binlog listener", e);
                }
                try {
                    MySQLBinaryLogCDC.this.start();
                } catch (Exception e2) {
                    MySQLBinaryLogCDC.log.error("Fatal to start binlog listener", e2);
                }
                MySQLBinaryLogCDC.log.info("position is clean and binlog listener is restarted");
            }

            public void onEventDeserializationFailure(BinaryLogClient binaryLogClient, Exception exc) {
                MySQLBinaryLogCDC.log.error("Event Deserialization Failure", exc);
            }

            public void onDisconnect(BinaryLogClient binaryLogClient) {
                MySQLBinaryLogCDC.log.info("Disconnect from the mysql server");
            }
        };
    }

    @Override // com.xforceplus.bi.ultraman.binlog.LifeCycleCDC
    public void stop() throws Exception {
        try {
            if (this.eventListener != null && this.client != null) {
                this.client.unregisterEventListener(this.eventListener);
            }
            if (this.lifecycleListener != null && this.client != null) {
                this.client.unregisterLifecycleListener(this.lifecycleListener);
            }
            this.client = null;
            this.eventListener = null;
            this.lifecycleListener = null;
            log.info("MySQLBinaryLogCDC stopped...");
        } catch (Throwable th) {
            throw new CDCException("Error closing MySQL Binlog CDC connection");
        }
    }

    @Override // com.xforceplus.bi.ultraman.binlog.LifeCycleCDC
    public boolean isStarted() {
        return this.client != null && this.client.isConnected();
    }

    public DatabaseInfo getDatabaseInfo() {
        return this.databaseInfo;
    }
}
