package com.xforceplus.ultraman.flows.automaticflow.shedule;

import com.github.kagkarlsson.scheduler.Scheduler;
import com.github.kagkarlsson.scheduler.serializer.JacksonSerializer;
import com.github.kagkarlsson.scheduler.task.Task;
import com.github.kagkarlsson.scheduler.task.helper.Tasks;
import com.github.kagkarlsson.scheduler.task.schedule.Schedules;
import com.google.common.collect.Maps;
import com.google.common.eventbus.Subscribe;
import com.xforceplus.ultraman.flows.automaticflow.executor.FlowExecutor;
import com.xforceplus.ultraman.flows.common.config.setting.FlowBus;
import com.xforceplus.ultraman.flows.common.config.setting.IntegrationFlow;
import com.xforceplus.ultraman.flows.common.constant.NodeType;
import com.xforceplus.ultraman.flows.common.core.EventBusCenter;
import com.xforceplus.ultraman.flows.common.core.FlowBusCompletedEvent;
import com.xforceplus.ultraman.flows.common.exception.FlowParseException;
import com.xforceplus.ultraman.flows.common.pojo.flow.node.basic.AbstractNode;
import com.xforceplus.ultraman.flows.common.pojo.flow.node.basic.ScheduleTriggerNode;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TimeZone;
import java.util.concurrent.ScheduledFuture;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.support.CronTrigger;

/* loaded from: input_file:com/xforceplus/ultraman/flows/automaticflow/shedule/ScheduleManager.class */
public class ScheduleManager implements InitializingBean {
    private DataSource dataSource;
    private FlowExecutor flowExecutor;
    private TaskScheduler taskScheduler;
    private Map<String, ScheduledFuture> scheduledFutureMap = Maps.newConcurrentMap();
    private volatile Scheduler scheduler;
    private static final String CHECK_JOB_TABLE = "SELECT count(*) as num FROM information_schema.TABLES WHERE table_name ='scheduled_tasks';";
    private static final String CREATE_JOB_TABLE = "create table scheduled_tasks (\n  task_name varchar(40) not null,\n  task_instance varchar(40) not null,\n  task_data blob,\n  execution_time timestamp(6) not null,\n  picked BOOLEAN not null,\n  picked_by varchar(50),\n  last_success timestamp(6) null,\n  last_failure timestamp(6) null,\n  consecutive_failures INT,\n  last_heartbeat timestamp(6) null,\n  version BIGINT not null,\n  PRIMARY KEY (task_name, task_instance)\n);";
    private static final Integer MIN_POLL_INTERVAL = 5;
    private static final Logger logger = LoggerFactory.getLogger(ScheduleManager.class);

    public ScheduleManager(DataSource dataSource, FlowExecutor flowExecutor, TaskScheduler taskScheduler) {
        this.dataSource = dataSource;
        this.flowExecutor = flowExecutor;
        this.taskScheduler = taskScheduler;
    }

    private boolean jobTableExists() {
        try {
            Connection connection = this.dataSource.getConnection();
            try {
                ResultSet executeQuery = connection.createStatement().executeQuery(CHECK_JOB_TABLE);
                if (!executeQuery.next()) {
                    if (connection != null) {
                        connection.close();
                    }
                    return true;
                }
                boolean z = executeQuery.getInt("num") != 0;
                if (connection != null) {
                    connection.close();
                }
                return z;
            } catch (Throwable th) {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (SQLException e) {
            logger.error("Init job conig failed!", e);
            return true;
        }
    }

    private void createJobTable() {
        try {
            Connection connection = this.dataSource.getConnection();
            try {
                connection.createStatement().execute(CREATE_JOB_TABLE);
                if (connection != null) {
                    connection.close();
                }
            } finally {
            }
        } catch (SQLException e) {
            logger.error("Create job table failed", e);
        }
    }

    @Subscribe
    public void init(FlowBusCompletedEvent flowBusCompletedEvent) {
        if (FlowBus.getSheduleTriggerFlow().isEmpty()) {
            return;
        }
        if (!this.scheduledFutureMap.isEmpty()) {
            this.scheduledFutureMap.forEach((str, scheduledFuture) -> {
                scheduledFuture.cancel(true);
                if (scheduledFuture.isDone()) {
                    logger.info("Cancel the flow {}", str);
                }
            });
        }
        if (!Optional.ofNullable(this.dataSource).isPresent()) {
            FlowBus.getSheduleTriggerFlow().forEach(abstractFlow -> {
                this.scheduledFutureMap.put(abstractFlow.getCode(), this.taskScheduler.schedule(() -> {
                    try {
                        this.flowExecutor.execute(abstractFlow.getCode());
                    } catch (Exception e) {
                        logger.error("Schedule failed!", e);
                    }
                }, new CronTrigger(findTriggerNode((IntegrationFlow) abstractFlow).get().getCron(), TimeZone.getTimeZone(TimeZone.getDefault().getID()))));
            });
            return;
        }
        if (!jobTableExists()) {
            createJobTable();
        }
        List list = (List) FlowBus.getSheduleTriggerFlow().stream().map(abstractFlow2 -> {
            ScheduleTriggerNode orElseThrow = findTriggerNode((IntegrationFlow) abstractFlow2).orElseThrow(() -> {
                return new FlowParseException("Can not find any schedule trigger node!");
            });
            return Tasks.recurring(orElseThrow.getNodeId(), Schedules.cron(orElseThrow.getCron()), IntegrationFlow.class).initialData((IntegrationFlow) abstractFlow2).execute((taskInstance, executionContext) -> {
                logger.info("flow schedule! flowCode : {}", ((IntegrationFlow) taskInstance.getData()).getCode());
                this.flowExecutor.execute(abstractFlow2.getCode());
            });
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            return;
        }
        if (this.scheduler != null) {
            try {
                this.scheduler.stop();
            } catch (Exception e) {
                logger.error("Stop scheduler failed!", e);
                return;
            }
        }
        this.scheduler = Scheduler.create(this.dataSource, new Task[0]).startTasks(list).serializer(new JacksonSerializer()).threads(20).pollingInterval(Duration.ofSeconds(MIN_POLL_INTERVAL.intValue())).registerShutdownHook().build();
        this.scheduler.start();
    }

    private Optional<AbstractNode> findTriggerNode(IntegrationFlow integrationFlow) {
        Optional<AbstractNode> findAny = integrationFlow.getNodes().stream().filter(abstractNode -> {
            return abstractNode.getNodeType().equals(NodeType.SHEDULE_TRIGGER);
        }).findAny();
        if (findAny.isPresent()) {
            return findAny;
        }
        throw new FlowParseException("Can not find any schedule trigger node!");
    }

    public void afterPropertiesSet() throws Exception {
        EventBusCenter.getInstance().register(this);
    }
}
