package com.xforceplus.apollo.janus.standalone.sdk.message;

import com.alibaba.fastjson.JSONObject;
import com.xforceplus.apollo.janus.standalone.sdk.beans.AckBaseDTO;
import com.xforceplus.apollo.janus.standalone.sdk.message.messageBus.MBClient;
import com.xforceplus.apollo.janus.standalone.sdk.message.messageBus.request.AckRequest;
import com.xforceplus.apollo.janus.standalone.sdk.message.messageBus.response.AckResponse;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xforceplus/apollo/janus/standalone/sdk/message/MessageReceiptHandler.class */
public class MessageReceiptHandler {
    private static final Logger log = LoggerFactory.getLogger(MessageReceiptHandler.class);
    private static BlockingQueue<AckBaseDTO> RECEIPTQUEUE = new ArrayBlockingQueue(5000);
    private static MBClient mbClient;

    public static boolean sendACK(AckBaseDTO ackBaseDTO) {
        boolean offer = RECEIPTQUEUE.offer(ackBaseDTO);
        if (RECEIPTQUEUE.size() > 50) {
            ArrayList arrayList = new ArrayList(16);
            RECEIPTQUEUE.drainTo(arrayList, 16);
            if (CollectionUtils.isNotEmpty(arrayList)) {
                ackBatch(arrayList);
            }
        }
        return offer;
    }

    private static void ackBatch(List<AckBaseDTO> list) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        Map map = (Map) list.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getSource();
        }));
        map.keySet().forEach(messageSourceEnum -> {
            try {
                switch (messageSourceEnum) {
                    case JANUS:
                        return;
                    case MESSAGE_BUS:
                        AckResponse ack = mbClient.ack(new AckRequest((List) ((List) map.get(messageSourceEnum)).stream().map(ackBaseDTO -> {
                            return (String) ackBaseDTO.getAckBody();
                        }).collect(Collectors.toList())));
                        if (!ack.getSuccess().booleanValue()) {
                            log.warn("{}消息回执反馈:[{}]", JSONObject.toJSONString(ack));
                        }
                        return;
                    default:
                        return;
                }
            } catch (Exception e) {
                log.error("发送回执给到{}异常", messageSourceEnum.getName());
            }
        });
    }

    public static void init(MBClient mBClient) {
        mbClient = mBClient;
        new Thread(() -> {
            while (true) {
                try {
                    AckBaseDTO take = RECEIPTQUEUE.take();
                    ArrayList arrayList = new ArrayList(16);
                    RECEIPTQUEUE.drainTo(arrayList, 16);
                    arrayList.add(take);
                    ackBatch(arrayList);
                    if (arrayList.size() <= 16) {
                        TimeUnit.SECONDS.sleep(1L);
                    }
                } catch (Throwable th) {
                    log.warn("auto send ack error:{}", th.getMessage());
                }
            }
        }, "MessageBus-ACK").start();
    }

    public static void setMbClient(MBClient mBClient) {
        mbClient = mBClient;
    }
}
