package com.xforceplus.ultraman.oqsengine.changelog.listener.impl;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.xforceplus.ultraman.oqsengine.changelog.ChangelogHandler;
import com.xforceplus.ultraman.oqsengine.changelog.domain.ChangedEvent;
import com.xforceplus.ultraman.oqsengine.changelog.domain.TransactionalChangelogEvent;
import com.xforceplus.ultraman.oqsengine.changelog.domain.ValueWrapper;
import com.xforceplus.ultraman.oqsengine.changelog.listener.EventLifecycleAware;
import com.xforceplus.ultraman.oqsengine.changelog.listener.flow.FlowRegistry;
import com.xforceplus.ultraman.oqsengine.changelog.listener.flow.QueueFlow;
import com.xforceplus.ultraman.oqsengine.changelog.utils.ChangelogHelper;
import com.xforceplus.ultraman.oqsengine.event.ActualEvent;
import com.xforceplus.ultraman.oqsengine.event.EventBus;
import com.xforceplus.ultraman.oqsengine.event.EventType;
import com.xforceplus.ultraman.oqsengine.event.payload.entity.BuildPayload;
import com.xforceplus.ultraman.oqsengine.event.payload.entity.DeletePayload;
import com.xforceplus.ultraman.oqsengine.event.payload.entity.ReplacePayload;
import com.xforceplus.ultraman.oqsengine.event.payload.transaction.BeginPayload;
import com.xforceplus.ultraman.oqsengine.event.payload.transaction.CommitPayload;
import com.xforceplus.ultraman.oqsengine.event.payload.transaction.RollbackPayload;
import com.xforceplus.ultraman.oqsengine.metadata.MetaManager;
import com.xforceplus.ultraman.oqsengine.pojo.dto.entity.IEntity;
import com.xforceplus.ultraman.oqsengine.pojo.dto.entity.IEntityClass;
import com.xforceplus.ultraman.oqsengine.pojo.dto.values.IValue;
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.sync.RedisCommands;
import io.micrometer.core.instrument.Metrics;
import io.vavr.Tuple;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xforceplus/ultraman/oqsengine/changelog/listener/impl/RedisEventLifecycleHandler.class */
public class RedisEventLifecycleHandler implements EventLifecycleAware {
    private RedisClient redisClient;
    private RedisCommands<String, String> syncCommands;
    private ObjectMapper mapper;
    private static final String MISSING_PAYLOAD = "Event {} has no payload";
    private static final String NO_OPERATION = "Do nothing in {} stage";
    private static final String QUEUE_PREFIX = "com.xforceplus.ultraman.oqsengine.changelog.tx.";
    private ChangelogHandler changelogHandler;
    private MetaManager metaManager;
    private FlowRegistry flowRegistry;

    @Resource
    private EventBus eventBus;
    private Logger logger = LoggerFactory.getLogger(RedisEventLifecycleHandler.class);
    private AtomicInteger inProgressTX = (AtomicInteger) Metrics.gauge("oqs.changelog.in-progress-tx", new AtomicInteger(0));

    public RedisEventLifecycleHandler(RedisClient redisClient, ChangelogHandler changelogHandler, ObjectMapper objectMapper, FlowRegistry flowRegistry, MetaManager metaManager) {
        this.redisClient = redisClient;
        this.syncCommands = redisClient.connect().sync();
        this.changelogHandler = changelogHandler;
        this.mapper = objectMapper;
        this.flowRegistry = flowRegistry;
        this.metaManager = metaManager;
    }

    @PostConstruct
    public void init() {
        this.eventBus.watch(EventType.ENTITY_BUILD, event -> {
            onEntityCreate((ActualEvent) event);
        });
        this.eventBus.watch(EventType.ENTITY_DELETE, event2 -> {
            onEntityDelete((ActualEvent) event2);
        });
        this.eventBus.watch(EventType.ENTITY_REPLACE, event3 -> {
            onEntityUpdate((ActualEvent) event3);
        });
        this.eventBus.watch(EventType.TX_PREPAREDNESS_COMMIT, event4 -> {
            onTxPreCommit((ActualEvent) event4);
        });
        this.eventBus.watch(EventType.TX_BEGIN, event5 -> {
            onTxCreate((ActualEvent) event5);
        });
        this.eventBus.watch(EventType.TX_COMMITED, event6 -> {
            onTxCommitted((ActualEvent) event6);
        });
        this.eventBus.watch(EventType.TX_PREPAREDNESS_ROLLBACK, event7 -> {
            onTxPreRollBack((ActualEvent) event7);
        });
        this.eventBus.watch(EventType.TX_ROLLBACKED, event8 -> {
            onTxRollBack((ActualEvent) event8);
        });
    }

    @Override // com.xforceplus.ultraman.oqsengine.changelog.listener.EventLifecycleAware
    public void onTxCreate(ActualEvent<BeginPayload> actualEvent) {
        this.logger.debug("Got tx create");
        extract(actualEvent, beginPayload -> {
            this.inProgressTX.getAndIncrement();
            long txId = beginPayload.getTxId();
            String msg = beginPayload.getMsg();
            this.logger.debug("Got message {}", msg);
            this.flowRegistry.flow(Long.toString(txId)).feed(Tuple.of(new CompletableFuture(), () -> {
                createQueueIfNotExists(txId, msg);
                return null;
            }));
        });
    }

    @Override // com.xforceplus.ultraman.oqsengine.changelog.listener.EventLifecycleAware
    public void onEntityCreate(ActualEvent<BuildPayload> actualEvent) {
        this.logger.debug("Got entity create");
        extract(actualEvent, buildPayload -> {
            long txId = buildPayload.getTxId();
            this.flowRegistry.flow(Long.toString(txId)).feed(Tuple.of(new CompletableFuture(), () -> {
                pushQueue(txId, entityToChangedEvent(buildPayload.getEntity()));
                return null;
            }));
        });
    }

    @Override // com.xforceplus.ultraman.oqsengine.changelog.listener.EventLifecycleAware
    public void onEntityUpdate(ActualEvent<ReplacePayload> actualEvent) {
        this.logger.debug("Got entity update");
        extract(actualEvent, replacePayload -> {
            long txId = replacePayload.getTxId();
            QueueFlow flow = this.flowRegistry.flow(Long.toString(txId));
            CompletableFuture completableFuture = new CompletableFuture();
            replacePayload.getChanges().forEach((iEntity, iValueArr) -> {
                combineEntityFromEntry(iEntity, iValueArr);
                flow.feed(Tuple.of(completableFuture, () -> {
                    pushQueue(txId, entityToChangedEvent(iEntity));
                    return null;
                }));
            });
        });
    }

    private void combineEntityFromEntry(IEntity iEntity, IValue[] iValueArr) {
        for (IValue iValue : iValueArr) {
            iEntity.entityValue().addValue(iValue);
        }
    }

    @Override // com.xforceplus.ultraman.oqsengine.changelog.listener.EventLifecycleAware
    public void onEntityDelete(ActualEvent<DeletePayload> actualEvent) {
        this.logger.debug("Got entity delete");
    }

    @Override // com.xforceplus.ultraman.oqsengine.changelog.listener.EventLifecycleAware
    public void onTxPreCommit(ActualEvent<CommitPayload> actualEvent) {
        this.logger.debug(NO_OPERATION, actualEvent.type());
    }

    @Override // com.xforceplus.ultraman.oqsengine.changelog.listener.EventLifecycleAware
    public void onTxCommitted(ActualEvent<CommitPayload> actualEvent) {
        this.logger.debug("Got tx committed");
        extract(actualEvent, commitPayload -> {
            long txId = commitPayload.getTxId();
            long commitId = commitPayload.getCommitId();
            long time = actualEvent.time();
            this.inProgressTX.decrementAndGet();
            this.flowRegistry.flow(Long.toString(txId)).feed(Tuple.of(new CompletableFuture(), () -> {
                List<ChangedEvent> popQueue = popQueue(txId);
                if (popQueue.size() > 0) {
                    this.changelogHandler.handle(toTransactionalChangelogEvent(commitId, time, (String) Optional.ofNullable(popQueue.get(0).getComment()).orElse(""), popQueue));
                }
                dropQueue(txId);
                return null;
            }));
        });
    }

    @Override // com.xforceplus.ultraman.oqsengine.changelog.listener.EventLifecycleAware
    public void onTxPreRollBack(ActualEvent<RollbackPayload> actualEvent) {
        this.logger.debug(NO_OPERATION, actualEvent.type());
    }

    @Override // com.xforceplus.ultraman.oqsengine.changelog.listener.EventLifecycleAware
    public void onTxRollBack(ActualEvent<RollbackPayload> actualEvent) {
        this.logger.debug("Got tx rollback");
        extract(actualEvent, rollbackPayload -> {
            this.inProgressTX.decrementAndGet();
            long txId = rollbackPayload.getTxId();
            this.flowRegistry.flow(Long.toString(txId)).feed(Tuple.of(new CompletableFuture(), () -> {
                dropQueue(txId);
                return null;
            }));
        });
    }

    private ChangedEvent entityToChangedEvent(IEntity iEntity) {
        ChangedEvent changedEvent = new ChangedEvent();
        changedEvent.setEntityClassId(iEntity.entityClassRef().getId());
        changedEvent.setTimestamp(iEntity.time());
        changedEvent.setId(iEntity.id());
        HashMap hashMap = new HashMap();
        changedEvent.setValueMap(hashMap);
        iEntity.entityValue().values().stream().forEach(iValue -> {
            hashMap.put(Long.valueOf(iValue.getField().id()), new ValueWrapper(ChangelogHelper.serialize(iValue), iValue.getField().type(), Long.valueOf(iValue.getField().id())));
        });
        return changedEvent;
    }

    private void createQueueIfNotExists(long j, String str) {
        String concat = QUEUE_PREFIX.concat(Long.toString(j));
        if (this.syncCommands.llen(concat).longValue() == 0) {
            this.logger.debug("create tx {} with {}", Long.valueOf(j), str);
            this.syncCommands.rpush(concat, new String[]{str});
        }
    }

    private TransactionalChangelogEvent toTransactionalChangelogEvent(long j, long j2, String str, List<ChangedEvent> list) {
        TransactionalChangelogEvent transactionalChangelogEvent = new TransactionalChangelogEvent();
        transactionalChangelogEvent.setCommitId(Long.valueOf(j));
        list.forEach(changedEvent -> {
            changedEvent.setCommitId(j);
            changedEvent.setUsername(getUsername(changedEvent));
        });
        transactionalChangelogEvent.setChangedEventList((List) ((Map) list.stream().collect(Collectors.groupingBy(changedEvent2 -> {
            return Long.valueOf(changedEvent2.getId());
        }, Collectors.reducing((changedEvent3, changedEvent4) -> {
            ChangedEvent changedEvent3 = new ChangedEvent();
            changedEvent3.setEntityClassId(changedEvent3.getEntityClassId());
            changedEvent3.setComment(str);
            changedEvent3.setUsername(getUsername(changedEvent3));
            changedEvent3.setTimestamp(j2);
            changedEvent3.setCommitId(j);
            Map<Long, ValueWrapper> valueMap = changedEvent3.getValueMap();
            Map<Long, ValueWrapper> valueMap2 = changedEvent4.getValueMap();
            HashMap hashMap = new HashMap(valueMap);
            hashMap.putAll(valueMap2);
            changedEvent3.setValueMap(hashMap);
            return changedEvent4;
        })))).values().stream().peek(optional -> {
            if (optional.isPresent()) {
                return;
            }
            this.logger.error("Got corrupted Change log but we can do nothing on it T T");
        }).filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).collect(Collectors.toList()));
        return transactionalChangelogEvent;
    }

    private String getUsername(ChangedEvent changedEvent) {
        if (!StringUtils.isEmpty(changedEvent.getUsername())) {
            return changedEvent.getUsername();
        }
        Map<Long, ValueWrapper> valueMap = changedEvent.getValueMap();
        Optional load = this.metaManager.load(changedEvent.getEntityClassId(), "");
        if (!load.isPresent()) {
            this.logger.error("Cannot find related entityClass {}", Long.valueOf(changedEvent.getEntityClassId()));
            return null;
        }
        IEntityClass iEntityClass = (IEntityClass) load.get();
        Optional map = iEntityClass.field("create_user_name").map(iEntityField -> {
            ValueWrapper valueWrapper = (ValueWrapper) valueMap.get(Long.valueOf(iEntityField.id()));
            if (valueWrapper != null) {
                return valueWrapper.getValue();
            }
            return null;
        });
        Optional map2 = iEntityClass.field("update_user_name").map(iEntityField2 -> {
            ValueWrapper valueWrapper = (ValueWrapper) valueMap.get(Long.valueOf(iEntityField2.id()));
            if (valueWrapper != null) {
                return valueWrapper.getValue();
            }
            return null;
        });
        if (map2.isPresent()) {
            return map2.get().toString();
        }
        if (map.isPresent()) {
            return map.get().toString();
        }
        return null;
    }

    private List<ChangedEvent> popQueue(long j) {
        List lrange = this.syncCommands.lrange(QUEUE_PREFIX.concat(Long.toString(j)), 0L, -1L);
        String str = lrange.size() > 0 ? (String) lrange.get(0) : "";
        this.logger.debug("comment in {} is {}", Long.valueOf(j), str);
        String str2 = str;
        return (List) lrange.stream().skip(1L).map(str3 -> {
            try {
                ChangedEvent changedEvent = (ChangedEvent) this.mapper.readValue(str3, ChangedEvent.class);
                changedEvent.setComment(str2);
                return changedEvent;
            } catch (JsonProcessingException e) {
                this.logger.error("{}", e);
                return null;
            }
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toList());
    }

    private void pushQueue(long j, ChangedEvent changedEvent) {
        String concat = QUEUE_PREFIX.concat(Long.toString(j));
        try {
            this.syncCommands.rpushx(concat, new String[]{this.mapper.writeValueAsString(changedEvent)});
        } catch (JsonProcessingException e) {
            this.logger.error("{}", e);
        }
    }

    private void dropQueue(long j) {
        this.syncCommands.del(new String[]{QUEUE_PREFIX.concat(Long.toString(j))});
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <T extends Serializable> void extract(ActualEvent<T> actualEvent, Consumer<T> consumer) {
        Optional payload = actualEvent.payload();
        if (payload.isPresent()) {
            consumer.accept((Serializable) payload.get());
        } else {
            this.logger.error(MISSING_PAYLOAD, actualEvent);
        }
    }
}
