package com.xforceplus.apollo.janus.standalone.service.impl;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.util.concurrent.RateLimiter;
import com.xforceplus.apollo.core.utils.DateFormatUtil;
import com.xforceplus.apollo.janus.standalone.cache.LocalExpireCache;
import com.xforceplus.apollo.janus.standalone.cache.MessageCache;
import com.xforceplus.apollo.janus.standalone.cache.ProjectConfigCache;
import com.xforceplus.apollo.janus.standalone.constant.JanusStandaloneConstants;
import com.xforceplus.apollo.janus.standalone.constant.RequestConstants;
import com.xforceplus.apollo.janus.standalone.constant.SplitConstants;
import com.xforceplus.apollo.janus.standalone.constant.TableNameConstants;
import com.xforceplus.apollo.janus.standalone.constant.TcpConstants;
import com.xforceplus.apollo.janus.standalone.dto.ProjectAuthApiDto;
import com.xforceplus.apollo.janus.standalone.dto.ProjectDto;
import com.xforceplus.apollo.janus.standalone.dto.QMessagePageDTO;
import com.xforceplus.apollo.janus.standalone.dto.QueryPage;
import com.xforceplus.apollo.janus.standalone.entity.Content;
import com.xforceplus.apollo.janus.standalone.entity.LogQueryDownRecord;
import com.xforceplus.apollo.janus.standalone.entity.LogQueryTask;
import com.xforceplus.apollo.janus.standalone.entity.Message;
import com.xforceplus.apollo.janus.standalone.entity.MessageNeedPush;
import com.xforceplus.apollo.janus.standalone.entity.ProjectApiInfo;
import com.xforceplus.apollo.janus.standalone.entity.RequestMessage;
import com.xforceplus.apollo.janus.standalone.entity.Result;
import com.xforceplus.apollo.janus.standalone.entity.SystemInfo;
import com.xforceplus.apollo.janus.standalone.enums.JanusInternalCmdTaskStatusEnum;
import com.xforceplus.apollo.janus.standalone.enums.MessageTraceStatusEnum;
import com.xforceplus.apollo.janus.standalone.enums.ProtocolEnum;
import com.xforceplus.apollo.janus.standalone.exception.BusinessException;
import com.xforceplus.apollo.janus.standalone.handler.MessageInsertContentHandler;
import com.xforceplus.apollo.janus.standalone.handler.MessageNeedPushInsertHandler;
import com.xforceplus.apollo.janus.standalone.handler.MessageTraceInsertHandler;
import com.xforceplus.apollo.janus.standalone.handler.RequestRecordContentHandler;
import com.xforceplus.apollo.janus.standalone.mapper.ContentMapper;
import com.xforceplus.apollo.janus.standalone.mapper.LogQueryDownRecordMapper;
import com.xforceplus.apollo.janus.standalone.mapper.LogQueryTaskMapper;
import com.xforceplus.apollo.janus.standalone.mapper.MessageMapper;
import com.xforceplus.apollo.janus.standalone.model.RabbitMqConsumer;
import com.xforceplus.apollo.janus.standalone.model.RabbitMqProducer;
import com.xforceplus.apollo.janus.standalone.service.DiskFileDataService;
import com.xforceplus.apollo.janus.standalone.service.IMessageNeedPushService;
import com.xforceplus.apollo.janus.standalone.service.IMessageOverstockStaticsService;
import com.xforceplus.apollo.janus.standalone.service.IMessageService;
import com.xforceplus.apollo.janus.standalone.service.IMessageTraceService;
import com.xforceplus.apollo.janus.standalone.utils.DateUtils;
import com.xforceplus.apollo.janus.standalone.utils.ErrorUtils;
import com.xforceplus.apollo.janus.standalone.utils.PageUtils;
import com.xforceplus.apollo.janus.standalone.utils.RequestUtils;
import com.xforceplus.apollo.janus.standalone.utils.SliceUtils;
import com.xforceplus.apollo.janus.standalone.utils.SnowflakeIdWorker;
import com.xforceplus.apollo.msg.SealedMessage;
import com.xforceplus.apollo.utils.JacksonUtil;
import com.xforceplus.janus.config.core.monitor.JanusUploader;
import com.xforceplus.janus.framework.event.SealedMessageEvent;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

@Service
/* loaded from: input_file:com/xforceplus/apollo/janus/standalone/service/impl/MessageServiceImpl.class */
public class MessageServiceImpl extends ServiceImpl<MessageMapper, Message> implements IMessageService {
    private static final Logger log = LoggerFactory.getLogger(MessageServiceImpl.class);
    public static final String PROJECT_DTO = "projectDto";

    @Autowired
    private MessageMapper messageMapper;

    @Autowired
    private ContentMapper contentMapper;

    @Autowired
    DiskFileDataService diskFileDataService;

    @Autowired
    MessageInsertContentHandler messageInsertContentHandler;

    @Autowired
    MessageTraceInsertHandler messageTraceInsertHandler;

    @Autowired
    IMessageTraceService messageTraceService;

    @Autowired
    RequestRecordContentHandler requestRecordContentHandler;

    @Autowired
    MessageNeedPushInsertHandler messageNeedPushInsertHandler;

    @Autowired
    IMessageOverstockStaticsService messageOverstockStaticsService;

    @Autowired
    IMessageNeedPushService mssageNeedPushService;

    @Autowired
    MCFactoryUtils mCFactoryUtils;

    @Autowired
    MessageCache messageCache;

    @Autowired
    LogQueryTaskMapper logQueryTaskMapper;

    @Autowired
    LogQueryDownRecordMapper logQueryDownRecordMapper;

    @Autowired
    private JanusUploader monitorUploader;

    @Value("${message-cache.overstock-limit-size:20000}")
    private Integer overstockLimitSize;

    @Value("${gateway.limit-qps.enabled:false}")
    private boolean limitQpsEnabled;

    @Value("${gateway.limit-qps.size:20}")
    private Integer limitQpsSize;
    private final Map<String, RateLimiter> rateLimiterMap = new ConcurrentHashMap();
    private static final String QUEUE_PREFIX = "xplat.aws.sqs.receiver.queueName.prefix";

    @Value("${message-cache.enable:false}")
    private boolean messageCacheEnable;

    @Override // com.xforceplus.apollo.janus.standalone.service.IMessageService
    public void addBatch(List<Message> list) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        for (Map.Entry entry : ((Map) list.stream().collect(Collectors.groupingBy(message -> {
            return fetchGroupKey(message);
        }))).entrySet()) {
            this.messageMapper.addBatch((String) entry.getKey(), (List) entry.getValue());
        }
    }

    @Override // com.xforceplus.apollo.janus.standalone.service.IMessageService
    public PageUtils queryPage(QMessagePageDTO qMessagePageDTO) {
        ArrayList arrayList = new ArrayList();
        String formatDate = DateFormatUtil.formatDate(new Date(), "yyyy-MM-dd");
        String format = DateUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss");
        if (StringUtils.isEmpty(qMessagePageDTO.getStart())) {
            qMessagePageDTO.setStart(formatDate + " 00:00:00");
        }
        if (StringUtils.isEmpty(qMessagePageDTO.getEnd())) {
            qMessagePageDTO.setEnd(format);
        }
        if (qMessagePageDTO.getStart().compareTo(qMessagePageDTO.getEnd()) > 0) {
            throw new BusinessException(500, "开始时间不能大于结束时间");
        }
        if (qMessagePageDTO.getStart().compareTo(format) > 0) {
            qMessagePageDTO.setStart(format);
        }
        if (qMessagePageDTO.getEnd().compareTo(format) > 0) {
            qMessagePageDTO.setEnd(format);
        }
        Date stringToDate = DateUtils.stringToDate(qMessagePageDTO.getEnd().substring(0, 10) + " 23:59:59", "yyyy-MM-dd HH:mm:ss");
        ArrayList<String> arrayList2 = new ArrayList();
        for (Date stringToDate2 = DateUtils.stringToDate(qMessagePageDTO.getStart().substring(0, 10) + " 00:00:00", "yyyy-MM-dd HH:mm:ss"); !stringToDate2.after(stringToDate); stringToDate2 = DateUtils.addDateDays(stringToDate2, 1)) {
            arrayList2.add(DateUtils.format(stringToDate2, DateUtils.DATE_PATTERN_NO));
        }
        TreeMap treeMap = new TreeMap((str, str2) -> {
            return str.compareTo(str2);
        });
        String key = qMessagePageDTO.getKey();
        Integer num = 0;
        for (String str3 : arrayList2) {
            String str4 = TableNameConstants.t_message1 + SplitConstants.splicesOne + str3 + SplitConstants.splicesOne + key;
            Integer num2 = (Integer) LocalExpireCache.get(str4);
            if (num2 == null) {
                try {
                    num2 = this.messageMapper.queryCount(TableNameConstants.t_message1 + "_" + str3, qMessagePageDTO);
                } catch (Exception e) {
                    log.error(ErrorUtils.getStackMsg(e));
                    num2 = 0;
                }
            }
            if (!formatDate.equals(str3)) {
                LocalExpireCache.set(str4, num2, JanusStandaloneConstants.expireTimesOfThreeDays);
            }
            num = Integer.valueOf(num.intValue() + num2.intValue());
            treeMap.put(str3, num2);
        }
        Integer limit = qMessagePageDTO.getLimit();
        Integer page = qMessagePageDTO.getPage();
        if (limit == null || limit.intValue() > 50) {
            limit = 10;
        }
        if (page == null || page.intValue() < 1) {
            page = 1;
        }
        if (num.intValue() == 0) {
            return new PageUtils(arrayList, 0, limit.intValue(), page.intValue());
        }
        Map<String, QueryPage> slice = SliceUtils.slice(treeMap, page, limit);
        if (slice == null || slice.size() == 0) {
            return new PageUtils(arrayList, num.intValue(), limit.intValue(), page.intValue());
        }
        for (Map.Entry<String, QueryPage> entry : slice.entrySet()) {
            String key2 = entry.getKey();
            arrayList.addAll(this.messageMapper.queryPage(TableNameConstants.t_message1 + "_" + key2, qMessagePageDTO, entry.getValue()));
        }
        return new PageUtils(arrayList, num.intValue(), limit.intValue(), page.intValue());
    }

    @Override // com.xforceplus.apollo.janus.standalone.service.IMessageService
    public Message queryMessageContent(String str) {
        Object obj = LocalExpireCache.get(str);
        if (obj != null) {
            if (obj instanceof String) {
                return null;
            }
            return (Message) obj;
        }
        String substring = str.substring(0, 8);
        Message queryMessageById = this.messageMapper.queryMessageById(TableNameConstants.t_message + substring, str);
        if (queryMessageById == null || queryMessageById.getOssKey() == null) {
            LocalExpireCache.set(str, "nodata", JanusStandaloneConstants.expireTimesOfMinutes);
            return null;
        }
        String ossKey = queryMessageById.getOssKey();
        String str2 = SplitConstants.empty;
        if (ossKey.length() > 30) {
            str2 = this.diskFileDataService.loadByOssKey(ossKey);
        } else {
            Content queryContentById = this.contentMapper.queryContentById(TableNameConstants.t_content + substring, ossKey);
            if (queryContentById != null) {
                str2 = queryContentById.getContent();
            }
        }
        if (StringUtils.isEmpty(str2)) {
            LocalExpireCache.set(str, "nodata", JanusStandaloneConstants.expireTimesOfMinutes);
            return null;
        }
        try {
            List<Message> fromJsonToList = JacksonUtil.getInstance().fromJsonToList(str2, Message.class);
            if (CollectionUtils.isNotEmpty(fromJsonToList)) {
                Message message = null;
                for (Message message2 : fromJsonToList) {
                    String id = message2.getId();
                    LocalExpireCache.set(id, message2, JanusStandaloneConstants.expireTimesOfMinutes);
                    if (id.equals(str)) {
                        message = message2;
                    }
                }
                return message;
            }
        } catch (Exception e) {
            log.error(ErrorUtils.getStackMsg(e));
        }
        LocalExpireCache.set(str, "nodata", JanusStandaloneConstants.expireTimesOfMinutes);
        return null;
    }

    private String fetchGroupKey(Message message) {
        return TableNameConstants.t_message + message.getId().substring(0, 8);
    }

    private void validate(Map<String, String> map, Map<String, Object> map2) throws IOException {
        ProjectDto projectDto = ProjectConfigCache.projectCodeCache.get(map.get(RequestConstants.clientFlags));
        if (projectDto == null) {
            throw new BusinessException(HttpStatus.FORBIDDEN.value(), "项目信息不存在！");
        }
        String id = projectDto.getId();
        map.put(RequestConstants.providerNo, id);
        map2.put(PROJECT_DTO, projectDto);
        String str = map.get(RequestConstants.requestBody);
        String str2 = map.get(TcpConstants.pubCode);
        if (str == null || org.apache.commons.lang3.StringUtils.isEmpty(str) || str.length() > 1048576) {
            throw new BusinessException(HttpStatus.BAD_REQUEST.value(), "消息体不能为空并且大小不能超过 1M");
        }
        RequestMessage requestMessage = (RequestMessage) JacksonUtil.getInstance().fromJson(str, RequestMessage.class);
        String content = requestMessage.getContent();
        if (requestMessage == null || org.apache.commons.lang3.StringUtils.isEmpty(content) || content.length() > 1048576) {
            throw new BusinessException(HttpStatus.BAD_REQUEST.value(), "消息体不能为空并且大小不能超过 1M");
        }
        if (str2.equals("heart-beat-task")) {
            projectDto.setSystemInfo((SystemInfo) JacksonUtil.getInstance().fromJson(content, SystemInfo.class));
            log.info("heart-beat-task :projectId: {},content: {}", id, content);
        } else {
            Map<String, String> properties = requestMessage.getProperties();
            Map<String, String> innerProps = requestMessage.getInnerProps();
            if (properties == null) {
                throw new BusinessException(HttpStatus.BAD_REQUEST.value(), "businessNo不能为空!");
            }
            if (properties.get(QUEUE_PREFIX) != null) {
                String str3 = properties.get(QUEUE_PREFIX);
                if (str2.startsWith(str3)) {
                    str2 = str2.replaceFirst(str3, SplitConstants.empty);
                    map.put(TcpConstants.pubCode, str2);
                }
            }
            innerProps.put(TcpConstants.msgId, DateUtils.format(new Date(), DateUtils.DATE_PATTERN_NO) + SnowflakeIdWorker.getId());
            innerProps.put(TcpConstants.pubCode, str2);
            innerProps.put(TcpConstants.provideCode, projectDto.getCode());
            innerProps.put(RequestConstants.providerNo, projectDto.getId());
            map.put(TcpConstants.msgId, properties.get(TcpConstants.msgId));
            requestMessage.setCreatedTime(DateUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"));
            if (!properties.containsKey(TcpConstants.businessNo)) {
                throw new BusinessException(HttpStatus.BAD_REQUEST.value(), "businessNo不能为空!");
            }
            innerProps.put(TcpConstants.businessNo, properties.get(TcpConstants.businessNo));
            if (properties.containsKey(TcpConstants.transNo)) {
                innerProps.put(TcpConstants.transNo, properties.get(TcpConstants.transNo));
            } else {
                innerProps.put(TcpConstants.transNo, properties.get(TcpConstants.businessNo));
            }
            this.messageInsertContentHandler.add(requestMessage);
            map.put(TcpConstants.businessNo, innerProps.get(TcpConstants.businessNo));
            map.put(TcpConstants.transNo, innerProps.get(TcpConstants.transNo));
        }
        map2.put(TcpConstants.requestMessage, requestMessage);
        validateLimiter(id);
    }

    private void validateLimiter(String str) {
        if (this.limitQpsEnabled) {
            if (!this.rateLimiterMap.containsKey(str)) {
                if (this.limitQpsSize.intValue() < 1 || this.limitQpsSize.intValue() > 50) {
                    this.limitQpsSize = 20;
                }
                this.rateLimiterMap.put(str, RateLimiter.create(this.limitQpsSize.intValue()));
            }
            if (!this.rateLimiterMap.get(str).tryAcquire()) {
                throw new BusinessException(HttpStatus.FORBIDDEN.value(), "per second max requests limit is :" + this.limitQpsSize);
            }
        }
    }

    @Override // com.xforceplus.apollo.janus.standalone.service.IMessageService
    public Result pubMessage(HttpServletRequest httpServletRequest) {
        HashMap hashMap = new HashMap();
        Result pubMessageBefore = pubMessageBefore(httpServletRequest, hashMap);
        return pubMessageBefore != null ? pubMessageBefore : doPubMessage(hashMap);
    }

    private Result pubMessageBefore(HttpServletRequest httpServletRequest, Map<String, String> map) {
        try {
            Map<String, ProjectDto> map2 = ProjectConfigCache.projectCodeCache;
            if (map2 == null || map2.size() == 0) {
                throw new BusinessException(HttpStatus.FORBIDDEN.value(), "属地集成平台服务器正在启动，请稍后再试！");
            }
            map.put("startTime", System.currentTimeMillis() + SplitConstants.empty);
            String header = httpServletRequest.getHeader(RequestConstants.clientFlags);
            if (org.apache.commons.lang3.StringUtils.isBlank(header)) {
                throw new BusinessException(HttpStatus.FORBIDDEN.value(), "http请求中 clientFlags不能为空！");
            }
            map.put(RequestConstants.clientFlags, header.startsWith(RequestConstants.clientFlagsPrefix) ? header.replaceFirst(RequestConstants.clientFlagsPrefix, SplitConstants.empty) : SplitConstants.empty);
            map.put(RequestConstants.protocol, ProtocolEnum.TCP.getName());
            String header2 = httpServletRequest.getHeader(TcpConstants.pubCode);
            if (org.apache.commons.lang3.StringUtils.isBlank(header2)) {
                header2 = httpServletRequest.getParameter(TcpConstants.pubCode);
            }
            if (org.apache.commons.lang3.StringUtils.isBlank(header2)) {
                throw new BusinessException(HttpStatus.FORBIDDEN.value(), "pubCode不能为空！");
            }
            map.put(TcpConstants.pubCode, header2);
            RequestUtils.httpRequestConvertToRecordMap(httpServletRequest, map);
            return null;
        } catch (BusinessException e) {
            log.error("request: {} ,BusinessException error: {} ", JacksonUtil.getInstance().toJson(map), ErrorUtils.getStackMsg(e));
            return new Result(e.getCode(), e.getMessage());
        } catch (Exception e2) {
            log.error("request: {} ,Exception error: {} ", JacksonUtil.getInstance().toJson(map), ErrorUtils.getStackMsg(e2));
            return Result.serverError("redirect error! ");
        }
    }

    @Override // com.xforceplus.apollo.janus.standalone.service.IMessageService
    public Result doPubMessage(Map<String, String> map) {
        try {
            try {
                HashMap hashMap = new HashMap();
                validate(map, hashMap);
                RequestMessage requestMessage = (RequestMessage) hashMap.get(TcpConstants.requestMessage);
                String str = requestMessage.getProperties().get(TcpConstants.msgId);
                if (map.get(TcpConstants.pubCode).equals(RequestConstants.logQueryCmd)) {
                    this.monitorUploader.sendJanusCmdResult(requestMessage.getProperties().get(TcpConstants.sourceType), requestMessage.getProperties().get(TcpConstants.cmdSerialKey) + SplitConstants.one_colon + JanusInternalCmdTaskStatusEnum.FINISHED.name() + SplitConstants.one_colon + SnowflakeIdWorker.getId(), requestMessage.getContent());
                } else {
                    dispatchLocalSubMessage(map, requestMessage, hashMap);
                    dispatchCloundMessage(map, requestMessage);
                }
                map.put(RequestConstants.responseCode, "200");
                map.put(RequestConstants.responseBody, str);
                map.put(RequestConstants.responseBodySize, String.valueOf(str == null ? 0 : str.length()));
                Result ok = Result.ok(str);
                addRecord(map);
                return ok;
            } catch (BusinessException e) {
                ErrorUtils.putErrorToMap(e, map);
                Result result = new Result(e.getCode(), e.getMessage());
                addRecord(map);
                return result;
            } catch (Exception e2) {
                ErrorUtils.putErrorToMap(e2, map);
                Result serverError = Result.serverError("redirect error! ");
                addRecord(map);
                return serverError;
            }
        } catch (Throwable th) {
            addRecord(map);
            throw th;
        }
    }

    private void dispatchCloundMessage(Map<String, String> map, RequestMessage requestMessage) {
        ProjectDto projectDto;
        ProjectDto projectDto2;
        try {
            Map<String, String> properties = requestMessage.getProperties();
            if (properties == null || !properties.containsKey(TcpConstants.eventType)) {
                return;
            }
            String str = properties.get(TcpConstants.eventType);
            if (org.apache.commons.lang3.StringUtils.isBlank(str)) {
                return;
            }
            if (properties.get(QUEUE_PREFIX) != null) {
                String str2 = properties.get(QUEUE_PREFIX);
                if (str.startsWith(str2)) {
                    str = str.replaceFirst(str2, SplitConstants.empty);
                }
            }
            String str3 = map.get(RequestConstants.providerNo);
            if (org.apache.commons.lang3.StringUtils.isBlank(str3)) {
                return;
            }
            String str4 = ProjectConfigCache.projectId;
            if (org.apache.commons.lang3.StringUtils.isBlank(str4) || (projectDto = ProjectConfigCache.projectCache.get(str4)) == null || (projectDto2 = ProjectConfigCache.projectCache.get(str3)) == null) {
                return;
            }
            Map<String, ProjectApiInfo> api_all_cache = projectDto.getApi_all_cache();
            Map<String, ProjectApiInfo> api_all_cache2 = projectDto2.getApi_all_cache();
            String str5 = "TCP@@" + str;
            if (api_all_cache2 != null && api_all_cache != null && api_all_cache2.get(str5) != null && api_all_cache.get(str5) != null) {
                ProjectApiInfo projectApiInfo = api_all_cache2.get(str5);
                ProjectApiInfo projectApiInfo2 = api_all_cache.get(str5);
                if (projectApiInfo.getRequestPath().equals(projectApiInfo2.getRequestPath()) && projectApiInfo.getName().equals(projectApiInfo2.getName())) {
                    this.mCFactoryUtils.dispatchCloundMessage(requestMessage, properties, str, projectDto, projectDto2);
                }
            }
        } catch (Exception e) {
            log.error("dispatchCloundMessage failed ,error:{}", ErrorUtils.getStackMsg(e));
        }
    }

    private void dispatchLocalSubMessage(Map<String, String> map, RequestMessage requestMessage, Map<String, Object> map2) throws IllegalAccessException, InvocationTargetException {
        ProjectApiInfo projectApiInfo;
        ProjectDto projectDto = (ProjectDto) map2.get(PROJECT_DTO);
        String str = map.get(TcpConstants.pubCode);
        String str2 = map.get(TcpConstants.subProjectCode);
        String str3 = map.get(RequestConstants.providerNo);
        Map<String, Set<ProjectAuthApiDto>> subKeyMap = projectDto.getSubKeyMap();
        ProjectApiInfo projectApiInfo2 = projectDto.getApi_all_cache().get("TCP@@" + str);
        if (projectApiInfo2 != null) {
            map.put(RequestConstants.apiId, projectApiInfo2.getId());
        }
        String str4 = SplitConstants.empty;
        Map<String, String> properties = requestMessage.getProperties();
        if (properties != null && properties.containsKey(TcpConstants.eventType)) {
            str4 = properties.get(TcpConstants.eventType);
            if (properties.get(QUEUE_PREFIX) != null) {
                String str5 = properties.get(QUEUE_PREFIX);
                if (str4.startsWith(str5)) {
                    str4 = str4.replaceFirst(str5, SplitConstants.empty);
                }
            }
        }
        if (org.apache.commons.lang3.StringUtils.isNotBlank(str4) && (projectApiInfo = projectDto.getApi_all_cache().get("TCP@@" + str4)) != null) {
            map.put(RequestConstants.apiId, projectApiInfo.getId());
        }
        if (subKeyMap == null || subKeyMap.size() <= 0) {
            log.info("无消费者订阅项目[{}]的tcp消息,projectCode :{} ", projectDto.getName(), projectDto.getCode());
            return;
        }
        Set<ProjectAuthApiDto> set = subKeyMap.get(str);
        if (CollectionUtils.isEmpty(set) && org.apache.commons.lang3.StringUtils.isNotBlank(str4)) {
            set = subKeyMap.get(str4);
        }
        if (!CollectionUtils.isNotEmpty(set)) {
            log.info("无消费者订阅项目[{}]的tcp消息,projectCode :{},pubCode :{} ", new Object[]{projectDto.getName(), projectDto.getCode(), str});
            return;
        }
        RabbitMqProducer rabbitMqProducer = ProjectConfigCache.rabbitMqProducerMap.get(str3);
        if (rabbitMqProducer == null) {
            log.error("rabbitMqProducer is null ,无消费者订阅项目[{}]的tcp消息,projectCode :{},pubCode :{} ", new Object[]{projectDto.getName(), projectDto.getCode(), str});
            return;
        }
        Iterator<ProjectAuthApiDto> it = set.iterator();
        while (it.hasNext()) {
            dispatchLocalSubMessageForConsumer(requestMessage, str2, rabbitMqProducer, it.next());
        }
    }

    private void dispatchLocalSubMessageForConsumer(RequestMessage requestMessage, String str, RabbitMqProducer rabbitMqProducer, ProjectAuthApiDto projectAuthApiDto) throws IllegalAccessException, InvocationTargetException {
        if (StringUtils.isEmpty(str) || str.equals(projectAuthApiDto.getProjectCode())) {
            RequestMessage requestMessage2 = new RequestMessage();
            BeanUtils.copyProperties(requestMessage2, requestMessage);
            requestMessage2.getInnerProps().put("action", projectAuthApiDto.getAction());
            requestMessage2.getInnerProps().put(RequestConstants.apiId, projectAuthApiDto.getApiId());
            requestMessage2.getInnerProps().put(TcpConstants.clientProjectId, projectAuthApiDto.getProjectId());
            requestMessage2.getInnerProps().put(TcpConstants.subProjectCode, str);
            String json = JacksonUtil.getInstance().toJson(requestMessage2);
            Integer overstockSizeMq = this.messageOverstockStaticsService.overstockSizeMq(projectAuthApiDto.getProjectId());
            Integer overstockSizeDb = this.messageOverstockStaticsService.overstockSizeDb(projectAuthApiDto.getProjectId());
            if (overstockSizeMq.intValue() < this.overstockLimitSize.intValue() && overstockSizeDb.intValue() < 50) {
                rabbitMqProducer.sendMsg(ProjectConfigCache.getQueueName(projectAuthApiDto.getProjectCode()), json);
                this.messageOverstockStaticsService.increaseMq(projectAuthApiDto.getProjectId());
                this.messageTraceInsertHandler.add(requestMessage2, MessageTraceStatusEnum.dlq.getCode());
            } else {
                MessageNeedPush messageNeedPush = new MessageNeedPush();
                messageNeedPush.setClientProjectId(projectAuthApiDto.getProjectId());
                messageNeedPush.setMessageId(requestMessage.getProperties().get(TcpConstants.msgId));
                this.messageNeedPushInsertHandler.add(messageNeedPush);
                this.messageOverstockStaticsService.increaseDb(projectAuthApiDto.getProjectId());
                this.messageTraceInsertHandler.add(requestMessage2, MessageTraceStatusEnum.dff.getCode());
            }
        }
    }

    @Override // com.xforceplus.apollo.janus.standalone.service.IMessageService
    public Result subMessage(HttpServletRequest httpServletRequest) {
        RabbitMqConsumer rabbitMqConsumer;
        Map<String, String> hashMap = new HashMap<>();
        long currentTimeMillis = System.currentTimeMillis();
        String str = SplitConstants.empty;
        try {
            try {
                Map<String, ProjectDto> map = ProjectConfigCache.projectCodeCache;
                if (map == null) {
                    Result invalidParam = Result.invalidParam("属地集成平台服务器正在启动，请稍后再试！");
                    addRecord(hashMap);
                    return invalidParam;
                }
                hashMap.put("startTime", currentTimeMillis + SplitConstants.empty);
                String header = httpServletRequest.getHeader(RequestConstants.clientFlags);
                if (org.apache.commons.lang3.StringUtils.isBlank(header)) {
                    Result invalidParam2 = Result.invalidParam("http请求中 clientFlags不能为空！！");
                    addRecord(hashMap);
                    return invalidParam2;
                }
                if (header.startsWith(RequestConstants.clientFlagsPrefix)) {
                    header = header.replaceFirst(RequestConstants.clientFlagsPrefix, SplitConstants.empty);
                }
                ProjectDto projectDto = map.get(header);
                if (projectDto == null) {
                    Result invalidParam3 = Result.invalidParam("项目信息不存在！");
                    addRecord(hashMap);
                    return invalidParam3;
                }
                String id = projectDto.getId();
                hashMap.put(RequestConstants.protocol, ProtocolEnum.TCP.getName());
                hashMap.put(RequestConstants.projectId, id);
                RequestUtils.httpRequestConvertToRecordMap(httpServletRequest, hashMap);
                String code = projectDto.getCode();
                Result<String> logQueryCmdhHandle = logQueryCmdhHandle(httpServletRequest, hashMap, code);
                if (logQueryCmdhHandle != null) {
                    addRecord(hashMap);
                    return logQueryCmdhHandle;
                }
                RequestMessage oneMessages = this.messageCache.getOneMessages(code);
                if (oneMessages != null) {
                    str = JacksonUtil.getInstance().toJson(oneMessages);
                } else if (!this.messageCacheEnable) {
                    validateLimiter(id);
                    Map<String, RabbitMqConsumer> map2 = ProjectConfigCache.rabbitMqConsumerMap;
                    if (map2 != null && map2.size() > 0 && (rabbitMqConsumer = map2.get(id)) != null) {
                        str = rabbitMqConsumer.poll();
                        if (org.apache.commons.lang3.StringUtils.isNotBlank(str)) {
                            this.messageOverstockStaticsService.decreaseMq(id);
                        }
                    }
                }
                hashMap.put(RequestConstants.responseCode, "200");
                hashMap.put(RequestConstants.responseBody, str);
                hashMap.put(RequestConstants.responseBodySize, String.valueOf(str == null ? 0 : str.length()));
                if (org.apache.commons.lang3.StringUtils.isBlank(str)) {
                    Result ok = Result.ok(str);
                    addRecord(hashMap);
                    return ok;
                }
                RequestMessage requestMessage = (RequestMessage) JacksonUtil.getInstance().fromJson(str, RequestMessage.class);
                hashMap.put(RequestConstants.apiId, requestMessage.getInnerProps().get(RequestConstants.apiId));
                hashMap.put("action", requestMessage.getInnerProps().get("action"));
                hashMap.put(RequestConstants.providerNo, requestMessage.getInnerProps().get(RequestConstants.providerNo));
                hashMap.put(TcpConstants.businessNo, requestMessage.getInnerProps().get(TcpConstants.businessNo));
                hashMap.put(TcpConstants.transNo, requestMessage.getInnerProps().get(TcpConstants.transNo));
                this.messageTraceInsertHandler.add(requestMessage, MessageTraceStatusEnum.dqr.getCode());
                RequestMessage requestMessage2 = new RequestMessage();
                BeanUtils.copyProperties(requestMessage2, requestMessage);
                Result ok2 = Result.ok(JacksonUtil.getInstance().toJson(requestMessage2));
                addRecord(hashMap);
                return ok2;
            } catch (Exception e) {
                ErrorUtils.putErrorToMap(e, hashMap);
                addRecord(hashMap);
                return Result.ok(str);
            }
        } catch (Throwable th) {
            addRecord(hashMap);
            throw th;
        }
    }

    private Result<String> logQueryCmdhHandle(HttpServletRequest httpServletRequest, Map<String, String> map, String str) {
        String header = httpServletRequest.getHeader(RequestConstants.logQueryCmd);
        if (StringUtils.isEmpty(header) || !RequestMessage.COMPRESS_FLAG_TRUE.equals(header)) {
            return null;
        }
        String str2 = map.get(RequestConstants.sourceIp);
        if (StringUtils.isEmpty(str2)) {
            str2 = "ip is null";
        }
        List<LogQueryTask> queryTasks = this.logQueryTaskMapper.queryTasks(str, str2);
        RequestMessage requestMessage = new RequestMessage();
        if (CollectionUtils.isEmpty(queryTasks)) {
            requestMessage.setContent(SplitConstants.empty);
            requestMessage.getProperties().put("hasTaskCmd", RequestMessage.COMPRESS_FLAG_FALSE);
        } else {
            LogQueryTask logQueryTask = queryTasks.get(0);
            LogQueryDownRecord logQueryDownRecord = new LogQueryDownRecord();
            String format = DateUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss");
            logQueryDownRecord.setId(SnowflakeIdWorker.getId());
            logQueryDownRecord.setQueryTaskId(logQueryTask.getId());
            logQueryDownRecord.setAddress(str2);
            logQueryDownRecord.setCreatedTime(format);
            this.logQueryDownRecordMapper.insert(logQueryDownRecord);
            requestMessage.setContent(logQueryTask.getQueryContent());
            requestMessage.getProperties().put("hasTaskCmd", RequestMessage.COMPRESS_FLAG_TRUE);
        }
        return Result.ok(JacksonUtil.getInstance().toJson(requestMessage));
    }

    @Override // com.xforceplus.apollo.janus.standalone.service.IMessageService
    public Result ackMessage(HttpServletRequest httpServletRequest) {
        Map<String, ProjectDto> map;
        HashMap hashMap = new HashMap();
        hashMap.put("startTime", System.currentTimeMillis() + SplitConstants.empty);
        try {
            try {
                map = ProjectConfigCache.projectCodeCache;
            } catch (Exception e) {
                ErrorUtils.putErrorToMap(e, hashMap);
                addRecord(hashMap);
            }
            if (map == null) {
                Result invalidParam = Result.invalidParam("属地集成平台服务器正在启动，请稍后再试！");
                addRecord(hashMap);
                return invalidParam;
            }
            String header = httpServletRequest.getHeader(RequestConstants.clientFlags);
            if (org.apache.commons.lang3.StringUtils.isBlank(header)) {
                Result invalidParam2 = Result.invalidParam("http请求中 clientFlags不能为空！！");
                addRecord(hashMap);
                return invalidParam2;
            }
            if (header.startsWith(RequestConstants.clientFlagsPrefix)) {
                header = header.replaceFirst(RequestConstants.clientFlagsPrefix, SplitConstants.empty);
            }
            ProjectDto projectDto = map.get(header);
            if (projectDto == null) {
                Result invalidParam3 = Result.invalidParam("项目信息不存在！");
                addRecord(hashMap);
                return invalidParam3;
            }
            String id = projectDto.getId();
            hashMap.put(RequestConstants.protocol, ProtocolEnum.TCP.getName());
            hashMap.put(RequestConstants.projectId, id);
            RequestMessage requestMessage = (RequestMessage) JacksonUtil.getInstance().fromJson(RequestUtils.getRequestJsonString(httpServletRequest), RequestMessage.class);
            RequestUtils.httpRequestConvertToRecordMap(httpServletRequest, hashMap);
            this.messageTraceInsertHandler.add(requestMessage, MessageTraceStatusEnum.yqr.getCode());
            hashMap.put(RequestConstants.responseCode, "200");
            hashMap.put(RequestConstants.responseBody, SplitConstants.empty);
            hashMap.put(RequestConstants.responseBodySize, "0");
            addRecord(hashMap);
            return Result.ok();
        } catch (Throwable th) {
            addRecord(hashMap);
            throw th;
        }
    }

    @Override // com.xforceplus.apollo.janus.standalone.service.IMessageService
    public void handleCloundMessage(SealedMessageEvent sealedMessageEvent) {
        try {
            String requestName = sealedMessageEvent.getRequestName();
            SealedMessage sealedMessage = sealedMessageEvent.getSealedMessage();
            log.info(JacksonUtil.getInstance().toJson(sealedMessage));
            String str = ProjectConfigCache.projectId;
            if (org.apache.commons.lang3.StringUtils.isBlank(str)) {
                log.error("项目信息未加载, baseProjectId is null");
            }
            ProjectDto projectDto = ProjectConfigCache.projectCache.get(str);
            if (projectDto == null) {
                log.error("项目信息未加载, projectDto is null ");
            }
            Map<String, Set<ProjectAuthApiDto>> subKeyMap = projectDto.getSubKeyMap();
            if (subKeyMap == null || subKeyMap.size() <= 0) {
                log.error("消息没有子项目订阅, requestName ： {} ", requestName);
            } else {
                Set<ProjectAuthApiDto> set = subKeyMap.get(requestName);
                RequestMessage requestMessage = new RequestMessage();
                HashMap hashMap = new HashMap();
                requestMessage.setContent(sealedMessage.getPayload().getObj().toString());
                SealedMessage.Header header = sealedMessage.getHeader();
                hashMap.put(TcpConstants.sourceMsgId, header.getMsgId());
                hashMap.put(TcpConstants.sourceCreateTime, header.getCreateTime());
                hashMap.put(TcpConstants.sourceUserId, header.getUserId());
                hashMap.put("type", header.getType());
                requestMessage.setProperties(header.getOthers());
                requestMessage.setInnerProps(hashMap);
                if (CollectionUtils.isNotEmpty(set)) {
                    hashMap.put(TcpConstants.msgId, DateUtils.format(new Date(), DateUtils.DATE_PATTERN_NO) + SnowflakeIdWorker.getId());
                    hashMap.put(TcpConstants.pubCode, requestName);
                    hashMap.put(TcpConstants.provideCode, projectDto.getCode());
                    hashMap.put(RequestConstants.providerNo, projectDto.getId());
                    requestMessage.setCreatedTime(DateUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"));
                    if (!requestMessage.getProperties().containsKey(TcpConstants.businessNo)) {
                        throw new BusinessException(HttpStatus.BAD_REQUEST.value(), "businessNo不能为空!");
                    }
                    hashMap.put(TcpConstants.businessNo, requestMessage.getProperties().get(TcpConstants.businessNo));
                    if (requestMessage.getProperties().containsKey(TcpConstants.transNo)) {
                        hashMap.put(TcpConstants.transNo, requestMessage.getProperties().get(TcpConstants.transNo));
                    } else {
                        hashMap.put(TcpConstants.transNo, requestMessage.getProperties().get(TcpConstants.businessNo));
                    }
                    this.messageInsertContentHandler.add(requestMessage);
                    HashMap hashMap2 = new HashMap();
                    RabbitMqProducer rabbitMqProducer = ProjectConfigCache.rabbitMqProducerMap.get(str);
                    if (rabbitMqProducer == null) {
                        log.error("rabbitMqProducer is null ,无消费者订阅项目[{}]的tcp消息,projectCode :{},pubCode :{} ", new Object[]{projectDto.getName(), projectDto.getCode(), requestName});
                    } else {
                        for (ProjectAuthApiDto projectAuthApiDto : set) {
                            RequestMessage requestMessage2 = new RequestMessage();
                            BeanUtils.copyProperties(requestMessage2, requestMessage);
                            requestMessage2.getInnerProps().put("action", projectAuthApiDto.getAction());
                            requestMessage2.getInnerProps().put(RequestConstants.apiId, projectAuthApiDto.getApiId());
                            requestMessage2.getInnerProps().put(TcpConstants.clientProjectId, projectDto.getId());
                            requestMessage2.getInnerProps().put(TcpConstants.subProjectCode, projectAuthApiDto.getProjectCode());
                            hashMap2.put(RequestConstants.apiId, projectAuthApiDto.getApiId());
                            String json = JacksonUtil.getInstance().toJson(requestMessage2);
                            Integer overstockSizeMq = this.messageOverstockStaticsService.overstockSizeMq(projectAuthApiDto.getProjectId());
                            Integer overstockSizeDb = this.messageOverstockStaticsService.overstockSizeDb(projectAuthApiDto.getProjectId());
                            if (overstockSizeMq.intValue() >= this.overstockLimitSize.intValue() || overstockSizeDb.intValue() >= 10) {
                                MessageNeedPush messageNeedPush = new MessageNeedPush();
                                messageNeedPush.setClientProjectId(projectAuthApiDto.getProjectId());
                                messageNeedPush.setMessageId(projectAuthApiDto.getProjectId());
                                this.messageNeedPushInsertHandler.add(messageNeedPush);
                                this.messageOverstockStaticsService.increaseDb(projectAuthApiDto.getProjectId());
                                this.messageTraceInsertHandler.add(requestMessage2, MessageTraceStatusEnum.dff.getCode());
                            } else {
                                rabbitMqProducer.sendMsg(ProjectConfigCache.getQueueName(projectAuthApiDto.getProjectCode()), json);
                                this.messageOverstockStaticsService.increaseMq(projectAuthApiDto.getProjectId());
                                this.messageTraceInsertHandler.add(requestMessage2, MessageTraceStatusEnum.dlq.getCode());
                            }
                        }
                    }
                } else {
                    log.error("无消费者订阅项目[{}]的tcp消息,projectCode :{},pubCode :{} ", new Object[]{projectDto.getName(), projectDto.getCode(), requestName});
                }
            }
        } catch (Exception e) {
            log.info(ErrorUtils.getStackMsg(e));
        }
    }

    private void addRecord(Map<String, String> map) {
        try {
            String str = map.get(TcpConstants.pubCode);
            long longValue = Long.valueOf(String.valueOf(map.get("startTime"))).longValue();
            if (org.apache.commons.lang3.StringUtils.isNotBlank(str) && !str.equals("heart-beat-task")) {
                map.put(RequestConstants.costTime, String.valueOf(System.currentTimeMillis() - longValue));
                this.requestRecordContentHandler.addRecord(map);
            }
        } catch (Exception e) {
            log.error(ErrorUtils.getStackMsg(e));
        }
    }
}
