/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.oqsengine.changelog.impl;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.xforceplus.ultraman.oqsengine.changelog.ChangelogHandler;
import com.xforceplus.ultraman.oqsengine.changelog.command.AddChangelog;
import com.xforceplus.ultraman.oqsengine.changelog.command.ChangelogCommand;
import com.xforceplus.ultraman.oqsengine.changelog.domain.ChangedEvent;
import com.xforceplus.ultraman.oqsengine.changelog.domain.TransactionalChangelogEvent;
import com.xforceplus.ultraman.oqsengine.changelog.event.ChangelogEvent;
import com.xforceplus.ultraman.oqsengine.changelog.gateway.Gateway;
import com.xforceplus.ultraman.oqsengine.common.pool.ExecutorHelper;
import io.lettuce.core.Consumer;
import io.lettuce.core.Range;
import io.lettuce.core.RedisClient;
import io.lettuce.core.ScriptOutputType;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.XGroupCreateArgs;
import io.lettuce.core.XReadArgs;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import io.micrometer.core.instrument.Metrics;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedisChangelogHandler<T>
implements ChangelogHandler<T> {
    private RedisClient redisClient;
    private Gateway<ChangelogCommand, ChangelogEvent> gateway;
    private ObjectMapper mapper;
    public static final ZoneId zoneId = ZoneId.of("Asia/Shanghai");
    private Logger logger = LoggerFactory.getLogger(ChangelogHandler.class);
    private StatefulRedisConnection statefulRedisConnection;
    private RedisCommands<String, String> syncCommands;
    private Thread daemonThread;
    private long expireTime = 10L;
    private AtomicBoolean isLeader = new AtomicBoolean(false);
    private ScheduledExecutorService consumer;
    private ScheduledExecutorService mover;
    private long initDelay = 10L;
    private long period = 10L;
    private long blockTime = 3000L;
    private long count = 100L;
    private long move_time_threshold = 10000L;
    private long move_size_threshold = 10L;
    private String queueName;
    private String queueCurrentName;
    private String queueStreamName;
    private String leaderKey = "current";
    private String nodeName;
    private String SEND_SCRIPT = "redis.call('ZADD', '%s', KEYS[1], KEYS[2]);redis.call('SET', '%s', KEYS[1]..':'..KEYS[3]);return true;";
    private String SPLIT_FUNCTION = "local split = function (s, delimiter)\n    local result = {};\n    for match in (s..delimiter):gmatch(\"(.-)\"..delimiter) do\n        table.insert(result, match);\n    end\n    return result;\nend; \n";
    private AtomicLong inQueue = (AtomicLong)Metrics.gauge((String)"oqs.changelog.in-queue", (Number)new AtomicLong(0L));
    private AtomicLong inStream = (AtomicLong)Metrics.gauge((String)"oqs.changelog.in-stream", (Number)new AtomicLong(0L));
    private String MOVE_AND_ADD = this.SPLIT_FUNCTION + "redis.log(2, 'start');local currentTask = redis.call('GET', KEYS[1]);if currentTask ~= false then \n  redis.log(2, 'got task');  redis.log(2, currentTask);  local splitValue = split(currentTask, ':');  redis.log(2, 'split value');  redis.log(2, splitValue[1]);  redis.log(2, splitValue[2]);  local taskCommitId = tonumber(splitValue[1]);  local taskTimestamp = tonumber(splitValue[2]);  local size;  redis.log(2, KEYS[1]);  redis.log(2, KEYS[2]);  redis.log(2, 'Queue Name');  redis.log(2, KEYS[5]);  if (tonumber(KEYS[2]) - taskTimestamp) > tonumber(KEYS[3])\n  then \n    redis.log(2, 'in here');    size = redis.call('ZCOUNT', KEYS[5], '0', '+inf');\n    redis.log(2, 'here end');  else \n    redis.log(2, 'in there');    local lastFetchCommit = taskCommitId - tonumber(KEYS[4]);    redis.log(2, lastFetchCommit);    size = redis.call('ZCOUNT', KEYS[5], '0', lastFetchCommit)  end \n  redis.log(2, 'GOT SIZE');  redis.log(2, size);  local result = redis.call('ZPOPMIN', KEYS[5], size);  for i = 1, #result do \n    if i % 2 ~= 0 then      redis.call('XADD', KEYS[6], '*', 'payload', result[i]);    end\n   end\nend";
    private String sendScriptSha;
    private String moveScriptSha;

    public RedisChangelogHandler(String nodeName, String queueName, RedisClient redisClient, Gateway gateway, ObjectMapper mapper) {
        this.queueName = queueName;
        this.queueCurrentName = queueName.concat("_current");
        this.queueStreamName = queueName.concat("_stream");
        this.nodeName = nodeName;
        this.syncCommands = redisClient.connect().sync();
        this.sendScriptSha = this.syncCommands.scriptLoad(String.format(this.SEND_SCRIPT, queueName, queueName.concat("_current")));
        this.moveScriptSha = this.syncCommands.scriptLoad(this.MOVE_AND_ADD);
        this.redisClient = redisClient;
        this.gateway = gateway;
        this.mapper = mapper;
    }

    @Override
    public TransactionalChangelogEvent getEvent(T source) {
        return null;
    }

    @Override
    public void handle(TransactionalChangelogEvent transaction) {
        Long commitId = transaction.getCommitId();
        String payload = "";
        try {
            payload = this.mapper.writeValueAsString((Object)transaction);
            Object[] keys = new String[]{commitId.toString(), payload, Long.toString(LocalDateTime.now().atZone(zoneId).toInstant().toEpochMilli())};
            this.syncCommands.evalsha(this.sendScriptSha, ScriptOutputType.BOOLEAN, keys);
        }
        catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void prepareConsumer() {
        this.daemonThread = new Thread(() -> {
            while (true) {
                Object current;
                if ((current = this.syncCommands.get((Object)this.leaderKey)) == null || current.equals(this.nodeName)) {
                    this.syncCommands.setex((Object)this.leaderKey, this.expireTime, (Object)this.nodeName);
                    this.isLeader.compareAndSet(false, true);
                } else {
                    this.isLeader.compareAndSet(true, false);
                }
                try {
                    Thread.sleep(this.expireTime / 2L);
                    continue;
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                    continue;
                }
                break;
            }
        });
        this.daemonThread.setDaemon(true);
        this.daemonThread.start();
        this.mover = new ScheduledThreadPoolExecutor(1, ExecutorHelper.buildNameThreadFactory((String)"Changelog-mover"));
        this.startMover();
        this.consumer = new ScheduledThreadPoolExecutor(1, ExecutorHelper.buildNameThreadFactory((String)"Changelog-consumer"));
        this.startConsumer();
        this.initGroup();
    }

    private void startConsumer() {
        this.consumer.scheduleAtFixedRate(() -> {
            if (this.isLeader.get()) {
                this.logger.info("{}:Leader is me", (Object)this.nodeName);
                this.doConsume();
            }
        }, this.initDelay, this.period, TimeUnit.SECONDS);
    }

    private void startMover() {
        this.mover.scheduleAtFixedRate(() -> {
            Long zcount = this.syncCommands.zcount((Object)this.queueName, Range.unbounded());
            Long xlen = this.syncCommands.xlen((Object)this.queueStreamName);
            this.inQueue.set(zcount);
            this.inStream.set(xlen);
            if (this.isLeader.get()) {
                this.logger.info("{}:Leader is me", (Object)this.nodeName);
                this.doMove();
            }
        }, this.initDelay, this.period, TimeUnit.SECONDS);
    }

    private void updateMetrics() {
    }

    private void doMove() {
        Object[] keys = new String[]{this.queueCurrentName, Long.toString(LocalDateTime.now().atZone(zoneId).toInstant().toEpochMilli()), Long.toString(this.move_time_threshold), Long.toString(this.move_size_threshold), this.queueName, this.queueStreamName};
        this.syncCommands.evalsha(this.moveScriptSha, ScriptOutputType.BOOLEAN, keys);
    }

    private void doConsume() {
        this.consumeAndDeliver();
    }

    private void initGroup() {
        try {
            this.syncCommands.xgroupCreate(XReadArgs.StreamOffset.latest((Object)this.queueStreamName), (Object)"group", XGroupCreateArgs.Builder.mkstream());
        }
        catch (Exception ex) {
            System.out.println("Group is created");
        }
    }

    private List<ChangelogCommand> toChangeCommandList(StreamMessage<String, String> x) {
        Map body = x.getBody();
        String payload = (String)body.get("payload");
        try {
            TransactionalChangelogEvent event = (TransactionalChangelogEvent)this.mapper.readValue(payload, TransactionalChangelogEvent.class);
            return event.getChangedEventList().stream().map(evt -> new AddChangelog(evt.getId(), evt.getEntityClassId(), (ChangedEvent)evt)).collect(Collectors.toList());
        }
        catch (JsonProcessingException e) {
            this.logger.error("{}", (Throwable)e);
            return Collections.emptyList();
        }
    }

    private void consumeAndDeliver() {
        List xreadgroup = this.syncCommands.xreadgroup(Consumer.from((Object)"group", (Object)this.nodeName), XReadArgs.Builder.block((long)this.blockTime).count(this.count), new XReadArgs.StreamOffset[]{XReadArgs.StreamOffset.lastConsumed((Object)this.queueStreamName)});
        if (xreadgroup == null || xreadgroup.isEmpty()) {
            return;
        }
        this.logger.info("{}:Got Message", (Object)this.nodeName);
        xreadgroup.forEach(x -> {
            this.syncCommands.xack((Object)this.queueStreamName, (Object)"group", new String[]{x.getId()});
            this.toChangeCommandList((StreamMessage<String, String>)x).forEach(command -> {
                this.logger.info("got changeCommand and deliver to the gateway");
                try {
                    this.gateway.fireAndForget((ChangelogCommand)command, (Map<String, Object>)new HashMap<String, Object>());
                }
                catch (Exception ex) {
                    this.logger.error("{}", (Throwable)ex);
                }
                this.syncCommands.xack((Object)this.queueStreamName, (Object)"group", new String[]{x.getId()});
            });
        });
    }
}

