package com.xforceplus.ultraman.flows.automaticflow.listener;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Maps;
import com.google.common.eventbus.Subscribe;
import com.xforceplus.tech.base.core.context.ContextService;
import com.xforceplus.ultraman.action.constant.EventType;
import com.xforceplus.ultraman.flows.common.config.UltramanFlowSetting;
import com.xforceplus.ultraman.flows.common.config.setting.FlowBus;
import com.xforceplus.ultraman.flows.common.config.setting.IntegrationFlow;
import com.xforceplus.ultraman.flows.common.constant.AppEnv;
import com.xforceplus.ultraman.flows.common.constant.NodeType;
import com.xforceplus.ultraman.flows.common.core.EventBusCenter;
import com.xforceplus.ultraman.flows.common.core.FlowBusCompletedEvent;
import com.xforceplus.ultraman.flows.common.core.event.JanusMessageEvent;
import com.xforceplus.ultraman.flows.common.pojo.flow.AbstractFlow;
import com.xforceplus.ultraman.flows.common.pojo.flow.node.basic.AbstractNode;
import com.xforceplus.ultraman.flows.common.pojo.flow.node.basic.AppEventTriggerNode;
import com.xforceplus.ultraman.flows.common.pojo.flow.node.business.InnerMessage;
import com.xforceplus.ultraman.flows.common.sqs.SqsHelper;
import com.xforceplus.ultraman.flows.common.sqs.spring.SqsListenerRegistry;
import com.xforceplus.ultraman.flows.common.sqs.spring.SqsMessageListenerFactory;
import com.xforceplus.ultraman.flows.common.sqs.spring.core.DefaultSqsListenerRegistry;
import com.xforceplus.ultraman.flows.common.sqs.spring.core.SqsQueueAttributes;
import com.xforceplus.ultraman.flows.common.sqs.spring.core.SqsTemplate;
import com.xforceplus.ultraman.flows.common.utils.ContextUtil;
import com.xforceplus.ultraman.flows.common.utils.JsonUtils;
import com.xforceplus.ultraman.oqsengine.sdk.autoconfigurer.SdkConfiguration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;

/* loaded from: input_file:com/xforceplus/ultraman/flows/automaticflow/listener/SqsMessageFlowListenerRegister.class */
public class SqsMessageFlowListenerRegister implements InitializingBean {
    private static final Logger logger = LoggerFactory.getLogger(SqsMessageFlowListenerRegister.class);

    @Autowired
    private SqsMessageListenerFactory sqsMessageListenerFactory;

    @Autowired
    private UltramanFlowSetting ultramanFlowSetting;

    @Autowired
    private SqsListenerRegistry sqsListenerRegistry;

    @Autowired
    private ContextService contextService;

    @Autowired
    private SdkConfiguration sdkConfiguration;

    @Autowired
    private SqsTemplate sqsTemplate;

    public SqsMessageFlowListenerRegister(SqsMessageListenerFactory sqsMessageListenerFactory, UltramanFlowSetting ultramanFlowSetting, SqsListenerRegistry sqsListenerRegistry) {
        this.sqsListenerRegistry = sqsListenerRegistry;
        this.sqsMessageListenerFactory = sqsMessageListenerFactory;
        this.ultramanFlowSetting = ultramanFlowSetting;
    }

    @Subscribe
    public void init(FlowBusCompletedEvent flowBusCompletedEvent) {
        if (FlowBus.getAppEventTriggerFlows(EventType.SQS.name()).isEmpty()) {
            return;
        }
        logger.info("Received flow deploy event {}", flowBusCompletedEvent.getMsg());
        DefaultSqsListenerRegistry defaultSqsListenerRegistry = this.sqsListenerRegistry;
        defaultSqsListenerRegistry.stopDynamicListeners();
        getTriggerNodesForEvent(EventType.SQS.name()).forEach(appEventTriggerNode -> {
            this.sqsListenerRegistry.registerDynamicListener(appEventTriggerNode.getNodeId(), this.sqsMessageListenerFactory.createListener(SqsQueueAttributes.builder().concurrency(appEventTriggerNode.getSqsConfig().getMessageConsumeConcurrency()).loopConcurrency(appEventTriggerNode.getSqsConfig().getMessageRetrieveConcurrency()).longPolling(true).handler(message -> {
                logger.info("Received sqs message {}", message);
                boolean z = false;
                try {
                    try {
                        Map map = (Map) message.messageAttributes().entrySet().stream().collect(Collectors.toMap((v0) -> {
                            return v0.getKey();
                        }, entry -> {
                            return ((MessageAttributeValue) entry.getValue()).stringValue();
                        }));
                        Map map2 = (Map) message.attributesAsStrings().entrySet().stream().collect(Collectors.toMap((v0) -> {
                            return v0.getKey();
                        }, (v0) -> {
                            return v0.getValue();
                        }));
                        ContextUtil.fillTenantInfoByMap(this.contextService, map);
                        Maps.newHashMap();
                        try {
                            JanusMessageEvent janusMessageEvent = new JanusMessageEvent(new InnerMessage((Map) JsonUtils.json2Object(message.body(), new TypeReference<Map<String, Object>>() { // from class: com.xforceplus.ultraman.flows.automaticflow.listener.SqsMessageFlowListenerRegister.1
                            }), map, map2));
                            janusMessageEvent.setEventType(EventType.SQS);
                            janusMessageEvent.setRequestName(appEventTriggerNode.getEventCode());
                            EventBusCenter.getInstance().post(janusMessageEvent);
                            z = true;
                            ContextUtil.clearTenantInfo(this.contextService);
                        } catch (Throwable th) {
                            logger.error("message body is not json format {}", message.body());
                            ContextUtil.clearTenantInfo(this.contextService);
                            return true;
                        }
                    } catch (Throwable th2) {
                        ContextUtil.clearTenantInfo(this.contextService);
                        throw th2;
                    }
                } catch (Throwable th3) {
                    logger.error("Failed to handle sqs message {}", message, th3);
                    ContextUtil.clearTenantInfo(this.contextService);
                }
                return z;
            }).autoAcknowledge(true).maxBatchSize(appEventTriggerNode.getSqsConfig().getMaxBatchSize()).visibilityTimeoutSeconds(appEventTriggerNode.getSqsConfig().getVisibilityTimeoutSeconds()).url(SqsHelper.getQueueUrl(this.ultramanFlowSetting.getQueue().getSqs().getRegion(), this.ultramanFlowSetting.getQueue().getSqs().getAccountId(), AppEnv.fromValue(this.sdkConfiguration.getAuth().getEnv()).desc(), appEventTriggerNode.getEventCode())).build()));
        });
        if (defaultSqsListenerRegistry.isRunning()) {
            defaultSqsListenerRegistry.startDynamicListeners();
        } else {
            defaultSqsListenerRegistry.start();
        }
    }

    private List<AppEventTriggerNode> getTriggerNodesForEvent(String str) {
        return (List) FlowBus.getAppEventTriggerFlows(str).stream().map(this::getTriggerNodeFromFlow).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toList());
    }

    private AppEventTriggerNode getTriggerNodeFromFlow(AbstractFlow abstractFlow) {
        if (abstractFlow instanceof IntegrationFlow) {
            return (AppEventTriggerNode) ((IntegrationFlow) abstractFlow).getNodes().stream().filter(this::isAppEventTriggerStartNode).map(abstractNode -> {
                return (AppEventTriggerNode) abstractNode;
            }).findFirst().orElse(null);
        }
        return null;
    }

    private boolean isAppEventTriggerStartNode(AbstractNode abstractNode) {
        return abstractNode.isStartNode() && NodeType.APP_EVENT_TRIGGER.equals(abstractNode.getNodeType());
    }

    public void afterPropertiesSet() throws Exception {
        EventBusCenter.getInstance().register(this);
    }

    private void checkQueue(String str) {
        try {
            this.sqsTemplate.createQueue(str);
        } catch (Exception e) {
            logger.error("create queue error", e);
        }
    }
}
