/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.extensions.cdc.status.impl;

import com.xforceplus.ultraman.extensions.cdc.status.domain.CurrentStatus;
import com.xforceplus.ultraman.extensions.cdc.status.domain.ProfiledEntityClassStatus;
import com.xforceplus.ultraman.extensions.cdc.status.impl.StatusServiceSupport;
import com.xforceplus.ultraman.metadata.cdc.OqsEngineEntity;
import com.xforceplus.ultraman.metadata.values.DateTimeValue;
import com.xforceplus.ultraman.sdk.infra.base.id.LongIdGenerator;
import io.vavr.Tuple;
import io.vavr.Tuple2;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.ParameterizedPreparedStatementSetter;
import org.springframework.jdbc.core.RowMapper;

public class DBStatusServiceImpl
extends StatusServiceSupport {
    private static final Logger log = LoggerFactory.getLogger(DBStatusServiceImpl.class);
    private static String CREATE_SQL = "INSERT INTO CDC_STATUS(ID, PROFILE, ENTITYCLASS_ID, UPDATE_TIME) VALUES (?,?,?,?)";
    private static String CLEAN_SQL = "DELETE FROM CDC_STATUS WHERE UPDATE_TIME < ? ";
    private static String CLEAN_SQL_FOR_SPECIAL = "DELETE FROM CDC_STATUS WHERE UPDATE_TIME < ? AND ENTITYCLASS_ID = ? AND PROFILE = ?";
    private static String SELECT_SQL = "SELECT PROFILE, ENTITYCLASS_ID, MAX(UPDATE_TIME) AS UPDATE_TIME FROM CDC_STATUS GROUP BY PROFILE, ENTITYCLASS_ID";
    private int WINDOW = 30;
    private Map<Tuple2<Long, String>, List<Tuple2<Long, CountDownLatch>>> lockMap = new ConcurrentHashMap<Tuple2<Long, String>, List<Tuple2<Long, CountDownLatch>>>();
    @Autowired
    private JdbcTemplate mainJdbcTemplate;
    @Autowired
    private LongIdGenerator longIdGenerator;
    private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
    private ScheduledExecutorService scheduledExecutorServiceNotifier = Executors.newScheduledThreadPool(1);

    public DBStatusServiceImpl(ExecutorService executorService) {
        super(executorService);
    }

    public void startNotify() {
        this.scheduledExecutorServiceNotifier.scheduleAtFixedRate(() -> {
            try {
                CurrentStatus overview = this.getOverview();
                Map<Tuple2<Long, String>, ProfiledEntityClassStatus> overviewMapping = overview.getOverview();
                this.lockMap.forEach((k, v) -> {
                    Tuple2 target = Tuple.of((Object)k._1, (Object)"");
                    ProfiledEntityClassStatus classStatus = (ProfiledEntityClassStatus)overviewMapping.get(target);
                    if (classStatus != null) {
                        long timestamp = classStatus.getTimestamp();
                        v.stream().filter(x -> (Long)x._1 < timestamp).forEach(x -> ((CountDownLatch)x._2()).countDown());
                    } else {
                        v.forEach(x -> ((CountDownLatch)x._2()).countDown());
                    }
                });
            }
            catch (Throwable throwable) {
                log.error("", throwable);
            }
        }, 100L, 1L, TimeUnit.SECONDS);
    }

    @Override
    protected void save(Map<Tuple2<String, Long>, ?> grouped, long timestamp) {
        try {
            Set<Tuple2<String, Long>> entityRefs = grouped.keySet();
            this.mainJdbcTemplate.batchUpdate(CREATE_SQL, entityRefs, entityRefs.size(), (ps, argument) -> {
                ps.setLong(1, (Long)this.longIdGenerator.next());
                ps.setString(2, "");
                ps.setLong(3, (Long)argument._2);
                ps.setLong(4, timestamp);
            });
        }
        catch (Throwable throwable) {
            log.error("", throwable);
        }
    }

    @Override
    protected void clear(List<OqsEngineEntity> oqsEngineEntity, final long timestamp) {
        try {
            Map<Tuple2, List<OqsEngineEntity>> group = oqsEngineEntity.stream().collect(Collectors.groupingBy(x -> Tuple.of((Object)x.getEntityClassRef().getProfile(), (Object)x.getEntityClassRef().getId())));
            Set<Tuple2> entityRefs = group.keySet();
            this.mainJdbcTemplate.batchUpdate(CLEAN_SQL_FOR_SPECIAL, entityRefs, entityRefs.size(), (ParameterizedPreparedStatementSetter)new ParameterizedPreparedStatementSetter<Tuple2<String, Long>>(){

                public void setValues(PreparedStatement ps, Tuple2<String, Long> argument) throws SQLException {
                    ps.setLong(1, timestamp);
                    ps.setLong(2, (Long)argument._2);
                    ps.setString(3, "");
                }
            });
        }
        catch (Throwable throwable) {
            log.error("", throwable);
        }
    }

    @Override
    public void clearStatus(long timestamp) {
    }

    @Override
    public CurrentStatus getOverview() {
        try {
            List query = this.mainJdbcTemplate.query(SELECT_SQL, (RowMapper)new RowMapper<ProfiledEntityClassStatus>(){

                public ProfiledEntityClassStatus mapRow(ResultSet rs, int rowNum) throws SQLException {
                    ProfiledEntityClassStatus statusItem = new ProfiledEntityClassStatus();
                    String profile = rs.getString("PROFILE");
                    long entityClassId = rs.getLong("ENTITYCLASS_ID");
                    long timestamp = rs.getLong("UPDATE_TIME");
                    statusItem.setProfile(profile);
                    statusItem.setEntityClassId(entityClassId);
                    statusItem.setTimestamp(timestamp);
                    return statusItem;
                }
            });
            CurrentStatus currentStatus = new CurrentStatus();
            Map<Tuple2<Long, String>, ProfiledEntityClassStatus> mapped = query.stream().collect(Collectors.toMap(x -> Tuple.of((Object)x.getEntityClassId(), (Object)x.getProfile()), y -> y, (a, b) -> a));
            currentStatus.setOverview(mapped);
            return currentStatus;
        }
        catch (Throwable throwable) {
            log.error("", throwable);
            return null;
        }
    }

    @Override
    public void query(List<Tuple2<Long, String>> profiledEntityClass, long requestTimestamp) {
        CurrentStatus overview = this.getOverview();
        ArrayList<CountDownLatch> countDownLatches = new ArrayList<CountDownLatch>();
        for (Tuple2<Long, String> entityClass : profiledEntityClass) {
            long timestamp;
            Tuple2 target;
            Map<Tuple2<Long, String>, ProfiledEntityClassStatus> overviewMapping = overview.getOverview();
            ProfiledEntityClassStatus classStatus = overviewMapping.get(target = Tuple.of((Object)entityClass._1, (Object)""));
            if (classStatus == null || (timestamp = classStatus.getTimestamp()) >= requestTimestamp) continue;
            CountDownLatch countDownLatch = new CountDownLatch(1);
            countDownLatches.add(countDownLatch);
            this.lockMap.compute(entityClass, (k, v) -> {
                if (v == null) {
                    v = new ArrayList<Tuple2>();
                }
                v.add(Tuple.of((Object)requestTimestamp, (Object)countDownLatch));
                return v;
            });
        }
        for (CountDownLatch countDownLatch : countDownLatches) {
            try {
                countDownLatch.await();
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void startClean() {
        this.scheduledExecutorService.scheduleAtFixedRate(this::cleanTimeout, 100L, this.WINDOW, TimeUnit.SECONDS);
    }

    public void shutDown() {
    }

    private void cleanTimeout() {
        try {
            LocalDateTime minus = LocalDateTime.now().minus(this.WINDOW, ChronoUnit.SECONDS);
            Instant instant = minus.atZone(DateTimeValue.ZONE_ID).toInstant();
            long currentTime = instant.toEpochMilli();
            this.mainJdbcTemplate.update(CLEAN_SQL, new Object[]{currentTime});
        }
        catch (Throwable throwable) {
            log.error("", throwable);
        }
    }
}

