package com.xforceplus.ultraman.oqsengine.meta;

import com.google.common.util.concurrent.Uninterruptibles;
import com.xforceplus.ultraman.oqsengine.meta.common.config.GRpcParamsConfig;
import com.xforceplus.ultraman.oqsengine.meta.common.constant.RequestStatus;
import com.xforceplus.ultraman.oqsengine.meta.common.dto.WatchElement;
import com.xforceplus.ultraman.oqsengine.meta.common.exception.MetaSyncClientException;
import com.xforceplus.ultraman.oqsengine.meta.common.executor.IBasicSyncExecutor;
import com.xforceplus.ultraman.oqsengine.meta.common.proto.EntityClassSyncRequest;
import com.xforceplus.ultraman.oqsengine.meta.common.proto.EntityClassSyncResponse;
import com.xforceplus.ultraman.oqsengine.meta.common.utils.TimeWaitUtils;
import com.xforceplus.ultraman.oqsengine.meta.connect.GRpcClient;
import com.xforceplus.ultraman.oqsengine.meta.executor.IRequestWatchExecutor;
import com.xforceplus.ultraman.oqsengine.meta.handler.IRequestHandler;
import io.grpc.stub.StreamObserver;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xforceplus/ultraman/oqsengine/meta/EntityClassSyncClient.class */
public class EntityClassSyncClient implements IBasicSyncExecutor {
    private Logger logger = LoggerFactory.getLogger(EntityClassSyncClient.class);

    @Resource(name = "gRpcClient")
    private GRpcClient client;

    @Resource(name = "requestHandler")
    private IRequestHandler requestHandler;

    @Resource
    private IRequestWatchExecutor requestWatchExecutor;

    @Resource
    private GRpcParamsConfig gRpcParamsConfig;

    @Resource(name = "oqsSyncThreadPool")
    private ExecutorService executorService;
    public static volatile boolean isShutdown = false;

    @PostConstruct
    public void start() {
        this.client.start();
        if (!this.client.opened()) {
            throw new MetaSyncClientException("client stub create failed.", true);
        }
        isShutdown = false;
        this.executorService.submit(this::observerStreamMonitor);
        this.logger.info("entityClassSyncClient start.");
    }

    public void stop() {
        isShutdown = true;
        this.requestWatchExecutor.stop();
        if (this.client.opened()) {
            this.client.stop();
        }
        this.logger.info("entityClassSyncClient stop.");
    }

    public static boolean isShutDown() {
        return isShutdown;
    }

    private boolean observerStreamMonitor() {
        while (!isShutDown()) {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            String uuid = UUID.randomUUID().toString();
            try {
                this.requestWatchExecutor.create(uuid, responseEvent(countDownLatch));
                if (this.requestHandler.reRegister()) {
                    this.requestWatchExecutor.watcher().onServe();
                    Uninterruptibles.awaitUninterruptibly(countDownLatch);
                }
                this.requestWatchExecutor.watcher().notServer();
                if (isShutdown) {
                    this.logger.warn("stream has broken due to client has been shutdown...");
                } else {
                    this.logger.warn("stream [{}] has broken, reCreate new stream after ({})ms...", uuid, Long.valueOf(this.gRpcParamsConfig.getReconnectDuration()));
                    TimeWaitUtils.wakeupAfter(this.gRpcParamsConfig.getReconnectDuration(), TimeUnit.MILLISECONDS);
                    this.requestWatchExecutor.watcher().release();
                }
            } catch (Exception e) {
                this.logger.warn("observer init error, message : {}, retry after ({})ms", Long.valueOf(this.gRpcParamsConfig.getReconnectDuration()), e.getMessage());
                TimeWaitUtils.wakeupAfter(this.gRpcParamsConfig.getReconnectDuration(), TimeUnit.MILLISECONDS);
            }
        }
        return true;
    }

    private StreamObserver<EntityClassSyncRequest> responseEvent(final CountDownLatch countDownLatch) {
        return this.client.channelStub().register(new StreamObserver<EntityClassSyncResponse>() { // from class: com.xforceplus.ultraman.oqsengine.meta.EntityClassSyncClient.1
            public void onNext(EntityClassSyncResponse entityClassSyncResponse) {
                if (EntityClassSyncClient.this.requestWatchExecutor.canAccess(entityClassSyncResponse.getUid())) {
                    EntityClassSyncClient.this.requestWatchExecutor.resetHeartBeat(entityClassSyncResponse.getUid());
                    if (entityClassSyncResponse.getStatus() == RequestStatus.REGISTER_OK.ordinal()) {
                        EntityClassSyncClient.this.requestWatchExecutor.update(new WatchElement(entityClassSyncResponse.getAppId(), entityClassSyncResponse.getEnv(), entityClassSyncResponse.getVersion(), WatchElement.AppStatus.Confirmed));
                    } else if (entityClassSyncResponse.getStatus() == RequestStatus.SYNC.ordinal()) {
                        EntityClassSyncClient.this.executorService.submit(() -> {
                            try {
                                EntityClassSyncClient.this.requestHandler.accept(entityClassSyncResponse);
                            } catch (Exception e) {
                                EntityClassSyncClient.this.logger.warn(e.getMessage());
                                if (EntityClassSyncClient.this.requestWatchExecutor.watcher().isOnServe()) {
                                    EntityClassSyncClient.this.requestWatchExecutor.watcher().observer().onError(e);
                                }
                            }
                        });
                    }
                }
            }

            public void onError(Throwable th) {
                EntityClassSyncClient.this.logger.error("stream observer on error, message-[{}].", th.getMessage());
                countDownLatch.countDown();
            }

            public void onCompleted() {
                EntityClassSyncClient.this.logger.info("stream observer completed.");
                countDownLatch.countDown();
            }
        });
    }
}
