/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.extensions.cdc.status.impl;

import com.xforceplus.ultraman.extensions.cdc.status.domain.CurrentStatus;
import com.xforceplus.ultraman.extensions.cdc.status.domain.ProfiledEntityClassStatus;
import com.xforceplus.ultraman.extensions.cdc.status.impl.StatusServiceSupport;
import com.xforceplus.ultraman.metadata.cdc.OqsEngineEntity;
import com.xforceplus.ultraman.metadata.values.DateTimeValue;
import com.xforceplus.ultraman.sdk.infra.base.id.LongIdGenerator;
import com.xforceplus.ultraman.sdk.infra.logging.LoggingPattern;
import com.xforceplus.ultraman.sdk.infra.logging.LoggingUtils;
import io.vavr.Tuple;
import io.vavr.Tuple2;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.ParameterizedPreparedStatementSetter;
import org.springframework.jdbc.core.RowMapper;

public class DBStatusServiceImpl
extends StatusServiceSupport {
    private static final Logger log = LoggerFactory.getLogger(DBStatusServiceImpl.class);
    private static final String CREATE_SQL = "INSERT INTO CDC_STATUS(ID, PROFILE, ENTITYCLASS_ID, UPDATE_TIME) VALUES (?,?,?,?)";
    private static final String CLEAN_SQL = "DELETE FROM CDC_STATUS WHERE UPDATE_TIME <= ? ";
    private static final String CLEAN_SQL_FOR_SPECIAL = "DELETE FROM CDC_STATUS WHERE UPDATE_TIME <= ? AND ENTITYCLASS_ID = ? AND PROFILE = ?";
    private static final String SELECT_SQL = "SELECT PROFILE, ENTITYCLASS_ID, MIN(UPDATE_TIME) AS UPDATE_TIME FROM CDC_STATUS GROUP BY PROFILE, ENTITYCLASS_ID";
    private final Map<Tuple2<Long, String>, List<Tuple2<Long, CountDownLatch>>> lockMap = new ConcurrentHashMap<Tuple2<Long, String>, List<Tuple2<Long, CountDownLatch>>>();
    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
    private final ScheduledExecutorService scheduledExecutorServiceNotifier = Executors.newScheduledThreadPool(1);
    private int WINDOW = 30;
    private boolean enableTimeout = false;
    private JdbcTemplate jdbcTemplate;
    @Autowired
    private LongIdGenerator longIdGenerator;

    public DBStatusServiceImpl(ExecutorService executorService, DataSource dataSource, int window, boolean enableTimeout) {
        super(executorService);
        this.jdbcTemplate = new JdbcTemplate(dataSource);
        this.WINDOW = window;
        this.enableTimeout = enableTimeout;
    }

    public void setMainJdbcTemplate(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    public void setLongIdGenerator(LongIdGenerator longIdGenerator) {
        this.longIdGenerator = longIdGenerator;
    }

    public void startNotify() {
        this.scheduledExecutorServiceNotifier.scheduleAtFixedRate(() -> {
            try {
                CurrentStatus overview = this.getOverview();
                Map<Tuple2<Long, String>, ProfiledEntityClassStatus> overviewMapping = overview.getOverview();
                this.lockMap.forEach((k, v) -> {
                    Tuple2 target = Tuple.of((Object)k._1, (Object)"");
                    ProfiledEntityClassStatus classStatus = (ProfiledEntityClassStatus)overviewMapping.get(target);
                    if (classStatus != null) {
                        long timestamp = classStatus.getTimestamp();
                        v.stream().filter(x -> (Long)x._1 < timestamp).forEach(x -> ((CountDownLatch)x._2()).countDown());
                    } else {
                        v.forEach(x -> ((CountDownLatch)x._2()).countDown());
                    }
                });
            }
            catch (Throwable throwable) {
                LoggingUtils.logErrorPattern((Logger)log, (LoggingPattern)LoggingPattern.UNKNOWN_ERROR, (String)"StatusServiceSync", (Throwable)throwable);
            }
        }, 20L, 1L, TimeUnit.SECONDS);
    }

    @Override
    protected void save(Map<Tuple2<String, Long>, ?> grouped, long timestamp) {
        try {
            Set<Tuple2<String, Long>> entityRefs = grouped.keySet();
            this.jdbcTemplate.batchUpdate(CREATE_SQL, entityRefs, entityRefs.size(), (ps, argument) -> {
                ps.setLong(1, (Long)this.longIdGenerator.next());
                log.debug("Save Status at {}, {}", (Object)timestamp, argument._2);
                ps.setString(2, "");
                ps.setLong(3, (Long)argument._2);
                ps.setLong(4, timestamp);
            });
        }
        catch (Throwable throwable) {
            LoggingUtils.logErrorPattern((Logger)log, (LoggingPattern)LoggingPattern.UNKNOWN_ERROR, (String)"SaveStatus", (Throwable)throwable);
        }
    }

    @Override
    protected void clear(Collection<OqsEngineEntity> oqsEngineEntity) {
        try {
            final Map<Tuple2, List<OqsEngineEntity>> group = oqsEngineEntity.stream().collect(Collectors.groupingBy(x -> Tuple.of((Object)x.getEntityClassRef().getProfile(), (Object)x.getEntityClassRef().getId())));
            Set<Tuple2> entityRefs = group.keySet();
            Instant instant = LocalDateTime.now().atZone(DateTimeValue.ZONE_ID).toInstant();
            final long currentTime = instant.toEpochMilli();
            this.jdbcTemplate.batchUpdate(CLEAN_SQL_FOR_SPECIAL, entityRefs, entityRefs.size(), (ParameterizedPreparedStatementSetter)new ParameterizedPreparedStatementSetter<Tuple2<String, Long>>(){

                public void setValues(PreparedStatement ps, Tuple2<String, Long> argument) throws SQLException {
                    List oqsEngineEntities = (List)group.get(argument);
                    long lastUpdateTime = this.findLastUpdateTime(oqsEngineEntities);
                    log.debug("Clear Status at {} {}", (Object)lastUpdateTime, argument._2);
                    ps.setLong(1, lastUpdateTime);
                    ps.setLong(2, (Long)argument._2);
                    ps.setString(3, "");
                }

                private long findLastUpdateTime(List<OqsEngineEntity> oqsEngineEntities) {
                    OptionalLong max = oqsEngineEntities.stream().mapToLong(x -> x.getUpdateTime()).max();
                    return max.orElseGet(() -> {
                        log.error("Cannot find UPDATE_TIME in RECORD");
                        return currentTime;
                    });
                }
            });
        }
        catch (Throwable throwable) {
            LoggingUtils.logErrorPattern((Logger)log, (LoggingPattern)LoggingPattern.UNKNOWN_ERROR, (String)"ClearStatus", (Throwable)throwable);
        }
    }

    @Override
    public void clearStatus(long timestamp) {
    }

    @Override
    public CurrentStatus getOverview() {
        try {
            List query = this.jdbcTemplate.query(SELECT_SQL, (RowMapper)new RowMapper<ProfiledEntityClassStatus>(){

                public ProfiledEntityClassStatus mapRow(ResultSet rs, int rowNum) throws SQLException {
                    ProfiledEntityClassStatus statusItem = new ProfiledEntityClassStatus();
                    String profile = rs.getString("PROFILE");
                    long entityClassId = rs.getLong("ENTITYCLASS_ID");
                    long timestamp = rs.getLong("UPDATE_TIME");
                    statusItem.setProfile(profile);
                    statusItem.setEntityClassId(entityClassId);
                    statusItem.setTimestamp(timestamp);
                    return statusItem;
                }
            });
            CurrentStatus currentStatus = new CurrentStatus();
            Map<Tuple2<Long, String>, ProfiledEntityClassStatus> mapped = query.stream().collect(Collectors.toMap(x -> Tuple.of((Object)x.getEntityClassId(), (Object)x.getProfile()), y -> y, (a, b) -> a));
            currentStatus.setOverview(mapped);
            return currentStatus;
        }
        catch (Throwable throwable) {
            LoggingUtils.logErrorPattern((Logger)log, (LoggingPattern)LoggingPattern.UNKNOWN_ERROR, (String)"GetOverview", (Throwable)throwable);
            return null;
        }
    }

    @Override
    public void query(List<Tuple2<Long, String>> profiledEntityClass, long requestTimestamp) {
        log.debug("Query Status at {}", (Object)requestTimestamp);
        CurrentStatus overview = this.getOverview();
        log.debug("Current Pending List {}", (Object)overview);
        ArrayList<CountDownLatch> countDownLatches = new ArrayList<CountDownLatch>();
        for (Tuple2<Long, String> entityClass : profiledEntityClass) {
            long timestamp;
            Tuple2 target;
            Map<Tuple2<Long, String>, ProfiledEntityClassStatus> overviewMapping = overview.getOverview();
            ProfiledEntityClassStatus classStatus = overviewMapping.get(target = Tuple.of((Object)entityClass._1, (Object)""));
            if (classStatus == null || (timestamp = classStatus.getTimestamp()) > requestTimestamp) continue;
            CountDownLatch countDownLatch = new CountDownLatch(1);
            countDownLatches.add(countDownLatch);
            this.lockMap.compute(entityClass, (k, v) -> {
                if (v == null) {
                    v = new ArrayList<Tuple2>();
                }
                v.add(Tuple.of((Object)timestamp, (Object)countDownLatch));
                return v;
            });
        }
        for (CountDownLatch countDownLatch : countDownLatches) {
            try {
                if (this.enableTimeout) {
                    countDownLatch.await(this.WINDOW - 1, TimeUnit.SECONDS);
                    continue;
                }
                countDownLatch.await();
            }
            catch (InterruptedException e) {
                LoggingUtils.logErrorPattern((Logger)log, (LoggingPattern)LoggingPattern.UNKNOWN_ERROR, (String)"QueryInterceptor", (Throwable)e);
            }
        }
    }

    @Override
    public void onEmpty() {
    }

    public void startClean() {
        this.scheduledExecutorService.scheduleAtFixedRate(this::cleanTimeout, 100L, this.WINDOW, TimeUnit.SECONDS);
    }

    public void shutDown() {
    }

    private void cleanTimeout() {
        try {
            LocalDateTime minus = LocalDateTime.now().minusSeconds(this.WINDOW);
            Instant instant = minus.atZone(DateTimeValue.ZONE_ID).toInstant();
            long currentTime = instant.toEpochMilli();
            this.jdbcTemplate.update(CLEAN_SQL, new Object[]{currentTime});
            this.lockMap.values().removeIf(x -> {
                x.removeIf(y -> ((CountDownLatch)y._2).getCount() == 0L);
                return x.size() == 0;
            });
        }
        catch (Throwable throwable) {
            LoggingUtils.logErrorPattern((Logger)log, (LoggingPattern)LoggingPattern.UNKNOWN_ERROR, (String)"cleanTimeout", (Throwable)throwable);
        }
    }
}

