package com.xforceplus.phoenix.messagebus.model;

import com.xforceplus.janus.message.sdk.MBClient;
import com.xforceplus.janus.message.sdk.ResponseMessage;
import com.xforceplus.janus.message.sdk.request.AckRequest;
import com.xforceplus.janus.message.sdk.response.AckResponse;
import java.util.Collections;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xforceplus/phoenix/messagebus/model/MessageConsumerThread.class */
public class MessageConsumerThread extends Thread {
    private static final Logger log = LoggerFactory.getLogger(MessageConsumerThread.class);
    private static final AtomicInteger seqNo = new AtomicInteger(1);
    private final MBClient mbClient;
    private final MessageListenerManager manager;
    private final BlockingQueue<ResponseMessage> messageQueue;

    public MessageConsumerThread(MBClient mBClient, MessageListenerManager messageListenerManager, BlockingQueue<ResponseMessage> blockingQueue) {
        this.mbClient = mBClient;
        this.manager = messageListenerManager;
        this.messageQueue = blockingQueue;
        setName("MessageConsumerThread" + seqNo.getAndIncrement());
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (true) {
            try {
                ResponseMessage take = this.messageQueue.take();
                if (this.manager.containsPubCode(take.getPubCode())) {
                    this.manager.call(take);
                    AckResponse ack = this.mbClient.ack(new AckRequest(Collections.singletonList(take.getReceiptHandle())));
                    if (!ack.getSuccess().booleanValue()) {
                        log.error(String.format("消息总线消息回执失败! Id=%s, PubCode=%s, 回执=%s, 原因=%s", take.getId(), take.getPubCode(), take.getReceiptHandle(), ack.getError()));
                    }
                } else {
                    log.error(String.format("Id=%s, PubCode=%s，没有找到对应的Listener！", take.getId(), take.getPubCode()));
                }
            } catch (Throwable th) {
                log.error("消费消息总线的消息时发生异常!", th);
            }
        }
    }
}
