package com.xforceplus.ultraman.cdc.core;

import com.alibaba.otter.canal.protocol.ClientIdentity;
import com.alibaba.otter.canal.protocol.Message;
import com.xforceplus.ultraman.cdc.context.RunnerContext;
import com.xforceplus.ultraman.cdc.processor.DataProcessor;
import com.xforceplus.ultraman.cdc.reader.CanalPropertiesReader;
import com.xforceplus.ultraman.oqsengine.plus.common.metrics.MetricsDefine;
import io.micrometer.core.annotation.Timed;
import java.sql.SQLException;
import javax.annotation.Resource;

/* loaded from: input_file:BOOT-INF/lib/cdc-2023.6.20-140541-feature-merge.jar:com/xforceplus/ultraman/cdc/core/CanalDataConsumer.class */
public class CanalDataConsumer implements SourceDataConsumer {

    @Resource
    private DataProcessor dataProcessor;
    private ClientIdentity clientIdentity;

    @Override // com.xforceplus.ultraman.cdc.core.CDCLifeCycle
    public void init() throws SQLException {
        CanalPropertiesReader propertiesReader = RunnerContext.instance().getPropertiesReader();
        this.clientIdentity = new ClientIdentity(propertiesReader.getDestination(), propertiesReader.getClientId(), propertiesReader.getFilter());
        if (!RunnerContext.instance().getSourceDataProducer().singletonRegistry()) {
            throw new SQLException("CDCServer start error, reason : register consumer failed.");
        }
    }

    @Override // com.xforceplus.ultraman.cdc.core.CDCLifeCycle
    public void destroy() {
    }

    @Override // com.xforceplus.ultraman.cdc.core.SourceDataConsumer
    @Timed(value = MetricsDefine.PROCESS_DELAY_LATENCY_SECONDS, extraTags = {"initiator", "cdc", "action", "oneBatch"})
    public boolean onConsume() {
        Message onMessage = RunnerContext.instance().getSourceDataProducer().onMessage();
        if (null == onMessage) {
            return false;
        }
        long id = onMessage.getId();
        try {
            this.dataProcessor.onProcess(onMessage, id);
            RunnerContext.instance().getSourceDataProducer().ack(id);
            return true;
        } catch (Exception e) {
            RunnerContext.instance().getSourceDataProducer().rollback(id);
            return false;
        }
    }

    @Override // com.xforceplus.ultraman.cdc.core.SourceDataConsumer
    public ClientIdentity client() {
        return this.clientIdentity;
    }
}
