package com.xforceplus.ultraman.flows.common.messagebus;

import cn.hutool.json.JSONUtil;
import com.xforceplus.janus.message.sdk.MBClient;
import com.xforceplus.janus.message.sdk.request.AckRequest;
import com.xforceplus.janus.message.sdk.response.SubResponse;
import com.xforceplus.ultraman.flows.common.config.setting.FlowBus;
import com.xforceplus.ultraman.flows.common.constant.Constant;
import com.xforceplus.ultraman.flows.common.exception.FlowQueueFullException;
import com.xforceplus.ultraman.flows.common.exception.MessageBusPubCodeNotMatchException;
import com.xforceplus.ultraman.sdk.infra.base.thread.ExecutorHelper;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;

/* loaded from: input_file:com/xforceplus/ultraman/flows/common/messagebus/MessageBusListener.class */
public class MessageBusListener {
    private static final Logger log = LoggerFactory.getLogger(MessageBusListener.class);
    private final MBClient mbClient;
    private final ScheduledExecutorService worker = new ScheduledThreadPoolExecutor(1, ExecutorHelper.buildNameThreadFactory("flow-message-bus"));
    private final ApplicationEventPublisher publisher;

    /* loaded from: input_file:com/xforceplus/ultraman/flows/common/messagebus/MessageBusListener$Consumer.class */
    class Consumer implements Runnable {
        Consumer() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (FlowBus.isLoadFinished()) {
                    SubResponse sub = MessageBusListener.this.mbClient.sub();
                    ArrayList arrayList = new ArrayList();
                    if (!sub.getSuccess().booleanValue()) {
                        MessageBusListener.log.debug(JSONUtil.toJsonStr(sub));
                        return;
                    }
                    List responseMessages = sub.getResponseMessages();
                    if (CollectionUtils.isEmpty(responseMessages)) {
                        return;
                    }
                    responseMessages.forEach(responseMessage -> {
                        if (StringUtils.isEmpty(responseMessage.getContent())) {
                            return;
                        }
                        MessageBusListener.log.debug("MessageBus消息处理");
                        MessageBusListener.log.debug(JSONUtil.toJsonStr(responseMessage));
                        try {
                            MessageBusListener.this.publisher.publishEvent(responseMessage);
                        } catch (FlowQueueFullException e) {
                            MessageBusListener.log.error("FlowQueueFullException", e);
                            return;
                        } catch (MessageBusPubCodeNotMatchException e2) {
                            return;
                        } catch (Throwable th) {
                            MessageBusListener.this.handleException(th);
                        }
                        arrayList.add(responseMessage.getReceiptHandle());
                    });
                    if (CollectionUtils.isNotEmpty(arrayList)) {
                        MessageBusListener.log.debug(JSONUtil.toJsonStr(MessageBusListener.this.mbClient.ack(new AckRequest(arrayList))));
                    }
                }
            } catch (Exception e) {
                MessageBusListener.log.error(Constant.INIT_STATE_CODE, e);
            }
        }
    }

    public MessageBusListener(String str, String str2, ApplicationEventPublisher applicationEventPublisher) {
        this.mbClient = MBClient.getInstance(str, str2);
        this.publisher = applicationEventPublisher;
        this.worker.scheduleWithFixedDelay(new Consumer(), 10L, 3L, TimeUnit.SECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleException(Throwable th) {
        log.error("Handle  message bus message failed!", th);
    }
}
