/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.oqsengine.cdc.consumer.impl;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.xforceplus.ultraman.oqsengine.cdc.cdcerror.CdcErrorStorage;
import com.xforceplus.ultraman.oqsengine.cdc.cdcerror.condition.CdcErrorQueryCondition;
import com.xforceplus.ultraman.oqsengine.cdc.cdcerror.dto.ErrorType;
import com.xforceplus.ultraman.oqsengine.cdc.cdcerror.tools.CdcErrorUtils;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.impl.SyncExecutor;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.tools.BinLogParseUtils;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.tools.CommonUtils;
import com.xforceplus.ultraman.oqsengine.common.id.LongIdGenerator;
import com.xforceplus.ultraman.oqsengine.metadata.MetaManager;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.dto.RawEntry;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.enums.OqsBigEntityColumns;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.metrics.CDCMetrics;
import com.xforceplus.ultraman.oqsengine.pojo.devops.CdcErrorTask;
import com.xforceplus.ultraman.oqsengine.pojo.devops.DevOpsCdcMetrics;
import com.xforceplus.ultraman.oqsengine.pojo.devops.FixedStatus;
import com.xforceplus.ultraman.oqsengine.pojo.dto.entity.IEntityClass;
import com.xforceplus.ultraman.oqsengine.storage.define.OperationType;
import com.xforceplus.ultraman.oqsengine.storage.index.IndexStorage;
import com.xforceplus.ultraman.oqsengine.storage.master.MasterStorage;
import com.xforceplus.ultraman.oqsengine.storage.master.utils.OriginalEntityUtils;
import com.xforceplus.ultraman.oqsengine.storage.pojo.OriginalEntity;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SphinxSyncExecutor
implements SyncExecutor {
    final Logger logger = LoggerFactory.getLogger(SphinxSyncExecutor.class);
    @Resource(name="indexStorage")
    private IndexStorage sphinxQLIndexStorage;
    @Resource(name="masterStorage")
    private MasterStorage masterStorage;
    @Resource(name="cdcErrorStorage")
    private CdcErrorStorage cdcErrorStorage;
    @Resource(name="longNoContinuousPartialOrderIdGenerator")
    private LongIdGenerator seqNoGenerator;
    @Resource
    private MetaManager metaManager;

    @Override
    public int execute(Collection<RawEntry> rawEntries, CDCMetrics cdcMetrics) throws SQLException {
        int synced = 0;
        ArrayList<OriginalEntity> storageEntityList = new ArrayList<OriginalEntity>();
        long startTime = 0L;
        RawEntry start = null;
        HashMap<String, IEntityClass> entityClassMap = new HashMap<String, IEntityClass>();
        for (RawEntry rawEntry : rawEntries) {
            if (null == start) {
                start = rawEntry;
            }
            Long txId = null;
            boolean success = false;
            try {
                txId = BinLogParseUtils.getLongFromColumn(rawEntry.getColumns(), OqsBigEntityColumns.TX);
                OriginalEntity entity = this.prepareForUpdateDelete(rawEntry.getColumns(), rawEntry.getId(), rawEntry.getCommitId(), entityClassMap);
                storageEntityList.add(entity);
                success = true;
            }
            catch (Exception e) {
                this.logger.warn("add to storageEntityList error, message : {}", (Object)e.toString());
                this.formatErrorHandle(rawEntry.getColumns(), rawEntry.getUniKeyPrefix(), rawEntry.getPos(), cdcMetrics.getBatchId(), e.getMessage());
            }
            if (null == txId || !CommonUtils.isMaintainRecord(rawEntry.getCommitId())) continue;
            cdcMetrics.getDevOpsMetrics().computeIfAbsent(txId, f -> new DevOpsCdcMetrics()).incrementByStatus(success);
        }
        if (!storageEntityList.isEmpty()) {
            block9: {
                try {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("[cdc-sync-executor] Prepare to persist {} objects to the index.", (Object)storageEntityList.size());
                    }
                    this.sphinxQLIndexStorage.saveOrDeleteOriginalEntities(storageEntityList);
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("[cdc-sync-executor] Succeeded in persisting {} objects to the index.", (Object)storageEntityList.size());
                    }
                }
                catch (Exception e) {
                    this.logger.error(e.getMessage(), (Throwable)e);
                    cdcMetrics.getDevOpsMetrics().forEach((k, v) -> v.allFails());
                    OriginalEntity originalEntity = (OriginalEntity)storageEntityList.get(0);
                    String uniKey = CdcErrorUtils.uniKeyGenerate(start.getUniKeyPrefix(), start.getPos(), ErrorType.DATA_INSERT_ERROR);
                    if (this.recordOrRecover(cdcMetrics.getBatchId(), uniKey, originalEntity.getId(), originalEntity.getEntityClass().id(), originalEntity.getVersion(), originalEntity.getOp(), originalEntity.getCommitid(), ErrorType.DATA_INSERT_ERROR, e.getMessage(), storageEntityList)) break block9;
                    throw e;
                }
            }
            synced += storageEntityList.size();
            this.syncMetrics(cdcMetrics, Math.abs(System.currentTimeMillis() - startTime));
        }
        return synced;
    }

    @Override
    public boolean formatErrorHandle(List<CanalEntry.Column> columns, String uniKeyPrefix, int pos, Long batchId, String message) throws SQLException {
        Long id = BinLogParseUtils.getLongFromColumn(columns, OqsBigEntityColumns.ID, -1L);
        Long commitId = BinLogParseUtils.getLongFromColumn(columns, OqsBigEntityColumns.COMMITID);
        Integer version = BinLogParseUtils.getIntegerFromColumn(columns, OqsBigEntityColumns.VERSION, -1);
        Integer op = BinLogParseUtils.getIntegerFromColumn(columns, OqsBigEntityColumns.OP, 0);
        Long entity = -1L;
        try {
            entity = this.getEntity(columns);
        }
        catch (Exception exception) {
            // empty catch block
        }
        String uniKey = CdcErrorUtils.uniKeyGenerate(uniKeyPrefix, pos, ErrorType.DATA_FORMAT_ERROR);
        this.recordOrRecover(batchId, uniKey, id, entity, version, op, commitId, ErrorType.DATA_FORMAT_ERROR, message, new ArrayList<OriginalEntity>());
        return true;
    }

    private boolean recordOrRecover(long batchId, String uniKey, long id, long entity, int version, int op, long commitId, ErrorType errorType, String message, List<OriginalEntity> entities) throws SQLException {
        this.logger.warn("[cdc-sync-executor] batchId : {}, cdc-consume error will be record in cdcErrors,  id : {}, commitId : {}, message : {}", new Object[]{batchId, id, commitId, null == message ? "unKnow" : message});
        try {
            CdcErrorTask cdcErrorTask;
            CdcErrorQueryCondition cdcErrorQueryCondition = new CdcErrorQueryCondition();
            cdcErrorQueryCondition.setUniKey(uniKey);
            Collection<CdcErrorTask> errorTasks = this.cdcErrorStorage.queryCdcErrors(cdcErrorQueryCondition);
            if (null == errorTasks || errorTasks.isEmpty()) {
                this.cdcErrorStorage.buildCdcError(CdcErrorTask.buildErrorTask((long)((Long)this.seqNoGenerator.next()), (String)uniKey, (long)batchId, (long)id, (long)entity, (int)version, (int)op, (long)commitId, (int)errorType.getType(), (String)(null == entities ? "{}" : OriginalEntityUtils.toOriginalEntityStr(entities)), (String)(null == message ? errorType.name() : message)));
                return false;
            }
            if (errorType.equals((Object)ErrorType.DATA_INSERT_ERROR) && (cdcErrorTask = errorTasks.iterator().next()).getStatus() == FixedStatus.SUBMIT_FIX_REQ.getStatus()) {
                try {
                    List originalEntities = OriginalEntityUtils.toOriginalEntity((MetaManager)this.metaManager, (String)cdcErrorTask.getOperationObject());
                    this.sphinxQLIndexStorage.saveOrDeleteOriginalEntities((Collection)originalEntities);
                }
                catch (Exception e) {
                    this.logger.warn("[cdc-sync-executor] fixed error, seqNo : [{}], batchId : [{}], message : [{}]", new Object[]{cdcErrorTask.getSeqNo(), cdcErrorTask.getBatchId(), e.getMessage()});
                    this.cdcErrorStorage.submitRecover(cdcErrorTask.getSeqNo(), FixedStatus.FIX_ERROR, OriginalEntityUtils.toOriginalEntityStr(entities));
                    return false;
                }
                try {
                    this.cdcErrorStorage.updateCdcError(cdcErrorTask.getSeqNo(), FixedStatus.FIXED);
                }
                catch (Exception exception) {
                    // empty catch block
                }
                return true;
            }
            return false;
        }
        catch (Exception e) {
            throw new SQLException(e.getMessage());
        }
    }

    private synchronized void syncMetrics(CDCMetrics cdcMetrics, long useTime) {
        if (cdcMetrics.getCdcAckMetrics().getMaxSyncUseTime() < useTime) {
            cdcMetrics.getCdcAckMetrics().setMaxSyncUseTime(useTime);
        }
    }

    private long getEntity(List<CanalEntry.Column> columns) throws SQLException {
        for (int o = OqsBigEntityColumns.ENTITYCLASSL4.ordinal(); o >= OqsBigEntityColumns.ENTITYCLASSL0.ordinal(); --o) {
            long entity;
            Optional op = OqsBigEntityColumns.getByOrdinal((int)o);
            if (!op.isPresent() || (entity = BinLogParseUtils.getLongFromColumn(columns, (OqsBigEntityColumns)op.get())) <= 0L) continue;
            return entity;
        }
        return -1L;
    }

    private IEntityClass getEntityClass(long id, Map<String, IEntityClass> entityClassMap, List<CanalEntry.Column> columns) throws SQLException {
        long entityId = this.getEntity(columns);
        if (entityId > 0L) {
            String profile = BinLogParseUtils.getStringWithoutNullCheck(columns, OqsBigEntityColumns.PROFILE);
            String key = this.toClassKeyWithProfile(id, profile);
            IEntityClass entityClass = entityClassMap.get(key);
            if (null != entityClass) {
                return entityClass;
            }
            Optional entityClassOptional = this.metaManager.load(entityId, profile);
            if (entityClassOptional.isPresent()) {
                IEntityClass finalClass = (IEntityClass)entityClassOptional.get();
                entityClassMap.put(key, finalClass);
                return finalClass;
            }
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("[cdc-sync-executor] id [{}], entityClassId [{}] has no entityClass in meta.", (Object)id, (Object)entityId);
            }
        }
        return null;
    }

    private Map<String, Object> attrCollection(long id, List<CanalEntry.Column> columns) throws SQLException {
        String attrStr = BinLogParseUtils.getStringFromColumn(columns, OqsBigEntityColumns.ATTRIBUTE);
        if (null == attrStr || attrStr.isEmpty()) {
            return Collections.emptyMap();
        }
        try {
            return OriginalEntityUtils.attributesToMap((String)attrStr);
        }
        catch (Exception e) {
            String error = String.format("[cdc-sync-executor] id [%d], jsonToObject error, message : [%s], attrStr [%s] ", id, e.getMessage(), attrStr);
            this.logger.warn(error);
            throw new SQLException(error);
        }
    }

    private OriginalEntity prepareForUpdateDelete(List<CanalEntry.Column> columns, long id, long commitId, Map<String, IEntityClass> entityClassMap) throws SQLException {
        IEntityClass entityClass = this.getEntityClass(id, entityClassMap, columns);
        if (null == entityClass) {
            throw new SQLException(String.format("[cdc-sync-executor] id [%d], commitId [%d] has no entityClass...", id, commitId));
        }
        Map<String, Object> attributes = this.attrCollection(id, columns);
        if (attributes.isEmpty()) {
            throw new SQLException(String.format("[cdc-sync-executor] id [%d], commitId [%d] has no attributes...", id, commitId));
        }
        boolean isDelete = BinLogParseUtils.getBooleanFromColumn(columns, OqsBigEntityColumns.DELETED);
        long tx = BinLogParseUtils.getLongFromColumn(columns, OqsBigEntityColumns.TX);
        OriginalEntity.Builder builder = OriginalEntity.Builder.anOriginalEntity().withId(id).withDeleted(isDelete).withOp(isDelete ? OperationType.DELETE.getValue() : OperationType.UPDATE.getValue()).withVersion(BinLogParseUtils.getIntegerFromColumn(columns, OqsBigEntityColumns.VERSION)).withOqsMajor(BinLogParseUtils.getIntegerFromColumn(columns, OqsBigEntityColumns.OQSMAJOR)).withCreateTime(BinLogParseUtils.getLongFromColumn(columns, OqsBigEntityColumns.CREATETIME)).withUpdateTime(BinLogParseUtils.getLongFromColumn(columns, OqsBigEntityColumns.UPDATETIME)).withTx(tx).withCommitid(commitId).withEntityClass(entityClass).withAttributes(attributes);
        if (CommonUtils.isMaintainRecord(commitId)) {
            builder.withMaintainid(tx);
        }
        return builder.build();
    }

    private String toClassKeyWithProfile(long id, String profile) {
        return id + "_" + (null == profile ? "" : profile);
    }
}

