package com.xforceplus.ultraman.extensions.cdc.status.impl;

import com.xforceplus.ultraman.extensions.cdc.status.domain.CurrentStatus;
import com.xforceplus.ultraman.extensions.cdc.status.domain.ProfiledEntityClassStatus;
import com.xforceplus.ultraman.metadata.cdc.OqsEngineEntity;
import com.xforceplus.ultraman.metadata.values.DateTimeValue;
import com.xforceplus.ultraman.sdk.infra.base.id.LongIdGenerator;
import io.vavr.Tuple2;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;

/* loaded from: input_file:com/xforceplus/ultraman/extensions/cdc/status/impl/SimpleDBStatusServiceImpl.class */
public class SimpleDBStatusServiceImpl extends StatusServiceSupport {
    private static final Logger log = LoggerFactory.getLogger(SimpleDBStatusServiceImpl.class);
    private static final String CREATE_SQL = "INSERT INTO CDC_STATUS(ID, UPDATE_TIME) VALUES (?,?)";
    private static final String CLEAN_ALL_SQL = "TRUNCATE TABLE CDC_STATUS";
    private static final String CLEAN_SQL_BY_UPDATE_TIME = "DELETE FROM CDC_STATUS WHERE UPDATE_TIME <= ? ";
    private static final String SELECT_SQL = "SELECT MAX(UPDATE_TIME) AS UPDATE_TIME FROM CDC_STATUS ";
    private final Map<Long, List<CountDownLatch>> lockMap;
    private int WINDOW;
    private boolean enableTimeout;
    private JdbcTemplate jdbcTemplate;

    @Autowired
    private LongIdGenerator longIdGenerator;
    private final ScheduledExecutorService scheduledExecutorService;
    private final ScheduledExecutorService scheduledExecutorServiceNotifier;

    public SimpleDBStatusServiceImpl(ExecutorService executorService, DataSource dataSource, int i, boolean z) {
        super(executorService);
        this.lockMap = new ConcurrentHashMap();
        this.WINDOW = 30;
        this.enableTimeout = false;
        this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
        this.scheduledExecutorServiceNotifier = Executors.newScheduledThreadPool(1);
        this.jdbcTemplate = new JdbcTemplate(dataSource);
        this.WINDOW = i;
        this.enableTimeout = z;
    }

    @Override // com.xforceplus.ultraman.extensions.cdc.status.StatusService
    public void clearStatus(long j) {
    }

    @Override // com.xforceplus.ultraman.extensions.cdc.status.StatusService
    public CurrentStatus getOverview() {
        try {
            ProfiledEntityClassStatus profiledEntityClassStatus = (ProfiledEntityClassStatus) this.jdbcTemplate.queryForObject(SELECT_SQL, (resultSet, i) -> {
                ProfiledEntityClassStatus profiledEntityClassStatus2 = new ProfiledEntityClassStatus();
                long j = resultSet.getLong("UPDATE_TIME");
                profiledEntityClassStatus2.setProfile(null);
                profiledEntityClassStatus2.setEntityClassId(0L);
                profiledEntityClassStatus2.setTimestamp(j);
                return profiledEntityClassStatus2;
            });
            CurrentStatus currentStatus = new CurrentStatus();
            if (profiledEntityClassStatus != null) {
                currentStatus.setTimestamp(Long.valueOf(profiledEntityClassStatus.getTimestamp()));
            } else {
                currentStatus.setTimestamp(Long.valueOf(System.currentTimeMillis()));
            }
            return currentStatus;
        } catch (Throwable th) {
            log.error("", th);
            return null;
        }
    }

    @Override // com.xforceplus.ultraman.extensions.cdc.status.StatusService
    public void query(List<Tuple2<Long, String>> list, long j) {
        log.debug("Query Status at {}", Long.valueOf(j));
        CurrentStatus overview = getOverview();
        log.debug("Current Sync time {}", overview.getTimestamp());
        ArrayList<CountDownLatch> arrayList = new ArrayList();
        if (overview.getTimestamp().longValue() == 0) {
            return;
        }
        if (j >= overview.getTimestamp().longValue()) {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            arrayList.add(countDownLatch);
            this.lockMap.compute(Long.valueOf(j), (l, list2) -> {
                if (list2 == null) {
                    list2 = new ArrayList();
                }
                list2.add(countDownLatch);
                return list2;
            });
        }
        for (CountDownLatch countDownLatch2 : arrayList) {
            try {
                if (this.enableTimeout) {
                    countDownLatch2.await(this.WINDOW - 1, TimeUnit.SECONDS);
                } else {
                    countDownLatch2.await();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @Override // com.xforceplus.ultraman.extensions.cdc.status.StatusService
    public void onEmpty() {
        this.jdbcTemplate.execute(CLEAN_ALL_SQL);
        this.lockMap.values().stream().flatMap(list -> {
            return list.stream();
        }).forEach(countDownLatch -> {
            countDownLatch.countDown();
        });
        this.lockMap.clear();
    }

    @Override // com.xforceplus.ultraman.extensions.cdc.status.impl.StatusServiceSupport
    protected void save(Map<Tuple2<String, Long>, ?> map, long j) {
    }

    @Override // com.xforceplus.ultraman.extensions.cdc.status.impl.StatusServiceSupport
    protected void clear(Collection<OqsEngineEntity> collection) {
        try {
            OptionalLong max = collection.stream().mapToLong(oqsEngineEntity -> {
                return oqsEngineEntity.getUpdateTime();
            }).max();
            this.jdbcTemplate.update(CREATE_SQL, preparedStatement -> {
                preparedStatement.setLong(1, ((Long) this.longIdGenerator.next()).longValue());
                preparedStatement.setLong(2, max.orElse(System.currentTimeMillis()));
            });
        } catch (Throwable th) {
            log.error("", th);
        }
    }

    /* JADX WARN: Type inference failed for: r0v4, types: [java.time.ZonedDateTime] */
    private void cleanTimeout() {
        try {
            this.jdbcTemplate.update(CLEAN_SQL_BY_UPDATE_TIME, new Object[]{Long.valueOf(LocalDateTime.now().minusSeconds(this.WINDOW).atZone(DateTimeValue.ZONE_ID).toInstant().toEpochMilli())});
            this.lockMap.values().removeIf(list -> {
                list.removeIf(countDownLatch -> {
                    return countDownLatch.getCount() == 0;
                });
                return list.isEmpty();
            });
        } catch (Throwable th) {
            log.error("", th);
        }
    }

    public void startClean() {
        this.scheduledExecutorService.scheduleAtFixedRate(this::cleanTimeout, 100L, this.WINDOW, TimeUnit.SECONDS);
    }

    public void startNotify() {
        this.scheduledExecutorServiceNotifier.scheduleAtFixedRate(() -> {
            try {
                long longValue = getOverview().getTimestamp().longValue();
                if (longValue == 0) {
                    this.lockMap.values().stream().flatMap((v0) -> {
                        return v0.stream();
                    }).forEach((v0) -> {
                        v0.countDown();
                    });
                } else {
                    this.lockMap.forEach((l, list) -> {
                        if (l.longValue() <= longValue) {
                            list.forEach((v0) -> {
                                v0.countDown();
                            });
                        }
                    });
                }
            } catch (Throwable th) {
                log.error("", th);
            }
        }, 20L, 1L, TimeUnit.SECONDS);
    }
}
