package com.xforceplus.ultraman.bocp.metadata.deploy.grpc.support;

import akka.actor.ActorRef;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Source;
import com.xforceplus.ultraman.bocp.grpc.proto.Base;
import com.xforceplus.ultraman.bocp.grpc.proto.CheckServiceGrpc;
import com.xforceplus.ultraman.bocp.grpc.proto.ModuleUpResult;
import com.xforceplus.xplat.galaxy.grpc.MessageRouter;
import com.xforceplus.xplat.galaxy.grpc.MessageSource;
import com.xforceplus.xplat.galaxy.grpc.actor.ChannelActor;
import io.grpc.Context;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xforceplus/ultraman/bocp/metadata/deploy/grpc/support/DefaultCheckServiceSupport.class */
public abstract class DefaultCheckServiceSupport extends CheckServiceGrpc.CheckServiceImplBase {
    private MessageRouter<Base.Authorization, ModuleUpResult> messageRouter;
    private ActorMaterializer mat;
    private Logger logger = LoggerFactory.getLogger(DefaultCheckServiceSupport.class);
    private ExecutorService executorService;

    public DefaultCheckServiceSupport(MessageRouter<Base.Authorization, ModuleUpResult> messageRouter, ActorMaterializer actorMaterializer, ExecutorService executorService) {
        this.messageRouter = messageRouter;
        this.mat = actorMaterializer;
        this.executorService = executorService;
    }

    public void checkStreaming(Base.Authorization authorization, StreamObserver<ModuleUpResult> streamObserver) {
        Object join = this.messageRouter.register(authorization).join();
        if (!(join instanceof ActorRef)) {
            this.logger.warn("Source is empty for {}", authorization);
            streamObserver.onCompleted();
            return;
        }
        Source fromGraph = Source.fromGraph(new MessageSource((ActorRef) join, ModuleUpResult.class));
        String appId = authorization.getAppId();
        String env = authorization.getEnv();
        Context.current().addListener(context -> {
            this.logger.debug("invalidate module stale connection for {}, {}, clean channel", appId, env);
            ((ActorRef) join).tell(ChannelActor.Cancelled.ins, ActorRef.noSender());
            streamObserver.onError(new RuntimeException("Cancelled"));
        }, this.executorService);
        fromGraph.runWith(new StreamObservableSink(streamObserver), this.mat);
    }

    public void offer(ModuleUpResult moduleUpResult) {
        this.messageRouter.send(moduleUpResult);
    }
}
