/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.oqsengine.cdc.consumer.parser;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.dto.ParseResult;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.parser.BinLogParser;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.tools.BinLogParseUtils;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.tools.CommonUtils;
import com.xforceplus.ultraman.oqsengine.cdc.context.ParserContext;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.enums.OqsBigEntityColumns;
import com.xforceplus.ultraman.oqsengine.pojo.define.OperationType;
import com.xforceplus.ultraman.oqsengine.pojo.dto.entity.EntityClassRef;
import com.xforceplus.ultraman.oqsengine.pojo.dto.entity.IEntityClass;
import com.xforceplus.ultraman.oqsengine.storage.master.utils.OriginalEntityUtils;
import com.xforceplus.ultraman.oqsengine.storage.pojo.OqsEngineEntity;
import com.xforceplus.ultraman.oqsengine.storage.transaction.commit.CommitHelper;
import io.vavr.Tuple3;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DynamicBinLogParser
implements BinLogParser {
    final Logger logger = LoggerFactory.getLogger(DynamicBinLogParser.class);

    @Override
    public void parser(ParserContext parserContext, ParseResult parseResult) {
        for (Map.Entry<Long, Tuple3<Long, Boolean, List<CanalEntry.Column>>> entry : parserContext.getParseMiddleResult().entrySet()) {
            block3: {
                long commitId = (Long)entry.getValue()._1();
                boolean isDelete = (Boolean)entry.getValue()._2();
                List columns = (List)entry.getValue()._3();
                long id = entry.getKey();
                try {
                    commitId = BinLogParseUtils.getLongFromColumn(columns, OqsBigEntityColumns.COMMITID);
                    IEntityClass entityClass = this.getEntityClass(id, columns, parserContext);
                    OqsEngineEntity oqsEngineEntity = this.toOriginalEntity(entityClass, id, commitId, columns, isDelete);
                    parseResult.getFinishEntries().put(id, oqsEngineEntity);
                }
                catch (Exception e) {
                    if (commitId == CommitHelper.getUncommitId()) break block3;
                    parserContext.getCdcMetrics().getCdcUnCommitMetrics().getUnCommitIds().add(commitId);
                    parseResult.addError(id, commitId, String.format("batch : %d, pos : %d, parser columns failed, message : %s", parserContext.getCdcMetrics().getBatchId(), parseResult.getPos(), e.getMessage()));
                }
            }
            parseResult.finishOne();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void merge(List<CanalEntry.Column> columns, boolean isPhysicalDelete, ParserContext parserContext, ParseResult parseResult) {
        long id = -1L;
        long commitId = -1L;
        try {
            commitId = BinLogParseUtils.getLongFromColumn(columns, OqsBigEntityColumns.COMMITID);
            if (!isPhysicalDelete) {
                switch (parserContext.getConsumerType()) {
                    case DATA_MIGRATION: {
                        if (commitId == CommitHelper.getMaintainCommitId()) break;
                        return;
                    }
                    case EXCLUDE_MIGRATION: {
                        if (commitId != CommitHelper.getMaintainCommitId()) break;
                        return;
                    }
                }
            }
            id = BinLogParseUtils.getLongFromColumn(columns, OqsBigEntityColumns.ID);
            if (parseResult.getStartId() == -1L) {
                parseResult.setStartId(id);
            }
            parserContext.getParseMiddleResult().put(id, (Tuple3<Long, Boolean, List<CanalEntry.Column>>)new Tuple3((Object)commitId, (Object)isPhysicalDelete, columns));
        }
        catch (Exception e) {
            if (commitId != CommitHelper.getUncommitId()) {
                if (commitId > CommitHelper.getMaintainCommitId()) {
                    parserContext.getCdcMetrics().getCdcUnCommitMetrics().getUnCommitIds().add(commitId);
                }
                parseResult.addError(id, commitId, String.format("batch : %d, pos : %d, merge columns failed, message : %s", parserContext.getCdcMetrics().getBatchId(), parserContext.currentCheckPos(), e.getMessage()));
            }
        }
        finally {
            parserContext.incrementCurrentCheckPos();
        }
        this.addToReadyChecks(commitId, parserContext, parseResult);
    }

    private OqsEngineEntity toOriginalEntity(IEntityClass entityClass, long id, long commitId, List<CanalEntry.Column> columns, boolean isPhysicalDelete) throws SQLException {
        OqsEngineEntity.Builder builder = OqsEngineEntity.Builder.anOriginalEntity();
        builder.withId(id);
        builder.withEntityClass(entityClass);
        builder.withEntityClassRef(entityClass.ref());
        Map<String, Object> attributes = this.attrCollection(id, columns);
        if (attributes.isEmpty()) {
            throw new SQLException(String.format("[dynamic-binlog-parser] id [%d], commitId [%d] has no attributes...", id, commitId));
        }
        builder.withAttributes(attributes);
        boolean isDelete = isPhysicalDelete || BinLogParseUtils.getBooleanFromColumn(columns, OqsBigEntityColumns.DELETED);
        builder.withDeleted(isDelete);
        builder.withOp(isDelete ? OperationType.DELETE.getValue() : OperationType.UPDATE.getValue());
        long txId = BinLogParseUtils.getLongFromColumn(columns, OqsBigEntityColumns.TX);
        builder.withTx(txId);
        builder.withCommitid(commitId);
        builder.withVersion(BinLogParseUtils.getIntegerFromColumn(columns, OqsBigEntityColumns.VERSION)).withOqsMajor(BinLogParseUtils.getIntegerFromColumn(columns, OqsBigEntityColumns.OQSMAJOR)).withCreateTime(BinLogParseUtils.getLongFromColumn(columns, OqsBigEntityColumns.CREATETIME)).withUpdateTime(BinLogParseUtils.getLongFromColumn(columns, OqsBigEntityColumns.UPDATETIME));
        return builder.build();
    }

    private Map<String, Object> attrCollection(long id, List<CanalEntry.Column> columns) throws SQLException {
        String attrStr = BinLogParseUtils.getStringFromColumn(columns, OqsBigEntityColumns.ATTRIBUTE);
        if (attrStr.isEmpty()) {
            return Collections.emptyMap();
        }
        try {
            return OriginalEntityUtils.attributesToMap((String)attrStr);
        }
        catch (Exception e) {
            String error = String.format("[dynamic-binlog-parser] id : %d, jsonToObject error, message : %s, attrStr %s.", id, e.getMessage(), attrStr);
            this.logger.warn(error);
            throw new SQLException(error);
        }
    }

    private IEntityClass getEntityClass(long id, List<CanalEntry.Column> columns, ParserContext parserContext) throws SQLException {
        long entityClassId = this.entityClassId(columns);
        if (entityClassId < 0L) {
            throw new SQLException(String.format("[dynamic-binlog-parser] id : %d has no entityClass...", id));
        }
        String profile = BinLogParseUtils.getStringWithoutNullCheck(columns, OqsBigEntityColumns.PROFILE);
        return CommonUtils.getEntityClass(new EntityClassRef(entityClassId, "", profile), parserContext);
    }

    private long entityClassId(List<CanalEntry.Column> columns) {
        for (int o = OqsBigEntityColumns.ENTITYCLASSL4.ordinal(); o >= OqsBigEntityColumns.ENTITYCLASSL0.ordinal(); --o) {
            long entity;
            Optional op = OqsBigEntityColumns.getByOrdinal((int)o);
            if (!op.isPresent() || (entity = BinLogParseUtils.getLongFromColumn(columns, (OqsBigEntityColumns)op.get())) <= 0L) continue;
            return entity;
        }
        return -1L;
    }

    private void addToReadyChecks(long commitId, ParserContext parserContext, ParseResult parseResult) {
        if (commitId != CommitHelper.getUncommitId()) {
            if (commitId == 0L) {
                return;
            }
            if (commitId > CommitHelper.getMaintainCommitId()) {
                if (parserContext.isCheckCommitReady() && (commitId > parserContext.getSkipCommitId() || parserContext.getSkipCommitId() <= 0L)) {
                    parseResult.isReadyCommitIds().add(commitId);
                }
                parserContext.getCdcMetrics().getCdcUnCommitMetrics().getUnCommitIds().add(commitId);
            }
        }
    }
}

