package com.xforceplus.ultraman.oqsengine.changelog.impl;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.xforceplus.ultraman.oqsengine.changelog.ChangelogHandler;
import com.xforceplus.ultraman.oqsengine.changelog.command.AddChangelog;
import com.xforceplus.ultraman.oqsengine.changelog.command.ChangelogCommand;
import com.xforceplus.ultraman.oqsengine.changelog.domain.TransactionalChangelogEvent;
import com.xforceplus.ultraman.oqsengine.changelog.event.ChangelogEvent;
import com.xforceplus.ultraman.oqsengine.changelog.gateway.Gateway;
import com.xforceplus.ultraman.oqsengine.common.pool.ExecutorHelper;
import io.lettuce.core.Consumer;
import io.lettuce.core.Range;
import io.lettuce.core.RedisClient;
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.XGroupCreateArgs;
import io.lettuce.core.XReadArgs;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import io.micrometer.core.instrument.Metrics;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xforceplus/ultraman/oqsengine/changelog/impl/RedisChangelogHandler.class */
public class RedisChangelogHandler<T> implements ChangelogHandler<T> {
    private RedisClient redisClient;
    private Gateway<ChangelogCommand, ChangelogEvent> gateway;
    private ObjectMapper mapper;
    public static final ZoneId zoneId = ZoneId.of("Asia/Shanghai");
    private StatefulRedisConnection statefulRedisConnection;
    private RedisCommands<String, String> syncCommands;
    private Thread daemonThread;
    private ScheduledExecutorService consumer;
    private ScheduledExecutorService mover;
    private String queueName;
    private String queueCurrentName;
    private String queueStreamName;
    private String nodeName;
    private String sendScriptSha;
    private String moveScriptSha;
    private Logger logger = LoggerFactory.getLogger(ChangelogHandler.class);
    private long expireTime = 10;
    private AtomicBoolean isLeader = new AtomicBoolean(false);
    private long initDelay = 10;
    private long period = 10;
    private long blockTime = 3000;
    private long count = 100;
    private long move_time_threshold = 10000;
    private long move_size_threshold = 10;
    private String leaderKey = "current";
    private String SEND_SCRIPT = "redis.call('ZADD', '%s', KEYS[1], KEYS[2]);redis.call('SET', '%s', KEYS[1]..':'..KEYS[3]);return true;";
    private String SPLIT_FUNCTION = "local split = function (s, delimiter)\n    local result = {};\n    for match in (s..delimiter):gmatch(\"(.-)\"..delimiter) do\n        table.insert(result, match);\n    end\n    return result;\nend; \n";
    private AtomicLong inQueue = (AtomicLong) Metrics.gauge("oqs.changelog.in-queue", new AtomicLong(0));
    private AtomicLong inStream = (AtomicLong) Metrics.gauge("oqs.changelog.in-stream", new AtomicLong(0));
    private String MOVE_AND_ADD = this.SPLIT_FUNCTION + "redis.log(2, 'start');local currentTask = redis.call('GET', KEYS[1]);if currentTask ~= false then \n  redis.log(2, 'got task');  redis.log(2, currentTask);  local splitValue = split(currentTask, ':');  redis.log(2, 'split value');  redis.log(2, splitValue[1]);  redis.log(2, splitValue[2]);  local taskCommitId = tonumber(splitValue[1]);  local taskTimestamp = tonumber(splitValue[2]);  local size;  redis.log(2, KEYS[1]);  redis.log(2, KEYS[2]);  redis.log(2, 'Queue Name');  redis.log(2, KEYS[5]);  if (tonumber(KEYS[2]) - taskTimestamp) > tonumber(KEYS[3])\n  then \n    redis.log(2, 'in here');    size = redis.call('ZCOUNT', KEYS[5], '0', '+inf');\n    redis.log(2, 'here end');  else \n    redis.log(2, 'in there');    local lastFetchCommit = taskCommitId - tonumber(KEYS[4]);    redis.log(2, lastFetchCommit);    size = redis.call('ZCOUNT', KEYS[5], '0', lastFetchCommit)  end \n  redis.log(2, 'GOT SIZE');  redis.log(2, size);  local result = redis.call('ZPOPMIN', KEYS[5], size);  for i = 1, #result do \n    if i % 2 ~= 0 then      redis.call('XADD', KEYS[6], '*', 'payload', result[i]);    end\n   end\nend";

    public RedisChangelogHandler(String str, String str2, RedisClient redisClient, Gateway gateway, ObjectMapper objectMapper) {
        this.queueName = str2;
        this.queueCurrentName = str2.concat("_current");
        this.queueStreamName = str2.concat("_stream");
        this.nodeName = str;
        this.syncCommands = redisClient.connect().sync();
        this.sendScriptSha = this.syncCommands.scriptLoad(String.format(this.SEND_SCRIPT, str2, str2.concat("_current")));
        this.moveScriptSha = this.syncCommands.scriptLoad(this.MOVE_AND_ADD);
        this.redisClient = redisClient;
        this.gateway = gateway;
        this.mapper = objectMapper;
    }

    @Override // com.xforceplus.ultraman.oqsengine.changelog.ChangelogHandler
    public TransactionalChangelogEvent getEvent(T t) {
        return null;
    }

    /* JADX WARN: Type inference failed for: r3v4, types: [java.time.ZonedDateTime] */
    @Override // com.xforceplus.ultraman.oqsengine.changelog.ChangelogHandler
    public void handle(TransactionalChangelogEvent transactionalChangelogEvent) {
        try {
            this.syncCommands.evalsha(this.sendScriptSha, ScriptOutputType.BOOLEAN, new String[]{transactionalChangelogEvent.getCommitId().toString(), this.mapper.writeValueAsString(transactionalChangelogEvent), Long.toString(LocalDateTime.now().atZone(zoneId).toInstant().toEpochMilli())});
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }

    @Override // com.xforceplus.ultraman.oqsengine.changelog.ChangelogHandler
    public void prepareConsumer() {
        this.daemonThread = new Thread(() -> {
            while (true) {
                Object obj = this.syncCommands.get(this.leaderKey);
                if (obj == null || obj.equals(this.nodeName)) {
                    this.syncCommands.setex(this.leaderKey, this.expireTime, this.nodeName);
                    this.isLeader.compareAndSet(false, true);
                } else {
                    this.isLeader.compareAndSet(true, false);
                }
                try {
                    Thread.sleep(this.expireTime / 2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        this.daemonThread.setDaemon(true);
        this.daemonThread.start();
        this.mover = new ScheduledThreadPoolExecutor(1, ExecutorHelper.buildNameThreadFactory("Changelog-mover"));
        startMover();
        this.consumer = new ScheduledThreadPoolExecutor(1, ExecutorHelper.buildNameThreadFactory("Changelog-consumer"));
        startConsumer();
        initGroup();
    }

    private void startConsumer() {
        this.consumer.scheduleAtFixedRate(() -> {
            if (this.isLeader.get()) {
                this.logger.info("{}:Leader is me", this.nodeName);
                doConsume();
            }
        }, this.initDelay, this.period, TimeUnit.SECONDS);
    }

    private void startMover() {
        this.mover.scheduleAtFixedRate(() -> {
            Long zcount = this.syncCommands.zcount(this.queueName, Range.unbounded());
            Long xlen = this.syncCommands.xlen(this.queueStreamName);
            this.inQueue.set(zcount.longValue());
            this.inStream.set(xlen.longValue());
            if (this.isLeader.get()) {
                this.logger.info("{}:Leader is me", this.nodeName);
                doMove();
            }
        }, this.initDelay, this.period, TimeUnit.SECONDS);
    }

    private void updateMetrics() {
    }

    /* JADX WARN: Type inference failed for: r3v3, types: [java.time.ZonedDateTime] */
    private void doMove() {
        this.syncCommands.evalsha(this.moveScriptSha, ScriptOutputType.BOOLEAN, new String[]{this.queueCurrentName, Long.toString(LocalDateTime.now().atZone(zoneId).toInstant().toEpochMilli()), Long.toString(this.move_time_threshold), Long.toString(this.move_size_threshold), this.queueName, this.queueStreamName});
    }

    private void doConsume() {
        consumeAndDeliver();
    }

    private void initGroup() {
        try {
            this.syncCommands.xgroupCreate(XReadArgs.StreamOffset.latest(this.queueStreamName), "group", XGroupCreateArgs.Builder.mkstream());
        } catch (Exception e) {
            System.out.println("Group is created");
        }
    }

    private List<ChangelogCommand> toChangeCommandList(StreamMessage<String, String> streamMessage) {
        try {
            return (List) ((TransactionalChangelogEvent) this.mapper.readValue((String) streamMessage.getBody().get("payload"), TransactionalChangelogEvent.class)).getChangedEventList().stream().map(changedEvent -> {
                return new AddChangelog(changedEvent.getId(), changedEvent.getEntityClassId(), changedEvent);
            }).collect(Collectors.toList());
        } catch (JsonProcessingException e) {
            this.logger.error("{}", e);
            return Collections.emptyList();
        }
    }

    private void consumeAndDeliver() {
        List xreadgroup = this.syncCommands.xreadgroup(Consumer.from("group", this.nodeName), XReadArgs.Builder.block(this.blockTime).count(this.count), new XReadArgs.StreamOffset[]{XReadArgs.StreamOffset.lastConsumed(this.queueStreamName)});
        if (xreadgroup == null || xreadgroup.isEmpty()) {
            return;
        }
        this.logger.info("{}:Got Message", this.nodeName);
        xreadgroup.forEach(streamMessage -> {
            this.syncCommands.xack(this.queueStreamName, "group", new String[]{streamMessage.getId()});
            toChangeCommandList(streamMessage).forEach(changelogCommand -> {
                this.logger.info("got changeCommand and deliver to the gateway");
                try {
                    this.gateway.fireAndForget(changelogCommand, new HashMap());
                } catch (Exception e) {
                    this.logger.error("{}", e);
                }
                this.syncCommands.xack(this.queueStreamName, "group", new String[]{streamMessage.getId()});
            });
        });
    }
}
