package com.xforceplus.phoenix.messagebus.model;

import com.xforceplus.janus.message.sdk.MBClient;
import com.xforceplus.janus.message.sdk.ResponseMessage;
import com.xforceplus.janus.message.sdk.response.SubResponse;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/xforceplus/phoenix/messagebus/model/MessageFetchThread.class */
public class MessageFetchThread extends Thread {
    private static final Logger log = LoggerFactory.getLogger(MessageFetchThread.class);
    private final MBClient mbClient;
    private final BlockingQueue<ResponseMessage> messageQueue;
    private boolean stopFlag = false;

    public MessageFetchThread(MBClient mBClient, BlockingQueue<ResponseMessage> blockingQueue) {
        this.mbClient = mBClient;
        this.messageQueue = blockingQueue;
        setName("MessageFetchThread");
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (!this.stopFlag) {
            try {
                SubResponse sub = this.mbClient.sub();
                if (sub.getSuccess().booleanValue()) {
                    Logger logger = log;
                    Object[] objArr = new Object[1];
                    objArr[0] = Integer.valueOf(sub.getResponseMessages() == null ? 0 : sub.getResponseMessages().size());
                    logger.info(String.format("从消息总线拉取到[%d]条消息", objArr));
                    if (!CollectionUtils.isEmpty(sub.getResponseMessages())) {
                        Iterator it = sub.getResponseMessages().iterator();
                        while (it.hasNext()) {
                            this.messageQueue.put((ResponseMessage) it.next());
                        }
                    }
                } else {
                    log.error("从消息总线拉取消息失败！");
                }
            } catch (Throwable th) {
                log.error("消息总线Sub时发生异常！", th);
                try {
                    Thread.sleep(3000L);
                } catch (Throwable th2) {
                    log.warn("消息总线Sub失败后延时发生异常！", th2);
                }
            }
        }
    }

    public void setStopFlag(boolean z) {
        this.stopFlag = z;
    }

    public boolean isStopFlag() {
        return this.stopFlag;
    }
}
