package com.xforceplus.ultraman.oqsengine.meta;

import com.google.common.util.concurrent.Uninterruptibles;
import com.xforceplus.ultraman.oqsengine.meta.common.config.GRpcParamsConfig;
import com.xforceplus.ultraman.oqsengine.meta.common.constant.RequestStatus;
import com.xforceplus.ultraman.oqsengine.meta.common.dto.WatchElement;
import com.xforceplus.ultraman.oqsengine.meta.common.exception.MetaSyncClientException;
import com.xforceplus.ultraman.oqsengine.meta.common.proto.EntityClassSyncRequest;
import com.xforceplus.ultraman.oqsengine.meta.common.proto.EntityClassSyncResponse;
import com.xforceplus.ultraman.oqsengine.meta.common.utils.TimeWaitUtils;
import com.xforceplus.ultraman.oqsengine.meta.connect.GRpcClient;
import com.xforceplus.ultraman.oqsengine.meta.dto.RequestWatcher;
import com.xforceplus.ultraman.oqsengine.meta.executor.EntityClassExecutor;
import com.xforceplus.ultraman.oqsengine.meta.executor.RequestWatchExecutor;
import com.xforceplus.ultraman.oqsengine.meta.utils.SendUtils;
import io.grpc.stub.StreamObserver;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xforceplus/ultraman/oqsengine/meta/OqsEntityClassSyncClient.class */
public class OqsEntityClassSyncClient implements EntityClassSyncClient {

    @Resource(name = "gRpcClient")
    private GRpcClient client;

    @Resource(name = "entityClassExecutor")
    private EntityClassExecutor entityClassExecutor;

    @Resource
    private RequestWatchExecutor requestWatchExecutor;

    @Resource
    private GRpcParamsConfig gRpcParamsConfig;
    private Logger logger = LoggerFactory.getLogger(OqsEntityClassSyncClient.class);
    private volatile boolean isReRegister = false;

    @PostConstruct
    public void start() {
        this.client.create();
        if (!this.client.opened()) {
            throw new MetaSyncClientException("client stub create failed.", true);
        }
        observerStream();
    }

    public void destroy() throws InterruptedException {
        this.requestWatchExecutor.stop();
        if (this.client.opened()) {
            this.client.destroy();
        }
    }

    private void observerStream() {
        new Thread(() -> {
            while (true) {
                CountDownLatch countDownLatch = new CountDownLatch(1);
                String uuid = UUID.randomUUID().toString();
                try {
                    this.requestWatchExecutor.create(uuid, newObserver(uuid, countDownLatch));
                    if (reRegister(this.requestWatchExecutor.watcher())) {
                        this.requestWatchExecutor.watcher().canServer();
                        Uninterruptibles.awaitUninterruptibly(countDownLatch);
                    }
                    this.requestWatchExecutor.watcher().canNotServer();
                    this.logger.warn("stream has broken, will re-create new one after ({})ms...", Long.valueOf(this.gRpcParamsConfig.getReconnectDuration()));
                    TimeWaitUtils.wakeupAfter(this.gRpcParamsConfig.getReconnectDuration(), TimeUnit.MILLISECONDS);
                    this.requestWatchExecutor.release();
                } catch (Exception e) {
                    this.logger.warn("observer init error, message : {}, retry after ({})ms", Long.valueOf(this.gRpcParamsConfig.getReconnectDuration()), e.getMessage());
                    TimeWaitUtils.wakeupAfter(this.gRpcParamsConfig.getReconnectDuration(), TimeUnit.MILLISECONDS);
                }
            }
        }).start();
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.EntityClassSyncClient
    public boolean register(String str, int i) {
        return register(Collections.singletonList(new AbstractMap.SimpleEntry(str, Integer.valueOf(i))));
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.EntityClassSyncClient
    public synchronized boolean register(List<AbstractMap.SimpleEntry<String, Integer>> list) {
        RequestWatcher watcher = this.requestWatchExecutor.watcher();
        if (null == watcher) {
            this.logger.warn("current gRpc-client is not init, can't offer appIds:{}.", list.stream().map((v0) -> {
                return v0.getKey();
            }).collect(Collectors.toList()));
            return false;
        }
        list.stream().filter(simpleEntry -> {
            if (watcher.watches().containsKey(simpleEntry.getKey())) {
                this.logger.info("appId : {} is already in watchList, will ignore...", simpleEntry.getKey());
                return false;
            }
            this.logger.info("add appId : {} in watchList", simpleEntry.getKey());
            return true;
        }).forEach(simpleEntry2 -> {
            if (this.isReRegister || watcher.isReleased()) {
                watcher.addWatch(new WatchElement((String) simpleEntry2.getKey(), ((Integer) simpleEntry2.getValue()).intValue(), WatchElement.AppStatus.Init));
                return;
            }
            EntityClassSyncRequest build = EntityClassSyncRequest.newBuilder().setUid(watcher.uid()).setAppId((String) simpleEntry2.getKey()).setVersion(((Integer) simpleEntry2.getValue()).intValue()).setStatus(RequestStatus.REGISTER.ordinal()).build();
            WatchElement.AppStatus appStatus = WatchElement.AppStatus.Register;
            try {
                SendUtils.sendRequest(this.requestWatchExecutor.watcher(), build, this.requestWatchExecutor.canAccessFunction(), build.getUid());
            } catch (Exception e) {
                appStatus = WatchElement.AppStatus.Init;
            }
            watcher.addWatch(new WatchElement((String) simpleEntry2.getKey(), ((Integer) simpleEntry2.getValue()).intValue(), appStatus));
        });
        this.logger.info("current watchList status : {}", watcher.watches().toString());
        return true;
    }

    private StreamObserver<EntityClassSyncRequest> newObserver(final String str, final CountDownLatch countDownLatch) {
        return this.client.channelStub().register(new StreamObserver<EntityClassSyncResponse>() { // from class: com.xforceplus.ultraman.oqsengine.meta.OqsEntityClassSyncClient.1
            public void onNext(EntityClassSyncResponse entityClassSyncResponse) {
                if (OqsEntityClassSyncClient.this.requestWatchExecutor.canAccess(entityClassSyncResponse.getUid())) {
                    OqsEntityClassSyncClient.this.requestWatchExecutor.resetHeartBeat();
                    if (entityClassSyncResponse.getStatus() == RequestStatus.CONFIRM_REGISTER.ordinal()) {
                        OqsEntityClassSyncClient.this.requestWatchExecutor.update(new WatchElement(entityClassSyncResponse.getAppId(), entityClassSyncResponse.getVersion(), WatchElement.AppStatus.Confirmed));
                        return;
                    }
                    try {
                        SendUtils.sendRequest(OqsEntityClassSyncClient.this.requestWatchExecutor.watcher(), OqsEntityClassSyncClient.this.entityClassExecutor.execute(entityClassSyncResponse).setUid(str).build(), OqsEntityClassSyncClient.this.requestWatchExecutor.canAccessFunction(), entityClassSyncResponse.getUid());
                    } catch (Exception e) {
                        OqsEntityClassSyncClient.this.logger.error("stream observer ack error, message-[{}].", e.getMessage());
                        onError(e);
                    }
                }
            }

            public void onError(Throwable th) {
                OqsEntityClassSyncClient.this.logger.error("stream observer on error, message-[{}].", th.getMessage());
                countDownLatch.countDown();
            }

            public void onCompleted() {
                countDownLatch.countDown();
            }
        });
    }

    private boolean reRegister(RequestWatcher requestWatcher) {
        try {
            this.isReRegister = true;
            if (requestWatcher.watches().size() > 0) {
                try {
                    requestWatcher.watches().forEach((str, watchElement) -> {
                        EntityClassSyncRequest build = EntityClassSyncRequest.newBuilder().setAppId(str).setVersion(this.entityClassExecutor.version(str)).setUid(requestWatcher.uid()).setStatus(RequestStatus.REGISTER.ordinal()).build();
                        try {
                            SendUtils.sendRequest(requestWatcher, build);
                        } catch (Exception e) {
                            throw new MetaSyncClientException(String.format("reRegister watchers-[%s] failed.", build.toString()), true);
                        }
                    });
                } catch (Exception e) {
                    this.logger.warn(e.getMessage());
                    requestWatcher.observer().onError(e);
                    this.isReRegister = false;
                    return false;
                }
            }
            return true;
        } finally {
            this.isReRegister = false;
        }
    }
}
