package com.xforceplus.ultraman.oqsengine.cdc.consumer.impl;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.xforceplus.ultraman.oqsengine.cdc.cdcerror.CdcErrorStorage;
import com.xforceplus.ultraman.oqsengine.cdc.cdcerror.condition.CdcErrorQueryCondition;
import com.xforceplus.ultraman.oqsengine.cdc.cdcerror.dto.ErrorType;
import com.xforceplus.ultraman.oqsengine.cdc.cdcerror.tools.CdcErrorUtils;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.tools.BinLogParseUtils;
import com.xforceplus.ultraman.oqsengine.common.id.LongIdGenerator;
import com.xforceplus.ultraman.oqsengine.metadata.MetaManager;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.dto.RawEntry;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.enums.OqsBigEntityColumns;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.metrics.CDCMetrics;
import com.xforceplus.ultraman.oqsengine.pojo.devops.CdcErrorTask;
import com.xforceplus.ultraman.oqsengine.pojo.devops.FixedStatus;
import com.xforceplus.ultraman.oqsengine.pojo.dto.entity.IEntityClass;
import com.xforceplus.ultraman.oqsengine.storage.define.OperationType;
import com.xforceplus.ultraman.oqsengine.storage.index.IndexStorage;
import com.xforceplus.ultraman.oqsengine.storage.master.MasterStorage;
import com.xforceplus.ultraman.oqsengine.storage.master.utils.OriginalEntityUtils;
import com.xforceplus.ultraman.oqsengine.storage.pojo.OriginalEntity;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xforceplus/ultraman/oqsengine/cdc/consumer/impl/SphinxSyncExecutor.class */
public class SphinxSyncExecutor implements SyncExecutor {
    final Logger logger = LoggerFactory.getLogger(SphinxSyncExecutor.class);

    @Resource(name = "indexStorage")
    private IndexStorage sphinxQLIndexStorage;

    @Resource(name = "masterStorage")
    private MasterStorage masterStorage;

    @Resource(name = "cdcErrorStorage")
    private CdcErrorStorage cdcErrorStorage;

    @Resource(name = "snowflakeIdGenerator")
    private LongIdGenerator seqNoGenerator;

    @Resource
    private MetaManager metaManager;

    @Override // com.xforceplus.ultraman.oqsengine.cdc.consumer.impl.SyncExecutor
    public int execute(Collection<RawEntry> collection, CDCMetrics cDCMetrics) throws SQLException {
        int i = 0;
        ArrayList arrayList = new ArrayList();
        RawEntry rawEntry = null;
        for (RawEntry rawEntry2 : collection) {
            if (null == rawEntry) {
                rawEntry = rawEntry2;
            }
            try {
                arrayList.add(prepareForUpdateDelete(rawEntry2.getColumns(), rawEntry2.getId(), rawEntry2.getCommitId()));
            } catch (Exception e) {
                this.logger.warn("add to storageEntityList error, message : {}", e.toString());
                formatErrorHandle(rawEntry2.getColumns(), rawEntry2.getUniKeyPrefix(), rawEntry2.getPos(), Long.valueOf(cDCMetrics.getBatchId()), e.getMessage());
            }
        }
        if (!arrayList.isEmpty()) {
            try {
                this.sphinxQLIndexStorage.saveOrDeleteOriginalEntities(arrayList);
            } catch (Exception e2) {
                OriginalEntity originalEntity = arrayList.get(0);
                if (!recordOrRecover(cDCMetrics.getBatchId(), CdcErrorUtils.uniKeyGenerate(rawEntry.getUniKeyPrefix(), rawEntry.getPos(), ErrorType.DATA_INSERT_ERROR), originalEntity.getId(), originalEntity.getEntityClass().id(), originalEntity.getVersion(), originalEntity.getOp(), originalEntity.getCommitid(), ErrorType.DATA_INSERT_ERROR, e2.getMessage(), arrayList)) {
                    throw e2;
                }
            }
            i = 0 + arrayList.size();
            syncMetrics(cDCMetrics, Math.abs(System.currentTimeMillis() - 0));
        }
        return i;
    }

    @Override // com.xforceplus.ultraman.oqsengine.cdc.consumer.impl.SyncExecutor
    public boolean formatErrorHandle(List<CanalEntry.Column> list, String str, int i, Long l, String str2) throws SQLException {
        Long valueOf = Long.valueOf(BinLogParseUtils.getLongFromColumn(list, OqsBigEntityColumns.ID, -1L));
        Long valueOf2 = Long.valueOf(BinLogParseUtils.getLongFromColumn(list, OqsBigEntityColumns.COMMITID));
        Integer valueOf3 = Integer.valueOf(BinLogParseUtils.getIntegerFromColumn(list, OqsBigEntityColumns.VERSION, -1));
        Integer valueOf4 = Integer.valueOf(BinLogParseUtils.getIntegerFromColumn(list, OqsBigEntityColumns.OP, 0));
        Long l2 = -1L;
        try {
            l2 = Long.valueOf(getEntity(list));
        } catch (Exception e) {
        }
        recordOrRecover(l.longValue(), CdcErrorUtils.uniKeyGenerate(str, i, ErrorType.DATA_FORMAT_ERROR), valueOf.longValue(), l2.longValue(), valueOf3.intValue(), valueOf4.intValue(), valueOf2.longValue(), ErrorType.DATA_FORMAT_ERROR, str2, new ArrayList());
        return true;
    }

    private boolean recordOrRecover(long j, String str, long j2, long j3, int i, int i2, long j4, ErrorType errorType, String str2, List<OriginalEntity> list) throws SQLException {
        Logger logger = this.logger;
        Object[] objArr = new Object[4];
        objArr[0] = Long.valueOf(j);
        objArr[1] = Long.valueOf(j2);
        objArr[2] = Long.valueOf(j4);
        objArr[3] = null == str2 ? "unKnow" : str2;
        logger.warn("[cdc-sync-executor] batchId : {}, cdc-consume error will be record in cdcErrors,  id : {}, commitId : {}, message : {}", objArr);
        try {
            CdcErrorQueryCondition cdcErrorQueryCondition = new CdcErrorQueryCondition();
            cdcErrorQueryCondition.setUniKey(str);
            Collection<CdcErrorTask> queryCdcErrors = this.cdcErrorStorage.queryCdcErrors(cdcErrorQueryCondition);
            if (null == queryCdcErrors || queryCdcErrors.isEmpty()) {
                this.cdcErrorStorage.buildCdcError(CdcErrorTask.buildErrorTask(((Long) this.seqNoGenerator.next()).longValue(), str, j, j2, j3, i, i2, j4, errorType.getType(), null == list ? "{}" : OriginalEntityUtils.toOriginalEntityStr(list), null == str2 ? errorType.name() : str2));
                return false;
            }
            if (!errorType.equals(ErrorType.DATA_INSERT_ERROR)) {
                return false;
            }
            CdcErrorTask next = queryCdcErrors.iterator().next();
            if (next.getStatus() != FixedStatus.SUBMIT_FIX_REQ.getStatus()) {
                return false;
            }
            try {
                this.sphinxQLIndexStorage.saveOrDeleteOriginalEntities(OriginalEntityUtils.toOriginalEntity(this.metaManager, next.getOperationObject()));
                try {
                    this.cdcErrorStorage.updateCdcError(next.getSeqNo(), FixedStatus.FIXED);
                    return true;
                } catch (Exception e) {
                    return true;
                }
            } catch (Exception e2) {
                this.logger.warn("[cdc-sync-executor] fixed error, seqNo : [{}], batchId : [{}], message : [{}]", new Object[]{Long.valueOf(next.getSeqNo()), Long.valueOf(next.getBatchId()), e2.getMessage()});
                this.cdcErrorStorage.submitRecover(next.getSeqNo(), FixedStatus.FIX_ERROR, OriginalEntityUtils.toOriginalEntityStr(list));
                return false;
            }
        } catch (Exception e3) {
            throw new SQLException(e3.getMessage());
        }
    }

    private synchronized void syncMetrics(CDCMetrics cDCMetrics, long j) {
        if (cDCMetrics.getCdcAckMetrics().getMaxSyncUseTime() < j) {
            cDCMetrics.getCdcAckMetrics().setMaxSyncUseTime(j);
        }
    }

    private long getEntity(List<CanalEntry.Column> list) throws SQLException {
        for (int ordinal = OqsBigEntityColumns.ENTITYCLASSL4.ordinal(); ordinal >= OqsBigEntityColumns.ENTITYCLASSL0.ordinal(); ordinal--) {
            Optional byOrdinal = OqsBigEntityColumns.getByOrdinal(ordinal);
            if (byOrdinal.isPresent()) {
                long longFromColumn = BinLogParseUtils.getLongFromColumn(list, (OqsBigEntityColumns) byOrdinal.get());
                if (longFromColumn > 0) {
                    return longFromColumn;
                }
            }
        }
        return -1L;
    }

    private IEntityClass getEntityClass(long j, List<CanalEntry.Column> list) throws SQLException {
        long entity = getEntity(list);
        String stringWithoutNullCheck = BinLogParseUtils.getStringWithoutNullCheck(list, OqsBigEntityColumns.PROFILE);
        if (entity <= 0) {
            return null;
        }
        Optional load = this.metaManager.load(entity, stringWithoutNullCheck);
        if (load.isPresent()) {
            return (IEntityClass) load.get();
        }
        this.logger.warn("[cdc-sync-executor] id [{}], entityClassId [{}] has no entityClass in meta.", Long.valueOf(j), Long.valueOf(entity));
        return null;
    }

    private Collection<Object> attrCollection(long j, List<CanalEntry.Column> list) throws SQLException {
        String stringFromColumn = BinLogParseUtils.getStringFromColumn(list, OqsBigEntityColumns.ATTRIBUTE);
        if (null == stringFromColumn || stringFromColumn.isEmpty()) {
            return new ArrayList();
        }
        try {
            return OriginalEntityUtils.attributesToList(stringFromColumn);
        } catch (Exception e) {
            String format = String.format("[cdc-sync-executor] id [%d], jsonToObject error, message : [%s], attrStr [%s] ", Long.valueOf(j), e.getMessage(), stringFromColumn);
            this.logger.warn(format);
            throw new SQLException(format);
        }
    }

    private OriginalEntity prepareForUpdateDelete(List<CanalEntry.Column> list, long j, long j2) throws SQLException {
        IEntityClass entityClass = getEntityClass(j, list);
        if (null == entityClass) {
            throw new SQLException(String.format("[cdc-sync-executor] id [%d], commitId [%d] has no entityClass...", Long.valueOf(j), Long.valueOf(j2)));
        }
        Collection<Object> attrCollection = attrCollection(j, list);
        if (attrCollection.isEmpty()) {
            throw new SQLException(String.format("[cdc-sync-executor] id [%d], commitId [%d] has no attributes...", Long.valueOf(j), Long.valueOf(j2)));
        }
        boolean booleanFromColumn = BinLogParseUtils.getBooleanFromColumn(list, OqsBigEntityColumns.DELETED);
        return OriginalEntity.Builder.anOriginalEntity().withId(j).withDeleted(booleanFromColumn).withOp(booleanFromColumn ? OperationType.DELETE.getValue() : OperationType.UPDATE.getValue()).withVersion(BinLogParseUtils.getIntegerFromColumn(list, OqsBigEntityColumns.VERSION)).withOqsMajor(BinLogParseUtils.getIntegerFromColumn(list, OqsBigEntityColumns.OQSMAJOR)).withCreateTime(BinLogParseUtils.getLongFromColumn(list, OqsBigEntityColumns.CREATETIME)).withUpdateTime(BinLogParseUtils.getLongFromColumn(list, OqsBigEntityColumns.UPDATETIME)).withTx(BinLogParseUtils.getLongFromColumn(list, OqsBigEntityColumns.TX)).withCommitid(j2).withEntityClass(entityClass).withAttributes(attrCollection).build();
    }
}
