/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.cdc.processor.impl;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.protobuf.ByteString;
import com.xforceplus.ultraman.cdc.adapter.CDCBeforeCallback;
import com.xforceplus.ultraman.cdc.adapter.EngineAdapterService;
import com.xforceplus.ultraman.cdc.context.ParserContext;
import com.xforceplus.ultraman.cdc.dto.ParseResult;
import com.xforceplus.ultraman.cdc.processor.DataProcessor;
import com.xforceplus.ultraman.cdc.processor.EventQueue;
import com.xforceplus.ultraman.cdc.processor.impl.BinlogToESUtils;
import com.xforceplus.ultraman.cdc.utils.BinLogParseUtils;
import com.xforceplus.ultraman.cdc.utils.ThreadPoolExecutorUtils;
import com.xforceplus.ultraman.extensions.cdc.status.StatusService;
import com.xforceplus.ultraman.metadata.cdc.OqsEngineEntity;
import com.xforceplus.ultraman.metadata.engine.EntityClassEngine;
import com.xforceplus.ultraman.metadata.entity.EntityClassRef;
import com.xforceplus.ultraman.metadata.entity.FieldType;
import com.xforceplus.ultraman.metadata.entity.IEntityClass;
import com.xforceplus.ultraman.metadata.entity.IEntityField;
import com.xforceplus.ultraman.sdk.core.event.EntityCreated;
import com.xforceplus.ultraman.sdk.core.event.EntityDeleted;
import com.xforceplus.ultraman.sdk.core.event.EntityUpdated;
import com.xforceplus.ultraman.sdk.infra.base.cdc.SystemAttachment;
import com.xforceplus.ultraman.sdk.infra.event.EventPublisher;
import com.xforceplus.ultraman.sdk.infra.utils.JacksonDefaultMapper;
import io.micrometer.core.annotation.Timed;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

public class DefaultDataProcessor
implements DataProcessor {
    final Logger logger = LoggerFactory.getLogger(DefaultDataProcessor.class);
    @Resource
    private EntityClassEngine engine;
    @Autowired(required=false)
    private StatusService statusService;
    @Autowired(required=false)
    private List<CDCBeforeCallback> cdcBeforeCallbacks = new ArrayList<CDCBeforeCallback>();
    @Resource
    private EventPublisher publisher;
    @Resource
    private EngineAdapterService engineAdapterService;
    @Resource
    private EventQueue eventQueue;
    @Resource
    private ExecutorService eventThreadPool;
    private ThreadPoolExecutor executor = ThreadPoolExecutorUtils.executor;
    private long writeTimeOut = 600L;
    private final int minThreadBatchSize = 512;
    private final int maxThreadBatchSize = 2048;

    public static Map<String, Object> attributesToMap(String attrStr) throws JsonProcessingException {
        return (Map)JacksonDefaultMapper.OBJECT_MAPPER.readValue(attrStr, Map.class);
    }

    public ExecutorService getEventThreadPool() {
        return this.eventThreadPool;
    }

    public void setEventThreadPool(ExecutorService eventThreadPool) {
        this.eventThreadPool = eventThreadPool;
    }

    public void setEngine(EntityClassEngine engine) {
        this.engine = engine;
    }

    public void setStatusService(StatusService statusService) {
        this.statusService = statusService;
    }

    public void setCdcBeforeCallbacks(List<CDCBeforeCallback> cdcBeforeCallbacks) {
        this.cdcBeforeCallbacks = cdcBeforeCallbacks;
    }

    public void setPublisher(EventPublisher publisher) {
        this.publisher = publisher;
    }

    public void setEngineAdapterService(EngineAdapterService engineAdapterService) {
        this.engineAdapterService = engineAdapterService;
    }

    public void setEventQueue(EventQueue eventQueue) {
        this.eventQueue = eventQueue;
    }

    @Override
    @Timed(value="oqs.process.delay.latency", percentiles={0.5, 0.9, 0.99}, extraTags={"initiator", "cdc", "action", "consume"})
    public boolean onProcess(Message message, int threadBatchSize) throws Exception {
        return this.messageProcess(message, threadBatchSize) > 0;
    }

    private int dynamicThread(int entries, int threadBatchSize) {
        if (entries <= threadBatchSize) {
            return 1;
        }
        return entries % threadBatchSize == 0 ? entries / threadBatchSize : entries / threadBatchSize + 1;
    }

    private int messageProcess(Message message, int threadBatchSize) throws Exception {
        try {
            if (message.getEntries().size() == 0) {
                return 0;
            }
            return BinlogToESUtils.parseBinlogMessage(message, this.engine, this.engineAdapterService);
        }
        catch (Exception e) {
            this.logger.error("batchId : {}, consume message failed, message : {}", (Object)message.getId(), (Object)e.getMessage());
            throw e;
        }
    }

    private int getThreadBatchSize(int threadBatchSize, int entriesTotal) {
        if (threadBatchSize < 512) {
            threadBatchSize = 512;
        } else if (threadBatchSize > 2048) {
            threadBatchSize = 2048;
        }
        int threadCount = entriesTotal / threadBatchSize;
        if (threadCount > 20) {
            threadCount = 20;
            threadBatchSize = entriesTotal / threadCount;
        }
        return threadBatchSize;
    }

    private void submitThreadHandler(List<CanalEntry.Entry> batchHandlerEntities, Map<Integer, Future> futures, ParserContext parserContext, int i) {
        Future<Map> future = this.executor.submit(() -> this.parseCanalEntries(batchHandlerEntities, parserContext));
        futures.put(i, future);
    }

    private Map<String, OqsEngineEntity> parseCanalEntries(List<CanalEntry.Entry> entries, ParserContext parserContext) {
        ParseResult parseResult = new ParseResult();
        for (CanalEntry.Entry entry : entries) {
            switch (entry.getEntryType()) {
                case TRANSACTIONBEGIN: 
                case TRANSACTIONEND: {
                    this.eventHandler(entry.getEntryType());
                    break;
                }
                case ROWDATA: {
                    this.rowDataParse(entry, parserContext, parseResult);
                    break;
                }
            }
        }
        if (!parseResult.getFinishEntries().isEmpty()) {
            Optional.ofNullable(this.cdcBeforeCallbacks).orElseGet(Collections::emptyList).forEach(x -> {
                try {
                    x.mutate(this.toMutateEntities(parseResult.getFinishEntries()));
                }
                catch (Throwable throwable) {
                    this.logger.error("CDC callback ERROR name:{} , ex:{}", (Object)x.name(), (Object)throwable);
                }
            });
        }
        return parseResult.getFinishEntries();
    }

    private OqsEngineEntity cloneEntity(OqsEngineEntity src) {
        OqsEngineEntity target = new OqsEngineEntity();
        target.setEntityClassRef(src.getEntityClassRef());
        target.setId(src.getId());
        target.setVersion(src.getVersion());
        Map attributes = src.getAttributes();
        try {
            String attr = JacksonDefaultMapper.OBJECT_MAPPER.writeValueAsString((Object)attributes);
            Map newAttr = (Map)JacksonDefaultMapper.OBJECT_MAPPER.readValue(attr, Map.class);
            target.setAttributes(newAttr);
        }
        catch (Throwable throwable) {
            this.logger.error("{}", throwable);
        }
        target.setUpdateTime(src.getUpdateTime());
        target.setFather(src.getFather());
        target.setDeleted(src.isDeleted());
        return target;
    }

    private void rowDataParse(CanalEntry.Entry entry, ParserContext parserContext, ParseResult parseResult) {
        String tableName = entry.getHeader().getTableName();
        if (tableName.isEmpty()) {
            this.logger.error("batch : {}, table name could not be Null, [{}]", (Object)parserContext.getBatchId(), (Object)entry.getStoreValue());
            return;
        }
        Optional<IEntityClass> entityClassOp = this.foundEntityClassFromTableName(tableName);
        if (!entityClassOp.isPresent()) {
            this.logger.error("batch : {}, entityClass could not be Null, [{}]", (Object)parserContext.getBatchId(), (Object)entry.getStoreValue());
            return;
        }
        CanalEntry.RowChange rowChange = null;
        try {
            rowChange = CanalEntry.RowChange.parseFrom((ByteString)entry.getStoreValue());
        }
        catch (Exception e) {
            this.logger.error("batch : {}, parse entry value failed, [{}], [{}]", new Object[]{parserContext.getBatchId(), entry.getStoreValue(), e.getMessage()});
            return;
        }
        CanalEntry.EventType eventType = rowChange.getEventType();
        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
            HashMap<String, CanalEntry.Column> beforeColumnsMap = new HashMap<String, CanalEntry.Column>();
            rowData.getBeforeColumnsList().forEach(column -> beforeColumnsMap.put(column.getName(), (CanalEntry.Column)column));
            HashMap<String, CanalEntry.Column> afterColumnsMap = new HashMap<String, CanalEntry.Column>();
            rowData.getAfterColumnsList().forEach(column -> afterColumnsMap.put(column.getName(), (CanalEntry.Column)column));
            OqsEngineEntity beforeEntity = null;
            OqsEngineEntity afterEntity = null;
            SystemAttachment systemAttachment = null;
            try {
                switch (eventType) {
                    case INSERT: {
                        afterEntity = this.oneRowParser(afterColumnsMap, false, entityClassOp.get(), parseResult);
                        systemAttachment = this.getAttachment(afterColumnsMap);
                        this.mergeEntityToResult(parseResult, afterEntity);
                        break;
                    }
                    case UPDATE: {
                        try {
                            beforeEntity = this.oneRowParser(beforeColumnsMap, false, entityClassOp.get(), parseResult);
                        }
                        catch (Exception e) {
                            this.logger.warn("beforeEntity in update is null, will be ignore, but it will influence event");
                        }
                        afterEntity = this.oneRowParser(afterColumnsMap, false, entityClassOp.get(), parseResult);
                        systemAttachment = this.getAttachment(afterColumnsMap);
                        this.mergeEntityToResult(parseResult, afterEntity);
                        break;
                    }
                    case DELETE: {
                        beforeEntity = this.oneRowParser(beforeColumnsMap, true, entityClassOp.get(), parseResult);
                        systemAttachment = this.getAttachment(beforeColumnsMap);
                        this.mergeEntityToResult(parseResult, beforeEntity);
                    }
                }
                OqsEngineEntity beforeClone = null;
                OqsEngineEntity afterClone = null;
                if (beforeEntity != null) {
                    beforeClone = this.cloneEntity(beforeEntity);
                }
                if (afterEntity != null) {
                    afterClone = this.cloneEntity(afterEntity);
                }
                if (systemAttachment == null) {
                    systemAttachment = new SystemAttachment();
                }
                this.eventHandler(eventType, systemAttachment, entityClassOp.get(), beforeClone, afterClone);
            }
            catch (Exception e) {
                this.logger.warn("parse entity error, message : {}", (Object)e.getMessage());
            }
        }
    }

    private SystemAttachment getAttachment(Map<String, CanalEntry.Column> columns) {
        String attrStr = BinLogParseUtils.getStringFromColumn(columns, "_sys_dynamic");
        SystemAttachment attachment = new SystemAttachment();
        try {
            if (attrStr.isEmpty()) {
                return attachment;
            }
            Map<String, Object> map = DefaultDataProcessor.attributesToMap(attrStr);
            Object o = map.get("#a");
            if (o != null) {
                Object delUname;
                Object delUid;
                Map attachmentMap = (Map)o;
                Object grouped = attachmentMap.get("grouped");
                if (grouped != null) {
                    attachment.setGrouped(Integer.parseInt(grouped.toString()));
                }
                if ((delUid = attachmentMap.get("deluid")) != null) {
                    attachment.setDelUId(Long.parseLong(delUid.toString()));
                }
                if ((delUname = attachmentMap.get("deluname")) != null) {
                    attachment.setDelUname(delUname.toString());
                }
            }
            return attachment;
        }
        catch (Throwable throwable) {
            return attachment;
        }
    }

    private Optional<IEntityClass> foundEntityClassFromTableName(String tableName) {
        String[] tableSplit = tableName.split("_");
        if (tableSplit.length < 3) {
            return Optional.empty();
        }
        String code = tableSplit[2];
        String profile = null;
        if (tableSplit.length > 3) {
            profile = tableSplit[3];
        }
        return this.engine.loadByCode(code, profile);
    }

    private List<OqsEngineEntity> toMutateEntities(Map<String, OqsEngineEntity> entityMap) {
        HashMap mutates = new HashMap();
        entityMap.forEach((k, v) -> {
            OqsEngineEntity engineEntity = (OqsEngineEntity)mutates.get(v.getId());
            if (null != engineEntity) {
                engineEntity.getAttributes().putAll(v.getAttributes());
                if (v.isDeleted()) {
                    engineEntity.setDeleted(true);
                    if (v.getFather() > 0L) {
                        engineEntity.setFather(v.getFather());
                    }
                }
                engineEntity.setUpdateTime(v.getUpdateTime());
            } else {
                mutates.put(v.getId(), v);
            }
        });
        return new ArrayList<OqsEngineEntity>(mutates.values());
    }

    private void mergeEntityToResult(ParseResult parseResult, OqsEngineEntity entity) {
        String key = entity.getId() + "@@" + entity.getTable();
        OqsEngineEntity target = parseResult.getFinishEntries().get(key);
        if (null == target) {
            parseResult.getFinishEntries().put(key, entity);
            return;
        }
        target.getAttributes().putAll(entity.getAttributes());
        if (entity.isDeleted()) {
            target.setDeleted(true);
        }
        if (entity.getFather() > 0L && target.getFather() == 0L) {
            target.setFather(entity.getFather());
        }
        target.setUpdateTime(entity.getUpdateTime());
    }

    private OqsEngineEntity oneRowParser(Map<String, CanalEntry.Column> columns, boolean isDelete, IEntityClass tableEntityClass, ParseResult parseResult) throws SQLException, JsonProcessingException {
        long entityClass = BinLogParseUtils.getLongFromColumn(columns, "_sys_entityclass");
        String profile = BinLogParseUtils.getStringFromColumn(columns, "_sys_profile");
        long id = BinLogParseUtils.getLongFromColumn(columns, "id");
        IEntityClass iEntityClass = tableEntityClass;
        OqsEngineEntity.Builder builder = new OqsEngineEntity.Builder();
        builder.withDeleted(isDelete);
        builder.withEntityClassRef(new EntityClassRef(entityClass, "", "", profile));
        builder.withId(id);
        if (entityClass != tableEntityClass.id()) {
            builder.withFather(tableEntityClass.id());
        } else {
            IEntityClass ec = tableEntityClass.extendEntityClass();
            if (null != ec) {
                builder.withFather(ec.id());
            } else {
                builder.withFather(0L);
            }
        }
        builder.withAttribute("id", (Object)id);
        HashMap iEntityFieldMap = new HashMap();
        this.engine.describe(iEntityClass, profile).getAllFields().stream().forEach(p -> iEntityFieldMap.put(p.name().replace(".", "_"), p));
        for (CanalEntry.Column column : columns.values()) {
            if (column.getName().equals("_sys_operatetime")) {
                long updateTime = Long.parseLong(column.getValue());
                builder.withUpdateTime(updateTime);
                continue;
            }
            if (column.getName().equals("_sys_ver")) {
                if (column.getValue().isEmpty()) continue;
                int version = Integer.parseInt(column.getValue());
                builder.withVersion(version);
                continue;
            }
            if (column.getName().equals("id") || column.getName().equals("_sys_entityclass") || column.getName().equals("_sys_profile") || column.getName().equals("_sys_deleted")) continue;
            if (column.getName().equals("_sys_dynamic")) {
                builder.withAttributes(this.attrCollection(iEntityClass, columns));
                continue;
            }
            String name = column.getName();
            String value = column.getValue();
            IEntityField field = (IEntityField)iEntityFieldMap.get(name);
            if (null == field) {
                this.logger.warn("entityField can not be null, column name {}", (Object)name);
                continue;
            }
            builder.withAttribute(name, (Object)value);
        }
        if (parseResult.getStartId() == -1L) {
            parseResult.setStartId(id);
        }
        return builder.build();
    }

    private void eventHandler(CanalEntry.EntryType entryType) {
    }

    private void eventHandler(CanalEntry.EventType eventType, SystemAttachment attachment, IEntityClass entityClass, OqsEngineEntity before, OqsEngineEntity after) {
        switch (eventType) {
            case INSERT: {
                this.publishCreatedEvent(entityClass, attachment, after);
                break;
            }
            case UPDATE: {
                if (attachment.getDelUname() != null && attachment.getDelUId() > 0L) break;
                this.publishUpdatedEvent(entityClass, attachment, before, after);
                break;
            }
            case DELETE: {
                this.publishDeletedEvent(entityClass, attachment, before);
                break;
            }
        }
    }

    private Map<String, Object> attrCollection(IEntityClass entityClass, Map<String, CanalEntry.Column> columns) throws JsonProcessingException {
        String attrStr = BinLogParseUtils.getStringFromColumn(columns, "_sys_dynamic");
        Map<String, Object> result = attrStr.isEmpty() ? new HashMap<String, Object>() : DefaultDataProcessor.attributesToMap(attrStr);
        entityClass.selfFields().stream().filter(IEntityField::isDynamic).forEach(f -> {
            if (!result.containsKey(f.name())) {
                result.put(f.name(), null);
            } else if (f.type().equals((Object)FieldType.BOOLEAN)) {
                result.computeIfPresent(f.name(), (k, s) -> (Integer)s != 0);
            }
        });
        return result;
    }

    private void fillAttachment(SystemAttachment systemAttachment, Map<String, Object> context) {
        context.put("attachment", systemAttachment);
    }

    private void publishDeletedEvent(IEntityClass entityClass, SystemAttachment attachment, OqsEngineEntity b) {
        this.eventQueue.feedDelete(attachment, b).thenAcceptOnce(x -> {
            if (this.publisher != null) {
                HashMap<String, Object> context = new HashMap<String, Object>();
                this.fillAttachment(attachment, context);
                this.publisher.publishTransactionEvent((Object)new EntityDeleted(entityClass.code(), Long.valueOf(x.getId()), x.getAttributes(), false, context));
            }
        }, this.eventThreadPool);
    }

    private void publishUpdatedEvent(IEntityClass entityClass, SystemAttachment attachment, OqsEngineEntity b, OqsEngineEntity a) {
        this.eventQueue.feedUpdate(attachment, b, a).thenAcceptOnce(x -> {
            if (this.publisher != null) {
                OqsEngineEntity before = (OqsEngineEntity)x._1;
                OqsEngineEntity after = (OqsEngineEntity)x._2;
                HashMap<String, Object> context = new HashMap<String, Object>();
                this.fillAttachment(attachment, context);
                EntityUpdated entityUpdated = new EntityUpdated(entityClass.code(), Long.valueOf(before.getId()), before.getAttributes(), after.getAttributes(), false, context);
                int version = after.getVersion();
                entityUpdated.setVer(version);
                this.publisher.publishTransactionEvent((Object)entityUpdated);
            }
        }, this.eventThreadPool);
    }

    private void publishCreatedEvent(IEntityClass entityClass, SystemAttachment attachment, OqsEngineEntity after) {
        this.eventQueue.feedCreate(attachment, after).thenAcceptOnce(x -> {
            if (this.publisher != null) {
                HashMap<String, Object> context = new HashMap<String, Object>();
                this.fillAttachment(attachment, context);
                this.publisher.publishTransactionEvent((Object)new EntityCreated(entityClass.code(), Long.valueOf(x.getId()), x.getAttributes(), false, context));
            }
        }, this.eventThreadPool);
    }
}

