/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.taxware.architecture.g1.rocketmq.client;

import com.alibaba.fastjson.JSON;
import com.xforceplus.taxware.architecture.g1.core.mq.BaseMessageDTO;
import com.xforceplus.taxware.architecture.g1.core.utils.ExceptionUtil;
import com.xforceplus.taxware.architecture.g1.domain.log.LogContext;
import com.xforceplus.taxware.architecture.g1.domain.log.model.LogEvent;
import com.xforceplus.taxware.architecture.g1.domain.log.model.LogSender;
import com.xforceplus.taxware.architecture.g1.domain.log.model.impl.MqLogEvent;
import com.xforceplus.taxware.architecture.g1.rocketmq.client.RmqMessageListenerConcurrently;
import com.xforceplus.taxware.architecture.g1.rocketmq.client.exception.RmqConsumeException;
import com.xforceplus.taxware.architecture.g1.rocketmq.client.model.RmqMessageExt;
import com.xforceplus.taxware.architecture.g1.rocketmq.client.util.RmqUtils;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RmqConsumer {
    private static final Logger log = LoggerFactory.getLogger(RmqConsumer.class);
    private static final int MAX_RECONSUME_TIMES = 4;
    private static final int CONSUME_THREAD_MIN = 4;
    private static final int CONSUME_THREAD_MAX = 8;
    private final DefaultMQPushConsumer defaultMQPushConsumer;
    private LogSender logSender;
    private String indexName;

    public RmqConsumer(String consumerGroup, String accessKey, String secretKey) {
        AclClientRPCHook aclClientRPCHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
        this.defaultMQPushConsumer = new DefaultMQPushConsumer(consumerGroup, (RPCHook)aclClientRPCHook, (AllocateMessageQueueStrategy)new AllocateMessageQueueAveragely());
        this.defaultMQPushConsumer.setMaxReconsumeTimes(4);
        this.defaultMQPushConsumer.setConsumeThreadMin(4);
        this.defaultMQPushConsumer.setConsumeThreadMax(8);
        this.defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
    }

    public void start() {
        try {
            this.defaultMQPushConsumer.start();
        }
        catch (Exception e) {
            throw new RmqConsumeException(String.format("\u6d88\u8d39\u8005\u542f\u52a8\u5f02\u5e38\uff0c%s", Optional.ofNullable(e.getMessage()).orElse(e.getClass().getName())), e);
        }
    }

    public void setNameSrvAddr(String nameSrvAddr) {
        this.defaultMQPushConsumer.setNamesrvAddr(nameSrvAddr);
    }

    public void subscribe(String topic, String subExpression) {
        try {
            this.defaultMQPushConsumer.subscribe(topic, subExpression);
        }
        catch (Exception e) {
            throw new RmqConsumeException(String.format("\u8ba2\u9605\u5f02\u5e38\uff0c%s", Optional.ofNullable(e.getMessage()).orElse(e.getClass().getName())), e);
        }
    }

    public void registerMessageListener(RmqMessageListenerConcurrently messageListener) {
        String consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
        this.defaultMQPushConsumer.setPullInterval(messageListener.getPullInterval());
        this.defaultMQPushConsumer.registerMessageListener((msgs, context) -> {
            LogContext.init();
            MqLogEvent event = this.createEvent(msgs, context, consumerGroup);
            try {
                List<RmqMessageExt> messageList = msgs.stream().map(msg -> {
                    String bodyData = new String(msg.getBody(), StandardCharsets.UTF_8);
                    BaseMessageDTO baseMessageDTO = (BaseMessageDTO)JSON.parseObject((String)bodyData, BaseMessageDTO.class);
                    String body = baseMessageDTO.getBody();
                    BaseMessageDTO.Meta meta = baseMessageDTO.getMeta();
                    if (StringUtils.equals((CharSequence)"2.0", (CharSequence)meta.getVersion())) {
                        try {
                            body = IOUtils.toString((URL)new URL(baseMessageDTO.getBody()), (Charset)StandardCharsets.UTF_8);
                        }
                        catch (IOException e) {
                            log.error("\u6d88\u8d39RMQ\u5927\u62a5\u6587\u5f02\u5e38", (Throwable)e);
                            throw new RmqConsumeException(String.format("\u6d88\u8d39\u5f02\u5e38\uff0c\u8bfb\u53d6\u6570\u636e\u65f6\uff0c%s", Optional.ofNullable(e.getMessage()).orElse(e.getClass().getName())), e);
                        }
                    }
                    RmqMessageExt result = new RmqMessageExt();
                    result.setKeys(msg.getKeys());
                    result.setTopic(msg.getTopic());
                    result.setTags(msg.getTags());
                    result.setDelayTimeLevel(msg.getDelayTimeLevel());
                    result.setBody(body);
                    result.setReconsumeTimes(msg.getReconsumeTimes());
                    result.putAllProperties(baseMessageDTO.getProperties());
                    result.setMsgId(msg.getMsgId());
                    return result;
                }).collect(Collectors.toList());
                this.defaultMQPushConsumer.setPullInterval(messageListener.getPullInterval());
                ConsumeConcurrentlyStatus consumeStatus = messageListener.consumeMessage(messageList, context);
                event.setConsumeStatus(consumeStatus.name());
                ConsumeConcurrentlyStatus consumeConcurrentlyStatus = consumeStatus;
                return consumeConcurrentlyStatus;
            }
            catch (Throwable e) {
                log.error("RmqConsumer\u63a5\u6536\u6d88\u606f\u65f6\u5f02\u5e38\u3002{}", (Object)ExceptionUtil.toDesc((Throwable)e));
                event.setThrowable(e);
                event.setConsumeStatus(ConsumeReturnType.EXCEPTION.name());
                throw e;
            }
            finally {
                event.getExt().putAll(LogContext.getAllPoint());
                if (this.logSender != null) {
                    this.logSender.send((LogEvent)event);
                }
                LogContext.clear();
            }
        });
    }

    public void setConsumeThreadMin(int consumeThreadMin) {
        this.defaultMQPushConsumer.setConsumeThreadMin(consumeThreadMin);
    }

    public void setConsumeThreadMax(int consumeThreadMax) {
        this.defaultMQPushConsumer.setConsumeThreadMax(consumeThreadMax);
    }

    public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) {
        this.defaultMQPushConsumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
    }

    public void setPullBatchSize(int pullBatchSize) {
        this.defaultMQPushConsumer.setPullBatchSize(pullBatchSize);
    }

    public void setMaxReconsumeTimes(int maxReconsumeTimes) {
        this.defaultMQPushConsumer.setMaxReconsumeTimes(maxReconsumeTimes);
    }

    public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
        this.defaultMQPushConsumer.setConsumeFromWhere(consumeFromWhere);
    }

    public void setLogSender(LogSender logSender) {
        this.logSender = logSender;
    }

    public void setIndexName(String indexName) {
        this.indexName = indexName;
    }

    public void shutdown() {
        this.defaultMQPushConsumer.shutdown();
    }

    private MqLogEvent createEvent(List<MessageExt> messageList, ConsumeConcurrentlyContext context, String consumerGroup) {
        MqLogEvent event = new MqLogEvent();
        try {
            MessageExt message = messageList.get(0);
            event.setMessageContent(new String(message.getBody(), StandardCharsets.UTF_8));
            event.setMessageContentSize(message.getBody().length);
            event.setMessageProperties(JSON.toJSONString((Object)message.getProperties()));
            event.setReconsumeTimes(message.getReconsumeTimes());
            if (message.getDelayTimeLevel() > 0) {
                event.setDelayLevel(String.format("%s[%d]", RmqUtils.getDelayLevelName(message.getDelayTimeLevel()), message.getDelayTimeLevel()));
            }
            event.setTags(message.getTags());
            event.setKeys(message.getKeys());
            event.setService(this.indexName);
            event.setName(message.getTopic());
            event.setTraceId(LogContext.getTraceId());
            event.setType("RmqConsumer");
            event.setThreadName(Thread.currentThread().getName());
            event.setConsumerGroup(consumerGroup);
            return event;
        }
        catch (Exception e) {
            log.warn("RmqConsumer\u521b\u5efa\u57cb\u70b9\u4e8b\u4ef6\u5bf9\u8c61\u5f02\u5e38", (Throwable)e);
            return event;
        }
    }
}

