package net.wicp.tams.common.others.kafka;

import java.lang.reflect.ParameterizedType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import net.wicp.tams.common.Conf;
import net.wicp.tams.common.Result;
import net.wicp.tams.common.apiext.ReflectAssist;
import net.wicp.tams.common.others.constant.SeekPosition;
import org.apache.commons.collections.CollectionUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/common-others-3.5.14.jar:net/wicp/tams/common/others/kafka/KafkaConsumerGroup.class */
public abstract class KafkaConsumerGroup<T> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaConsumerGroup.class);
    private List<KafkaConsumerGroup<T>.KafkaConsumerGroupThread> consumerThreadList;
    private String groupId;
    private String topic;
    private IConsumer<T> doConsumer;
    private int batchNum;
    private long timeout;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/common-others-3.5.14.jar:net/wicp/tams/common/others/kafka/KafkaConsumerGroup$KafkaConsumerGroupThread.class */
    public class KafkaConsumerGroupThread implements Runnable {
        private KafkaConsumer<String, T> kafkaConsumer;

        public KafkaConsumerGroupThread(SeekPosition seekPosition, Long l) {
            Properties props = KafkaTools.getProps(false);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConsumerGroup.this.groupId);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaTools.getValueProp(KafkaConsumerGroup.this.getTClass(), false));
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Integer.valueOf(KafkaConsumerGroup.this.batchNum + 1));
            KafkaConsumerGroup.log.info("kafka consumer 参数:");
            for (Object obj : props.keySet()) {
                KafkaConsumerGroup.log.info("{}:{}", obj, props.get(obj));
            }
            this.kafkaConsumer = new KafkaConsumer<>(props);
            this.kafkaConsumer.subscribe(Arrays.asList(KafkaConsumerGroup.this.topic));
            seekPotion(seekPosition, l);
        }

        public void seekPotion(SeekPosition seekPosition, Long l) {
            List<PartitionInfo> partitionsFor = this.kafkaConsumer.partitionsFor(KafkaConsumerGroup.this.topic);
            if (!CollectionUtils.isNotEmpty(partitionsFor) || seekPosition == SeekPosition.no) {
                return;
            }
            ArrayList arrayList = new ArrayList();
            Iterator<PartitionInfo> it = partitionsFor.iterator();
            while (it.hasNext()) {
                arrayList.add(new TopicPartition(KafkaConsumerGroup.this.topic, it.next().partition()));
            }
            this.kafkaConsumer.poll(0L);
            switch (seekPosition) {
                case begin:
                    this.kafkaConsumer.seekToBeginning(arrayList);
                    return;
                case end:
                    this.kafkaConsumer.seekToEnd(arrayList);
                    return;
                case user:
                    if (l == null || l.longValue() <= 0) {
                        return;
                    }
                    Iterator it2 = arrayList.iterator();
                    while (it2.hasNext()) {
                        this.kafkaConsumer.seek((TopicPartition) it2.next(), l.longValue());
                    }
                    return;
                default:
                    return;
            }
        }

        public KafkaConsumerGroupThread(KafkaConsumerGroup kafkaConsumerGroup) {
            this(SeekPosition.no, null);
        }

        @Override // java.lang.Runnable
        public void run() {
            ArrayList arrayList = new ArrayList();
            long j = KafkaConsumerGroup.this.timeout * 3;
            long currentTimeMillis = System.currentTimeMillis();
            while (true) {
                ConsumerRecords<String, T> poll = this.kafkaConsumer.poll(KafkaConsumerGroup.this.timeout);
                Iterator<ConsumerRecord<String, T>> it = poll.iterator();
                while (it.hasNext()) {
                    arrayList.add(it.next());
                }
                long currentTimeMillis2 = System.currentTimeMillis();
                if (arrayList.size() >= KafkaConsumerGroup.this.batchNum || (currentTimeMillis2 - currentTimeMillis > j && arrayList.size() > 0)) {
                    Result result = null;
                    try {
                        try {
                            result = KafkaConsumerGroup.this.doConsumer.doWithRecords(arrayList);
                            KafkaTools.errorlog(poll, result, KafkaConsumerGroup.log);
                        } catch (Throwable th) {
                            KafkaConsumerGroup.log.error("业务处理失败", th);
                            result = Result.getError(th.getMessage());
                            KafkaTools.errorlog(poll, result, KafkaConsumerGroup.log);
                        }
                        if (result.isSuc()) {
                            try {
                                this.kafkaConsumer.commitSync();
                            } catch (Throwable th2) {
                                KafkaConsumerGroup.log.error("commit error", th2);
                            }
                        }
                        KafkaConsumerGroup.log.info("from kafka server,the time:{},records:{}", Long.valueOf(currentTimeMillis2 - currentTimeMillis), Integer.valueOf(arrayList.size()));
                        arrayList.clear();
                        currentTimeMillis = System.currentTimeMillis();
                    } catch (Throwable th3) {
                        KafkaTools.errorlog(poll, result, KafkaConsumerGroup.log);
                        throw th3;
                    }
                }
            }
        }
    }

    public KafkaConsumerGroup(String str, String str2, IConsumer<T> iConsumer, int i) {
        this.consumerThreadList = new ArrayList();
        this.batchNum = Integer.parseInt(Conf.get("common.others.kafka.consumer.batch.num"));
        this.timeout = Long.parseLong(Conf.get("common.others.kafka.consumer.batch.timeout"));
        this.groupId = str;
        this.topic = str2;
        this.doConsumer = iConsumer;
        List<PartitionInfo> partitionsFor = KafkaAssitInst.getInst().getKafkaProducer(ReflectAssist.getSuperClassGenricType(getClass())).partitionsFor(str2);
        log.info("topic======{},partitions size====={}", str2, Integer.valueOf(partitionsFor.size()));
        int size = (partitionsFor.size() / i) + (partitionsFor.size() % i > 0 ? 1 : 0);
        log.info("consumerNum====={}", Integer.valueOf(size));
        for (int i2 = 0; i2 < size; i2++) {
            this.consumerThreadList.add(new KafkaConsumerGroupThread(this));
        }
    }

    public KafkaConsumerGroup(String str, IConsumer<T> iConsumer, int i) {
        this(Conf.get("common.others.kafka.consumer.group.id"), str, iConsumer, i);
    }

    public void seekPotion(SeekPosition seekPosition, Long l) {
        Iterator<KafkaConsumerGroup<T>.KafkaConsumerGroupThread> it = this.consumerThreadList.iterator();
        while (it.hasNext()) {
            it.next().seekPotion(seekPosition, l);
        }
    }

    public Class<T> getTClass() {
        return (Class) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
    }

    public void start() {
        Iterator<KafkaConsumerGroup<T>.KafkaConsumerGroupThread> it = this.consumerThreadList.iterator();
        while (it.hasNext()) {
            new Thread(it.next()).start();
        }
    }
}
