package net.wicp.tams.common.binlog.dump.handlerConsumer;

import com.lmax.disruptor.RingBuffer;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import net.wicp.tams.common.Conf;
import net.wicp.tams.common.apiext.LoggerUtil;
import net.wicp.tams.common.apiext.StringUtil;
import net.wicp.tams.common.apiext.TimeAssist;
import net.wicp.tams.common.apiext.jdbc.JdbcAssit;
import net.wicp.tams.common.binlog.alone.dump.bean.Dump;
import net.wicp.tams.common.binlog.alone.dump.bean.DumpEvent;
import net.wicp.tams.common.binlog.dump.MainDump;
import net.wicp.tams.common.constant.JvmStatus;
import net.wicp.tams.common.jdbc.DruidAssit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/wicp/tams/common/binlog/dump/handlerConsumer/Publisher.class */
public class Publisher implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(Publisher.class);
    private final RingBuffer<DumpEvent> ringBuffer;
    private Connection connection;
    private PreparedStatement stmt;
    private int numDuan;
    private final long maxDuanNo;
    private final long maxBatchNo;
    private final String temp;
    private final String startId;
    private final long numLastBatch;
    private final Dump dump;
    private final int numBatch = Integer.parseInt(Conf.get("common.binlog.alone.dump.global.batchNum"));
    private boolean isover = false;
    private int batchNo = 0;
    private int duanNo = 0;
    private long timeBegin = 0;

    public Publisher(RingBuffer<DumpEvent> ringBuffer, Dump dump) {
        this.dump = dump;
        this.ringBuffer = ringBuffer;
        this.numDuan = dump.getNumDuan() == 0 ? 500 : dump.getNumDuan();
        Long numDump = dump.getNumDump();
        if (numDump == null || numDump.longValue() < 0) {
            this.numLastBatch = this.numBatch;
            this.maxDuanNo = Long.MAX_VALUE;
            this.maxBatchNo = Long.MAX_VALUE;
        } else {
            this.maxBatchNo = (numDump.longValue() / this.numBatch) + (numDump.longValue() % ((long) this.numBatch) == 0 ? 0 : 1);
            long longValue = numDump.longValue() - ((this.maxBatchNo - 1) * this.numBatch);
            this.maxDuanNo = ((this.maxBatchNo - 1) * ((this.numBatch / this.numDuan) + (this.numBatch % this.numDuan == 0 ? 0 : 1))) + (longValue / this.numDuan) + (longValue % ((long) this.numDuan) == 0 ? 0 : 1);
            this.numLastBatch = numDump.longValue() - ((this.maxBatchNo - 1) * this.numBatch);
        }
        this.startId = dump.getStartId();
        String str = dump.getPrimarys()[0];
        this.temp = String.format("select %s %s and %s>=?  order by %s limit ?,?", str, dump.packFromstr(), str, str);
        log.info("--------maxDuanNo={},maxBatchNo={}-------", Long.valueOf(this.maxDuanNo == Long.MAX_VALUE ? -1L : this.maxDuanNo), Long.valueOf(this.maxBatchNo == Long.MAX_VALUE ? -1L : this.maxBatchNo));
    }

    @Override // java.lang.Runnable
    public void run() {
        ResultSet executeQuery;
        Thread.currentThread().setName("PublisherThread");
        String hasNull = StringUtil.hasNull(new String[]{this.startId, ""});
        boolean z = true;
        boolean z2 = false;
        this.timeBegin = System.currentTimeMillis();
        while (true) {
            while (true) {
                try {
                    try {
                        if (this.connection == null || this.connection.isClosed()) {
                            if (this.stmt != null && this.stmt.isClosed()) {
                                this.stmt.close();
                            }
                            this.connection = DruidAssit.getConnection("_global");
                            this.stmt = this.connection.prepareStatement(this.temp);
                            this.stmt.setFetchSize(this.numBatch);
                        }
                        PreparedStatement preparedStatement = this.stmt;
                        Object[] objArr = new Object[3];
                        objArr[0] = hasNull;
                        objArr[1] = Integer.valueOf(z ? 0 : 1);
                        objArr[2] = Long.valueOf((!z2 || this.numLastBatch <= 0) ? this.numBatch : this.numLastBatch);
                        JdbcAssit.setPreParam(preparedStatement, objArr);
                        executeQuery = this.stmt.executeQuery();
                    } catch (Throwable th) {
                        if (TimeAssist.reDoWait("tams-dump", 7)) {
                            log.error("重试7次都不能拿到链接，退出");
                            LoggerUtil.exit(JvmStatus.s15);
                        } else {
                            log.error("不能查询数据，重试", th);
                        }
                    }
                    if (executeQuery != null) {
                        break;
                    }
                } catch (Exception e) {
                    log.error("生产者执行失败", e);
                    LoggerUtil.exit(JvmStatus.s15);
                }
            }
            if (!executeQuery.next()) {
                break;
            }
            String string = executeQuery.getString(1);
            String string2 = executeQuery.getString(1);
            int i = 0 + 1;
            while (executeQuery.next() && this.duanNo < this.maxDuanNo) {
                if (i == 0) {
                    string = executeQuery.getString(1);
                    string2 = executeQuery.getString(1);
                    i++;
                } else if (i == this.numDuan - 1) {
                    String string3 = executeQuery.getString(1);
                    pushData(string, string3);
                    hasNull = string3;
                    this.duanNo++;
                    i = 0;
                    string = null;
                    string2 = null;
                } else {
                    string2 = executeQuery.getString(1);
                    i++;
                }
            }
            if (string != null && string2 != null && this.duanNo < this.maxDuanNo) {
                pushData(string, string2);
                this.duanNo++;
                hasNull = string2;
            }
            this.batchNo++;
            executeQuery.close();
            if (this.duanNo >= this.maxDuanNo) {
                break;
            }
            z2 = ((long) this.batchNo) == this.maxBatchNo - 1;
            z = false;
        }
        this.isover = true;
        try {
            this.stmt.close();
            this.connection.close();
        } catch (Exception e2) {
            log.error("回收资源失败", e2);
        }
    }

    private void pushData(String str, String str2) {
        long next = this.ringBuffer.next();
        DumpEvent dumpEvent = (DumpEvent) this.ringBuffer.get(next);
        dumpEvent.setBeginId(str);
        dumpEvent.setEndId(str2);
        dumpEvent.setDump(this.dump);
        this.ringBuffer.publish(next);
    }

    public int getBatchNo() {
        return this.batchNo;
    }

    public int getDuanNo() {
        return this.duanNo;
    }

    public boolean isIsover() {
        return this.isover && MainDump.metricsMap.get(this.dump.getId()).counter_send_event.getCount() == ((long) getDuanNo());
    }

    public long getTimeBegin() {
        return this.timeBegin;
    }

    public Dump getDump() {
        return this.dump;
    }
}
