package com.xforceplus.ultraman.bocp.metadata.config;

import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import com.xforceplus.ultraman.bocp.grpc.proto.Base;
import com.xforceplus.ultraman.bocp.grpc.proto.DictUpResult;
import com.xforceplus.ultraman.bocp.grpc.proto.ModuleUpResult;
import com.xforceplus.ultraman.bocp.metadata.service.IDictUpgradeService;
import com.xforceplus.ultraman.bocp.metadata.service.IModuleUpgradeService;
import com.xforceplus.ultraman.bocp.metadata.util.ExecutorHelper;
import com.xforceplus.xplat.galaxy.grpc.GrpcLongConnectionListener;
import com.xforceplus.xplat.galaxy.grpc.MessageRouter;
import com.xforceplus.xplat.galaxy.grpc.MessageRouterBuilder;
import com.xplat.ultraman.api.management.convertor.pojo.constant.Symbol;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
/* loaded from: input_file:BOOT-INF/lib/ultraman-bocp-service-4.8.0-SNAPSHOT.jar:com/xforceplus/ultraman/bocp/metadata/config/MessageSourceService.class */
public class MessageSourceService {
    private Logger logger = LoggerFactory.getLogger((Class<?>) MessageSourceService.class);

    private ExecutorService buildThreadPool(int i, int i2, String str, boolean z) {
        return new ThreadPoolExecutor(i, i, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(i2), ExecutorHelper.buildNameThreadFactory(str, z), new ThreadPoolExecutor.AbortPolicy());
    }

    @Bean
    ExecutorService executorService() {
        return buildThreadPool(50, 1000, "Blocking", false);
    }

    @Bean({"ConnectionListener"})
    ExecutorService connectionListener() {
        return buildThreadPool(10, 1000, "ConnectionListener", false);
    }

    @Bean({"MetadataServiceMessageRouter"})
    public MessageRouter<Base.Authorization, ModuleUpResult> metadataServiceMessageRouter(ActorSystem actorSystem, ActorMaterializer actorMaterializer, final IModuleUpgradeService iModuleUpgradeService, ExecutorService executorService) {
        return MessageRouterBuilder.newBuilder(actorSystem, actorMaterializer).typed(Base.Authorization.class, ModuleUpResult.class).routerKeyExtractor(moduleUpResult -> {
            String env = moduleUpResult.getAuth().getEnv();
            return (List) moduleUpResult.getAppIdsList().stream().map(str -> {
                return str + Symbol.OPERATION_KEY_LINKER + env;
            }).collect(Collectors.toList());
        }).routerKeyGenerator(authorization -> {
            return authorization.getAppId() + Symbol.OPERATION_KEY_LINKER + authorization.getEnv();
        }).connectionListener(new GrpcLongConnectionListener<Base.Authorization, ModuleUpResult>() { // from class: com.xforceplus.ultraman.bocp.metadata.config.MessageSourceService.1
            @Override // com.xforceplus.xplat.galaxy.grpc.GrpcLongConnectionListener
            public List<ModuleUpResult> onConnected(Base.Authorization authorization2) {
                try {
                    return iModuleUpgradeService.init(authorization2);
                } catch (Exception e) {
                    MessageSourceService.this.logger.error("Meta Init is Error {}", (Throwable) e);
                    return Collections.emptyList();
                }
            }

            @Override // com.xforceplus.xplat.galaxy.grpc.GrpcLongConnectionListener
            public void onDisConnected() {
                System.out.println("this channel is over");
            }
        }).routerName("MetadataServiceMessageRouter").withExecutorService(executorService).build();
    }

    @Bean({"DictServiceMessageRouter"})
    public MessageRouter<Base.Authorization, DictUpResult> dictServiceMessageRouter(ActorSystem actorSystem, ActorMaterializer actorMaterializer, final IDictUpgradeService iDictUpgradeService, ExecutorService executorService) {
        return MessageRouterBuilder.newBuilder(actorSystem, actorMaterializer).typed(Base.Authorization.class, DictUpResult.class).routerKeyExtractor(dictUpResult -> {
            return (List) dictUpResult.getAuthorizationList().stream().map(authorization -> {
                return authorization.getAppId() + Symbol.OPERATION_KEY_LINKER + authorization.getEnv();
            }).collect(Collectors.toList());
        }).routerKeyGenerator(authorization -> {
            return authorization.getAppId() + Symbol.OPERATION_KEY_LINKER + authorization.getEnv();
        }).connectionListener(new GrpcLongConnectionListener<Base.Authorization, DictUpResult>() { // from class: com.xforceplus.ultraman.bocp.metadata.config.MessageSourceService.2
            @Override // com.xforceplus.xplat.galaxy.grpc.GrpcLongConnectionListener
            public List<DictUpResult> onConnected(Base.Authorization authorization2) {
                try {
                    return (List) Optional.ofNullable(iDictUpgradeService.init(authorization2)).map((v0) -> {
                        return Collections.singletonList(v0);
                    }).orElseGet(Collections::emptyList);
                } catch (Exception e) {
                    MessageSourceService.this.logger.error("Dict Init is Error {}", (Throwable) e);
                    return Collections.emptyList();
                }
            }

            @Override // com.xforceplus.xplat.galaxy.grpc.GrpcLongConnectionListener
            public void onDisConnected() {
                System.out.println("this channel is over");
            }
        }).withExecutorService(executorService).routerName("DictServiceMessageRouter").build();
    }
}
