package net.wicp.tams.common.others.kafka;

import java.lang.reflect.ParameterizedType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import net.wicp.tams.common.Conf;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/common-others-3.5.9.jar:net/wicp/tams/common/others/kafka/KafkaConsumerThread.class */
public abstract class KafkaConsumerThread<T> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaConsumerThread.class);
    private final KafkaConsumer<String, T> consumer;
    private final String topic;
    private final IConsumer<T> doConsumer;
    private ExecutorService executor;
    private int batchNum;
    private long timeout;

    public KafkaConsumerThread(String str, String str2, IConsumer<T> iConsumer) {
        this.batchNum = Integer.parseInt(Conf.get("common.others.kafka.consumer.batch.num"));
        this.timeout = Long.parseLong(Conf.get("common.others.kafka.consumer.batch.timeout"));
        Properties props = KafkaTools.getProps(false);
        props.put("group.id", str);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", KafkaTools.getValueProp(getTClass(), false));
        this.consumer = new KafkaConsumer<>(props);
        this.topic = str2;
        this.doConsumer = iConsumer;
        this.consumer.subscribe(Arrays.asList(this.topic));
    }

    public KafkaConsumerThread(String str, IConsumer<T> iConsumer) {
        this(Conf.get("common.others.kafka.consumer.group.id"), str, iConsumer);
    }

    public Class<T> getTClass() {
        return (Class) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
    }

    public void start(int i) {
        this.executor = new ThreadPoolExecutor(i, i, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(1000), new ThreadPoolExecutor.CallerRunsPolicy());
        final ArrayList arrayList = new ArrayList();
        long j = this.timeout * 3;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            final ConsumerRecords poll = this.consumer.poll(this.timeout);
            Iterator it = poll.iterator();
            while (it.hasNext()) {
                arrayList.add((ConsumerRecord) it.next());
            }
            long currentTimeMillis2 = System.currentTimeMillis();
            if (arrayList.size() >= this.batchNum || currentTimeMillis2 - currentTimeMillis > j) {
                this.executor.submit(new Runnable() { // from class: net.wicp.tams.common.others.kafka.KafkaConsumerThread.1
                    @Override // java.lang.Runnable
                    public void run() {
                        KafkaTools.errorlog(poll, KafkaConsumerThread.this.doConsumer.doWithRecords(arrayList), KafkaConsumerThread.log);
                    }
                });
                this.consumer.commitSync();
                arrayList.clear();
                log.info("从kafka取数据用时：{},数量：{}", Long.valueOf(currentTimeMillis2 - currentTimeMillis), Integer.valueOf(arrayList.size()));
                currentTimeMillis = System.currentTimeMillis();
            }
        }
    }
}
