package com.xforceplus.ultraman.oqsengine.meta;

import com.google.common.util.concurrent.Uninterruptibles;
import com.xforceplus.ultraman.oqsengine.meta.common.config.GRpcParams;
import com.xforceplus.ultraman.oqsengine.meta.common.exception.MetaSyncClientException;
import com.xforceplus.ultraman.oqsengine.meta.common.executor.IBasicSyncExecutor;
import com.xforceplus.ultraman.oqsengine.meta.common.proto.sync.EntityClassSyncRequest;
import com.xforceplus.ultraman.oqsengine.meta.common.proto.sync.EntityClassSyncResponse;
import com.xforceplus.ultraman.oqsengine.meta.common.utils.ThreadUtils;
import com.xforceplus.ultraman.oqsengine.meta.common.utils.TimeWaitUtils;
import com.xforceplus.ultraman.oqsengine.meta.connect.GRpcClient;
import com.xforceplus.ultraman.oqsengine.meta.handler.IRequestHandler;
import io.grpc.stub.StreamObserver;
import io.micrometer.core.instrument.Metrics;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xforceplus/ultraman/oqsengine/meta/EntityClassSyncClient.class */
public class EntityClassSyncClient implements IBasicSyncExecutor {

    @Resource(name = "gRpcClient")
    private GRpcClient client;

    @Resource(name = "requestHandler")
    private IRequestHandler requestHandler;

    @Resource
    private GRpcParams gRpcParamsConfig;
    private Thread observerStreamMonitorThread;
    private Logger logger = LoggerFactory.getLogger(EntityClassSyncClient.class);
    private AtomicInteger clientRebuildStreamCounter = (AtomicInteger) Metrics.gauge("oqs.connector.client.continues-rebuild-stream", new AtomicInteger(0));

    @PostConstruct
    public void start() {
        this.client.start();
        if (!this.client.opened()) {
            throw new MetaSyncClientException("client stub create failed.", true);
        }
        this.observerStreamMonitorThread = ThreadUtils.create(this::monitor);
        this.observerStreamMonitorThread.start();
        this.requestHandler.start();
        this.logger.info("entityClassSyncClient start.");
    }

    public void stop() {
        this.requestHandler.stop();
        if (this.client.opened()) {
            this.client.stop();
        }
        this.logger.info("entityClassSyncClient stop.");
    }

    private boolean monitor() {
        while (!this.requestHandler.isShutDown()) {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            try {
                this.requestHandler.initWatcher(UUID.randomUUID().toString(), responseEvent(countDownLatch));
                if (this.requestHandler.reRegister()) {
                    this.requestHandler.ready();
                    Uninterruptibles.awaitUninterruptibly(countDownLatch);
                }
                this.requestHandler.notReady();
                if (!this.requestHandler.isShutDown()) {
                    this.clientRebuildStreamCounter.incrementAndGet();
                }
            } catch (Exception e) {
                this.logger.warn("observer init error, message : {}, retry after ({})ms", e.getMessage(), Long.valueOf(this.gRpcParamsConfig.getReconnectDuration()));
                TimeWaitUtils.wakeupAfter(this.gRpcParamsConfig.getReconnectDuration(), TimeUnit.MILLISECONDS);
            }
        }
        return true;
    }

    private StreamObserver<EntityClassSyncRequest> responseEvent(final CountDownLatch countDownLatch) {
        return this.client.channelStub().register(new StreamObserver<EntityClassSyncResponse>() { // from class: com.xforceplus.ultraman.oqsengine.meta.EntityClassSyncClient.1
            public void onNext(EntityClassSyncResponse entityClassSyncResponse) {
                if (EntityClassSyncClient.this.requestHandler.watchExecutor().isAlive(entityClassSyncResponse.getUid())) {
                    EntityClassSyncClient.this.clientRebuildStreamCounter.set(0);
                    EntityClassSyncClient.this.requestHandler.invoke(entityClassSyncResponse, null);
                }
            }

            public void onError(Throwable th) {
                EntityClassSyncClient.this.logger.error("stream observer on error, message-[{}].", th.getMessage());
                countDownLatch.countDown();
            }

            public void onCompleted() {
                EntityClassSyncClient.this.logger.info("request stream observer completed.");
                countDownLatch.countDown();
            }
        });
    }
}
