package com.xforceplus.ultraman.oqsengine.status.impl;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.metrics.CDCAckMetrics;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.metrics.CDCMetrics;
import com.xforceplus.ultraman.oqsengine.status.CDCStatusService;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisCommandExecutionException;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.TimeGauge;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;

/* loaded from: input_file:com/xforceplus/ultraman/oqsengine/status/impl/CDCStatusServiceImpl.class */
public class CDCStatusServiceImpl implements CDCStatusService {
    private static final String DEFAULT_CDC_METRICS_KEY = "com.xforceplus.ultraman.oqsengine.status.cdc.metrics";
    private static final String DEFAULT_HEART_BEAT_KEY = "com.xforceplus.ultraman.oqsengine.status.cdc.heartBeat";
    private static final String DEFAULT_CDC_ACK_METRICS_KEY = "com.xforceplus.ultraman.oqsengine.status.cdc.ack";

    @Resource
    private RedisClient redisClient;

    @Resource
    private ObjectMapper objectMapper;
    private StatefulRedisConnection<String, String> connect;
    private String metricsKey;
    private String ackKey;
    private String heartBeatKey;
    private long lastHeartBeatValue;
    private AtomicLong cdcSyncTime;
    private TimeGauge.Builder<AtomicLong> cdcSyncTimeGauge;
    private AtomicLong cdcExecutedCountGauge;

    public CDCStatusServiceImpl() {
        this(DEFAULT_CDC_METRICS_KEY, DEFAULT_CDC_ACK_METRICS_KEY, DEFAULT_HEART_BEAT_KEY);
    }

    public CDCStatusServiceImpl(String str, String str2, String str3) {
        this.lastHeartBeatValue = -1L;
        this.cdcSyncTime = new AtomicLong(0L);
        this.metricsKey = str;
        if (this.metricsKey == null || this.metricsKey.isEmpty()) {
            throw new IllegalArgumentException("The cdc status metrics is invalid.");
        }
        this.ackKey = str2;
        if (this.ackKey == null || this.ackKey.isEmpty()) {
            throw new IllegalArgumentException("The ack key is invalid.");
        }
        this.heartBeatKey = str3;
        if (this.heartBeatKey == null || this.heartBeatKey.isEmpty()) {
            throw new IllegalArgumentException("The heartBeatKey is invalid.");
        }
    }

    @PostConstruct
    public void init() {
        this.connect = this.redisClient.connect();
        this.connect.sync().clientSetname("oqs.cdc");
        this.cdcSyncTimeGauge = TimeGauge.builder("oqs.cdc.sync.delay.latency", this.cdcSyncTime, TimeUnit.MILLISECONDS, (v0) -> {
            return v0.get();
        });
        this.cdcExecutedCountGauge = (AtomicLong) Metrics.gauge("oqs.cdc.sync.executed.count", new AtomicLong(0L));
        this.cdcSyncTimeGauge.register(Metrics.globalRegistry);
    }

    @PreDestroy
    public void destroy() {
        this.connect.close();
    }

    @Override // com.xforceplus.ultraman.oqsengine.status.CDCStatusService
    public boolean heartBeat() {
        RedisCommands sync = this.connect.sync();
        try {
            sync.incr(this.heartBeatKey);
            return true;
        } catch (RedisCommandExecutionException e) {
            if (!"ERR increment or decrement would overflow".equals(e.getMessage())) {
                return true;
            }
            sync.set(this.heartBeatKey, Long.toString(0L));
            return true;
        }
    }

    @Override // com.xforceplus.ultraman.oqsengine.status.CDCStatusService
    public boolean isAlive() {
        String str = (String) this.connect.sync().get(this.heartBeatKey);
        if (str == null) {
            return true;
        }
        long parseLong = Long.parseLong(str);
        try {
            if (parseLong != this.lastHeartBeatValue) {
                return true;
            }
            this.lastHeartBeatValue = parseLong;
            return false;
        } finally {
            this.lastHeartBeatValue = parseLong;
        }
    }

    @Override // com.xforceplus.ultraman.oqsengine.status.CDCStatusService
    public boolean saveUnCommit(CDCMetrics cDCMetrics) {
        return save(cDCMetrics, this.metricsKey);
    }

    @Override // com.xforceplus.ultraman.oqsengine.status.CDCStatusService
    public Optional<CDCMetrics> getUnCommit() {
        return get(this.metricsKey, CDCMetrics.class);
    }

    @Override // com.xforceplus.ultraman.oqsengine.status.CDCStatusService
    public boolean saveAck(CDCAckMetrics cDCAckMetrics) {
        try {
            return save(cDCAckMetrics, this.ackKey);
        } finally {
            this.cdcSyncTime.set(cDCAckMetrics.getTotalUseTime());
            this.cdcExecutedCountGauge.set(cDCAckMetrics.getExecuteRows());
        }
    }

    @Override // com.xforceplus.ultraman.oqsengine.status.CDCStatusService
    public Optional<CDCAckMetrics> getAck() {
        return get(this.ackKey, CDCAckMetrics.class);
    }

    private boolean save(Object obj, String str) {
        try {
            return "OK".equals(this.connect.sync().set(str, this.objectMapper.writeValueAsString(obj)));
        } catch (JsonProcessingException e) {
            throw new IllegalArgumentException(e.getMessage(), e);
        }
    }

    private <T> Optional<T> get(String str, Class<T> cls) {
        String str2 = (String) this.connect.sync().get(str);
        if (str2 == null) {
            return Optional.empty();
        }
        try {
            return Optional.of(this.objectMapper.readValue(str2, cls));
        } catch (JsonProcessingException e) {
            throw new IllegalArgumentException(e.getMessage(), e);
        }
    }
}
