package com.xforceplus.ultraman.oqsengine.cdc.consumer.impl;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.tools.BinLogParseUtils;
import com.xforceplus.ultraman.oqsengine.common.error.CommonErrors;
import com.xforceplus.ultraman.oqsengine.common.id.LongIdGenerator;
import com.xforceplus.ultraman.oqsengine.devops.cdcerror.CdcErrorStorage;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.dto.RawEntry;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.enums.OqsBigEntityColumns;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.metrics.CDCMetrics;
import com.xforceplus.ultraman.oqsengine.pojo.devops.CdcErrorTask;
import com.xforceplus.ultraman.oqsengine.pojo.dto.entity.IEntityValue;
import com.xforceplus.ultraman.oqsengine.storage.index.IndexStorage;
import com.xforceplus.ultraman.oqsengine.storage.index.sphinxql.command.StorageEntity;
import com.xforceplus.ultraman.oqsengine.storage.master.MasterStorage;
import com.xforceplus.ultraman.oqsengine.storage.master.utils.EntityFieldBuildUtils;
import com.xforceplus.ultraman.oqsengine.storage.utils.IEntityValueBuilder;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xforceplus/ultraman/oqsengine/cdc/consumer/impl/SphinxSyncExecutor.class */
public class SphinxSyncExecutor implements SyncExecutor {
    final Logger logger = LoggerFactory.getLogger(SphinxSyncExecutor.class);

    @Resource(name = "indexStorage")
    private IndexStorage sphinxQLIndexStorage;

    @Resource(name = "masterStorage")
    private MasterStorage masterStorage;

    @Resource(name = "cdcErrorStorage")
    private CdcErrorStorage cdcErrorStorage;

    @Resource(name = "entityValueBuilder")
    private IEntityValueBuilder<String> entityValueBuilder;

    @Resource(name = "snowflakeIdGenerator")
    private LongIdGenerator seqNoGenerator;

    @Override // com.xforceplus.ultraman.oqsengine.cdc.consumer.impl.SyncExecutor
    public int execute(Collection<RawEntry> collection, CDCMetrics cDCMetrics) throws SQLException {
        int i = 0;
        ArrayList arrayList = new ArrayList();
        long j = 0;
        for (RawEntry rawEntry : collection) {
            try {
                if (BinLogParseUtils.getBooleanFromColumn(rawEntry.getColumns(), OqsBigEntityColumns.DELETED)) {
                    i += doDelete(rawEntry.getId(), rawEntry.getCommitId());
                    syncMetrics(cDCMetrics, Math.abs(System.currentTimeMillis() - rawEntry.getExecuteTime()));
                } else {
                    if (rawEntry.getExecuteTime() < j || j == 0) {
                        j = rawEntry.getExecuteTime();
                    }
                    arrayList.add(prepareForReplace(rawEntry.getColumns(), rawEntry.getId(), rawEntry.getCommitId()));
                }
            } catch (Exception e) {
                e.printStackTrace();
                errorRecord(rawEntry.getId(), rawEntry.getCommitId(), e.getMessage());
            }
        }
        if (!arrayList.isEmpty()) {
            i += this.sphinxQLIndexStorage.batchSave(arrayList, true, true);
            syncMetrics(cDCMetrics, Math.abs(System.currentTimeMillis() - j));
        }
        return i;
    }

    @Override // com.xforceplus.ultraman.oqsengine.cdc.consumer.impl.SyncExecutor
    public void errorRecord(long j, long j2, String str) throws SQLException {
        Logger logger = this.logger;
        Object[] objArr = new Object[3];
        objArr[0] = Long.valueOf(j);
        objArr[1] = Long.valueOf(j2);
        objArr[2] = null == str ? "unKnow" : str;
        logger.warn("[cdc-sync-executor] sphinx consume error will be record in cdcerrors,  id : {}, commitId : {}, message : {}", objArr);
        this.cdcErrorStorage.buildCdcError(CdcErrorTask.buildErrorTask(((Long) this.seqNoGenerator.next()).longValue(), j, j2, null == str ? "unKnow" : str));
    }

    private int doDelete(long j, long j2) throws SQLException {
        while (true) {
            try {
                return this.sphinxQLIndexStorage.delete(j);
            } catch (Exception e) {
                this.logger.warn("[cdc-sync-executor] delete error, will retry, id : {}, commitId : {}, message : {}", new Object[]{Long.valueOf(j), Long.valueOf(j2), e.getMessage()});
                if ((e instanceof SQLException) && ((SQLException) e).getSQLState().equals(CommonErrors.INVALID_ENTITY_ID.name())) {
                    throw new SQLException(String.format("replace-delete error, id : %d, commitId : %d, message : %s", Long.valueOf(j), Long.valueOf(j2), e.getMessage()));
                }
                sleepNoInterrupted(1000L);
            }
        }
    }

    private synchronized void syncMetrics(CDCMetrics cDCMetrics, long j) {
        if (cDCMetrics.getCdcAckMetrics().getMaxSyncUseTime() < j) {
            cDCMetrics.getCdcAckMetrics().setMaxSyncUseTime(j);
        }
    }

    private StorageEntity prepareForReplace(List<CanalEntry.Column> list, long j, long j2) throws SQLException {
        long longFromColumn = BinLogParseUtils.getLongFromColumn(list, OqsBigEntityColumns.CREF);
        long longFromColumn2 = BinLogParseUtils.getLongFromColumn(list, OqsBigEntityColumns.PREF);
        int integerFromColumn = BinLogParseUtils.getIntegerFromColumn(list, OqsBigEntityColumns.OQSMAJOR);
        StorageEntity storageEntity = new StorageEntity(j, BinLogParseUtils.getLongFromColumn(list, OqsBigEntityColumns.ENTITY), longFromColumn2, longFromColumn, BinLogParseUtils.getLongFromColumn(list, OqsBigEntityColumns.TX), j2, (Map) null, (Set) null, BinLogParseUtils.getLongFromColumn(list, OqsBigEntityColumns.TIME));
        storageEntity.setOqsmajor(integerFromColumn);
        IEntityValue buildEntityValue = buildEntityValue(Long.valueOf(storageEntity.getId()), BinLogParseUtils.getStringFromColumn(list, OqsBigEntityColumns.META), BinLogParseUtils.getStringFromColumn(list, OqsBigEntityColumns.ATTRIBUTE));
        if (integerFromColumn != 1 && longFromColumn2 > 0) {
            IEntityValue queryEntityValue = queryEntityValue(longFromColumn2);
            buildEntityValue.addValues(null == queryEntityValue ? Collections.emptyList() : queryEntityValue.values());
        }
        this.sphinxQLIndexStorage.entityValueToStorage(storageEntity, buildEntityValue);
        return storageEntity;
    }

    private IEntityValue buildEntityValue(Long l, String str, String str2) throws SQLException {
        return this.entityValueBuilder.build(l.longValue(), EntityFieldBuildUtils.metaToFieldTypeMap(str), str2);
    }

    private IEntityValue queryEntityValue(long j) {
        while (true) {
            try {
                return (IEntityValue) this.masterStorage.selectEntityValue(j).orElse(null);
            } catch (Exception e) {
                this.logger.warn("[cdc-sync-executor] entityValueGet from master db error, will retry..., id : {}, message : {}", Long.valueOf(j), e.getMessage());
                sleepNoInterrupted(1000L);
            }
        }
    }

    private void sleepNoInterrupted(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
        }
    }
}
