/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.extensions.cdc.status.impl;

import com.xforceplus.ultraman.extensions.cdc.status.domain.CurrentStatus;
import com.xforceplus.ultraman.extensions.cdc.status.domain.ProfiledEntityClassStatus;
import com.xforceplus.ultraman.extensions.cdc.status.impl.StatusServiceSupport;
import com.xforceplus.ultraman.metadata.cdc.OqsEngineEntity;
import com.xforceplus.ultraman.metadata.values.DateTimeValue;
import com.xforceplus.ultraman.sdk.infra.base.id.LongIdGenerator;
import com.xforceplus.ultraman.sdk.infra.logging.LoggingPattern;
import com.xforceplus.ultraman.sdk.infra.logging.LoggingUtils;
import io.vavr.Tuple2;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;

public class SimpleDBStatusServiceImpl
extends StatusServiceSupport {
    private static final Logger log = LoggerFactory.getLogger(SimpleDBStatusServiceImpl.class);
    private static final String CREATE_SQL = "INSERT INTO CDC_STATUS(ID, UPDATE_TIME) VALUES (?,?)";
    private static final String CLEAN_ALL_SQL = "TRUNCATE TABLE CDC_STATUS";
    private static final String CLEAN_SQL_BY_UPDATE_TIME = "DELETE FROM CDC_STATUS WHERE UPDATE_TIME <= ? ";
    private static final String SELECT_SQL = "SELECT MAX(UPDATE_TIME) AS UPDATE_TIME FROM CDC_STATUS ";
    private final Map<Long, List<CountDownLatch>> lockMap = new ConcurrentHashMap<Long, List<CountDownLatch>>();
    private int WINDOW = 30;
    private boolean enableTimeout = false;
    private boolean enableException = false;
    private JdbcTemplate jdbcTemplate;
    @Autowired
    private LongIdGenerator longIdGenerator;
    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
    private final ScheduledExecutorService scheduledExecutorServiceNotifier = Executors.newScheduledThreadPool(1);

    public SimpleDBStatusServiceImpl(ExecutorService executorService, DataSource dataSource, int window, boolean enableTimeout, boolean enableException) {
        super(executorService);
        this.jdbcTemplate = new JdbcTemplate(dataSource);
        this.WINDOW = window;
        this.enableTimeout = enableTimeout;
        this.enableException = enableException;
    }

    @Override
    public void clearStatus(long timestamp) {
    }

    @Override
    public CurrentStatus getOverview() {
        try {
            ProfiledEntityClassStatus query = (ProfiledEntityClassStatus)this.jdbcTemplate.queryForObject(SELECT_SQL, (rs, rowNum) -> {
                ProfiledEntityClassStatus statusItem = new ProfiledEntityClassStatus();
                long timestamp = rs.getLong("UPDATE_TIME");
                statusItem.setProfile(null);
                statusItem.setEntityClassId(0L);
                statusItem.setTimestamp(timestamp);
                return statusItem;
            });
            CurrentStatus currentStatus = new CurrentStatus();
            if (query != null) {
                currentStatus.setTimestamp(query.getTimestamp());
            } else {
                currentStatus.setTimestamp(System.currentTimeMillis());
            }
            return currentStatus;
        }
        catch (Throwable throwable) {
            LoggingUtils.logErrorPattern((Logger)log, (LoggingPattern)LoggingPattern.UNKNOWN_ERROR, (String)"GetOverview", (Throwable)throwable);
            return null;
        }
    }

    @Override
    public void query(List<Tuple2<Long, String>> profiledEntityClass, long requestTimestamp) {
        log.debug("Query Status at {}", (Object)requestTimestamp);
        CurrentStatus overview = this.getOverview();
        if (overview != null) {
            log.debug("Current Sync time {}", (Object)overview.getTimestamp());
            ArrayList<CountDownLatch> countDownLatches = new ArrayList<CountDownLatch>();
            if (overview.getTimestamp() == 0L) {
                return;
            }
            if (requestTimestamp >= overview.getTimestamp()) {
                CountDownLatch countDownLatch = new CountDownLatch(1);
                countDownLatches.add(countDownLatch);
                this.lockMap.compute(requestTimestamp, (k, v) -> {
                    if (v == null) {
                        v = new ArrayList<CountDownLatch>();
                    }
                    v.add(countDownLatch);
                    return v;
                });
            }
            for (CountDownLatch countDownLatch : countDownLatches) {
                try {
                    countDownLatch.await(this.WINDOW - 1, TimeUnit.SECONDS);
                }
                catch (InterruptedException e) {
                    LoggingUtils.logErrorPattern((Logger)log, (LoggingPattern)LoggingPattern.UNKNOWN_ERROR, (String)"Query", (Throwable)e);
                    if (!this.enableException) continue;
                    throw new RuntimeException("Wait-For Timeout For " + profiledEntityClass.stream().map(Object::toString).collect(Collectors.joining(",")), e);
                }
            }
        }
    }

    @Override
    public void onEmpty() {
        this.jdbcTemplate.execute(CLEAN_ALL_SQL);
        this.lockMap.values().stream().flatMap(x -> x.stream()).forEach(x -> x.countDown());
        this.lockMap.clear();
    }

    @Override
    protected void save(Map<Tuple2<String, Long>, ?> grouped, long timestamp) {
    }

    @Override
    protected void clear(Collection<OqsEngineEntity> oqsEngineEntity) {
        try {
            OptionalLong maxTime = oqsEngineEntity.stream().mapToLong(x -> x.getUpdateTime()).max();
            this.jdbcTemplate.update(CREATE_SQL, ps -> {
                ps.setLong(1, (Long)this.longIdGenerator.next());
                ps.setLong(2, maxTime.orElse(System.currentTimeMillis()));
            });
        }
        catch (Throwable throwable) {
            LoggingUtils.logErrorPattern((Logger)log, (LoggingPattern)LoggingPattern.UNKNOWN_ERROR, (String)"Clear", (Throwable)throwable);
        }
    }

    private void cleanTimeout() {
        try {
            LocalDateTime minus = LocalDateTime.now().minusSeconds(this.WINDOW);
            Instant instant = minus.atZone(DateTimeValue.ZONE_ID).toInstant();
            long currentTime = instant.toEpochMilli();
            this.jdbcTemplate.update(CLEAN_SQL_BY_UPDATE_TIME, new Object[]{currentTime});
            this.lockMap.values().removeIf(x -> {
                x.removeIf(y -> y.getCount() == 0L);
                return x.isEmpty();
            });
        }
        catch (Throwable throwable) {
            LoggingUtils.logErrorPattern((Logger)log, (LoggingPattern)LoggingPattern.UNKNOWN_ERROR, (String)"CleanTimeout", (Throwable)throwable);
        }
    }

    public void startClean() {
        this.scheduledExecutorService.scheduleAtFixedRate(this::cleanTimeout, 100L, this.WINDOW, TimeUnit.SECONDS);
    }

    public void startNotify() {
        this.scheduledExecutorServiceNotifier.scheduleAtFixedRate(() -> {
            try {
                CurrentStatus overview = this.getOverview();
                long newSyncTime = overview.getTimestamp();
                if (newSyncTime == 0L) {
                    this.lockMap.values().stream().flatMap(Collection::stream).forEach(CountDownLatch::countDown);
                    return;
                }
                this.lockMap.forEach((k, v) -> {
                    if (k <= newSyncTime) {
                        v.forEach(CountDownLatch::countDown);
                    }
                });
            }
            catch (Throwable throwable) {
                LoggingUtils.logErrorPattern((Logger)log, (LoggingPattern)LoggingPattern.UNKNOWN_ERROR, (String)"StarSync", (Throwable)throwable);
            }
        }, 20L, 1L, TimeUnit.SECONDS);
    }
}

