package com.xforceplus.ultraman.flows.automaticflow.shedule;

import com.github.kagkarlsson.scheduler.Scheduler;
import com.github.kagkarlsson.scheduler.serializer.JacksonSerializer;
import com.github.kagkarlsson.scheduler.task.Task;
import com.github.kagkarlsson.scheduler.task.helper.Tasks;
import com.github.kagkarlsson.scheduler.task.schedule.Schedules;
import com.google.common.collect.Maps;
import com.google.common.eventbus.Subscribe;
import com.xforceplus.ultraman.flows.automaticflow.executor.FlowExecutor;
import com.xforceplus.ultraman.flows.common.config.setting.FlowBus;
import com.xforceplus.ultraman.flows.common.config.setting.IntegrationFlow;
import com.xforceplus.ultraman.flows.common.constant.NodeType;
import com.xforceplus.ultraman.flows.common.core.EventBusCenter;
import com.xforceplus.ultraman.flows.common.core.FlowBusCompletedEvent;
import com.xforceplus.ultraman.flows.common.exception.FlowParseException;
import com.xforceplus.ultraman.flows.common.pojo.flow.node.basic.ScheduleTriggerNode;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TimeZone;
import java.util.concurrent.ScheduledFuture;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.support.CronTrigger;

/* loaded from: input_file:com/xforceplus/ultraman/flows/automaticflow/shedule/ScheduleManager.class */
public class ScheduleManager implements InitializingBean {

    @Autowired(required = false)
    private DataSource dataSource;

    @Autowired
    private FlowExecutor flowExecutor;

    @Autowired
    private TaskScheduler taskScheduler;
    private static final Logger logger = LoggerFactory.getLogger(ScheduleManager.class);
    Map<String, ScheduledFuture<?>> jobsMap = Maps.newConcurrentMap();

    @Subscribe
    public void init(FlowBusCompletedEvent flowBusCompletedEvent) {
        if (!Optional.ofNullable(this.dataSource).isPresent()) {
            FlowBus.getSheduleTriggerFlow().stream().forEach(abstractFlow -> {
                Optional findAny = ((IntegrationFlow) abstractFlow).getNodes().stream().filter(abstractNode -> {
                    return abstractNode.getNodeType().equals(NodeType.SHEDULE_TRIGGER);
                }).findAny();
                if (!findAny.isPresent()) {
                    throw new FlowParseException("Can not find any schedule trigger node!");
                }
                this.jobsMap.put(abstractFlow.getCode(), this.taskScheduler.schedule(() -> {
                    this.flowExecutor.execute(abstractFlow.getCode());
                }, new CronTrigger(((ScheduleTriggerNode) findAny.get()).getCron(), TimeZone.getTimeZone(TimeZone.getDefault().getID()))));
            });
            return;
        }
        List list = (List) FlowBus.getSheduleTriggerFlow().stream().map(abstractFlow2 -> {
            Optional findAny = ((IntegrationFlow) abstractFlow2).getNodes().stream().filter(abstractNode -> {
                return abstractNode.getNodeType().equals(NodeType.SHEDULE_TRIGGER);
            }).findAny();
            if (!findAny.isPresent()) {
                throw new FlowParseException("Can not find any schedule trigger node!");
            }
            ScheduleTriggerNode scheduleTriggerNode = (ScheduleTriggerNode) findAny.get();
            return Tasks.recurring(scheduleTriggerNode.getNodeId(), Schedules.cron(scheduleTriggerNode.getCron()), IntegrationFlow.class).initialData((IntegrationFlow) abstractFlow2).execute((taskInstance, executionContext) -> {
                logger.info("flow schedule! flowCode : {}", ((IntegrationFlow) taskInstance.getData()).getCode());
                this.flowExecutor.execute(abstractFlow2.getCode());
            });
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            return;
        }
        Scheduler build = Scheduler.create(this.dataSource, new Task[0]).startTasks(list).serializer(new JacksonSerializer()).pollingInterval(Duration.ofSeconds(1L)).build();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            LoggerFactory.getLogger(ScheduleManager.class).info("Received shutdown signal.");
            build.stop();
        }));
        build.start();
    }

    public void afterPropertiesSet() throws Exception {
        EventBusCenter.getInstance().register(this);
    }

    public void removeScheduledTask(String str) {
        ScheduledFuture<?> scheduledFuture = this.jobsMap.get(str);
        if (scheduledFuture != null) {
            scheduledFuture.cancel(true);
            this.jobsMap.put(str, null);
        }
    }
}
