package com.xforceplus.tech.admin.server.socket;

import com.xforceplus.tech.admin.domain.ClientResponse;
import com.xforceplus.tech.admin.domain.HeaderConstants;
import com.xforceplus.tech.admin.server.domain.AppNode;
import com.xforceplus.tech.admin.server.socket.handler.ConnectionHandler;
import com.xforceplus.tech.admin.server.socket.handler.PayloadHandler;
import com.xforceplus.tech.admin.server.utils.PayloadUtils;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.metadata.WellKnownMimeType;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.MediaType;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeType;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Service
/* loaded from: input_file:BOOT-INF/classes/com/xforceplus/tech/admin/server/socket/AdminSocketAcceptor.class */
public class AdminSocketAcceptor implements AdminServiceSocketAcceptor {

    @Autowired
    private List<ConnectionHandler> connectionHandlers;

    @Autowired
    private List<PayloadHandler> handlers;

    @Autowired
    private RSocketStrategies rSocketStrategies;
    private Logger log = LoggerFactory.getLogger((Class<?>) AdminSocketAcceptor.class);
    private Map<String, RSocketRequester> mapping = new ConcurrentHashMap();
    private String APP_ENV_PREFIX = "%s^%s";
    private String NODE_PREFIX = "%s^%s^%s";
    private ParameterizedTypeReference mapType = new ParameterizedTypeReference<Map<String, Object>>(this) { // from class: com.xforceplus.tech.admin.server.socket.AdminSocketAcceptor.1
    };

    private String toKey(ConnectionSetupPayload connectionSetupPayload) {
        Map<String, Object> extract = this.rSocketStrategies.metadataExtractor().extract(connectionSetupPayload, MimeType.valueOf(connectionSetupPayload.metadataMimeType()));
        return extract.get(HeaderConstants.APP) + "^" + extract.get(HeaderConstants.ENV) + "^" + extract.get(HeaderConstants.NODE);
    }

    @Override // com.xforceplus.tech.admin.server.socket.AdminServerService
    public <T> Mono<T> query(AppNode appNode, String str, Object obj, Class<T> cls) {
        String format = String.format(this.NODE_PREFIX, appNode.getAppId(), appNode.getEnv(), appNode.getNodeCode());
        new HashMap();
        List<RSocketRequester> findChannel = findChannel(format);
        return !findChannel.isEmpty() ? findChannel.get(0).route(str, new Object[0]).data(obj).retrieveMono(cls) : Mono.empty();
    }

    @Override // com.xforceplus.tech.admin.server.socket.AdminServerService
    public Mono<String> query(AppNode appNode, String str, Object obj) {
        return query(appNode, str, obj, String.class);
    }

    @Override // com.xforceplus.tech.admin.server.socket.AdminServerService
    public void deployConfiguration(String str, String str2, String str3, String str4) {
        List<RSocketRequester> findChannel = findChannel(this.APP_ENV_PREFIX.formatted(str, str2));
        HashMap hashMap = new HashMap();
        hashMap.put(str3, str4);
        findChannel.forEach(rSocketRequester -> {
            rSocketRequester.metadata(PayloadUtils.toJson(hashMap), MediaType.APPLICATION_JSON).sendMetadata().subscribe();
        });
    }

    @Override // io.rsocket.SocketAcceptor
    public Mono<RSocket> accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) {
        ClientResponse clientResponse = null;
        HashMap hashMap = new HashMap();
        RSocketRequester wrap = RSocketRequester.wrap(rSocket, MediaType.APPLICATION_JSON, MimeType.valueOf(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString()), this.rSocketStrategies);
        try {
            Map<String, Object> extract = this.rSocketStrategies.metadataExtractor().extract(connectionSetupPayload, MimeType.valueOf(connectionSetupPayload.metadataMimeType()));
            clientResponse = PayloadUtils.toClientResponse(connectionSetupPayload);
            extract.forEach((str, obj) -> {
                hashMap.put(str, obj == null ? null : obj.toString());
            });
        } catch (Exception e) {
        }
        ClientResponse clientResponse2 = clientResponse;
        this.connectionHandlers.forEach(connectionHandler -> {
            try {
                connectionHandler.handle(connectionSetupPayload, clientResponse2, hashMap, wrap);
            } catch (Exception e2) {
                this.log.error("{}", (Throwable) e2);
            }
        });
        registerChannel(toKey(connectionSetupPayload), wrap);
        return Mono.just(new RSocket() { // from class: com.xforceplus.tech.admin.server.socket.AdminSocketAcceptor.2
            @Override // io.rsocket.RSocket
            public Mono<Payload> requestResponse(Payload payload) {
                return Mono.empty();
            }

            @Override // io.rsocket.RSocket
            public Flux<Payload> requestStream(Payload payload) {
                return Flux.empty();
            }

            @Override // io.rsocket.RSocket
            public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
                return Flux.empty();
            }

            @Override // io.rsocket.RSocket
            public Mono<Void> metadataPush(Payload payload) {
                return Mono.empty();
            }

            @Override // io.rsocket.RSocket
            public Mono<Void> fireAndForget(Payload payload) {
                try {
                    ClientResponse clientResponse3 = PayloadUtils.toClientResponse(payload);
                    Map<String, String> metadata = PayloadUtils.getMetadata(payload);
                    AdminSocketAcceptor.this.handlers.forEach(payloadHandler -> {
                        payloadHandler.handle(payload, clientResponse3, metadata);
                    });
                } catch (Throwable th) {
                    AdminSocketAcceptor.this.log.error("{}", th);
                }
                return Mono.empty();
            }
        });
    }

    private void registerChannel(String str, RSocketRequester rSocketRequester) {
        this.mapping.put(str, rSocketRequester);
    }

    private List<RSocketRequester> findChannel(String str) {
        return (List) this.mapping.entrySet().stream().filter(entry -> {
            return ((String) entry.getKey()).startsWith(str);
        }).map(entry2 -> {
            return (RSocketRequester) entry2.getValue();
        }).filter(rSocketRequester -> {
            return !rSocketRequester.isDisposed();
        }).collect(Collectors.toList());
    }
}
