/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.oqsengine.cdc.consumer.impl;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.impl.SyncExecutor;
import com.xforceplus.ultraman.oqsengine.cdc.consumer.tools.BinLogParseUtils;
import com.xforceplus.ultraman.oqsengine.common.error.CommonErrors;
import com.xforceplus.ultraman.oqsengine.common.id.LongIdGenerator;
import com.xforceplus.ultraman.oqsengine.devops.cdcerror.CdcErrorStorage;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.dto.RawEntry;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.enums.OqsBigEntityColumns;
import com.xforceplus.ultraman.oqsengine.pojo.cdc.metrics.CDCMetrics;
import com.xforceplus.ultraman.oqsengine.pojo.devops.CdcErrorTask;
import com.xforceplus.ultraman.oqsengine.pojo.dto.entity.IEntityValue;
import com.xforceplus.ultraman.oqsengine.storage.index.IndexStorage;
import com.xforceplus.ultraman.oqsengine.storage.index.sphinxql.command.StorageEntity;
import com.xforceplus.ultraman.oqsengine.storage.master.MasterStorage;
import com.xforceplus.ultraman.oqsengine.storage.master.utils.EntityFieldBuildUtils;
import com.xforceplus.ultraman.oqsengine.storage.utils.IEntityValueBuilder;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SphinxSyncExecutor
implements SyncExecutor {
    final Logger logger = LoggerFactory.getLogger(SphinxSyncExecutor.class);
    @Resource(name="indexStorage")
    private IndexStorage sphinxQLIndexStorage;
    @Resource(name="masterStorage")
    private MasterStorage masterStorage;
    @Resource(name="cdcErrorStorage")
    private CdcErrorStorage cdcErrorStorage;
    @Resource(name="entityValueBuilder")
    private IEntityValueBuilder<String> entityValueBuilder;
    @Resource(name="snowflakeIdGenerator")
    private LongIdGenerator seqNoGenerator;

    @Override
    public int execute(Collection<RawEntry> rawEntries, CDCMetrics cdcMetrics) throws SQLException {
        int synced = 0;
        ArrayList<StorageEntity> storageEntityList = new ArrayList<StorageEntity>();
        long startTime = 0L;
        for (RawEntry rawEntry : rawEntries) {
            try {
                boolean isDelete = BinLogParseUtils.getBooleanFromColumn(rawEntry.getColumns(), OqsBigEntityColumns.DELETED);
                if (!isDelete) {
                    if (rawEntry.getExecuteTime() < startTime || startTime == 0L) {
                        startTime = rawEntry.getExecuteTime();
                    }
                    storageEntityList.add(this.prepareForReplace(rawEntry.getColumns(), rawEntry.getId(), rawEntry.getCommitId()));
                    continue;
                }
                synced += this.doDelete(rawEntry.getId(), rawEntry.getCommitId(), cdcMetrics.getBatchId());
                this.syncMetrics(cdcMetrics, Math.abs(System.currentTimeMillis() - rawEntry.getExecuteTime()));
            }
            catch (Exception e) {
                e.printStackTrace();
                this.errorRecord(cdcMetrics.getBatchId(), rawEntry.getId(), rawEntry.getCommitId(), e.getMessage());
            }
        }
        if (!storageEntityList.isEmpty()) {
            synced += this.sphinxQLIndexStorage.batchSave(storageEntityList, true, true);
            this.syncMetrics(cdcMetrics, Math.abs(System.currentTimeMillis() - startTime));
        }
        return synced;
    }

    @Override
    public void errorRecord(long batchId, long id, long commitId, String message) throws SQLException {
        this.logger.warn("[cdc-sync-executor] batch : {}, sphinx consume error will be record in cdcerrors,  id : {}, commitId : {}, message : {}", new Object[]{batchId, id, commitId, null == message ? "unKnow" : message});
        this.cdcErrorStorage.buildCdcError(CdcErrorTask.buildErrorTask((long)((Long)this.seqNoGenerator.next()), (long)id, (long)commitId, (String)(null == message ? "unKnow" : message)));
    }

    private int doDelete(long id, long commitId, long batchId) throws SQLException {
        while (true) {
            try {
                return this.sphinxQLIndexStorage.delete(id);
            }
            catch (Exception e) {
                SQLException el;
                this.logger.warn("[cdc-sync-executor] batch : {}, delete error, will retry, id : {}, commitId : {}, message : {}", new Object[]{batchId, id, commitId, e.getMessage()});
                if (e instanceof SQLException && (el = (SQLException)e).getSQLState().equals(CommonErrors.INVALID_ENTITY_ID.name())) {
                    throw new SQLException(String.format("batch : %d, replace-delete error, id : %d, commitId : %d, message : %s", batchId, id, commitId, e.getMessage()));
                }
                this.sleepNoInterrupted(1000L);
                continue;
            }
            break;
        }
    }

    private synchronized void syncMetrics(CDCMetrics cdcMetrics, long useTime) {
        if (cdcMetrics.getCdcAckMetrics().getMaxSyncUseTime() < useTime) {
            cdcMetrics.getCdcAckMetrics().setMaxSyncUseTime(useTime);
        }
    }

    private StorageEntity prepareForReplace(List<CanalEntry.Column> columns, long id, long commitId) throws SQLException {
        long cref = BinLogParseUtils.getLongFromColumn(columns, OqsBigEntityColumns.CREF);
        long pref = BinLogParseUtils.getLongFromColumn(columns, OqsBigEntityColumns.PREF);
        int oqsMajor = BinLogParseUtils.getIntegerFromColumn(columns, OqsBigEntityColumns.OQSMAJOR);
        StorageEntity storageEntity = new StorageEntity(id, BinLogParseUtils.getLongFromColumn(columns, OqsBigEntityColumns.ENTITY), pref, cref, BinLogParseUtils.getLongFromColumn(columns, OqsBigEntityColumns.TX), commitId, null, null, BinLogParseUtils.getLongFromColumn(columns, OqsBigEntityColumns.TIME));
        storageEntity.setOqsmajor(oqsMajor);
        IEntityValue entityValue = this.buildEntityValue(storageEntity.getId(), BinLogParseUtils.getStringFromColumn(columns, OqsBigEntityColumns.META), BinLogParseUtils.getStringFromColumn(columns, OqsBigEntityColumns.ATTRIBUTE));
        if (oqsMajor != 1 && pref > 0L) {
            IEntityValue entityValueF = this.queryEntityValue(pref);
            entityValue.addValues(null == entityValueF ? Collections.emptyList() : entityValueF.values());
        }
        this.sphinxQLIndexStorage.entityValueToStorage(storageEntity, entityValue);
        return storageEntity;
    }

    private IEntityValue buildEntityValue(Long id, String meta, String attribute) throws SQLException {
        return this.entityValueBuilder.build(id.longValue(), EntityFieldBuildUtils.metaToFieldTypeMap((String)meta), (Object)attribute);
    }

    private IEntityValue queryEntityValue(long pref) {
        while (true) {
            try {
                return this.masterStorage.selectEntityValue(pref).orElse(null);
            }
            catch (Exception e) {
                this.logger.warn("[cdc-sync-executor] entityValueGet from master db error, will retry..., id : {}, message : {}", (Object)pref, (Object)e.getMessage());
                this.sleepNoInterrupted(1000L);
                continue;
            }
            break;
        }
    }

    private void sleepNoInterrupted(long interval) {
        try {
            Thread.sleep(interval);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }
}

