/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.cdc.processor.impl;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.protobuf.ByteString;
import com.xforceplus.ultraman.cdc.adapter.CDCBeforeCallback;
import com.xforceplus.ultraman.cdc.adapter.EngineAdapterService;
import com.xforceplus.ultraman.cdc.context.ParserContext;
import com.xforceplus.ultraman.cdc.dto.ParseResult;
import com.xforceplus.ultraman.cdc.processor.DataProcessor;
import com.xforceplus.ultraman.cdc.processor.EventQueue;
import com.xforceplus.ultraman.cdc.utils.BinLogParseUtils;
import com.xforceplus.ultraman.extensions.cdc.status.StatusService;
import com.xforceplus.ultraman.metadata.cdc.OqsEngineEntity;
import com.xforceplus.ultraman.metadata.engine.EntityClassEngine;
import com.xforceplus.ultraman.metadata.entity.EntityClassRef;
import com.xforceplus.ultraman.metadata.entity.FieldType;
import com.xforceplus.ultraman.metadata.entity.IEntityClass;
import com.xforceplus.ultraman.metadata.entity.IEntityField;
import com.xforceplus.ultraman.metadata.values.DateTimeValue;
import com.xforceplus.ultraman.sdk.core.event.EntityCreated;
import com.xforceplus.ultraman.sdk.core.event.EntityDeleted;
import com.xforceplus.ultraman.sdk.core.event.EntityUpdated;
import com.xforceplus.ultraman.sdk.infra.base.cdc.SystemAttachment;
import com.xforceplus.ultraman.sdk.infra.event.EventPublisher;
import com.xforceplus.ultraman.sdk.infra.utils.JacksonDefaultMapper;
import io.micrometer.core.annotation.Timed;
import java.math.BigDecimal;
import java.sql.SQLException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

public class DefaultDataProcessor
implements DataProcessor {
    final Logger logger = LoggerFactory.getLogger(DefaultDataProcessor.class);
    @Resource
    private EntityClassEngine engine;
    @Autowired(required=false)
    private StatusService statusService;
    @Autowired(required=false)
    private List<CDCBeforeCallback> cdcBeforeCallbacks = new ArrayList<CDCBeforeCallback>();
    @Resource
    private EventPublisher publisher;
    @Resource
    private EngineAdapterService engineAdapterService;
    @Resource
    private EventQueue eventQueue;

    public static Map<String, Object> attributesToMap(String attrStr) throws JsonProcessingException {
        return (Map)JacksonDefaultMapper.OBJECT_MAPPER.readValue(attrStr, Map.class);
    }

    public void setEngine(EntityClassEngine engine) {
        this.engine = engine;
    }

    public void setStatusService(StatusService statusService) {
        this.statusService = statusService;
    }

    public void setCdcBeforeCallbacks(List<CDCBeforeCallback> cdcBeforeCallbacks) {
        this.cdcBeforeCallbacks = cdcBeforeCallbacks;
    }

    public void setPublisher(EventPublisher publisher) {
        this.publisher = publisher;
    }

    public void setEngineAdapterService(EngineAdapterService engineAdapterService) {
        this.engineAdapterService = engineAdapterService;
    }

    public void setEventQueue(EventQueue eventQueue) {
        this.eventQueue = eventQueue;
    }

    @Override
    @Timed(value="oqs.process.delay.latency", percentiles={0.5, 0.9, 0.99}, extraTags={"initiator", "cdc", "action", "consume"})
    public boolean onProcess(Message message, long batchId) throws SQLException {
        try {
            return this.messageProcess(message, batchId) > 0;
        }
        catch (Exception e) {
            this.logger.error("batchId : {}, consume message failed, message : {}", (Object)message.getId(), (Object)e);
            throw e;
        }
    }

    private int messageProcess(Message message, long batchId) throws SQLException {
        return this.parseCanalEntries(message.getEntries(), batchId);
    }

    private int parseCanalEntries(List<CanalEntry.Entry> entries, long batchId) throws SQLException {
        ParseResult parseResult;
        block10: {
            ParserContext parserContext = new ParserContext(batchId);
            parseResult = new ParseResult();
            for (CanalEntry.Entry entry : entries) {
                switch (entry.getEntryType()) {
                    case TRANSACTIONBEGIN: 
                    case TRANSACTIONEND: {
                        this.eventHandler(entry.getEntryType());
                        break;
                    }
                    case ROWDATA: {
                        this.rowDataParse(entry, parserContext, parseResult);
                        break;
                    }
                }
            }
            if (!parseResult.getFinishEntries().isEmpty()) {
                try {
                    ArrayList<OqsEngineEntity> oqsEngineEntities = new ArrayList<OqsEngineEntity>(parseResult.getFinishEntries().values());
                    Optional.ofNullable(this.cdcBeforeCallbacks).orElseGet(Collections::emptyList).forEach(x -> {
                        try {
                            x.mutate(oqsEngineEntities);
                        }
                        catch (Throwable throwable) {
                            this.logger.error("CDC callback ERROR name:{} , ex:{}", (Object)x.name(), (Object)throwable);
                        }
                    });
                    boolean flag = this.engineAdapterService.batchUpsertOperation(oqsEngineEntities);
                    if (flag) {
                        Instant instant = LocalDateTime.now().atZone(DateTimeValue.ZONE_ID).toInstant();
                        long currentTime = instant.toEpochMilli();
                        if (this.statusService != null) {
                            this.statusService.clearStatus(oqsEngineEntities, currentTime);
                        }
                        break block10;
                    }
                    throw new RuntimeException("Adapter Service return False");
                }
                catch (Exception e) {
                    throw new SQLException(e);
                }
            }
        }
        return parseResult.getFinishEntries().size();
    }

    private OqsEngineEntity cloneEntity(OqsEngineEntity src) {
        OqsEngineEntity target = new OqsEngineEntity();
        target.setEntityClassRef(src.getEntityClassRef());
        target.setId(src.getId());
        target.setVersion(src.getVersion());
        Map attributes = src.getAttributes();
        try {
            String attr = JacksonDefaultMapper.OBJECT_MAPPER.writeValueAsString((Object)attributes);
            Map newAttr = (Map)JacksonDefaultMapper.OBJECT_MAPPER.readValue(attr, Map.class);
            target.setAttributes(newAttr);
        }
        catch (Throwable throwable) {
            this.logger.error("{}", throwable);
        }
        target.setUpdateTime(src.getUpdateTime());
        target.setFather(src.getFather());
        target.setDeleted(src.isDeleted());
        return target;
    }

    private void rowDataParse(CanalEntry.Entry entry, ParserContext parserContext, ParseResult parseResult) {
        String tableName = entry.getHeader().getTableName();
        if (tableName.isEmpty()) {
            this.logger.error("batch : {}, table name could not be Null, [{}]", (Object)parserContext.getBatchId(), (Object)entry.getStoreValue());
            return;
        }
        Optional<IEntityClass> entityClassOp = this.foundEntityClassFromTableName(tableName);
        if (!entityClassOp.isPresent()) {
            this.logger.error("batch : {}, entityClass could not be Null, [{}]", (Object)parserContext.getBatchId(), (Object)entry.getStoreValue());
            return;
        }
        CanalEntry.RowChange rowChange = null;
        try {
            rowChange = CanalEntry.RowChange.parseFrom((ByteString)entry.getStoreValue());
        }
        catch (Exception e) {
            this.logger.error("batch : {}, parse entry value failed, [{}], [{}]", new Object[]{parserContext.getBatchId(), entry.getStoreValue(), e});
            return;
        }
        CanalEntry.EventType eventType = rowChange.getEventType();
        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
            List beforeColumns = rowData.getBeforeColumnsList();
            List afterColumns = rowData.getAfterColumnsList();
            OqsEngineEntity beforeEntity = null;
            OqsEngineEntity afterEntity = null;
            Object profile = null;
            SystemAttachment systemAttachment = null;
            try {
                switch (eventType) {
                    case INSERT: {
                        afterEntity = this.oneRowParser(afterColumns, false, entityClassOp.get(), parseResult);
                        systemAttachment = this.getAttachment(afterColumns);
                        this.mergeEntityToResult(parseResult, afterEntity);
                        break;
                    }
                    case UPDATE: {
                        try {
                            beforeEntity = this.oneRowParser(beforeColumns, false, entityClassOp.get(), parseResult);
                        }
                        catch (Exception e) {
                            this.logger.warn("beforeEntity in update is null, will be ignore, but it will influence event");
                        }
                        afterEntity = this.oneRowParser(afterColumns, false, entityClassOp.get(), parseResult);
                        systemAttachment = this.getAttachment(afterColumns);
                        this.mergeEntityToResult(parseResult, afterEntity);
                        break;
                    }
                    case DELETE: {
                        beforeEntity = this.oneRowParser(beforeColumns, true, entityClassOp.get(), parseResult);
                        systemAttachment = this.getAttachment(beforeColumns);
                        this.mergeEntityToResult(parseResult, beforeEntity);
                    }
                }
                OqsEngineEntity beforeClone = null;
                OqsEngineEntity afterClone = null;
                if (beforeEntity != null) {
                    beforeClone = this.cloneEntity(beforeEntity);
                }
                if (afterEntity != null) {
                    afterClone = this.cloneEntity(afterEntity);
                }
                if (systemAttachment == null) {
                    systemAttachment = new SystemAttachment();
                }
                this.eventHandler(eventType, systemAttachment, entityClassOp.get(), beforeClone, afterClone);
            }
            catch (Exception e) {
                this.logger.warn("parse entity error, message : {}", (Object)e.getMessage());
            }
        }
    }

    private SystemAttachment getAttachment(List<CanalEntry.Column> columns) {
        String attrStr = BinLogParseUtils.getStringFromColumn(columns, "_sys_dynamic");
        SystemAttachment attachment = new SystemAttachment();
        try {
            if (attrStr.isEmpty()) {
                return attachment;
            }
            Map<String, Object> map = DefaultDataProcessor.attributesToMap(attrStr);
            Object o = map.get("#a");
            if (o != null) {
                Object delUname;
                Object delUid;
                Map attachmentMap = (Map)o;
                Object grouped = attachmentMap.get("grouped");
                if (grouped != null) {
                    attachment.setGrouped(Integer.parseInt(grouped.toString()));
                }
                if ((delUid = attachmentMap.get("deluid")) != null) {
                    attachment.setDelUId(Long.parseLong(delUid.toString()));
                }
                if ((delUname = attachmentMap.get("deluname")) != null) {
                    attachment.setDelUname(delUname.toString());
                }
            }
            return attachment;
        }
        catch (Throwable throwable) {
            return attachment;
        }
    }

    private Optional<IEntityClass> foundEntityClassFromTableName(String tableName) {
        String[] tableSplit = tableName.split("_");
        if (tableSplit.length < 3) {
            return Optional.empty();
        }
        String code = tableSplit[2];
        String profile = null;
        if (tableSplit.length > 3) {
            profile = tableSplit[3];
        }
        return this.engine.loadByCode(code, profile);
    }

    private void mergeEntityToResult(ParseResult parseResult, OqsEngineEntity entity) {
        OqsEngineEntity target = parseResult.getFinishEntries().get(entity.getId());
        if (null == target) {
            parseResult.getFinishEntries().put(entity.getId(), entity);
            return;
        }
        target.getAttributes().putAll(entity.getAttributes());
        if (entity.isDeleted()) {
            target.setDeleted(true);
        }
        if (entity.getFather() > 0L && target.getFather() == 0L) {
            target.setFather(entity.getFather());
        }
        target.setUpdateTime(entity.getUpdateTime());
    }

    private OqsEngineEntity oneRowParser(List<CanalEntry.Column> columns, boolean isDelete, IEntityClass tableEntityClass, ParseResult parseResult) throws SQLException, JsonProcessingException {
        long entityClass = BinLogParseUtils.getLongFromColumn(columns, "_sys_entityclass");
        String profile = BinLogParseUtils.getStringFromColumn(columns, "_sys_profile");
        long id = BinLogParseUtils.getLongFromColumn(columns, "id");
        IEntityClass iEntityClass = tableEntityClass;
        OqsEngineEntity.Builder builder = new OqsEngineEntity.Builder();
        builder.withDeleted(isDelete);
        builder.withEntityClassRef(new EntityClassRef(entityClass, "", "", profile));
        builder.withId(id);
        if (entityClass != tableEntityClass.id()) {
            builder.withFather(tableEntityClass.id());
        } else {
            IEntityClass ec = tableEntityClass.extendEntityClass();
            if (null != ec) {
                builder.withFather(ec.id());
            } else {
                builder.withFather(0L);
            }
        }
        builder.withAttribute("id", (Object)id);
        for (CanalEntry.Column column : columns) {
            if (column.getName().equals("_sys_operatetime")) {
                long updateTime = Long.parseLong(column.getValue());
                builder.withUpdateTime(updateTime);
                continue;
            }
            if (column.getName().equals("_sys_ver")) {
                if (column.getValue().isEmpty()) continue;
                int version = Integer.parseInt(column.getValue());
                builder.withVersion(version);
                continue;
            }
            if (column.getName().equals("id") || column.getName().equals("_sys_entityclass") || column.getName().equals("_sys_profile") || column.getName().equals("_sys_deleted")) continue;
            if (column.getName().equals("_sys_dynamic")) {
                builder.withAttributes(this.attrCollection(iEntityClass, columns));
                continue;
            }
            String name = column.getName();
            String value = column.getValue();
            IEntityField field = this.engine.describe(iEntityClass, profile).getAllFields().stream().filter(p -> p.name().replace(".", "_").equalsIgnoreCase(name)).findFirst().orElse(null);
            if (null == field) {
                this.logger.warn("entityField can not be null, column name {}", (Object)name);
                continue;
            }
            if (value.isEmpty()) {
                builder.withAttribute(name, null);
                continue;
            }
            if (field.type() == FieldType.LONG || field.type() == FieldType.DATETIME) {
                builder.withAttribute(name, (Object)Long.parseLong(value));
                continue;
            }
            if (field.type() == FieldType.BOOLEAN) {
                long result = Long.parseLong(value);
                builder.withAttribute(name, (Object)(result != 0L ? 1 : 0));
                continue;
            }
            if (field.type() == FieldType.DECIMAL) {
                builder.withAttribute(name, (Object)new BigDecimal(value));
                continue;
            }
            if (field.type() == FieldType.STRINGS) {
                try {
                    List multiValues = (List)JacksonDefaultMapper.OBJECT_MAPPER.readValue(value, JacksonDefaultMapper.LIST_TYPE_REFERENCE);
                    builder.withAttribute(name, (Object)multiValues);
                }
                catch (Throwable throwable) {
                    this.logger.error("{}", throwable);
                    builder.withAttribute(name, (Object)value);
                }
                continue;
            }
            builder.withAttribute(name, (Object)value);
        }
        if (parseResult.getStartId() == -1L) {
            parseResult.setStartId(id);
        }
        return builder.build();
    }

    private void eventHandler(CanalEntry.EntryType entryType) {
    }

    private void eventHandler(CanalEntry.EventType eventType, SystemAttachment attachment, IEntityClass entityClass, OqsEngineEntity before, OqsEngineEntity after) {
        switch (eventType) {
            case INSERT: {
                this.publishCreatedEvent(entityClass, attachment, after);
                break;
            }
            case UPDATE: {
                if (attachment.getDelUname() != null && attachment.getDelUId() > 0L) break;
                this.publishUpdatedEvent(entityClass, attachment, before, after);
                break;
            }
            case DELETE: {
                this.publishDeletedEvent(entityClass, attachment, before);
                break;
            }
        }
    }

    private void fillAttachment(SystemAttachment systemAttachment, Map<String, Object> context) {
        context.put("attachment", systemAttachment);
    }

    private void publishDeletedEvent(IEntityClass entityClass, SystemAttachment attachment, OqsEngineEntity b) {
        this.eventQueue.feedDelete(attachment, b).thenAcceptOnce(x -> {
            if (this.publisher != null) {
                HashMap<String, Object> context = new HashMap<String, Object>();
                this.fillAttachment(attachment, context);
                this.publisher.publishTransactionEvent((Object)new EntityDeleted(entityClass.code(), Long.valueOf(x.getId()), x.getAttributes(), false, context));
            }
        });
    }

    private void publishUpdatedEvent(IEntityClass entityClass, SystemAttachment attachment, OqsEngineEntity b, OqsEngineEntity a) {
        this.eventQueue.feedUpdate(attachment, b, a).thenAcceptOnce(x -> {
            if (this.publisher != null) {
                OqsEngineEntity before = (OqsEngineEntity)x._1;
                OqsEngineEntity after = (OqsEngineEntity)x._2;
                HashMap<String, Object> context = new HashMap<String, Object>();
                this.fillAttachment(attachment, context);
                EntityUpdated entityUpdated = new EntityUpdated(entityClass.code(), Long.valueOf(before.getId()), before.getAttributes(), after.getAttributes(), false, context);
                int version = after.getVersion();
                entityUpdated.setVer(version);
                this.publisher.publishTransactionEvent((Object)entityUpdated);
            }
        });
    }

    private void publishCreatedEvent(IEntityClass entityClass, SystemAttachment attachment, OqsEngineEntity after) {
        this.eventQueue.feedCreate(attachment, after).thenAcceptOnce(x -> {
            if (this.publisher != null) {
                HashMap<String, Object> context = new HashMap<String, Object>();
                this.fillAttachment(attachment, context);
                this.publisher.publishTransactionEvent((Object)new EntityCreated(entityClass.code(), Long.valueOf(x.getId()), x.getAttributes(), false, context));
            }
        });
    }

    private Map<String, Object> attrCollection(IEntityClass entityClass, List<CanalEntry.Column> columns) throws JsonProcessingException {
        String attrStr = BinLogParseUtils.getStringFromColumn(columns, "_sys_dynamic");
        Map<String, Object> result = attrStr.isEmpty() ? new HashMap<String, Object>() : DefaultDataProcessor.attributesToMap(attrStr);
        entityClass.selfFields().stream().filter(IEntityField::isDynamic).forEach(f -> {
            if (!result.containsKey(f.name())) {
                result.put(f.name(), null);
            } else if (f.type().equals((Object)FieldType.BOOLEAN)) {
                result.computeIfPresent(f.name(), (k, s) -> (Integer)s != 0);
            }
        });
        return result;
    }
}

