package com.xforceplus.apollo.janus.standalone.sdk.message.messageBus;

import com.alibaba.fastjson.JSONObject;
import com.xforceplus.apollo.janus.standalone.sdk.config.LocalCLusterProperties;
import com.xforceplus.apollo.janus.standalone.sdk.constants.MessageSourceConstants;
import com.xforceplus.apollo.janus.standalone.sdk.constants.PropertiesConstants;
import com.xforceplus.apollo.janus.standalone.sdk.message.AbsGlobalMessageEventWithResultListener;
import com.xforceplus.apollo.janus.standalone.sdk.message.IGlobalMessageEventListener;
import com.xforceplus.apollo.janus.standalone.sdk.message.MessageEventInitListener;
import com.xforceplus.apollo.janus.standalone.sdk.message.messageBus.dto.MessageAck;
import com.xforceplus.apollo.janus.standalone.sdk.message.messageBus.response.SubResponse;
import com.xforceplus.apollo.janus.standalone.sdk.message.messageBus.utils.ZipUtil;
import com.xforceplus.apollo.utils.ErrorUtil;
import com.xforceplus.apollo.utils.JacksonUtil;
import com.xforceplus.janus.config.core.config.HttpConfig;
import com.xforceplus.janus.config.core.monitor.JanusUploader;
import com.xforceplus.janus.framework.event.AckTuple;
import java.util.HashMap;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.env.Environment;

/* loaded from: input_file:com/xforceplus/apollo/janus/standalone/sdk/message/messageBus/MessageBusInit.class */
public class MessageBusInit implements InitializingBean, DisposableBean {
    private MBClient mbClient;
    private Environment environment;
    private MessageEventInitListener messageEventInitListener;
    private JanusUploader janusUploader;
    private volatile boolean canRun = true;
    private static final Logger log = LoggerFactory.getLogger(MessageBusInit.class);
    public static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(2, 8, 60, TimeUnit.SECONDS, new ArrayBlockingQueue(16), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

    public MessageBusInit(Environment environment, MessageEventInitListener messageEventInitListener) {
        try {
            this.messageEventInitListener = messageEventInitListener;
            this.environment = environment;
            String property = environment.getProperty("message-bus.url");
            String property2 = environment.getProperty("message-bus.token");
            log.info("message-bus-url: {} ,token: {} ", property, property2);
            if (StringUtils.isBlank(property) || StringUtils.isBlank(property2)) {
                throw new IllegalArgumentException("message-bus.url ,message-bus.token 为空 ");
            }
            this.mbClient = MBClient.getInstance(property, property2);
            MessageReceiptHandler.init(this.mbClient);
            log.info("MessageBusInit created");
        } catch (Exception e) {
            log.error("MessageBusInit error :{} ", ErrorUtil.getStackMsg(e));
            this.mbClient = null;
        }
    }

    public boolean pullMessage() {
        try {
            SubResponse sub = this.mbClient.sub();
            log.debug("拉取的消息内容:[{}]", JSONObject.toJSONString(sub));
            List<ResponseMessage> responseMessages = sub.getResponseMessages();
            log.info("拉取的消息个数:[{}]", Integer.valueOf(responseMessages == null ? 0 : responseMessages.size()));
            if (CollectionUtils.isEmpty(responseMessages)) {
                return false;
            }
            LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(responseMessages);
            CountDownLatch countDownLatch = new CountDownLatch(LocalCLusterProperties.getInstance().getListenerHandleThreadNum());
            for (int i = 0; i < LocalCLusterProperties.getInstance().getListenerHandleThreadNum(); i++) {
                EXECUTOR.execute(() -> {
                    try {
                        call(linkedBlockingQueue);
                    } finally {
                        countDownLatch.countDown();
                    }
                });
            }
            try {
                countDownLatch.await();
                return true;
            } catch (Exception e) {
                return true;
            }
        } catch (Error e2) {
            log.error(ErrorUtil.getStackMsg(e2));
            return false;
        } catch (Exception e3) {
            log.error(ErrorUtil.getStackMsg(e3));
            return false;
        }
    }

    public void call(Queue<ResponseMessage> queue) {
        AckTuple<Boolean, String> ackTuple;
        String str;
        while (true) {
            ResponseMessage poll = queue.poll();
            if (poll == null) {
                return;
            }
            String pubAppKey = poll.getPubAppKey();
            String pubCode = poll.getPubCode();
            log.debug("pubAppKey:[{}],pubCode:[{}],tenantId:[{}]", new Object[]{pubAppKey, pubCode, poll.getProperties().get("tenantId")});
            String str2 = poll.getId() + "";
            try {
                str = poll.getProperties().get("janus_userId");
            } catch (Error e) {
                log.error("msgId:{}处理异常,{}", str2, ErrorUtil.getStackMsg(e));
                ackTuple = new AckTuple<>(false, ErrorUtil.getStackMsg(e));
            } catch (Exception e2) {
                log.error("msgId:{}处理异常,{}", str2, ErrorUtil.getStackMsg(e2));
                ackTuple = new AckTuple<>(false, ErrorUtil.getStackMsg(e2));
            }
            if (RequestMessage.COMPRESS_FLAG_TRUE.equals(poll.getProperties().get("superBigDataCompressFlag"))) {
                if (!Boolean.TRUE.toString().equalsIgnoreCase((String) HttpConfig.getConfig("enableSuperBigData"))) {
                    log.warn("msgid:{},requestName:{} 超大报文处理开关未开启", poll.getId(), poll.getPubCode());
                } else if (!dealSuperBigData(poll)) {
                    MessageReceiptHandler.sendACK(poll.getReceiptHandle());
                }
            }
            if (RequestMessage.COMPRESS_FLAG_TRUE.equals(poll.getProperties().get(RequestMessage.COMPRESS_FLAG))) {
                poll.setContent(ZipUtil.ungzip(poll.getContent()));
                poll.getProperties().remove(RequestMessage.COMPRESS_FLAG);
            }
            Set<IGlobalMessageEventListener> findHandler = MessageEventInitListener.findHandler(pubCode, MessageSourceConstants.MESSAGE_BUS, str);
            if (findHandler == null || findHandler.size() == 0) {
                ackTuple = new AckTuple<>(true, "消息总线下发的消息无消息处理器,pubCode:" + pubCode);
            } else if (findHandler.size() == 1) {
                ackTuple = handlerOne(poll, findHandler.iterator().next());
            } else {
                boolean z = true;
                HashMap hashMap = new HashMap();
                for (IGlobalMessageEventListener iGlobalMessageEventListener : findHandler) {
                    AckTuple<Boolean, String> handlerOne = handlerOne(poll, iGlobalMessageEventListener);
                    String name = iGlobalMessageEventListener.getClass().getName();
                    hashMap.put(name.substring(name.lastIndexOf(".") + 1, name.length()), handlerOne);
                    if (!((Boolean) handlerOne.status).booleanValue()) {
                        z = false;
                    }
                }
                log.info(JacksonUtil.getInstance().toJson(hashMap));
                ackTuple = new AckTuple<>(Boolean.valueOf(z), JacksonUtil.getInstance().toJson(hashMap));
            }
            if (null == ackTuple) {
                log.error("{} 缺失有效回执，请补充回执内容！！！", str2);
                ackTuple = new AckTuple<>(false, "缺失有效回执，请补充回执内容");
            }
            MessageAck messageAck = (MessageAck) JacksonUtil.getInstance().fromJson(poll.getReceiptHandle(), MessageAck.class);
            messageAck.setStatus(((Boolean) ackTuple.status).booleanValue());
            messageAck.setMessage(StringUtils.isBlank((CharSequence) ackTuple.getMessage()) ? null : StringUtils.substring((String) ackTuple.getMessage(), 0, 20));
            MessageReceiptHandler.sendACK(JacksonUtil.getInstance().toJson(messageAck));
        }
    }

    private boolean dealSuperBigData(ResponseMessage responseMessage) {
        try {
            String readFromObjectService = this.janusUploader.readFromObjectService(responseMessage.getContent());
            if (!StringUtils.isNotBlank(readFromObjectService)) {
                return false;
            }
            responseMessage.setContent(readFromObjectService);
            responseMessage.getProperties().remove("superBigDataCompressFlag");
            return true;
        } catch (Exception e) {
            log.warn("msgId:{}获取超大报文失败:{},{}", new Object[]{responseMessage.getId(), e.getMessage(), ErrorUtil.getStackMsg(e)});
            return false;
        }
    }

    private AckTuple<Boolean, String> handlerOne(ResponseMessage responseMessage, IGlobalMessageEventListener iGlobalMessageEventListener) {
        AckTuple<Boolean, String> ackTuple;
        String pubAppKey = responseMessage.getPubAppKey();
        String pubCode = responseMessage.getPubCode();
        String str = responseMessage.getId() + "";
        try {
            HashMap hashMap = new HashMap();
            hashMap.putAll(responseMessage.getProperties());
            hashMap.put("pubAppKey", pubAppKey);
            hashMap.put("pubCode", pubCode);
            hashMap.put(PropertiesConstants.eventType, pubCode);
            hashMap.put("thirdPartyId", responseMessage.getThirdPartyId());
            hashMap.put("consumeTimes", responseMessage.getConsumeTimes());
            hashMap.put("receivedTime", Long.valueOf(responseMessage.getReceivedTime()));
            hashMap.put("receiptHandle", responseMessage.getReceiptHandle());
            if (hashMap.get("msgId") == null) {
                hashMap.put("msgId", responseMessage.getId());
            }
            if (hashMap.get("createTime") == null) {
                hashMap.put("createTime", Long.valueOf(responseMessage.getReceivedTime()));
            }
            if (hashMap.get("userId") == null) {
                hashMap.put("userId", pubAppKey);
            }
            if (hashMap.get("requestName") == null) {
                hashMap.put("requestName", pubCode);
            }
            if (hashMap.get("payLoadId") == null) {
                hashMap.put("payLoadId", responseMessage.getId());
            }
            if (iGlobalMessageEventListener instanceof AbsGlobalMessageEventWithResultListener) {
                com.xforceplus.apollo.janus.standalone.sdk.message.AckTuple onMessageWithResult = ((AbsGlobalMessageEventWithResultListener) iGlobalMessageEventListener).onMessageWithResult(str, hashMap, responseMessage.getContent(), MessageSourceConstants.MESSAGE_BUS);
                ackTuple = new AckTuple<>(onMessageWithResult.getStatus(), onMessageWithResult.getMessage());
            } else {
                boolean onMessage = iGlobalMessageEventListener.onMessage(str, hashMap, responseMessage.getContent(), MessageSourceConstants.MESSAGE_BUS);
                ackTuple = new AckTuple<>(Boolean.valueOf(onMessage), onMessage ? "处理成功" : "处理失败");
            }
        } catch (Error e) {
            log.error("msgId:{}处理异常,{}", str, ErrorUtil.getStackMsg(e));
            ackTuple = new AckTuple<>(false, ErrorUtil.getStackMsg(e));
        } catch (Exception e2) {
            log.error("msgId:{}处理异常,{}", str, ErrorUtil.getStackMsg(e2));
            ackTuple = new AckTuple<>(false, ErrorUtil.getStackMsg(e2));
        }
        return ackTuple;
    }

    public void afterPropertiesSet() {
        new Thread(() -> {
            while (this.canRun && this.mbClient != null) {
                try {
                    if (MessageEventInitListener.EVENT_CLASS_HANDLERS.size() == 0 || !pullMessage()) {
                        try {
                            TimeUnit.MILLISECONDS.sleep(LocalCLusterProperties.getInstance().getFetchMessageInterval());
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                } catch (Exception e2) {
                    log.info(" {}", ErrorUtil.getStackMsg(e2));
                }
            }
        }, "PullMessageFromMessageBus").start();
    }

    public void destroy() throws Exception {
        this.canRun = false;
    }

    public void setJanusUploader(JanusUploader janusUploader) {
        this.janusUploader = janusUploader;
    }
}
