package com.xforceplus.taxware.architecture.g1.rocketmq.client;

import com.alibaba.fastjson.JSON;
import com.xforceplus.taxware.architecture.g1.core.mq.BaseMessageDTO;
import com.xforceplus.taxware.architecture.g1.domain.log.LogContext;
import com.xforceplus.taxware.architecture.g1.domain.log.model.LogSender;
import com.xforceplus.taxware.architecture.g1.domain.log.model.impl.MqLogEvent;
import com.xforceplus.taxware.architecture.g1.rocketmq.client.exception.RmqConsumeException;
import com.xforceplus.taxware.architecture.g1.rocketmq.client.model.RmqMessageExt;
import com.xforceplus.taxware.architecture.g1.rocketmq.client.util.RmqUtils;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xforceplus/taxware/architecture/g1/rocketmq/client/RmqConsumer.class */
public class RmqConsumer {
    private static final Logger log = LoggerFactory.getLogger(RmqConsumer.class);
    private static final int MAX_RECONSUME_TIMES = 4;
    private static final int CONSUME_THREAD_MIN = 4;
    private static final int CONSUME_THREAD_MAX = 8;
    private final DefaultMQPushConsumer defaultMQPushConsumer;
    private LogSender logSender;
    private String indexName;

    public RmqConsumer(String str, String str2, String str3) {
        this.defaultMQPushConsumer = new DefaultMQPushConsumer(str, new AclClientRPCHook(new SessionCredentials(str2, str3)), new AllocateMessageQueueAveragely());
        this.defaultMQPushConsumer.setMaxReconsumeTimes(4);
        this.defaultMQPushConsumer.setConsumeThreadMin(4);
        this.defaultMQPushConsumer.setConsumeThreadMax(8);
        this.defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
        this.defaultMQPushConsumer.setConsumeMessageBatchMaxSize(1);
    }

    public void start() {
        try {
            this.defaultMQPushConsumer.start();
        } catch (Exception e) {
            throw new RmqConsumeException(String.format("消费者启动异常，%s", Optional.ofNullable(e.getMessage()).orElse(e.getClass().getName())), e);
        }
    }

    public void setNameSrvAddr(String str) {
        this.defaultMQPushConsumer.setNamesrvAddr(str);
    }

    public void subscribe(String str, String str2) {
        try {
            this.defaultMQPushConsumer.subscribe(str, str2);
        } catch (Exception e) {
            throw new RmqConsumeException(String.format("订阅异常，%s", Optional.ofNullable(e.getMessage()).orElse(e.getClass().getName())), e);
        }
    }

    public void registerMessageListener(RmqMessageListenerConcurrently rmqMessageListenerConcurrently) {
        this.defaultMQPushConsumer.registerMessageListener((list, consumeConcurrentlyContext) -> {
            LogContext.init();
            MqLogEvent createEvent = createEvent(list, consumeConcurrentlyContext);
            try {
                try {
                    ConsumeConcurrentlyStatus consumeMessage = rmqMessageListenerConcurrently.consumeMessage((List) list.stream().map(messageExt -> {
                        BaseMessageDTO baseMessageDTO = (BaseMessageDTO) JSON.parseObject(new String(messageExt.getBody(), StandardCharsets.UTF_8), BaseMessageDTO.class);
                        RmqMessageExt rmqMessageExt = new RmqMessageExt();
                        rmqMessageExt.setKeys(messageExt.getKeys());
                        rmqMessageExt.setTopic(messageExt.getTopic());
                        rmqMessageExt.setTags(messageExt.getTags());
                        rmqMessageExt.setDelayTimeLevel(messageExt.getDelayTimeLevel());
                        rmqMessageExt.setBody(baseMessageDTO.getBody());
                        rmqMessageExt.setReconsumeTimes(messageExt.getReconsumeTimes());
                        rmqMessageExt.putAllProperties(baseMessageDTO.getProperties());
                        return rmqMessageExt;
                    }).collect(Collectors.toList()), consumeConcurrentlyContext);
                    createEvent.setConsumeStatus(consumeMessage.name());
                    createEvent.getExt().putAll(LogContext.getAllPoint());
                    if (this.logSender != null) {
                        this.logSender.send(createEvent);
                    }
                    LogContext.clear();
                    return consumeMessage;
                } catch (Throwable th) {
                    log.error("RmqConsumer接收消息时异常", th);
                    createEvent.setThrowable(th);
                    createEvent.setConsumeStatus(ConsumeReturnType.EXCEPTION.name());
                    throw th;
                }
            } catch (Throwable th2) {
                createEvent.getExt().putAll(LogContext.getAllPoint());
                if (this.logSender != null) {
                    this.logSender.send(createEvent);
                }
                LogContext.clear();
                throw th2;
            }
        });
    }

    public void setConsumeThreadMin(int i) {
        this.defaultMQPushConsumer.setConsumeThreadMin(i);
    }

    public void setConsumeThreadMax(int i) {
        this.defaultMQPushConsumer.setConsumeThreadMax(i);
    }

    public void setConsumeMessageBatchMaxSize(int i) {
        this.defaultMQPushConsumer.setConsumeMessageBatchMaxSize(i);
    }

    public void setMaxReconsumeTimes(int i) {
        this.defaultMQPushConsumer.setMaxReconsumeTimes(i);
    }

    public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
        this.defaultMQPushConsumer.setConsumeFromWhere(consumeFromWhere);
    }

    public void setLogSender(LogSender logSender) {
        this.logSender = logSender;
    }

    public void setIndexName(String str) {
        this.indexName = str;
    }

    public void shutdown() {
        this.defaultMQPushConsumer.shutdown();
    }

    private MqLogEvent createEvent(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        MqLogEvent mqLogEvent = new MqLogEvent();
        try {
            MessageExt messageExt = list.get(0);
            mqLogEvent.setMessageContent(new String(messageExt.getBody(), StandardCharsets.UTF_8));
            mqLogEvent.setMessageContentSize(messageExt.getBody().length);
            mqLogEvent.setMessageProperties(JSON.toJSONString(messageExt.getProperties()));
            mqLogEvent.setReconsumeTimes(messageExt.getReconsumeTimes());
            if (messageExt.getDelayTimeLevel() > 0) {
                mqLogEvent.setDelayLevel(String.format("%s[%d]", RmqUtils.getDelayLevelName(messageExt.getDelayTimeLevel()), Integer.valueOf(messageExt.getDelayTimeLevel())));
            }
            mqLogEvent.setTags(messageExt.getTags());
            mqLogEvent.setKeys(messageExt.getKeys());
            mqLogEvent.setService(this.indexName);
            mqLogEvent.setName(messageExt.getTopic());
            mqLogEvent.setTraceId(LogContext.getTraceId());
            mqLogEvent.setType("RmqConsumer");
            mqLogEvent.setThreadName(Thread.currentThread().getName());
            return mqLogEvent;
        } catch (Exception e) {
            log.warn("RmqConsumer创建埋点事件对象异常", e);
            return mqLogEvent;
        }
    }
}
