/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.taxware.architecture.g1.elk.kafka;

import com.alibaba.ttl.TtlRunnable;
import com.xforceplus.taxware.architecture.g1.domain.log.model.LogEvent;
import com.xforceplus.taxware.architecture.g1.domain.log.model.LogSender;
import com.xforceplus.taxware.architecture.g1.domain.util.ApplicationUtil;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaLogSender
implements LogSender {
    private static final Logger logger = LoggerFactory.getLogger(KafkaLogSender.class);
    private static final AtomicInteger seqNo = new AtomicInteger(0);
    private BlockingQueue<LogEvent> queue;
    private Producer<String, String> producer;
    private String defaultTopic = "taxware";
    private final String testTopic = "taxware-test";

    public KafkaLogSender(List<String> bootstrapServers, int queueSize) {
        this(bootstrapServers, queueSize, 1);
    }

    public KafkaLogSender(List<String> bootstrapServers, int queueSize, int threadSize) {
        Properties prop = new Properties();
        prop.put("bootstrap.servers", bootstrapServers);
        this.init(prop, queueSize, threadSize);
    }

    public KafkaLogSender(Properties props, int queueSize) {
        this.init(props, queueSize, 1);
    }

    private void init(Properties props, int queueSize, int threadSize) {
        Properties option = new Properties();
        props.put("retries", (Object)1);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("compression.type", "gzip");
        option.putAll((Map<?, ?>)props);
        this.producer = new KafkaProducer(option);
        this.queue = new LinkedBlockingDeque<LogEvent>(queueSize);
        for (int i = 0; i < threadSize; ++i) {
            this.addThread("KafkaLogSender-" + seqNo.addAndGet(1));
        }
    }

    public void send(LogEvent event) {
        this.queue.offer(event);
    }

    public Future<RecordMetadata> doSend(LogEvent event) {
        String targetTopic = "pro".equalsIgnoreCase(ApplicationUtil.getEnv()) ? this.defaultTopic : "taxware-test";
        return this.producer.send(new ProducerRecord(targetTopic, (Object)event.toJson()));
    }

    public void setDefaultTopic(String defaultTopic) {
        this.defaultTopic = defaultTopic;
    }

    private void addThread(String threadName) {
        Thread thread = new Thread((Runnable)TtlRunnable.get(() -> {
            while (true) {
                try {
                    while (true) {
                        LogEvent log = this.queue.take();
                        this.doSend(log);
                    }
                }
                catch (Exception e) {
                    logger.error("\u53d1\u9001ELK\u65e5\u5fd7\u5f02\u5e38", (Throwable)e);
                    continue;
                }
                break;
            }
        }));
        thread.setName(threadName);
        thread.start();
    }
}

