/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.oqsengine.changelog.listener.impl;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.xforceplus.ultraman.oqsengine.changelog.ChangelogHandler;
import com.xforceplus.ultraman.oqsengine.changelog.domain.ChangedEvent;
import com.xforceplus.ultraman.oqsengine.changelog.domain.TransactionalChangelogEvent;
import com.xforceplus.ultraman.oqsengine.changelog.domain.ValueWrapper;
import com.xforceplus.ultraman.oqsengine.changelog.listener.EventLifecycleAware;
import com.xforceplus.ultraman.oqsengine.changelog.listener.flow.FlowRegistry;
import com.xforceplus.ultraman.oqsengine.changelog.listener.flow.QueueFlow;
import com.xforceplus.ultraman.oqsengine.changelog.utils.ChangelogHelper;
import com.xforceplus.ultraman.oqsengine.event.ActualEvent;
import com.xforceplus.ultraman.oqsengine.event.EventBus;
import com.xforceplus.ultraman.oqsengine.event.EventType;
import com.xforceplus.ultraman.oqsengine.event.payload.entity.BuildPayload;
import com.xforceplus.ultraman.oqsengine.event.payload.entity.DeletePayload;
import com.xforceplus.ultraman.oqsengine.event.payload.entity.ReplacePayload;
import com.xforceplus.ultraman.oqsengine.event.payload.transaction.BeginPayload;
import com.xforceplus.ultraman.oqsengine.event.payload.transaction.CommitPayload;
import com.xforceplus.ultraman.oqsengine.event.payload.transaction.RollbackPayload;
import com.xforceplus.ultraman.oqsengine.metadata.MetaManager;
import com.xforceplus.ultraman.oqsengine.pojo.dto.entity.IEntity;
import com.xforceplus.ultraman.oqsengine.pojo.dto.entity.IEntityClass;
import com.xforceplus.ultraman.oqsengine.pojo.dto.values.IValue;
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.sync.RedisCommands;
import io.micrometer.core.instrument.Metrics;
import io.vavr.Tuple;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedisEventLifecycleHandler
implements EventLifecycleAware {
    private RedisClient redisClient;
    private RedisCommands<String, String> syncCommands;
    private ObjectMapper mapper;
    private Logger logger = LoggerFactory.getLogger(RedisEventLifecycleHandler.class);
    private static final String MISSING_PAYLOAD = "Event {} has no payload";
    private static final String NO_OPERATION = "Do nothing in {} stage";
    private static final String QUEUE_PREFIX = "com.xforceplus.ultraman.oqsengine.changelog.tx.";
    private ChangelogHandler changelogHandler;
    private MetaManager metaManager;
    private FlowRegistry flowRegistry;
    private AtomicInteger inProgressTX = (AtomicInteger)Metrics.gauge((String)"oqs.changelog.in-progress-tx", (Number)new AtomicInteger(0));
    @Resource
    private EventBus eventBus;

    public RedisEventLifecycleHandler(RedisClient redisClient, ChangelogHandler changelogHandler, ObjectMapper mapper, FlowRegistry flowRegistry, MetaManager manager) {
        this.redisClient = redisClient;
        this.syncCommands = redisClient.connect().sync();
        this.changelogHandler = changelogHandler;
        this.mapper = mapper;
        this.flowRegistry = flowRegistry;
        this.metaManager = manager;
    }

    @PostConstruct
    public void init() {
        this.eventBus.watch(EventType.ENTITY_BUILD, x -> this.onEntityCreate((ActualEvent<BuildPayload>)((ActualEvent)x)));
        this.eventBus.watch(EventType.ENTITY_DELETE, x -> this.onEntityDelete((ActualEvent<DeletePayload>)((ActualEvent)x)));
        this.eventBus.watch(EventType.ENTITY_REPLACE, x -> this.onEntityUpdate((ActualEvent<ReplacePayload>)((ActualEvent)x)));
        this.eventBus.watch(EventType.TX_PREPAREDNESS_COMMIT, x -> this.onTxPreCommit((ActualEvent<CommitPayload>)((ActualEvent)x)));
        this.eventBus.watch(EventType.TX_BEGIN, x -> this.onTxCreate((ActualEvent<BeginPayload>)((ActualEvent)x)));
        this.eventBus.watch(EventType.TX_COMMITED, x -> this.onTxCommitted((ActualEvent<CommitPayload>)((ActualEvent)x)));
        this.eventBus.watch(EventType.TX_PREPAREDNESS_ROLLBACK, x -> this.onTxPreRollBack((ActualEvent<RollbackPayload>)((ActualEvent)x)));
        this.eventBus.watch(EventType.TX_ROLLBACKED, x -> this.onTxRollBack((ActualEvent<RollbackPayload>)((ActualEvent)x)));
    }

    @Override
    public void onTxCreate(ActualEvent<BeginPayload> begin) {
        this.logger.debug("Got tx create");
        this.extract(begin, payload -> {
            this.inProgressTX.getAndIncrement();
            long txId = payload.getTxId();
            String msg = payload.getMsg();
            this.logger.debug("Got message {}", (Object)msg);
            QueueFlow flow = this.flowRegistry.flow(Long.toString(txId));
            CompletableFuture future = new CompletableFuture();
            flow.feed(Tuple.of(future, () -> {
                this.createQueueIfNotExists(txId, msg);
                return null;
            }));
        });
    }

    @Override
    public void onEntityCreate(ActualEvent<BuildPayload> create) {
        this.logger.debug("Got entity create");
        this.extract(create, createPayload -> {
            long txId = createPayload.getTxId();
            QueueFlow flow = this.flowRegistry.flow(Long.toString(txId));
            CompletableFuture future = new CompletableFuture();
            flow.feed(Tuple.of(future, () -> {
                this.pushQueue(txId, this.entityToChangedEvent(createPayload.getEntity()));
                return null;
            }));
        });
    }

    @Override
    public void onEntityUpdate(ActualEvent<ReplacePayload> update) {
        this.logger.debug("Got entity update");
        this.extract(update, updatePayload -> {
            long txId = updatePayload.getTxId();
            QueueFlow flow = this.flowRegistry.flow(Long.toString(txId));
            CompletableFuture future = new CompletableFuture();
            updatePayload.getChanges().forEach((k, v) -> {
                this.combineEntityFromEntry((IEntity)k, (IValue[])v);
                flow.feed(Tuple.of((Object)future, () -> {
                    this.pushQueue(txId, this.entityToChangedEvent((IEntity)k));
                    return null;
                }));
            });
        });
    }

    private void combineEntityFromEntry(IEntity rawEntity, IValue[] changes) {
        for (IValue value : changes) {
            rawEntity.entityValue().addValue(value);
        }
    }

    @Override
    public void onEntityDelete(ActualEvent<DeletePayload> delete) {
        this.logger.debug("Got entity delete");
    }

    @Override
    public void onTxPreCommit(ActualEvent<CommitPayload> preCommit) {
        this.logger.debug(NO_OPERATION, (Object)preCommit.type());
    }

    @Override
    public void onTxCommitted(ActualEvent<CommitPayload> committed) {
        this.logger.debug("Got tx committed");
        this.extract(committed, commitPayload -> {
            long txId = commitPayload.getTxId();
            long commitId = commitPayload.getCommitId();
            long time = committed.time();
            this.inProgressTX.decrementAndGet();
            QueueFlow flow = this.flowRegistry.flow(Long.toString(txId));
            CompletableFuture future = new CompletableFuture();
            flow.feed(Tuple.of(future, () -> {
                List<ChangedEvent> changedEvents = this.popQueue(txId);
                if (changedEvents.size() > 0) {
                    String comment = changedEvents.get(0).getComment();
                    TransactionalChangelogEvent changelogEvent = this.toTransactionalChangelogEvent(commitId, time, Optional.ofNullable(comment).orElse(""), changedEvents);
                    this.changelogHandler.handle(changelogEvent);
                }
                this.dropQueue(txId);
                return null;
            }));
        });
    }

    @Override
    public void onTxPreRollBack(ActualEvent<RollbackPayload> preRollBack) {
        this.logger.debug(NO_OPERATION, (Object)preRollBack.type());
    }

    @Override
    public void onTxRollBack(ActualEvent<RollbackPayload> preRollBack) {
        this.logger.debug("Got tx rollback");
        this.extract(preRollBack, payload -> {
            this.inProgressTX.decrementAndGet();
            long txId = payload.getTxId();
            QueueFlow flow = this.flowRegistry.flow(Long.toString(txId));
            CompletableFuture future = new CompletableFuture();
            flow.feed(Tuple.of(future, () -> {
                this.dropQueue(txId);
                return null;
            }));
        });
    }

    private ChangedEvent entityToChangedEvent(IEntity entity) {
        ChangedEvent changedEvent = new ChangedEvent();
        changedEvent.setEntityClassId(entity.entityClassRef().getId());
        changedEvent.setTimestamp(entity.time());
        changedEvent.setId(entity.id());
        HashMap<Long, ValueWrapper> valueMap = new HashMap<Long, ValueWrapper>();
        changedEvent.setValueMap(valueMap);
        entity.entityValue().values().stream().forEach(x -> {
            ValueWrapper valueWrapper = new ValueWrapper(ChangelogHelper.serialize(x), x.getField().type(), x.getField().id());
            valueMap.put(x.getField().id(), valueWrapper);
        });
        return changedEvent;
    }

    private void createQueueIfNotExists(long txId, String msg) {
        String queue = QUEUE_PREFIX.concat(Long.toString(txId));
        if (this.syncCommands.llen((Object)queue) == 0L) {
            this.logger.debug("create tx {} with {}", (Object)txId, (Object)msg);
            this.syncCommands.rpush((Object)queue, (Object[])new String[]{msg});
        }
    }

    private TransactionalChangelogEvent toTransactionalChangelogEvent(long commitId, long timestamp, String comment, List<ChangedEvent> changedEventList) {
        TransactionalChangelogEvent transactionalChangelogEvent = new TransactionalChangelogEvent();
        transactionalChangelogEvent.setCommitId(commitId);
        changedEventList.forEach(x -> {
            x.setCommitId(commitId);
            x.setUsername(this.getUsername((ChangedEvent)x));
        });
        Map groupedEvent = changedEventList.stream().collect(Collectors.groupingBy(x -> x.getId(), Collectors.reducing((prev, next) -> {
            ChangedEvent changedEvent = new ChangedEvent();
            changedEvent.setEntityClassId(prev.getEntityClassId());
            changedEvent.setComment(comment);
            changedEvent.setUsername(this.getUsername((ChangedEvent)prev));
            changedEvent.setTimestamp(timestamp);
            changedEvent.setCommitId(commitId);
            Map<Long, ValueWrapper> prevValueMap = prev.getValueMap();
            Map<Long, ValueWrapper> nextValueMap = next.getValueMap();
            HashMap<Long, ValueWrapper> newValueMap = new HashMap<Long, ValueWrapper>(prevValueMap);
            newValueMap.putAll(nextValueMap);
            changedEvent.setValueMap(newValueMap);
            return next;
        })));
        List<ChangedEvent> mergedChangeEvent = groupedEvent.values().stream().peek(x -> {
            if (!x.isPresent()) {
                this.logger.error("Got corrupted Change log but we can do nothing on it T T");
            }
        }).filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
        transactionalChangelogEvent.setChangedEventList(mergedChangeEvent);
        return transactionalChangelogEvent;
    }

    private String getUsername(ChangedEvent prev) {
        if (!StringUtils.isEmpty((CharSequence)prev.getUsername())) {
            return prev.getUsername();
        }
        Map<Long, ValueWrapper> valueMap = prev.getValueMap();
        Optional targetEntityClass = this.metaManager.load(prev.getEntityClassId(), "");
        if (targetEntityClass.isPresent()) {
            IEntityClass entityClass = (IEntityClass)targetEntityClass.get();
            Optional createUserNameField = entityClass.field("create_user_name");
            Optional<Object> createUserName = createUserNameField.map(x -> {
                ValueWrapper valueWrapper = (ValueWrapper)valueMap.get(x.id());
                if (valueWrapper != null) {
                    return valueWrapper.getValue();
                }
                return null;
            });
            Optional updateUserNameField = entityClass.field("update_user_name");
            Optional<Object> updateUserName = updateUserNameField.map(x -> {
                ValueWrapper valueWrapper = (ValueWrapper)valueMap.get(x.id());
                if (valueWrapper != null) {
                    return valueWrapper.getValue();
                }
                return null;
            });
            if (updateUserName.isPresent()) {
                return updateUserName.get().toString();
            }
            if (createUserName.isPresent()) {
                return createUserName.get().toString();
            }
        } else {
            this.logger.error("Cannot find related entityClass {}", (Object)prev.getEntityClassId());
        }
        return null;
    }

    private List<ChangedEvent> popQueue(long txId) {
        String queue = QUEUE_PREFIX.concat(Long.toString(txId));
        List orderedList = this.syncCommands.lrange((Object)queue, 0L, -1L);
        String comment = "";
        if (orderedList.size() > 0) {
            comment = (String)orderedList.get(0);
        }
        this.logger.debug("comment in {} is {}", (Object)txId, (Object)comment);
        String finalComment = comment;
        List<ChangedEvent> changedEvents = orderedList.stream().skip(1L).map(x -> {
            try {
                ChangedEvent changedEvent = (ChangedEvent)this.mapper.readValue(x, ChangedEvent.class);
                changedEvent.setComment(finalComment);
                return changedEvent;
            }
            catch (JsonProcessingException e) {
                this.logger.error("{}", (Throwable)e);
                return null;
            }
        }).filter(Objects::nonNull).collect(Collectors.toList());
        return changedEvents;
    }

    private void pushQueue(long txId, ChangedEvent changedEvent) {
        String queue = QUEUE_PREFIX.concat(Long.toString(txId));
        try {
            String payloadStr = this.mapper.writeValueAsString((Object)changedEvent);
            this.syncCommands.rpushx((Object)queue, (Object[])new String[]{payloadStr});
        }
        catch (JsonProcessingException e) {
            this.logger.error("{}", (Throwable)e);
        }
    }

    private void dropQueue(long txId) {
        String queue = QUEUE_PREFIX.concat(Long.toString(txId));
        this.syncCommands.del((Object[])new String[]{queue});
    }

    private <T extends Serializable> void extract(ActualEvent<T> evt, Consumer<T> consumer) {
        Optional payloadOp = evt.payload();
        if (payloadOp.isPresent()) {
            consumer.accept((Serializable)payloadOp.get());
        } else {
            this.logger.error(MISSING_PAYLOAD, evt);
        }
    }
}

