package com.xforceplus.xplat.galaxy.grpc;

import akka.actor.ActorRef;
import akka.actor.Status;
import akka.stream.Attributes;
import akka.stream.Outlet;
import akka.stream.SourceShape;
import akka.stream.stage.AbstractOutHandler;
import akka.stream.stage.GraphStage;
import akka.stream.stage.GraphStageLogic;
import com.xforceplus.xplat.galaxy.grpc.actor.ChannelActor;
import java.util.LinkedList;
import java.util.Queue;
import scala.runtime.BoxedUnit;

/* loaded from: input_file:BOOT-INF/lib/reactive-grpc-server-6.0.2-SNAPSHOT.jar:com/xforceplus/xplat/galaxy/grpc/MessageSource.class */
public class MessageSource<T> extends GraphStage<SourceShape<T>> {
    private ActorRef channel;
    private Class<T> clazz;
    public final Outlet<T> out = Outlet.create("MessageSource");
    private final SourceShape<T> shape = SourceShape.of(this.out);
    private Queue<T> queue = new LinkedList();

    public MessageSource(ActorRef actorRef, Class<T> cls) {
        this.clazz = cls;
        this.channel = actorRef;
    }

    @Override // akka.stream.stage.GraphStage
    public GraphStageLogic createLogic(Attributes attributes) {
        return new GraphStageLogic(shape2()) { // from class: com.xforceplus.xplat.galaxy.grpc.MessageSource.1
            {
                setHandler((Outlet<?>) MessageSource.this.out, new AbstractOutHandler() { // from class: com.xforceplus.xplat.galaxy.grpc.MessageSource.1.1
                    @Override // akka.stream.stage.OutHandler
                    public void onPull() throws Exception {
                        pump();
                    }
                });
            }

            /* JADX INFO: Access modifiers changed from: private */
            /* JADX WARN: Multi-variable type inference failed */
            public void pump() {
                if (!isAvailable(MessageSource.this.out) || MessageSource.this.queue.isEmpty()) {
                    return;
                }
                push(MessageSource.this.out, MessageSource.this.queue.poll());
            }

            @Override // akka.stream.stage.GraphStageLogic
            public void preStart() {
                MessageSource.this.channel.tell(new ChannelActor.Init(getStageActor(tuple2 -> {
                    Object mo22687_2 = tuple2.mo22687_2();
                    if (mo22687_2.getClass().isAssignableFrom(MessageSource.this.clazz)) {
                        MessageSource.this.queue.offer(mo22687_2);
                        pump();
                    } else if (mo22687_2 instanceof Status.Success) {
                        complete(MessageSource.this.out);
                    } else if (mo22687_2 instanceof Status.Failure) {
                        fail(MessageSource.this.out, ((Status.Failure) mo22687_2).cause());
                    }
                    return BoxedUnit.UNIT;
                }).ref()), ActorRef.noSender());
            }
        };
    }

    @Override // akka.stream.Graph
    /* renamed from: shape */
    public SourceShape<T> shape2() {
        return this.shape;
    }
}
