/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.oqsengine.meta;

import com.google.common.util.concurrent.Uninterruptibles;
import com.xforceplus.ultraman.oqsengine.meta.common.config.GRpcParams;
import com.xforceplus.ultraman.oqsengine.meta.common.exception.MetaSyncClientException;
import com.xforceplus.ultraman.oqsengine.meta.common.executor.IBasicSyncExecutor;
import com.xforceplus.ultraman.oqsengine.meta.common.monitor.dto.SyncCode;
import com.xforceplus.ultraman.oqsengine.meta.common.proto.sync.EntityClassSyncRequest;
import com.xforceplus.ultraman.oqsengine.meta.common.proto.sync.EntityClassSyncResponse;
import com.xforceplus.ultraman.oqsengine.meta.common.utils.ThreadUtils;
import com.xforceplus.ultraman.oqsengine.meta.common.utils.TimeWaitUtils;
import com.xforceplus.ultraman.oqsengine.meta.connect.GRpcClient;
import com.xforceplus.ultraman.oqsengine.meta.handler.IRequestHandler;
import com.xforceplus.ultraman.oqsengine.meta.utils.ClientIdUtils;
import io.grpc.stub.StreamObserver;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EntityClassSyncClient
implements IBasicSyncExecutor {
    private final Logger logger = LoggerFactory.getLogger(EntityClassSyncClient.class);
    @Resource(name="grpcClient")
    private GRpcClient client;
    @Resource(name="requestHandler")
    private IRequestHandler requestHandler;
    @Resource
    private GRpcParams grpcParamsConfig;
    private Thread observerStreamMonitorThread;
    private static String CLIENT_ID = ClientIdUtils.generate();
    private static String MONITOR_APP_ID = "MONITOR_APP" + CLIENT_ID;

    @PostConstruct
    public void start() {
        this.client.start();
        if (!this.client.opened()) {
            throw new MetaSyncClientException("client stub create failed.", true);
        }
        this.observerStreamMonitorThread = ThreadUtils.create(this::monitor);
        this.observerStreamMonitorThread.start();
        this.requestHandler.start();
        TimeWaitUtils.wakeupAfter((long)1L, (TimeUnit)TimeUnit.SECONDS);
        this.logger.info("entityClassSyncClient start.");
    }

    public void stop() {
        this.requestHandler.stop();
        if (this.client.opened()) {
            this.client.stop();
        }
        this.logger.info("entityClassSyncClient stop.");
    }

    private boolean monitor() {
        while (!this.requestHandler.isShutDown()) {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            StreamObserver<EntityClassSyncRequest> streamObserver = null;
            try {
                streamObserver = this.responseEvent(countDownLatch);
            }
            catch (Exception e) {
                this.requestHandler.metricsRecorder().error(MONITOR_APP_ID, SyncCode.INIT_OBSERVER_ERROR.name(), String.format("observer init error, message : %s", e.getMessage()));
                TimeWaitUtils.wakeupAfter((long)this.grpcParamsConfig.getReconnectDuration(), (TimeUnit)TimeUnit.MILLISECONDS);
                continue;
            }
            this.requestHandler.initWatcher(CLIENT_ID, UUID.randomUUID().toString(), streamObserver);
            if (this.requestHandler.reRegister()) {
                this.requestHandler.ready();
                Uninterruptibles.awaitUninterruptibly((CountDownLatch)countDownLatch);
            }
            this.requestHandler.notReady();
        }
        return true;
    }

    private StreamObserver<EntityClassSyncRequest> responseEvent(final CountDownLatch countDownLatch) {
        return this.client.channelStub().register((StreamObserver)new StreamObserver<EntityClassSyncResponse>(){

            public void onNext(EntityClassSyncResponse entityClassSyncResponse) {
                if (EntityClassSyncClient.this.requestHandler.watchExecutor().isAlive(entityClassSyncResponse.getUid())) {
                    EntityClassSyncClient.this.requestHandler.invoke(entityClassSyncResponse, null);
                }
            }

            public void onError(Throwable throwable) {
                EntityClassSyncClient.this.logger.error("stream observer on error, message-[{}].", (Object)throwable.getMessage());
                countDownLatch.countDown();
            }

            public void onCompleted() {
                EntityClassSyncClient.this.logger.info("request stream observer completed.");
                countDownLatch.countDown();
            }
        });
    }
}

