package com.xforceplus.janus.pubsub.sdk;

import com.alibaba.fastjson.JSONObject;
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.common.AckMessageException;
import com.aliyun.mq.http.common.ServiceException;
import com.aliyun.mq.http.model.Message;
import com.aliyun.mq.http.model.TopicMessage;
import com.xforceplus.apollo.pool.thread.ApolloThreadPool;
import com.xforceplus.apollo.utils.ErrorUtil;
import com.xforceplus.apollo.utils.JacksonUtil;
import com.xforceplus.janus.pubsub.sdk.cache.TokenThread;
import com.xforceplus.janus.pubsub.sdk.msg.SealedMessage;
import com.xforceplus.janus.pubsub.sdk.utils.HttpUtil;
import com.xforceplus.janus.pubsub.sdk.utils.TokenGenerator;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xforceplus/janus/pubsub/sdk/PubSubClient.class */
public class PubSubClient {
    private static volatile MQClient mqClient;
    private volatile String token;
    private volatile String instanceId;
    private volatile String userId;
    private volatile String subName;
    private volatile String userName;
    private volatile String password;
    private volatile String opsAddress;
    private volatile int port;
    protected static final String TAG_SPLIT_CHAR = "||";
    private static final String USERPUBS_URL = "/pubsub-ops/sys/login";
    private static final Logger log = LoggerFactory.getLogger(PubSubClient.class);
    private static volatile Map<String, MQConsumer> subMap = new ConcurrentHashMap();
    private static volatile Map<String, MQProducer> pubMap = new ConcurrentHashMap();
    static PubSubClient instance = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    public static PubSubClient getInstance(String str, String str2, String str3, int i) {
        if (null == instance) {
            synchronized (PubSubClient.class) {
                if (null == instance) {
                    instance = new PubSubClient(str, str2, str3, i);
                }
            }
        }
        return instance;
    }

    public static PubSubClient getInstance() {
        if (null == instance) {
            synchronized (PubSubClient.class) {
                if (null == instance) {
                    throw new RuntimeException("请先进行有参构造！！！");
                }
            }
        }
        return instance;
    }

    private PubSubClient(String str, String str2, String str3, int i) {
        if (StringUtils.isBlank(str) || StringUtils.isBlank(str2) || StringUtils.isBlank(str3)) {
            log.error("userName 、password、 opsAddress 都不能为空");
            System.exit(0);
        }
        this.userName = str;
        this.password = str2;
        this.opsAddress = str3;
        this.port = i;
        String fetchToken = fetchToken();
        fetchToken = StringUtils.isEmpty(fetchToken) ? fetchToken() : fetchToken;
        if (StringUtils.isEmpty(fetchToken)) {
            log.error("pubsub客户端初始化创建失败 token 获取失败");
            throw new RuntimeException("pubsub 客户端初始化创建失败 token 获取失败");
        }
        this.token = fetchToken;
        this.opsAddress = str3;
        this.port = i;
        if (rebuildClient(fetchToken)) {
            HeartBeatThread heartBeatThread = new HeartBeatThread();
            heartBeatThread.setThreadName("psHeartBeatThrd");
            ApolloThreadPool.getInstance().submit(heartBeatThread);
            AckThread ackThread = new AckThread();
            ackThread.setThreadName("ackTrhead");
            ApolloThreadPool.getInstance().submit(ackThread);
            TokenThread tokenThread = new TokenThread();
            heartBeatThread.setThreadName("tokenRefreshThread");
            ApolloThreadPool.getInstance().submit(tokenThread);
        }
    }

    public boolean rebuildClient(String str) {
        synchronized (PubSubClient.class) {
            String[] decryptToken = TokenGenerator.decryptToken(str);
            if (ArrayUtils.isEmpty(decryptToken) || decryptToken.length < 7) {
                log.error("token 非法解析失败");
                return false;
            }
            this.token = str;
            this.instanceId = decryptToken[3];
            this.userId = decryptToken[4];
            this.subName = decryptToken[5];
            if (StringUtils.isNotBlank(decryptToken[6])) {
                mqClient = new MQClient(decryptToken[0], decryptToken[1], decryptToken[2], decryptToken[6]);
            } else {
                mqClient = new MQClient(decryptToken[0], decryptToken[1], decryptToken[2]);
            }
            subMap.clear();
            pubMap.clear();
            return true;
        }
    }

    private MQConsumer getConsumer(String str, String str2, String... strArr) {
        String join = StringUtils.join(new String[]{str2, str});
        MQConsumer mQConsumer = getSubMap().get(join);
        if (null == mQConsumer) {
            synchronized (join.intern()) {
                mQConsumer = getSubMap().get(join);
                if (null == mQConsumer) {
                    String str3 = strArr[0];
                    if (strArr.length > 1) {
                        try {
                            str3 = StringUtils.join(strArr, URLEncoder.encode(TAG_SPLIT_CHAR, "utf-8"));
                        } catch (UnsupportedEncodingException e) {
                            str3 = StringUtils.join(strArr, TAG_SPLIT_CHAR);
                        }
                    }
                    mQConsumer = getMqClient().getConsumer(this.instanceId, str, str2, str3);
                    getSubMap().put(join, mQConsumer);
                }
            }
        }
        return mQConsumer;
    }

    private MQProducer getProducer(String str) {
        MQProducer mQProducer = getPubMap().get(str);
        if (null == mQProducer) {
            synchronized (str.intern()) {
                mQProducer = getPubMap().get(str);
                if (null == mQProducer) {
                    mQProducer = getMqClient().getProducer(this.instanceId, str);
                    getPubMap().put(str, mQProducer);
                }
            }
        }
        return mQProducer;
    }

    public String sendMessage(String str, SealedMessage sealedMessage, String... strArr) {
        if (mqClient == null || !mqClient.isOpen()) {
            log.warn("client  closed");
            return null;
        }
        TopicMessage topicMessage = new TopicMessage();
        if (null != strArr && strArr.length >= 1) {
            topicMessage.setMessageTag(strArr[0]);
        }
        try {
            if (StringUtils.isBlank(sealedMessage.getHeader().getUserId())) {
                sealedMessage.getHeader().setUserId(this.userId);
            }
            topicMessage.setMessageBody(JacksonUtil.getInstance().toJson(sealedMessage));
            int i = 0;
            do {
                i++;
                try {
                    TopicMessage publishMessage = getProducer(str).publishMessage(topicMessage);
                    log.info("原消息ID：{}发送成功,MessageId:{},RequestId:{}", new Object[]{sealedMessage.getHeader().getMsgId(), publishMessage.getMessageId(), publishMessage.getRequestId()});
                    return publishMessage.getMessageId();
                } catch (Exception e) {
                    log.error(ErrorUtil.getStackMsg(e));
                    log.warn("原消息ID：{}发送失败,将重试", sealedMessage.getHeader().getMsgId());
                    if (e.getMessage() != null && e.getMessage().contains("is forbidden")) {
                        refreshToken();
                    }
                    try {
                        Thread.sleep(100L);
                    } catch (InterruptedException e2) {
                        log.error(ErrorUtil.getStackMsg(e2));
                    }
                }
            } while (i <= 3);
            log.error("原消息ID：{}发送失败,将重试，topic:{}，次数：{}", new Object[]{sealedMessage.getHeader().getMsgId(), str, Integer.valueOf(i)});
            return null;
        } catch (UnsupportedEncodingException e3) {
            log.error(ErrorUtil.getStackMsg(e3));
            return null;
        }
    }

    public List<SealedMessage> pullMessages(String str, String str2, String... strArr) {
        SealedMessage sealedMessage;
        ArrayList arrayList = null;
        if (mqClient == null || !mqClient.isOpen()) {
            log.warn("client  closed");
            return null;
        }
        String str3 = StringUtils.isNotBlank(str2) ? str2 : this.subName;
        try {
            List<Message> consumeMessage = getConsumer(str, str3, strArr).consumeMessage(10, 10);
            if (CollectionUtils.isNotEmpty(consumeMessage)) {
                log.info("pubsub receive msgIds:{}", (List) consumeMessage.stream().map(message -> {
                    return message.getMessageId();
                }).collect(Collectors.toList()));
                arrayList = new ArrayList();
                for (Message message2 : consumeMessage) {
                    if (null != message2 && null != (sealedMessage = (SealedMessage) JacksonUtil.getInstance().fromJson(message2.getMessageBodyString(), SealedMessage.class))) {
                        sealedMessage.getHeader().getOthers().put("receiptHandler", message2.getReceiptHandle());
                        sealedMessage.getHeader().getOthers().put("topicName", str);
                        sealedMessage.getHeader().getOthers().put("subName", str3);
                        sealedMessage.getHeader().getOthers().put(SealedMessage.OTHERS_KEY_PS_MSGID, message2.getMessageId());
                        sealedMessage.getHeader().getOthers().put(SealedMessage.OTHERS_KEY_TAG, message2.getMessageTag());
                        arrayList.add(sealedMessage);
                    }
                }
            }
            return arrayList;
        } catch (ServiceException e) {
            if (e.getLocalizedMessage() == null || !e.getLocalizedMessage().contains("MessageNotExist")) {
                throw e;
            }
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void acknowlege(List<SealedMessage> list) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        log.info("批量回执{}", Integer.valueOf(list.size()));
        MQConsumer mQConsumer = null;
        List list2 = null;
        List list3 = (List) list.stream().map(sealedMessage -> {
            return sealedMessage.getHeader().getOthers().get(SealedMessage.OTHERS_KEY_PS_MSGID);
        }).collect(Collectors.toList());
        try {
            mQConsumer = getConsumer(list.get(0).getHeader().getOthers().get("topicName"), list.get(0).getHeader().getOthers().get("subName"), "");
            list2 = (List) list.stream().map(sealedMessage2 -> {
                return sealedMessage2.getHeader().getOthers().get("receiptHandler");
            }).collect(Collectors.toList());
            mQConsumer.ackMessage(list2);
            log.error("pubsub Ack 条数:{},MSGID:{}", Integer.valueOf(list3.size()), list3);
        } catch (Throwable th) {
            if (th instanceof AckMessageException) {
                log.error("pubsubMsgId{} Ack message fail, requestId is: {}, fail handles", list3, th.getRequestId());
                return;
            }
            try {
                mQConsumer.ackMessage(list2);
            } catch (Exception e) {
                log.error("pubsubMsgId{} 回执处理异常:{}", list3, ErrorUtil.getStackMsg(th));
            }
        }
    }

    public void refreshToken() {
        String fetchToken = fetchToken();
        if (StringUtils.isEmpty(fetchToken)) {
            try {
                Thread.sleep(5000L);
            } catch (InterruptedException e) {
            }
            fetchToken = fetchToken();
        }
        if (StringUtils.isEmpty(fetchToken)) {
            log.warn("fetch token failed");
        } else {
            log.info("fetch token success refresh client");
            rebuildClient(fetchToken);
        }
    }

    private String fetchToken() {
        String str = this.opsAddress + ":" + this.port + USERPUBS_URL;
        try {
            HashMap hashMap = new HashMap();
            hashMap.put("userName", this.userName);
            hashMap.put("password", this.password);
            String postJson = new HttpUtil().postJson(str, JacksonUtil.getInstance().toJson(hashMap), false);
            if (!StringUtils.isEmpty(postJson)) {
                return ((JSONObject) JacksonUtil.getInstance().fromJson(postJson, JSONObject.class)).getString("token");
            }
            log.warn("fetchToken  failed ");
            return null;
        } catch (Exception e) {
            log.error("请求失败", e);
            return null;
        }
    }

    private static Map<String, MQConsumer> getSubMap() {
        return subMap;
    }

    private static Map<String, MQProducer> getPubMap() {
        return pubMap;
    }

    protected static MQClient getMqClient() {
        return mqClient;
    }

    public void close() {
        mqClient.close();
    }

    public String getToken() {
        return this.token;
    }

    public String getOpsAddress() {
        return this.opsAddress;
    }

    public int getPort() {
        return this.port;
    }
}
