package com.xforceplus.ultraman.flows.automaticflow.executor.flow;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.xforceplus.tech.base.core.context.ContextKeys;
import com.xforceplus.tech.base.core.context.ContextService;
import com.xforceplus.ultraman.extensions.business.EntityId;
import com.xforceplus.ultraman.extensions.business.EntityInstance;
import com.xforceplus.ultraman.extensions.business.service.BusinessFacade;
import com.xforceplus.ultraman.flows.automaticflow.api.IFlowLogService;
import com.xforceplus.ultraman.flows.automaticflow.executor.FlowExecutor;
import com.xforceplus.ultraman.flows.automaticflow.executor.FlowTransactionManager;
import com.xforceplus.ultraman.flows.automaticflow.executor.NodeExecutorSelector;
import com.xforceplus.ultraman.flows.automaticflow.executor.impl.QueryDataNodeExecutor;
import com.xforceplus.ultraman.flows.automaticflow.executor.thread.NamedThreadFactory;
import com.xforceplus.ultraman.flows.automaticflow.executor.thread.ThreadPoolManager;
import com.xforceplus.ultraman.flows.common.config.UltramanFlowSetting;
import com.xforceplus.ultraman.flows.common.config.setting.FlowBus;
import com.xforceplus.ultraman.flows.common.config.setting.IntegrationFlow;
import com.xforceplus.ultraman.flows.common.constant.Constant;
import com.xforceplus.ultraman.flows.common.constant.NodeType;
import com.xforceplus.ultraman.flows.common.core.FlowContextHolder;
import com.xforceplus.ultraman.flows.common.core.FlowContextSnapshot;
import com.xforceplus.ultraman.flows.common.core.IKeyLocker;
import com.xforceplus.ultraman.flows.common.exception.FlowExecuteException;
import com.xforceplus.ultraman.flows.common.exception.FlowParseException;
import com.xforceplus.ultraman.flows.common.exception.FlowQueueFullException;
import com.xforceplus.ultraman.flows.common.history.FlowInstanceLog;
import com.xforceplus.ultraman.flows.common.history.FlowLogRepository;
import com.xforceplus.ultraman.flows.common.history.FlowReplayLog;
import com.xforceplus.ultraman.flows.common.history.FlowStatus;
import com.xforceplus.ultraman.flows.common.pojo.flow.FlowQueueMessage;
import com.xforceplus.ultraman.flows.common.pojo.flow.FlowType;
import com.xforceplus.ultraman.flows.common.pojo.flow.node.FlowDebugRequest;
import com.xforceplus.ultraman.flows.common.pojo.flow.node.FlowDebugResponse;
import com.xforceplus.ultraman.flows.common.pojo.flow.node.basic.AbstractNode;
import com.xforceplus.ultraman.flows.common.utils.ContextUtil;
import com.xforceplus.ultraman.flows.common.utils.FlowUtils;
import com.xforceplus.ultraman.flows.common.utils.JsonUtils;
import io.vavr.Function5;
import io.vavr.Function6;
import java.io.InputStream;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:com/xforceplus/ultraman/flows/automaticflow/executor/flow/FlowExecutorDefaultImpl.class */
public class FlowExecutorDefaultImpl implements FlowExecutor, InitializingBean {
    private static final Integer FLOW_QUEUE_CAPACITY = 10000;
    private static final int TRANSACTION_TIME_OUT = 300000;

    @Autowired
    private NodeExecutorSelector nodeExecutorSelector;

    @Autowired
    private FlowContextHolder flowContextHolder;

    @Autowired
    private FlowTransactionManager flowTransactionManager;

    @Autowired
    private IKeyLocker keyLocker;

    @Autowired(required = false)
    private FlowLogRepository flowLogRepository;

    @Autowired
    private BusinessFacade businessFacade;

    @Autowired
    private ContextService contextService;

    @Autowired
    private IFlowLogService flowLogService;

    @Autowired
    private UltramanFlowSetting ultramanFlowSetting;
    private Logger logger = LoggerFactory.getLogger(getClass());
    private Function6<String, Object, Consumer, Map<String, Object>, Boolean, AtomicReference<String>, Object> innerExecute = (str, obj, consumer, map, bool, atomicReference) -> {
        FlowExecuteException flowExecuteException;
        IntegrationFlow integrationFlow = (IntegrationFlow) Optional.ofNullable(FlowBus.getFlow(str, FlowType.INTEGRATION)).orElseThrow(() -> {
            return new FlowExecuteException(FlowUtils.buildFailedMessage(str, "can not find any flow!"));
        });
        FlowInstanceLog.FlowInstanceLogBuilder builder = FlowInstanceLog.builder();
        builder.flowCode(str);
        builder.flowName(integrationFlow.getName());
        builder.tenantId(Long.valueOf((String) Optional.ofNullable(this.contextService.get(ContextKeys.StringKeys.TENANTID_KEY)).orElse("0")));
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(true);
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        if (!integrationFlow.isAllowConcurrency() && !bool.booleanValue()) {
            try {
                boolean tryLock = this.keyLocker.tryLock(integrationFlow.getCode(), 300000L, TimeUnit.MILLISECONDS);
                atomicBoolean.set(tryLock);
                if (!tryLock) {
                    throw new FlowExecuteException(FlowUtils.buildFailedMessage(str, "wait execute lock time out！"));
                }
            } catch (Throwable th) {
                this.logger.error("get lock failed!", th);
            }
        }
        try {
            try {
                ContextUtil.fillContextFromEvent(this.contextService, map);
                this.flowContextHolder.create(str, integrationFlow.getName(), obj);
                this.flowContextHolder.get().setFlowCode(str);
                atomicReference.set(this.flowContextHolder.get().getRequestId());
                builder.instanceId(this.flowContextHolder.get().getRequestId()).parentInstanceId(this.flowContextHolder.get().getParentRequestId()).parentNodeId(this.flowContextHolder.get().getParentNodeId()).status(FlowStatus.SUCCESS.code()).deleteFlag("1");
                Optional<AbstractNode> firstNode = getFirstNode(str);
                if (firstNode.get().isStartTransaction()) {
                    this.flowTransactionManager.createNewTransaction(TRANSACTION_TIME_OUT, "");
                    if (integrationFlow.isAllowFailRetry() && integrationFlow.isAllowExecuteLog()) {
                        builder.transactionSnapshot(this.flowContextHolder.get().toSnapShot(this.contextService));
                    }
                }
                if (integrationFlow.isAllowSuccessRetry() && integrationFlow.isAllowExecuteLog()) {
                    builder.beginSnapshot(this.flowContextHolder.get().toSnapShot(this.contextService));
                }
                Optional<AbstractNode> execute = execute(str, firstNode.get());
                while (execute.isPresent()) {
                    if (execute.get().isStartTransaction() && !this.flowTransactionManager.getCurrent().isPresent()) {
                        this.flowTransactionManager.createNewTransaction(TRANSACTION_TIME_OUT, "");
                        if (integrationFlow.isAllowFailRetry() && integrationFlow.isAllowExecuteLog()) {
                            builder.transactionSnapshot(this.flowContextHolder.get().toSnapShot(this.contextService));
                        }
                    }
                    Optional<AbstractNode> execute2 = execute(str, execute.get());
                    if (execute.get().isCommitTransaction()) {
                        commitTransaction();
                    }
                    execute = execute2;
                }
                commitTransaction();
                Object flowResponse = this.flowContextHolder.get().getFlowResponse();
                if (atomicBoolean2.get()) {
                    builder.transactionSnapshot((FlowContextSnapshot) null);
                }
                String requestId = this.flowContextHolder.get().getRequestId();
                try {
                    if (!atomicBoolean2.get() && Optional.ofNullable(integrationFlow.getExceptionNodes()).isPresent() && !integrationFlow.getExceptionNodes().isEmpty()) {
                        try {
                            Optional<AbstractNode> execute3 = execute(str, (AbstractNode) integrationFlow.getExceptionNodes().stream().filter(abstractNode -> {
                                return abstractNode.getNodeType().equals(NodeType.EXCEPTION_TRIGGER);
                            }).findFirst().get());
                            while (execute3.isPresent()) {
                                execute3 = execute(str, execute3.get());
                            }
                        } catch (Throwable th2) {
                            this.logger.error("执行异常流发生异常", th2);
                        }
                    }
                    if (Optional.ofNullable(this.flowLogRepository).isPresent() && integrationFlow.isAllowExecuteLog()) {
                        builder.nodeHistory(JsonUtils.object2Json(this.flowContextHolder.get().getNodeHistory()));
                        builder.finishSnapshot(this.flowContextHolder.get().toSnapShot(this.contextService));
                        builder.extendLog(this.flowContextHolder.get().getExtendLog());
                        try {
                            this.flowLogRepository.saveInstance(builder.build());
                        } catch (RuntimeException e) {
                            this.logger.error("Save flow instance log failed!", e);
                        }
                    }
                } catch (Throwable th3) {
                    this.logger.error(String.format("Save flow with requestId : %s instance failed!", requestId), th3);
                }
                if (this.flowContextHolder.get().isOwner().booleanValue()) {
                    ContextUtil.clear(this.contextService);
                }
                this.flowContextHolder.clear();
                if (atomicBoolean.get()) {
                    this.keyLocker.unLock(str);
                }
                stopWatch.stop();
                this.logger.info("流执行完成。流代码: [{}]，请求Id: [{}], 耗时: {} ms", new Object[]{str, requestId, Long.valueOf(stopWatch.getTime(TimeUnit.MILLISECONDS))});
                return flowResponse;
            } catch (Throwable th4) {
                if (atomicBoolean2.get()) {
                    builder.transactionSnapshot((FlowContextSnapshot) null);
                }
                String requestId2 = this.flowContextHolder.get().getRequestId();
                try {
                    if (!atomicBoolean2.get() && Optional.ofNullable(integrationFlow.getExceptionNodes()).isPresent() && !integrationFlow.getExceptionNodes().isEmpty()) {
                        try {
                            Optional<AbstractNode> execute4 = execute(str, (AbstractNode) integrationFlow.getExceptionNodes().stream().filter(abstractNode2 -> {
                                return abstractNode2.getNodeType().equals(NodeType.EXCEPTION_TRIGGER);
                            }).findFirst().get());
                            while (execute4.isPresent()) {
                                execute4 = execute(str, execute4.get());
                            }
                        } catch (Throwable th5) {
                            this.logger.error("执行异常流发生异常", th5);
                        }
                    }
                    if (Optional.ofNullable(this.flowLogRepository).isPresent() && integrationFlow.isAllowExecuteLog()) {
                        builder.nodeHistory(JsonUtils.object2Json(this.flowContextHolder.get().getNodeHistory()));
                        builder.finishSnapshot(this.flowContextHolder.get().toSnapShot(this.contextService));
                        builder.extendLog(this.flowContextHolder.get().getExtendLog());
                        try {
                            this.flowLogRepository.saveInstance(builder.build());
                        } catch (RuntimeException e2) {
                            this.logger.error("Save flow instance log failed!", e2);
                            if (this.flowContextHolder.get().isOwner().booleanValue()) {
                                ContextUtil.clear(this.contextService);
                            }
                            this.flowContextHolder.clear();
                            if (atomicBoolean.get()) {
                                this.keyLocker.unLock(str);
                            }
                            stopWatch.stop();
                            this.logger.info("流执行完成。流代码: [{}]，请求Id: [{}], 耗时: {} ms", new Object[]{str, requestId2, Long.valueOf(stopWatch.getTime(TimeUnit.MILLISECONDS))});
                            throw th4;
                        }
                    }
                } catch (Throwable th6) {
                    this.logger.error(String.format("Save flow with requestId : %s instance failed!", requestId2), th6);
                    if (this.flowContextHolder.get().isOwner().booleanValue()) {
                    }
                    this.flowContextHolder.clear();
                    if (atomicBoolean.get()) {
                    }
                    stopWatch.stop();
                    this.logger.info("流执行完成。流代码: [{}]，请求Id: [{}], 耗时: {} ms", new Object[]{str, requestId2, Long.valueOf(stopWatch.getTime(TimeUnit.MILLISECONDS))});
                    throw th4;
                }
                if (this.flowContextHolder.get().isOwner().booleanValue()) {
                }
                this.flowContextHolder.clear();
                if (atomicBoolean.get()) {
                }
                stopWatch.stop();
                this.logger.info("流执行完成。流代码: [{}]，请求Id: [{}], 耗时: {} ms", new Object[]{str, requestId2, Long.valueOf(stopWatch.getTime(TimeUnit.MILLISECONDS))});
                throw th4;
            }
        } finally {
        }
    };
    private Function5<String, Object, Consumer, Map<String, Object>, Boolean, Object> flowTaskFun = (str, obj, consumer, map, bool) -> {
        AtomicReference atomicReference = new AtomicReference();
        Object obj = null;
        CompletableFuture addExecuteTask = ThreadPoolManager.getInstance(Optional.ofNullable(this.ultramanFlowSetting.getFlow().getPool()).isPresent() ? Integer.valueOf(this.ultramanFlowSetting.getFlow().getPool().getCorePoolSize()) : null, Optional.ofNullable(this.ultramanFlowSetting.getFlow().getPool()).isPresent() ? Integer.valueOf(this.ultramanFlowSetting.getFlow().getPool().getMaximumPoolSize()) : null).addExecuteTask(() -> {
            return this.innerExecute.apply(str, obj, consumer, map, bool, atomicReference);
        });
        if (!bool.booleanValue()) {
            try {
                obj = addExecuteTask.get(300000L, TimeUnit.SECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                throw new FlowExecuteException(String.format("Flow with requestId : %s was interrupted! %s", atomicReference.get(), e.getMessage()), e);
            }
        }
        if (Optional.ofNullable(consumer).isPresent()) {
            addExecuteTask.thenAccept(obj2 -> {
                consumer.accept(obj2);
            });
        }
        return obj;
    };
    private BlockingQueue<FlowQueueMessage> flowQueue = new ArrayBlockingQueue(FLOW_QUEUE_CAPACITY.intValue());
    private ThreadPoolExecutor flowQueuePool = new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS, new ArrayBlockingQueue(10), new NamedThreadFactory("flowQueueExecutor-"), new ThreadPoolExecutor.AbortPolicy()) { // from class: com.xforceplus.ultraman.flows.automaticflow.executor.flow.FlowExecutorDefaultImpl.1
        @Override // java.util.concurrent.ThreadPoolExecutor
        public void afterExecute(Runnable runnable, Throwable th) {
            super.afterExecute(runnable, th);
            ThreadPoolManager.printException(runnable, th);
        }
    };

    @Override // com.xforceplus.ultraman.flows.automaticflow.executor.FlowExecutor
    public Object execute(String str, Object obj, Map<String, Object> map, boolean z) {
        return execute(str, obj, null, map, z);
    }

    @Override // com.xforceplus.ultraman.flows.automaticflow.executor.FlowExecutor
    public Object execute(String str, Object obj, Consumer consumer, Map<String, Object> map, boolean z) {
        if (checkQueueRun(str, obj, consumer, map, z)) {
            return null;
        }
        return this.flowTaskFun.apply(str, obj, consumer, map, Boolean.valueOf(z));
    }

    @Override // com.xforceplus.ultraman.flows.automaticflow.executor.FlowExecutor
    public Object currentThreadExecute(String str, Object obj, Map<String, Object> map) {
        AtomicReference atomicReference = new AtomicReference();
        try {
            return this.innerExecute.apply(str, obj, (Object) null, map, false, atomicReference);
        } catch (Throwable th) {
            throw new FlowExecuteException(String.format("Flow with requestId : %s was interrupted! %s", atomicReference.get(), th.getMessage()), th);
        }
    }

    private boolean checkQueueRun(String str, Object obj, Consumer consumer, Map<String, Object> map, boolean z) {
        IntegrationFlow integrationFlow = (IntegrationFlow) Optional.ofNullable(FlowBus.getFlow(str, FlowType.INTEGRATION)).orElseThrow(() -> {
            return new FlowExecuteException(FlowUtils.buildFailedMessage(str, "can not find any flow!"));
        });
        if (integrationFlow.isAllowConcurrency() || !z) {
            return false;
        }
        try {
            if (this.flowQueue.offer(FlowQueueMessage.builder().flowData(obj).consumer(consumer).context(map).flow(integrationFlow).async(true).build(), Constant.OFFER_WAIT_TIME.intValue(), TimeUnit.SECONDS)) {
                return true;
            }
            throw new FlowQueueFullException(FlowUtils.buildFailedMessage(str, "Flow queue is full!"));
        } catch (InterruptedException e) {
            throw new FlowExecuteException(FlowUtils.buildFailedMessage(str, "Interrupted exception"), e);
        }
    }

    private void commitTransaction() {
        if (this.flowContextHolder.get().isOwner().booleanValue()) {
            this.flowTransactionManager.getCurrent().ifPresent(transactionStatus -> {
                this.flowTransactionManager.commit();
            });
        } else {
            this.logger.warn("Is not owner,so not commit transaction");
        }
    }

    private void rollBackTransaction() {
        if (this.flowContextHolder.get().isOwner().booleanValue()) {
            this.flowTransactionManager.getCurrent().ifPresent(transactionStatus -> {
                this.flowTransactionManager.rollBack();
            });
        } else {
            this.logger.warn("Is not owner,so not rollback transaction");
        }
    }

    @Override // com.xforceplus.ultraman.flows.automaticflow.executor.FlowExecutor
    public void execute(String str) {
        execute(str, Maps.newHashMap(), this.contextService.getAll(), true);
    }

    @Override // com.xforceplus.ultraman.flows.automaticflow.executor.FlowExecutor
    public void execute(String str, Consumer consumer) {
        execute(str, Maps.newHashMap(), consumer, this.contextService.getAll(), true);
    }

    public Optional<AbstractNode> execute(String str, AbstractNode abstractNode) {
        String str2;
        this.flowContextHolder.get().setCurrentNodeId(abstractNode.getNodeId());
        this.flowContextHolder.get().setCurrentNodeName(abstractNode.getName());
        this.flowContextHolder.get().addNodeHistory(abstractNode.getNodeId());
        Object execute = this.nodeExecutorSelector.select(abstractNode).execute(abstractNode);
        if ((execute instanceof Boolean) && !((Boolean) execute).booleanValue()) {
            this.flowTransactionManager.rollBack();
            return Optional.empty();
        }
        if (abstractNode.getNodeType().equals(NodeType.GATEWAY)) {
            str2 = Optional.ofNullable(((Map) Optional.ofNullable(this.flowContextHolder.get().getOutPut(abstractNode.getNodeId())).orElseThrow(() -> {
                return new FlowExecuteException(String.format("无法获取到前序节点的输出! SourceNodeId: %s", abstractNode.getNodeId()));
            })).get("result")).orElseThrow(() -> {
                return new FlowExecuteException("无法根据sourceKey->[result]获取到节点输出!");
            }).toString();
        } else {
            if (!Optional.ofNullable(abstractNode.getNextIds()).isPresent()) {
                return Optional.empty();
            }
            str2 = (String) abstractNode.getNextIds().get(0);
        }
        return getNodeById(str, str2);
    }

    @Override // com.xforceplus.ultraman.flows.automaticflow.executor.FlowExecutor
    public Optional<AbstractNode> getFirstNode(String str) {
        Optional<AbstractNode> findAny = FlowBus.getFlow(str, FlowType.INTEGRATION).getNodes().stream().filter(abstractNode -> {
            return abstractNode.isStartNode();
        }).findAny();
        if (findAny.isPresent()) {
            return findAny;
        }
        throw new FlowParseException(String.format("错误的流配置信息，未发现开始节点！ 流程编码 : %s", str));
    }

    private Optional<AbstractNode> getRollbackNode(String str) {
        Optional<AbstractNode> findAny = FlowBus.getFlow(str, FlowType.INTEGRATION).getNodes().stream().filter(abstractNode -> {
            return abstractNode.isStartTransaction();
        }).findAny();
        if (findAny.isPresent()) {
            return findAny;
        }
        throw new FlowParseException(String.format("错误的流配置信息，未发现配置的事务节点！ 流程编码 : %s", str));
    }

    @Override // com.xforceplus.ultraman.flows.automaticflow.executor.FlowExecutor
    public void replay(Long l, boolean z) {
        Optional findOne = this.businessFacade.findOne(new EntityId(this.businessFacade.load("flowInstanceLog"), l.longValue()));
        if (!findOne.isPresent()) {
            throw new FlowExecuteException(String.format("Can not find any flow instance with id %s", l));
        }
        FlowReplayLog.FlowReplayLogBuilder builder = FlowReplayLog.builder();
        FlowInstanceLog flowInstanceLog = (FlowInstanceLog) ((EntityInstance) findOne.get()).into(FlowInstanceLog.class).get();
        if (z) {
            if (StringUtils.isBlank(flowInstanceLog.getSnapshot()) || !Optional.ofNullable(flowInstanceLog.getSnapshotFormat()).isPresent()) {
                throw new FlowExecuteException("Can not retry flow without snapshot!");
            }
        } else if (StringUtils.isBlank(flowInstanceLog.getStartSnapshot()) || !Optional.ofNullable(flowInstanceLog.getStartSnapshotFormat()).isPresent()) {
            throw new FlowExecuteException("Can not replay flow without start snapshot");
        }
        IntegrationFlow flow = FlowBus.getFlow(flowInstanceLog.getFlowCode(), FlowType.INTEGRATION);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(true);
        ThreadPoolManager.getInstance(Integer.valueOf(this.ultramanFlowSetting.getFlow().getPool().getCorePoolSize()), Integer.valueOf(this.ultramanFlowSetting.getFlow().getPool().getMaximumPoolSize())).addExecuteTask(() -> {
            if (!flow.isAllowConcurrency()) {
                try {
                    boolean tryLock = this.keyLocker.tryLock(flow.getCode(), 300000L, TimeUnit.MILLISECONDS);
                    atomicBoolean.set(tryLock);
                    if (!tryLock) {
                        throw new FlowExecuteException(String.format("wait execute lock time out！ flow : %s", flow.getCode()));
                    }
                } catch (Throwable th) {
                    this.logger.error("Get lock failed!", th);
                }
            }
            StopWatch stopWatch = new StopWatch();
            stopWatch.start();
            try {
                try {
                    builder.flowCode(flow.getCode()).flowInstanceId(flowInstanceLog.getInstanceId()).deleteFlag("1").status(FlowStatus.SUCCESS.code());
                    Optional<AbstractNode> firstNode = !z ? getFirstNode(flowInstanceLog.getFlowCode()) : (Optional.ofNullable(flowInstanceLog.getRollback()).isPresent() && flowInstanceLog.getRollback().booleanValue()) ? getNodeById(flowInstanceLog.getFlowCode(), flowInstanceLog.getRollbackNodeId()) : getNodeById(flowInstanceLog.getFlowCode(), flowInstanceLog.getErrorNodeId());
                    if (!firstNode.isPresent()) {
                        throw new FlowExecuteException("Can not find any start node!");
                    }
                    this.flowContextHolder.createFromSnapshot(this.flowLogService.deserializeSnapshot(flowInstanceLog, z));
                    if (firstNode.get().isStartTransaction()) {
                        this.flowTransactionManager.createNewTransaction(TRANSACTION_TIME_OUT, "");
                        builder.rollbackNodeId(firstNode.get().getNodeId());
                    }
                    Optional<AbstractNode> execute = execute(flow.getCode(), firstNode.get());
                    while (execute.isPresent()) {
                        if (execute.get().isStartTransaction() && !this.flowTransactionManager.getCurrent().isPresent()) {
                            builder.rollbackNodeId(execute.get().getNodeId());
                            this.flowTransactionManager.createNewTransaction(TRANSACTION_TIME_OUT, "");
                        }
                        Optional<AbstractNode> execute2 = execute(flow.getCode(), execute.get());
                        if (execute.get().isCommitTransaction()) {
                            commitTransaction();
                        }
                        execute = execute2;
                    }
                    commitTransaction();
                    Object flowResponse = this.flowContextHolder.get().getFlowResponse();
                    if (this.flowContextHolder.get().isOwner().booleanValue()) {
                        ContextUtil.clear(this.contextService);
                    }
                    this.flowContextHolder.clear();
                    if (atomicBoolean.get()) {
                        this.keyLocker.unLock(flow.getCode());
                    }
                    try {
                        if (Optional.ofNullable(this.flowLogRepository).isPresent()) {
                            this.flowLogRepository.saveInstanceReplay(flowInstanceLog.getInstanceId(), builder.build());
                        }
                    } catch (Throwable th2) {
                        this.logger.error("Save instance replay log failed!", th2);
                    }
                    stopWatch.stop();
                    this.logger.info("Flow replay finished! time-consuming : {} ms", Long.valueOf(stopWatch.getTime(TimeUnit.MILLISECONDS)));
                    return flowResponse;
                } catch (Throwable th3) {
                    this.logger.error("Flow replay failed!", th3);
                    atomicBoolean2.set(false);
                    if (this.flowTransactionManager.getCurrent().isPresent()) {
                        this.logger.info("Rollback current transaction with id {}", this.flowTransactionManager.getCurrent().get());
                        rollBackTransaction();
                        builder.rollback(true);
                    }
                    builder.errorMsg("Flow replay failed!").exceptionTrace(ExceptionUtils.getStackTrace(th3)).errorNodeId(this.flowContextHolder.get().getCurrentNodeId()).status(FlowStatus.FAILED.code());
                    throw new FlowExecuteException("Flow replay failed!", th3);
                }
            } catch (Throwable th4) {
                if (this.flowContextHolder.get().isOwner().booleanValue()) {
                    ContextUtil.clear(this.contextService);
                }
                this.flowContextHolder.clear();
                if (atomicBoolean.get()) {
                    this.keyLocker.unLock(flow.getCode());
                }
                try {
                    if (Optional.ofNullable(this.flowLogRepository).isPresent()) {
                        this.flowLogRepository.saveInstanceReplay(flowInstanceLog.getInstanceId(), builder.build());
                    }
                } catch (Throwable th5) {
                    this.logger.error("Save instance replay log failed!", th5);
                }
                stopWatch.stop();
                this.logger.info("Flow replay finished! time-consuming : {} ms", Long.valueOf(stopWatch.getTime(TimeUnit.MILLISECONDS)));
                throw th4;
            }
        }).thenAccept(obj -> {
            this.logger.info("Flow finish!");
        });
    }

    public void getRightNodes(String str, String str2, List<String> list) {
        List nextIds = getNodeById(str, str2).get().getNextIds();
        if (!Optional.ofNullable(nextIds).isPresent() || nextIds.isEmpty()) {
            return;
        }
        list.addAll(nextIds);
        Iterator it = nextIds.iterator();
        while (it.hasNext()) {
            getRightNodes(str, (String) it.next(), list);
        }
    }

    /* JADX WARN: Removed duplicated region for block: B:93:0x03ae  */
    /* JADX WARN: Removed duplicated region for block: B:96:0x03ec  */
    /* JADX WARN: Removed duplicated region for block: B:99:0x0402  */
    @Override // com.xforceplus.ultraman.flows.automaticflow.executor.FlowExecutor
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public com.xforceplus.ultraman.flows.common.pojo.flow.node.FlowDebugResponse debug(com.xforceplus.ultraman.flows.common.pojo.flow.node.FlowDebugRequest r8, java.util.Map<java.lang.String, java.lang.Object> r9) {
        /*
            Method dump skipped, instructions count: 1068
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.xforceplus.ultraman.flows.automaticflow.executor.flow.FlowExecutorDefaultImpl.debug(com.xforceplus.ultraman.flows.common.pojo.flow.node.FlowDebugRequest, java.util.Map):com.xforceplus.ultraman.flows.common.pojo.flow.node.FlowDebugResponse");
    }

    public List<String> getUnReachBreakPoints(FlowDebugRequest flowDebugRequest, String str) {
        ArrayList newArrayList = Lists.newArrayList();
        getRightNodes(str, flowDebugRequest.getCurrentNodeId(), newArrayList);
        List list = (List) flowDebugRequest.getBreakPoints().stream().map(breakPoint -> {
            return breakPoint.getNodeId();
        }).collect(Collectors.toList());
        return (List) newArrayList.stream().filter(str2 -> {
            return list.contains(str2);
        }).collect(Collectors.toList());
    }

    private Optional<AbstractNode> getNodeById(String str, String str2) {
        IntegrationFlow flow = FlowBus.getFlow(str, FlowType.INTEGRATION);
        Optional<AbstractNode> node = flow.getNode(str2);
        if (!node.isPresent()) {
            node = flow.getExceptionNodes().stream().filter(abstractNode -> {
                return abstractNode.getNodeId().equals(str2);
            }).findAny();
        }
        return node;
    }

    private Object excludeStream(Object obj) {
        if (obj instanceof InputStream) {
            return Maps.newHashMap();
        }
        if (obj instanceof Map) {
            ((Map) obj).entrySet().stream().forEach(entry -> {
                entry.setValue(excludeStream(entry.getValue()));
            });
        }
        return obj;
    }

    private FlowDebugResponse buildNodeResult(String str) {
        FlowContextSnapshot snapShot = this.flowContextHolder.get().toSnapShot(this.contextService);
        HashMap newHashMap = Maps.newHashMap();
        snapShot.getContext().entrySet().stream().forEach(entry -> {
            if (((String) entry.getKey()).contains("node_input_")) {
                String substring = ((String) entry.getKey()).substring("node_input_".length());
                if (newHashMap.containsKey(substring)) {
                    ((FlowDebugResponse.NodeResult) newHashMap.get(substring)).setInput(excludeStream(entry.getValue()));
                    return;
                } else {
                    newHashMap.put(substring, FlowDebugResponse.NodeResult.builder().nodeId(substring).input(excludeStream(entry.getValue())).build());
                    return;
                }
            }
            if (((String) entry.getKey()).contains("node_output_")) {
                String substring2 = ((String) entry.getKey()).substring("node_output_".length());
                if (newHashMap.containsKey(substring2)) {
                    ((FlowDebugResponse.NodeResult) newHashMap.get(substring2)).setOutput(excludeStream(entry.getValue()));
                    return;
                } else {
                    newHashMap.put(substring2, FlowDebugResponse.NodeResult.builder().nodeId(substring2).output(excludeStream(entry.getValue())).build());
                    return;
                }
            }
            if (((String) entry.getKey()).contains("node_variable_")) {
                String substring3 = ((String) entry.getKey()).substring("node_variable_".length());
                if (newHashMap.containsKey(substring3)) {
                    ((FlowDebugResponse.NodeResult) newHashMap.get(substring3)).setOutput(excludeStream(entry.getValue()));
                } else {
                    newHashMap.put(substring3, FlowDebugResponse.NodeResult.builder().nodeId(substring3).output(excludeStream(entry.getValue())).build());
                }
            }
        });
        return FlowDebugResponse.builder().flowCode(this.flowContextHolder.get().getFlowCode()).currentNodeId(str).nodeResults(newHashMap).userContext(snapShot.getUserContext()).nodeHistory(this.flowContextHolder.get().getNodeHistory()).build();
    }

    public void afterPropertiesSet() {
        CompletableFuture.runAsync(() -> {
            while (true) {
                FlowQueueMessage flowQueueMessage = null;
                try {
                    flowQueueMessage = this.flowQueue.poll(Constant.POLL_WAIT_TIME.intValue(), TimeUnit.SECONDS);
                    if (Optional.ofNullable(flowQueueMessage).isPresent()) {
                        this.flowTaskFun.apply(flowQueueMessage.getFlow().getCode(), flowQueueMessage.getFlowData(), flowQueueMessage.getConsumer(), flowQueueMessage.getContext(), false);
                    }
                    if (this.flowQueue.size() > 0) {
                        this.logger.warn("flow task queue size : {}", Integer.valueOf(this.flowQueue.size()));
                    }
                } catch (Throwable th) {
                    this.logger.error(FlowUtils.buildFailedMessage(flowQueueMessage.getFlow().getCode(), th.getClass().getName() + ":" + th.getMessage()), th);
                }
            }
        }, this.flowQueuePool);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -765551204:
                if (implMethodName.equals("lambda$new$a6de2c18$1")) {
                    z = false;
                    break;
                }
                break;
            case -174006098:
                if (implMethodName.equals("lambda$new$aadf6bf8$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("io/vavr/Function5") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/xforceplus/ultraman/flows/automaticflow/executor/flow/FlowExecutorDefaultImpl") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Ljava/lang/Object;Ljava/util/function/Consumer;Ljava/util/Map;Ljava/lang/Boolean;)Ljava/lang/Object;")) {
                    FlowExecutorDefaultImpl flowExecutorDefaultImpl = (FlowExecutorDefaultImpl) serializedLambda.getCapturedArg(0);
                    return (str, obj, consumer, map, bool) -> {
                        AtomicReference atomicReference = new AtomicReference();
                        Object obj = null;
                        CompletableFuture addExecuteTask = ThreadPoolManager.getInstance(Optional.ofNullable(this.ultramanFlowSetting.getFlow().getPool()).isPresent() ? Integer.valueOf(this.ultramanFlowSetting.getFlow().getPool().getCorePoolSize()) : null, Optional.ofNullable(this.ultramanFlowSetting.getFlow().getPool()).isPresent() ? Integer.valueOf(this.ultramanFlowSetting.getFlow().getPool().getMaximumPoolSize()) : null).addExecuteTask(() -> {
                            return this.innerExecute.apply(str, obj, consumer, map, bool, atomicReference);
                        });
                        if (!bool.booleanValue()) {
                            try {
                                obj = addExecuteTask.get(300000L, TimeUnit.SECONDS);
                            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                                throw new FlowExecuteException(String.format("Flow with requestId : %s was interrupted! %s", atomicReference.get(), e.getMessage()), e);
                            }
                        }
                        if (Optional.ofNullable(consumer).isPresent()) {
                            addExecuteTask.thenAccept(obj2 -> {
                                consumer.accept(obj2);
                            });
                        }
                        return obj;
                    };
                }
                break;
            case QueryDataNodeExecutor.PAGE_START /* 1 */:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("io/vavr/Function6") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/xforceplus/ultraman/flows/automaticflow/executor/flow/FlowExecutorDefaultImpl") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Ljava/lang/Object;Ljava/util/function/Consumer;Ljava/util/Map;Ljava/lang/Boolean;Ljava/util/concurrent/atomic/AtomicReference;)Ljava/lang/Object;")) {
                    FlowExecutorDefaultImpl flowExecutorDefaultImpl2 = (FlowExecutorDefaultImpl) serializedLambda.getCapturedArg(0);
                    return (str2, obj2, consumer2, map2, bool2, atomicReference) -> {
                        FlowExecuteException flowExecuteException;
                        IntegrationFlow integrationFlow = (IntegrationFlow) Optional.ofNullable(FlowBus.getFlow(str2, FlowType.INTEGRATION)).orElseThrow(() -> {
                            return new FlowExecuteException(FlowUtils.buildFailedMessage(str2, "can not find any flow!"));
                        });
                        FlowInstanceLog.FlowInstanceLogBuilder builder = FlowInstanceLog.builder();
                        builder.flowCode(str2);
                        builder.flowName(integrationFlow.getName());
                        builder.tenantId(Long.valueOf((String) Optional.ofNullable(this.contextService.get(ContextKeys.StringKeys.TENANTID_KEY)).orElse("0")));
                        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                        AtomicBoolean atomicBoolean2 = new AtomicBoolean(true);
                        StopWatch stopWatch = new StopWatch();
                        stopWatch.start();
                        if (!integrationFlow.isAllowConcurrency() && !bool2.booleanValue()) {
                            try {
                                boolean tryLock = this.keyLocker.tryLock(integrationFlow.getCode(), 300000L, TimeUnit.MILLISECONDS);
                                atomicBoolean.set(tryLock);
                                if (!tryLock) {
                                    throw new FlowExecuteException(FlowUtils.buildFailedMessage(str2, "wait execute lock time out！"));
                                }
                            } catch (Throwable th) {
                                this.logger.error("get lock failed!", th);
                            }
                        }
                        try {
                            try {
                                ContextUtil.fillContextFromEvent(this.contextService, map2);
                                this.flowContextHolder.create(str2, integrationFlow.getName(), obj2);
                                this.flowContextHolder.get().setFlowCode(str2);
                                atomicReference.set(this.flowContextHolder.get().getRequestId());
                                builder.instanceId(this.flowContextHolder.get().getRequestId()).parentInstanceId(this.flowContextHolder.get().getParentRequestId()).parentNodeId(this.flowContextHolder.get().getParentNodeId()).status(FlowStatus.SUCCESS.code()).deleteFlag("1");
                                Optional<AbstractNode> firstNode = getFirstNode(str2);
                                if (firstNode.get().isStartTransaction()) {
                                    this.flowTransactionManager.createNewTransaction(TRANSACTION_TIME_OUT, "");
                                    if (integrationFlow.isAllowFailRetry() && integrationFlow.isAllowExecuteLog()) {
                                        builder.transactionSnapshot(this.flowContextHolder.get().toSnapShot(this.contextService));
                                    }
                                }
                                if (integrationFlow.isAllowSuccessRetry() && integrationFlow.isAllowExecuteLog()) {
                                    builder.beginSnapshot(this.flowContextHolder.get().toSnapShot(this.contextService));
                                }
                                Optional<AbstractNode> execute = execute(str2, firstNode.get());
                                while (execute.isPresent()) {
                                    if (execute.get().isStartTransaction() && !this.flowTransactionManager.getCurrent().isPresent()) {
                                        this.flowTransactionManager.createNewTransaction(TRANSACTION_TIME_OUT, "");
                                        if (integrationFlow.isAllowFailRetry() && integrationFlow.isAllowExecuteLog()) {
                                            builder.transactionSnapshot(this.flowContextHolder.get().toSnapShot(this.contextService));
                                        }
                                    }
                                    Optional<AbstractNode> execute2 = execute(str2, execute.get());
                                    if (execute.get().isCommitTransaction()) {
                                        commitTransaction();
                                    }
                                    execute = execute2;
                                }
                                commitTransaction();
                                Object flowResponse = this.flowContextHolder.get().getFlowResponse();
                                if (atomicBoolean2.get()) {
                                    builder.transactionSnapshot((FlowContextSnapshot) null);
                                }
                                String requestId = this.flowContextHolder.get().getRequestId();
                                try {
                                    if (!atomicBoolean2.get() && Optional.ofNullable(integrationFlow.getExceptionNodes()).isPresent() && !integrationFlow.getExceptionNodes().isEmpty()) {
                                        try {
                                            Optional<AbstractNode> execute3 = execute(str2, (AbstractNode) integrationFlow.getExceptionNodes().stream().filter(abstractNode2 -> {
                                                return abstractNode2.getNodeType().equals(NodeType.EXCEPTION_TRIGGER);
                                            }).findFirst().get());
                                            while (execute3.isPresent()) {
                                                execute3 = execute(str2, execute3.get());
                                            }
                                        } catch (Throwable th2) {
                                            this.logger.error("执行异常流发生异常", th2);
                                        }
                                    }
                                    if (Optional.ofNullable(this.flowLogRepository).isPresent() && integrationFlow.isAllowExecuteLog()) {
                                        builder.nodeHistory(JsonUtils.object2Json(this.flowContextHolder.get().getNodeHistory()));
                                        builder.finishSnapshot(this.flowContextHolder.get().toSnapShot(this.contextService));
                                        builder.extendLog(this.flowContextHolder.get().getExtendLog());
                                        try {
                                            this.flowLogRepository.saveInstance(builder.build());
                                        } catch (RuntimeException e) {
                                            this.logger.error("Save flow instance log failed!", e);
                                        }
                                    }
                                } catch (Throwable th3) {
                                    this.logger.error(String.format("Save flow with requestId : %s instance failed!", requestId), th3);
                                }
                                if (this.flowContextHolder.get().isOwner().booleanValue()) {
                                    ContextUtil.clear(this.contextService);
                                }
                                this.flowContextHolder.clear();
                                if (atomicBoolean.get()) {
                                    this.keyLocker.unLock(str2);
                                }
                                stopWatch.stop();
                                this.logger.info("流执行完成。流代码: [{}]，请求Id: [{}], 耗时: {} ms", new Object[]{str2, requestId, Long.valueOf(stopWatch.getTime(TimeUnit.MILLISECONDS))});
                                return flowResponse;
                            } catch (Throwable th4) {
                                if (atomicBoolean2.get()) {
                                    builder.transactionSnapshot((FlowContextSnapshot) null);
                                }
                                String requestId2 = this.flowContextHolder.get().getRequestId();
                                try {
                                    if (!atomicBoolean2.get() && Optional.ofNullable(integrationFlow.getExceptionNodes()).isPresent() && !integrationFlow.getExceptionNodes().isEmpty()) {
                                        try {
                                            Optional<AbstractNode> execute4 = execute(str2, (AbstractNode) integrationFlow.getExceptionNodes().stream().filter(abstractNode22 -> {
                                                return abstractNode22.getNodeType().equals(NodeType.EXCEPTION_TRIGGER);
                                            }).findFirst().get());
                                            while (execute4.isPresent()) {
                                                execute4 = execute(str2, execute4.get());
                                            }
                                        } catch (Throwable th5) {
                                            this.logger.error("执行异常流发生异常", th5);
                                        }
                                    }
                                    if (Optional.ofNullable(this.flowLogRepository).isPresent() && integrationFlow.isAllowExecuteLog()) {
                                        builder.nodeHistory(JsonUtils.object2Json(this.flowContextHolder.get().getNodeHistory()));
                                        builder.finishSnapshot(this.flowContextHolder.get().toSnapShot(this.contextService));
                                        builder.extendLog(this.flowContextHolder.get().getExtendLog());
                                        try {
                                            this.flowLogRepository.saveInstance(builder.build());
                                        } catch (RuntimeException e2) {
                                            this.logger.error("Save flow instance log failed!", e2);
                                            if (this.flowContextHolder.get().isOwner().booleanValue()) {
                                                ContextUtil.clear(this.contextService);
                                            }
                                            this.flowContextHolder.clear();
                                            if (atomicBoolean.get()) {
                                                this.keyLocker.unLock(str2);
                                            }
                                            stopWatch.stop();
                                            this.logger.info("流执行完成。流代码: [{}]，请求Id: [{}], 耗时: {} ms", new Object[]{str2, requestId2, Long.valueOf(stopWatch.getTime(TimeUnit.MILLISECONDS))});
                                            throw th4;
                                        }
                                    }
                                } catch (Throwable th6) {
                                    this.logger.error(String.format("Save flow with requestId : %s instance failed!", requestId2), th6);
                                    if (this.flowContextHolder.get().isOwner().booleanValue()) {
                                    }
                                    this.flowContextHolder.clear();
                                    if (atomicBoolean.get()) {
                                    }
                                    stopWatch.stop();
                                    this.logger.info("流执行完成。流代码: [{}]，请求Id: [{}], 耗时: {} ms", new Object[]{str2, requestId2, Long.valueOf(stopWatch.getTime(TimeUnit.MILLISECONDS))});
                                    throw th4;
                                }
                                if (this.flowContextHolder.get().isOwner().booleanValue()) {
                                }
                                this.flowContextHolder.clear();
                                if (atomicBoolean.get()) {
                                }
                                stopWatch.stop();
                                this.logger.info("流执行完成。流代码: [{}]，请求Id: [{}], 耗时: {} ms", new Object[]{str2, requestId2, Long.valueOf(stopWatch.getTime(TimeUnit.MILLISECONDS))});
                                throw th4;
                            }
                        } finally {
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
