package com.xforceplus.ultraman.extensions.cdc.status.impl;

import com.xforceplus.ultraman.extensions.cdc.status.domain.CurrentStatus;
import com.xforceplus.ultraman.extensions.cdc.status.domain.ProfiledEntityClassStatus;
import com.xforceplus.ultraman.metadata.cdc.OqsEngineEntity;
import com.xforceplus.ultraman.metadata.values.DateTimeValue;
import com.xforceplus.ultraman.sdk.infra.base.id.LongIdGenerator;
import io.vavr.Tuple;
import io.vavr.Tuple2;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.ParameterizedPreparedStatementSetter;
import org.springframework.jdbc.core.RowMapper;

/* loaded from: input_file:com/xforceplus/ultraman/extensions/cdc/status/impl/DBStatusServiceImpl.class */
public class DBStatusServiceImpl extends StatusServiceSupport {
    private static final Logger log = LoggerFactory.getLogger(DBStatusServiceImpl.class);
    private static final String CREATE_SQL = "INSERT INTO CDC_STATUS(ID, PROFILE, ENTITYCLASS_ID, UPDATE_TIME) VALUES (?,?,?,?)";
    private static final String CLEAN_SQL = "DELETE FROM CDC_STATUS WHERE UPDATE_TIME <= ? ";
    private static final String CLEAN_SQL_FOR_SPECIAL = "DELETE FROM CDC_STATUS WHERE UPDATE_TIME <= ? AND ENTITYCLASS_ID = ? AND PROFILE = ?";
    private static final String SELECT_SQL = "SELECT PROFILE, ENTITYCLASS_ID, MIN(UPDATE_TIME) AS UPDATE_TIME FROM CDC_STATUS GROUP BY PROFILE, ENTITYCLASS_ID";
    private final Map<Tuple2<Long, String>, List<Tuple2<Long, CountDownLatch>>> lockMap;
    private final ScheduledExecutorService scheduledExecutorService;
    private final ScheduledExecutorService scheduledExecutorServiceNotifier;
    private int WINDOW;
    private boolean enableTimeout;
    private JdbcTemplate jdbcTemplate;

    @Autowired
    private LongIdGenerator longIdGenerator;

    public DBStatusServiceImpl(ExecutorService executorService, DataSource dataSource, int i, boolean z) {
        super(executorService);
        this.lockMap = new ConcurrentHashMap();
        this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
        this.scheduledExecutorServiceNotifier = Executors.newScheduledThreadPool(1);
        this.WINDOW = 30;
        this.enableTimeout = false;
        this.jdbcTemplate = new JdbcTemplate(dataSource);
        this.WINDOW = i;
        this.enableTimeout = z;
    }

    public void setMainJdbcTemplate(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    public void setLongIdGenerator(LongIdGenerator longIdGenerator) {
        this.longIdGenerator = longIdGenerator;
    }

    public void startNotify() {
        this.scheduledExecutorServiceNotifier.scheduleAtFixedRate(() -> {
            try {
                Map<Tuple2<Long, String>, ProfiledEntityClassStatus> overview = getOverview().getOverview();
                this.lockMap.forEach((tuple2, list) -> {
                    ProfiledEntityClassStatus profiledEntityClassStatus = (ProfiledEntityClassStatus) overview.get(Tuple.of(tuple2._1, ""));
                    if (profiledEntityClassStatus == null) {
                        list.forEach(tuple2 -> {
                            ((CountDownLatch) tuple2._2()).countDown();
                        });
                    } else {
                        long timestamp = profiledEntityClassStatus.getTimestamp();
                        list.stream().filter(tuple22 -> {
                            return ((Long) tuple22._1).longValue() < timestamp;
                        }).forEach(tuple23 -> {
                            ((CountDownLatch) tuple23._2()).countDown();
                        });
                    }
                });
            } catch (Throwable th) {
                log.error("", th);
            }
        }, 20L, 1L, TimeUnit.SECONDS);
    }

    @Override // com.xforceplus.ultraman.extensions.cdc.status.impl.StatusServiceSupport
    protected void save(Map<Tuple2<String, Long>, ?> map, long j) {
        try {
            Set<Tuple2<String, Long>> keySet = map.keySet();
            this.jdbcTemplate.batchUpdate(CREATE_SQL, keySet, keySet.size(), (preparedStatement, tuple2) -> {
                preparedStatement.setLong(1, ((Long) this.longIdGenerator.next()).longValue());
                log.debug("Save Status at {}, {}", Long.valueOf(j), tuple2._2);
                preparedStatement.setString(2, "");
                preparedStatement.setLong(3, ((Long) tuple2._2).longValue());
                preparedStatement.setLong(4, j);
            });
        } catch (Throwable th) {
            log.error("", th);
        }
    }

    /* JADX WARN: Type inference failed for: r0v8, types: [java.time.ZonedDateTime] */
    @Override // com.xforceplus.ultraman.extensions.cdc.status.impl.StatusServiceSupport
    protected void clear(Collection<OqsEngineEntity> collection) {
        try {
            final Map map = (Map) collection.stream().collect(Collectors.groupingBy(oqsEngineEntity -> {
                return Tuple.of(oqsEngineEntity.getEntityClassRef().getProfile(), Long.valueOf(oqsEngineEntity.getEntityClassRef().getId()));
            }));
            Set keySet = map.keySet();
            final long epochMilli = LocalDateTime.now().atZone(DateTimeValue.ZONE_ID).toInstant().toEpochMilli();
            this.jdbcTemplate.batchUpdate(CLEAN_SQL_FOR_SPECIAL, keySet, keySet.size(), new ParameterizedPreparedStatementSetter<Tuple2<String, Long>>() { // from class: com.xforceplus.ultraman.extensions.cdc.status.impl.DBStatusServiceImpl.1
                public void setValues(PreparedStatement preparedStatement, Tuple2<String, Long> tuple2) throws SQLException {
                    long findLastUpdateTime = findLastUpdateTime((List) map.get(tuple2));
                    DBStatusServiceImpl.log.debug("Clear Status at {} {}", Long.valueOf(findLastUpdateTime), tuple2._2);
                    preparedStatement.setLong(1, findLastUpdateTime);
                    preparedStatement.setLong(2, ((Long) tuple2._2).longValue());
                    preparedStatement.setString(3, "");
                }

                private long findLastUpdateTime(List<OqsEngineEntity> list) {
                    OptionalLong max = list.stream().mapToLong(oqsEngineEntity2 -> {
                        return oqsEngineEntity2.getUpdateTime();
                    }).max();
                    long j = epochMilli;
                    return max.orElseGet(() -> {
                        DBStatusServiceImpl.log.error("Cannot find UPDATE_TIME in RECORD");
                        return j;
                    });
                }
            });
        } catch (Throwable th) {
            log.error("", th);
        }
    }

    @Override // com.xforceplus.ultraman.extensions.cdc.status.StatusService
    public void clearStatus(long j) {
    }

    @Override // com.xforceplus.ultraman.extensions.cdc.status.StatusService
    public CurrentStatus getOverview() {
        try {
            List query = this.jdbcTemplate.query(SELECT_SQL, new RowMapper<ProfiledEntityClassStatus>() { // from class: com.xforceplus.ultraman.extensions.cdc.status.impl.DBStatusServiceImpl.2
                /* renamed from: mapRow, reason: merged with bridge method [inline-methods] */
                public ProfiledEntityClassStatus m1mapRow(ResultSet resultSet, int i) throws SQLException {
                    ProfiledEntityClassStatus profiledEntityClassStatus = new ProfiledEntityClassStatus();
                    String string = resultSet.getString("PROFILE");
                    long j = resultSet.getLong("ENTITYCLASS_ID");
                    long j2 = resultSet.getLong("UPDATE_TIME");
                    profiledEntityClassStatus.setProfile(string);
                    profiledEntityClassStatus.setEntityClassId(j);
                    profiledEntityClassStatus.setTimestamp(j2);
                    return profiledEntityClassStatus;
                }
            });
            CurrentStatus currentStatus = new CurrentStatus();
            currentStatus.setOverview((Map) query.stream().collect(Collectors.toMap(profiledEntityClassStatus -> {
                return Tuple.of(Long.valueOf(profiledEntityClassStatus.getEntityClassId()), profiledEntityClassStatus.getProfile());
            }, profiledEntityClassStatus2 -> {
                return profiledEntityClassStatus2;
            }, (profiledEntityClassStatus3, profiledEntityClassStatus4) -> {
                return profiledEntityClassStatus3;
            })));
            return currentStatus;
        } catch (Throwable th) {
            log.error("", th);
            return null;
        }
    }

    @Override // com.xforceplus.ultraman.extensions.cdc.status.StatusService
    public void query(List<Tuple2<Long, String>> list, long j) {
        log.debug("Query Status at {}", Long.valueOf(j));
        CurrentStatus overview = getOverview();
        log.debug("Current Pending List {}", overview);
        ArrayList<CountDownLatch> arrayList = new ArrayList();
        for (Tuple2<Long, String> tuple2 : list) {
            ProfiledEntityClassStatus profiledEntityClassStatus = overview.getOverview().get(Tuple.of(tuple2._1, ""));
            if (profiledEntityClassStatus != null) {
                long timestamp = profiledEntityClassStatus.getTimestamp();
                if (timestamp <= j) {
                    CountDownLatch countDownLatch = new CountDownLatch(1);
                    arrayList.add(countDownLatch);
                    this.lockMap.compute(tuple2, (tuple22, list2) -> {
                        if (list2 == null) {
                            list2 = new ArrayList();
                        }
                        list2.add(Tuple.of(Long.valueOf(timestamp), countDownLatch));
                        return list2;
                    });
                }
            }
        }
        for (CountDownLatch countDownLatch2 : arrayList) {
            try {
                if (this.enableTimeout) {
                    countDownLatch2.await(this.WINDOW - 1, TimeUnit.SECONDS);
                } else {
                    countDownLatch2.await();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    @Override // com.xforceplus.ultraman.extensions.cdc.status.StatusService
    public void onEmpty() {
    }

    public void startClean() {
        this.scheduledExecutorService.scheduleAtFixedRate(this::cleanTimeout, 100L, this.WINDOW, TimeUnit.SECONDS);
    }

    public void shutDown() {
    }

    /* JADX WARN: Type inference failed for: r0v4, types: [java.time.ZonedDateTime] */
    private void cleanTimeout() {
        try {
            this.jdbcTemplate.update(CLEAN_SQL, new Object[]{Long.valueOf(LocalDateTime.now().minusSeconds(this.WINDOW).atZone(DateTimeValue.ZONE_ID).toInstant().toEpochMilli())});
            this.lockMap.values().removeIf(list -> {
                list.removeIf(tuple2 -> {
                    return ((CountDownLatch) tuple2._2).getCount() == 0;
                });
                return list.size() == 0;
            });
        } catch (Throwable th) {
            log.error("", th);
        }
    }
}
