package com.xforceplus.ultraman.flows.automaticflow.executor.flow;

import com.google.common.collect.Maps;
import com.xforceplus.ultraman.flows.automaticflow.executor.FlowExecutor;
import com.xforceplus.ultraman.flows.automaticflow.executor.FlowTransactionManager;
import com.xforceplus.ultraman.flows.automaticflow.executor.NodeExecutorSelector;
import com.xforceplus.ultraman.flows.automaticflow.executor.thread.ThreadPoolManager;
import com.xforceplus.ultraman.flows.common.config.setting.FlowBus;
import com.xforceplus.ultraman.flows.common.config.setting.IntegrationFlow;
import com.xforceplus.ultraman.flows.common.constant.NodeType;
import com.xforceplus.ultraman.flows.common.core.FlowContextHolder;
import com.xforceplus.ultraman.flows.common.core.FlowContextSnapshot;
import com.xforceplus.ultraman.flows.common.core.IKeyLocker;
import com.xforceplus.ultraman.flows.common.exception.FlowExecuteException;
import com.xforceplus.ultraman.flows.common.exception.FlowParseException;
import com.xforceplus.ultraman.flows.common.history.FlowInstanceLog;
import com.xforceplus.ultraman.flows.common.history.FlowLogRepository;
import com.xforceplus.ultraman.flows.common.history.FlowReplayLog;
import com.xforceplus.ultraman.flows.common.history.FlowStatus;
import com.xforceplus.ultraman.flows.common.history.SnapshotFormat;
import com.xforceplus.ultraman.flows.common.pojo.flow.FlowType;
import com.xforceplus.ultraman.flows.common.pojo.flow.node.basic.AbstractNode;
import com.xforceplus.ultraman.oqsengine.sdk.business.meta.EntityId;
import com.xforceplus.ultraman.oqsengine.sdk.business.meta.EntityInstance;
import com.xforceplus.ultraman.oqsengine.sdk.business.meta.service.BusinessFacade;
import com.xforceplus.ultraman.oqsengine.sdk.transactional.OqsTransaction;
import com.xplat.ultraman.api.management.convertor.utils.JsonUtils;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:com/xforceplus/ultraman/flows/automaticflow/executor/flow/FlowExecutorDefaultImpl.class */
public class FlowExecutorDefaultImpl implements FlowExecutor {
    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private NodeExecutorSelector nodeExecutorSelector;

    @Autowired
    private FlowContextHolder flowContextHolder;

    @Autowired
    private FlowTransactionManager flowTransactionManager;

    @Autowired
    private IKeyLocker keyLocker;

    @Autowired(required = false)
    private FlowLogRepository flowLogRepository;

    @Autowired
    private BusinessFacade businessFacade;
    private static final int TRANSACTION_TIME_OUT = 300000;

    @Override // com.xforceplus.ultraman.flows.automaticflow.executor.FlowExecutor
    public void execute(String str, Object obj) {
        execute(str, obj, null);
    }

    @Override // com.xforceplus.ultraman.flows.automaticflow.executor.FlowExecutor
    public void execute(String str, Object obj, Consumer consumer) {
        FlowInstanceLog.FlowInstanceLogBuilder builder = FlowInstanceLog.builder();
        builder.flowCode(str);
        IntegrationFlow flow = FlowBus.getFlow(str, FlowType.INTEGRATION);
        builder.flowName(flow.getName());
        if (!Optional.ofNullable(flow).isPresent()) {
            throw new FlowExecuteException(String.format("Can not find any flow with code %s", str));
        }
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(true);
        CompletableFuture addExecuteTask = ThreadPoolManager.getInstance().addExecuteTask(() -> {
            FlowExecuteException flowExecuteException;
            StopWatch stopWatch = new StopWatch();
            stopWatch.start();
            if (!flow.isAllowConcurrency()) {
                try {
                    boolean tryLock = this.keyLocker.tryLock(flow.getCode(), 300000L, TimeUnit.MILLISECONDS);
                    atomicBoolean.set(tryLock);
                    if (!tryLock) {
                        throw new FlowExecuteException(String.format("wait execute lock time out！ flow : %s", flow.getCode()));
                    }
                } catch (Throwable th) {
                    this.logger.error("Get lock failed!", th);
                }
            }
            this.flowContextHolder.create(obj);
            builder.instanceId(this.flowContextHolder.get().getRequestId());
            this.flowContextHolder.get().setFlowCode(str);
            try {
                try {
                    Optional<AbstractNode> firstNode = getFirstNode(str);
                    if (firstNode.get().isStartTransaction()) {
                        this.flowTransactionManager.createNewTransaction(TRANSACTION_TIME_OUT, "");
                        this.flowContextHolder.get().setTransactionNode(firstNode.get());
                        if (flow.isAllowFailRetry()) {
                            builder.transactionSnapshot(this.flowContextHolder.get().toSnapShot());
                        }
                    }
                    if (flow.isAllowSuccessRetry()) {
                        builder.beginSnapshot(this.flowContextHolder.get().toSnapShot());
                    }
                    Optional<AbstractNode> execute = execute(str, firstNode.get());
                    while (execute.isPresent()) {
                        if (execute.get().isStartTransaction() && !this.flowTransactionManager.getCurrent().isPresent()) {
                            this.flowTransactionManager.createNewTransaction(TRANSACTION_TIME_OUT, "");
                            this.flowContextHolder.get().setTransactionNode(execute.get());
                            if (flow.isAllowFailRetry()) {
                                builder.transactionSnapshot(this.flowContextHolder.get().toSnapShot());
                            }
                        }
                        Optional<AbstractNode> execute2 = execute(str, execute.get());
                        if (execute.get().isCommitTransaction()) {
                            commitTransaction();
                        }
                        execute = execute2;
                    }
                    commitTransaction();
                    Object flowResponse = this.flowContextHolder.get().getFlowResponse();
                    this.flowContextHolder.clear();
                    if (atomicBoolean.get()) {
                        this.keyLocker.unLock(str);
                    }
                    if (atomicBoolean2.get()) {
                        builder.transactionSnapshot((FlowContextSnapshot) null);
                    }
                    try {
                        if (Optional.ofNullable(this.flowLogRepository).isPresent()) {
                            this.flowLogRepository.saveInstance(builder.build());
                        }
                    } catch (Throwable th2) {
                        this.logger.error("Save instance log failed!", th2);
                    }
                    stopWatch.stop();
                    this.logger.info("Flow finished! time-consuming : {} ms", Long.valueOf(stopWatch.getTime(TimeUnit.MILLISECONDS)));
                    return flowResponse;
                } finally {
                }
            } catch (Throwable th3) {
                this.flowContextHolder.clear();
                if (atomicBoolean.get()) {
                    this.keyLocker.unLock(str);
                }
                if (atomicBoolean2.get()) {
                    builder.transactionSnapshot((FlowContextSnapshot) null);
                }
                try {
                    if (Optional.ofNullable(this.flowLogRepository).isPresent()) {
                        this.flowLogRepository.saveInstance(builder.build());
                    }
                } catch (Throwable th4) {
                    this.logger.error("Save instance log failed!", th4);
                }
                stopWatch.stop();
                this.logger.info("Flow finished! time-consuming : {} ms", Long.valueOf(stopWatch.getTime(TimeUnit.MILLISECONDS)));
                throw th3;
            }
        });
        if (Optional.ofNullable(consumer).isPresent()) {
            addExecuteTask.thenAccept(obj2 -> {
                consumer.accept(obj2);
            });
        }
    }

    private void commitTransaction() {
        Optional<OqsTransaction> current = this.flowTransactionManager.getCurrent();
        if (!current.isPresent() || current.get().isRollBack()) {
            return;
        }
        this.flowTransactionManager.commit();
    }

    private void rollBackTransaction() {
        this.flowTransactionManager.rollBack();
    }

    @Override // com.xforceplus.ultraman.flows.automaticflow.executor.FlowExecutor
    public void execute(String str) {
        execute(str, Maps.newHashMap());
    }

    @Override // com.xforceplus.ultraman.flows.automaticflow.executor.FlowExecutor
    public void execute(String str, Consumer consumer) {
        execute(str, Maps.newHashMap(), consumer);
    }

    public Optional<AbstractNode> execute(String str, AbstractNode abstractNode) {
        String str2;
        this.flowContextHolder.get().setCurrentNode(abstractNode.getNodeId());
        Object execute = this.nodeExecutorSelector.select(abstractNode).execute(abstractNode);
        if ((execute instanceof Boolean) && !((Boolean) execute).booleanValue()) {
            return Optional.empty();
        }
        if (abstractNode.getNodeType().equals(NodeType.GATEWAY)) {
            str2 = this.flowContextHolder.get().getOutPut(abstractNode.getNodeId()).toString();
        } else {
            if (!Optional.ofNullable(abstractNode.getNextIds()).isPresent()) {
                return Optional.empty();
            }
            str2 = (String) abstractNode.getNextIds().get(0);
        }
        return getNodeById(str, str2);
    }

    @Override // com.xforceplus.ultraman.flows.automaticflow.executor.FlowExecutor
    public Optional<AbstractNode> getFirstNode(String str) {
        Optional<AbstractNode> findAny = FlowBus.getFlow(str, FlowType.INTEGRATION).getNodes().stream().filter(abstractNode -> {
            return abstractNode.isStartNode();
        }).findAny();
        if (findAny.isPresent()) {
            return findAny;
        }
        throw new FlowParseException(String.format("错误的流配置信息，未发现开始节点！ 流程编码 : %s", str));
    }

    @Override // com.xforceplus.ultraman.flows.automaticflow.executor.FlowExecutor
    public void replay(Long l, boolean z) {
        Optional findOne = this.businessFacade.findOne(new EntityId(this.businessFacade.load("flowInstanceLog"), l.longValue()));
        if (!findOne.isPresent()) {
            throw new FlowExecuteException(String.format("Can not find any flow instance with id %s", l));
        }
        FlowReplayLog.FlowReplayLogBuilder builder = FlowReplayLog.builder();
        FlowInstanceLog flowInstanceLog = (FlowInstanceLog) ((EntityInstance) findOne.get()).into(FlowInstanceLog.class).get();
        IntegrationFlow flow = FlowBus.getFlow(flowInstanceLog.getFlowCode(), FlowType.INTEGRATION);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(true);
        if (!flow.isAllowConcurrency()) {
            try {
                boolean tryLock = this.keyLocker.tryLock(flow.getCode(), 300000L, TimeUnit.MILLISECONDS);
                atomicBoolean.set(tryLock);
                if (!tryLock) {
                    throw new FlowExecuteException(String.format("wait execute lock time out！ flow : %s", flow.getCode()));
                }
            } catch (Throwable th) {
                this.logger.error("Get lock failed!", th);
            }
        }
        ThreadPoolManager.getInstance().addExecuteTask(() -> {
            Optional<AbstractNode> nodeById;
            FlowExecuteException flowExecuteException;
            StopWatch stopWatch = new StopWatch();
            stopWatch.start();
            builder.flowCode(flow.getCode()).flowInstanceId(flowInstanceLog.getInstanceId()).deleteFlag("1").status(FlowStatus.SUCCESS.code());
            if (z) {
                this.flowContextHolder.createFromSnapshot(deserializeSnapshot(flowInstanceLog.getSnapshot(), SnapshotFormat.fromCode(flowInstanceLog.getSnapshotFormat())));
                nodeById = flowInstanceLog.getRollback().booleanValue() ? getNodeById(flowInstanceLog.getFlowCode(), flowInstanceLog.getRollbackNodeId()) : getNodeById(flowInstanceLog.getFlowCode(), flowInstanceLog.getErrorNodeId());
            } else {
                this.flowContextHolder.createFromSnapshot(deserializeSnapshot(flowInstanceLog.getStartSnapshot(), SnapshotFormat.fromCode(flowInstanceLog.getStartSnapshotFormat())));
                nodeById = getFirstNode(flowInstanceLog.getFlowCode());
            }
            try {
                try {
                    if (nodeById.get().isStartTransaction()) {
                        this.flowTransactionManager.createNewTransaction(TRANSACTION_TIME_OUT, "");
                        builder.rollbackNodeId(nodeById.get().getNodeId());
                    }
                    Optional<AbstractNode> execute = execute(flow.getCode(), nodeById.get());
                    while (execute.isPresent()) {
                        if (execute.get().isStartTransaction() && !this.flowTransactionManager.getCurrent().isPresent()) {
                            builder.rollbackNodeId(execute.get().getNodeId());
                            this.flowTransactionManager.createNewTransaction(TRANSACTION_TIME_OUT, "");
                        }
                        Optional<AbstractNode> execute2 = execute(flow.getCode(), execute.get());
                        if (execute.get().isCommitTransaction()) {
                            commitTransaction();
                        }
                        execute = execute2;
                    }
                    commitTransaction();
                    this.flowContextHolder.clear();
                    if (atomicBoolean.get()) {
                        this.keyLocker.unLock(flow.getCode());
                    }
                    try {
                        if (Optional.ofNullable(this.flowLogRepository).isPresent()) {
                            this.flowLogRepository.saveInstanceReplay(flowInstanceLog.getInstanceId(), builder.build());
                        }
                    } catch (Throwable th2) {
                        this.logger.error("Save instance replay log failed!", th2);
                    }
                    stopWatch.stop();
                    this.logger.info("Flow replay finished! time-consuming : {} ms", Long.valueOf(stopWatch.getTime(TimeUnit.MILLISECONDS)));
                } finally {
                }
            } catch (Throwable th3) {
                this.flowContextHolder.clear();
                if (atomicBoolean.get()) {
                    this.keyLocker.unLock(flow.getCode());
                }
                try {
                    if (Optional.ofNullable(this.flowLogRepository).isPresent()) {
                        this.flowLogRepository.saveInstanceReplay(flowInstanceLog.getInstanceId(), builder.build());
                    }
                } catch (Throwable th4) {
                    this.logger.error("Save instance replay log failed!", th4);
                }
                stopWatch.stop();
                this.logger.info("Flow replay finished! time-consuming : {} ms", Long.valueOf(stopWatch.getTime(TimeUnit.MILLISECONDS)));
                throw th3;
            }
        });
    }

    private FlowContextSnapshot deserializeSnapshot(String str, SnapshotFormat snapshotFormat) {
        return (FlowContextSnapshot) JsonUtils.json2Object(str, FlowContextSnapshot.class);
    }

    private Optional<AbstractNode> getNodeById(String str, String str2) {
        return FlowBus.getFlow(str, FlowType.INTEGRATION).getNodes().stream().filter(abstractNode -> {
            return abstractNode.getNodeId().equals(str2);
        }).findAny();
    }
}
