package com.xforceplus.ultraman.oqsengine.cdc.core;

import com.alibaba.otter.canal.protocol.ClientIdentity;
import com.alibaba.otter.canal.protocol.Message;
import com.xforceplus.ultraman.oqsengine.cdc.context.RunnerContext;
import com.xforceplus.ultraman.oqsengine.cdc.processor.DataProcessor;
import com.xforceplus.ultraman.oqsengine.cdc.reader.CanalPropertiesReader;
import com.xforceplus.ultraman.oqsengine.inner.pojo.cdc.metrics.CDCMetrics;
import java.sql.SQLException;
import java.util.LinkedHashSet;
import javax.annotation.Resource;

/* loaded from: input_file:com/xforceplus/ultraman/oqsengine/cdc/core/CanalDataConsumer.class */
public class CanalDataConsumer implements SourceDataConsumer {

    @Resource
    private DataProcessor dataProcessor;
    private ClientIdentity clientIdentity;

    @Override // com.xforceplus.ultraman.oqsengine.cdc.core.CDCLifeCycle
    public void init() throws SQLException {
        CanalPropertiesReader propertiesReader = RunnerContext.instance().getPropertiesReader();
        this.clientIdentity = new ClientIdentity(propertiesReader.getDestination(), propertiesReader.getClientId(), propertiesReader.getFilter());
        if (!RunnerContext.instance().getSourceDataProducer().singletonRegistry()) {
            throw new SQLException("CDCServer start error, reason : register consumer failed.");
        }
    }

    @Override // com.xforceplus.ultraman.oqsengine.cdc.core.CDCLifeCycle
    public void destroy() {
    }

    @Override // com.xforceplus.ultraman.oqsengine.cdc.core.SourceDataConsumer
    public boolean onConsume() {
        Message onMessage = RunnerContext.instance().getSourceDataProducer().onMessage();
        if (null == onMessage) {
            return false;
        }
        long id = onMessage.getId();
        try {
            RunnerContext.instance().setCdcMetrics(this.dataProcessor.onProcess(onMessage, resetUnCommitMetrics(id)));
            RunnerContext.instance().getSourceDataProducer().ack(id);
            return true;
        } catch (Exception e) {
            RunnerContext.instance().getSourceDataProducer().rollback(id);
            return false;
        }
    }

    @Override // com.xforceplus.ultraman.oqsengine.cdc.core.SourceDataConsumer
    public ClientIdentity client() {
        return this.clientIdentity;
    }

    private CDCMetrics resetUnCommitMetrics(long j) {
        CDCMetrics cDCMetrics = new CDCMetrics();
        cDCMetrics.setBatchId(j);
        LinkedHashSet unCommitIds = RunnerContext.instance().getCdcMetrics().getUnCommitIds();
        if (null != unCommitIds && !unCommitIds.isEmpty()) {
            cDCMetrics.setUnCommitIds(unCommitIds);
        }
        return cDCMetrics;
    }
}
