package com.xforceplus.ultraman.oqsengine.meta.executor;

import com.xforceplus.ultraman.oqsengine.meta.EntityClassSyncClient;
import com.xforceplus.ultraman.oqsengine.meta.common.config.GRpcParamsConfig;
import com.xforceplus.ultraman.oqsengine.meta.common.constant.RequestStatus;
import com.xforceplus.ultraman.oqsengine.meta.common.dto.WatchElement;
import com.xforceplus.ultraman.oqsengine.meta.common.proto.EntityClassSyncRequest;
import com.xforceplus.ultraman.oqsengine.meta.common.utils.ThreadUtils;
import com.xforceplus.ultraman.oqsengine.meta.common.utils.TimeWaitUtils;
import com.xforceplus.ultraman.oqsengine.meta.dto.RequestWatcher;
import com.xforceplus.ultraman.oqsengine.meta.utils.SendUtils;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xforceplus/ultraman/oqsengine/meta/executor/RequestWatchExecutor.class */
public class RequestWatchExecutor implements IRequestWatchExecutor {
    private RequestWatcher requestWatcher;

    @Resource
    private GRpcParamsConfig gRpcParamsConfig;
    final Logger logger = LoggerFactory.getLogger(RequestWatchExecutor.class);
    private Queue<WatchElement> forgotQueue = new ConcurrentLinkedDeque();
    private List<Thread> executors = new ArrayList(3);

    public void resetHeartBeat(String str) {
        this.requestWatcher.resetHeartBeat();
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.executor.IRequestWatchExecutor
    public void create(String str, StreamObserver<EntityClassSyncRequest> streamObserver) {
        if (null != this.requestWatcher) {
            this.requestWatcher.reset(str, streamObserver);
        } else {
            this.requestWatcher = new RequestWatcher(str, streamObserver);
            start();
        }
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.executor.IRequestWatchExecutor
    public void add(WatchElement watchElement) {
        if (null != this.requestWatcher) {
            this.requestWatcher.addWatch(watchElement);
        }
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.executor.IRequestWatchExecutor
    public synchronized boolean update(WatchElement watchElement) {
        if (!this.requestWatcher.onWatch(watchElement)) {
            return false;
        }
        this.requestWatcher.watches().put(watchElement.getAppId(), watchElement);
        return true;
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.executor.IRequestWatchExecutor
    public RequestWatcher watcher() {
        return this.requestWatcher;
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.executor.IRequestWatchExecutor
    public boolean canAccess(String str) {
        if (null == str || null == this.requestWatcher || !this.requestWatcher.isOnServe()) {
            return false;
        }
        try {
            return str.equals(this.requestWatcher.uid());
        } catch (Exception e) {
            return false;
        }
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.executor.IRequestWatchExecutor
    public void addForgot(WatchElement watchElement) {
        this.forgotQueue.add(watchElement);
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.executor.IRequestWatchExecutor
    public Queue<WatchElement> forgot() {
        return this.forgotQueue;
    }

    public void start() {
        this.executors.add(ThreadUtils.create(this::keepAliveTask));
        this.executors.add(ThreadUtils.create(this::streamConnectCheckTask));
        this.executors.add(ThreadUtils.create(this::appCheckTask));
        this.executors.forEach((v0) -> {
            v0.start();
        });
    }

    public void stop() {
        if (null != this.requestWatcher) {
            this.requestWatcher.notServer();
            TimeWaitUtils.wakeupAfter(3L, TimeUnit.SECONDS);
            this.requestWatcher.release();
            this.executors.forEach(thread -> {
                ThreadUtils.shutdown(thread, 3L);
            });
        }
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.executor.IRequestWatchExecutor
    public Function<String, Boolean> accessFunction() {
        return this::canAccess;
    }

    private boolean keepAliveTask() {
        this.logger.info("start keepAlive task ok...");
        while (!EntityClassSyncClient.isShutDown()) {
            if (this.requestWatcher.isOnServe()) {
                try {
                    SendUtils.sendRequest(this.requestWatcher, EntityClassSyncRequest.newBuilder().setUid(this.requestWatcher.uid()).setStatus(RequestStatus.HEARTBEAT.ordinal()).build());
                } catch (Exception e) {
                    this.logger.warn("send keepAlive failed, message [{}], but exception will ignore due to retry...", e.getMessage());
                }
            }
            this.logger.debug("keepAlive ok, next check after duration ({})ms...", Long.valueOf(this.gRpcParamsConfig.getKeepAliveSendDuration()));
            TimeWaitUtils.wakeupAfter(this.gRpcParamsConfig.getKeepAliveSendDuration(), TimeUnit.MILLISECONDS);
        }
        this.logger.info("keepAlive task has quited due to sync-client shutdown...");
        return true;
    }

    private boolean streamConnectCheckTask() {
        this.logger.info("start stream-connect-check task ok...");
        while (!EntityClassSyncClient.isShutDown()) {
            if (this.requestWatcher.isOnServe() && System.currentTimeMillis() - this.requestWatcher.heartBeat() > this.gRpcParamsConfig.getDefaultHeartbeatTimeout()) {
                try {
                    this.requestWatcher.observer().onCompleted();
                    this.logger.warn("last heartbeat time [{}] reaches max timeout [{}]", Long.valueOf(System.currentTimeMillis() - this.requestWatcher.heartBeat()), Long.valueOf(this.gRpcParamsConfig.getDefaultHeartbeatTimeout()));
                } catch (Exception e) {
                }
            }
            this.logger.debug("streamConnect check ok, next check after duration ({})ms...", Long.valueOf(this.gRpcParamsConfig.getMonitorSleepDuration()));
            TimeWaitUtils.wakeupAfter(this.gRpcParamsConfig.getMonitorSleepDuration(), TimeUnit.MILLISECONDS);
        }
        this.logger.info("streamConnect check task has quited due to sync-client shutdown...");
        return true;
    }

    private boolean appCheckTask() {
        this.logger.info("start appCheck task ok...");
        while (!EntityClassSyncClient.isShutDown()) {
            if (this.requestWatcher.isOnServe()) {
                for (int size = this.forgotQueue.size(); size > 0; size--) {
                    try {
                        WatchElement remove = this.forgotQueue.remove();
                        if (!this.requestWatcher.watches().containsKey(remove.getAppId())) {
                            this.requestWatcher.watches().put(remove.getAppId(), remove);
                        }
                    } catch (Exception e) {
                    }
                }
                this.requestWatcher.watches().values().stream().filter(watchElement -> {
                    return watchElement.getStatus().ordinal() == WatchElement.AppStatus.Init.ordinal() || (watchElement.getStatus().ordinal() < WatchElement.AppStatus.Confirmed.ordinal() && System.currentTimeMillis() - watchElement.getRegisterTime() > this.gRpcParamsConfig.getDefaultDelayTaskDuration());
                }).forEach(watchElement2 -> {
                    try {
                        SendUtils.sendRequest(this.requestWatcher, EntityClassSyncRequest.newBuilder().setUid(this.requestWatcher.uid()).setAppId(watchElement2.getAppId()).setVersion(watchElement2.getVersion()).setStatus(RequestStatus.REGISTER.ordinal()).build(), accessFunction(), this.requestWatcher.uid());
                        watchElement2.setRegisterTime(System.currentTimeMillis());
                    } catch (Exception e2) {
                        watchElement2.setStatus(WatchElement.AppStatus.Init);
                    }
                });
                this.logger.debug("app check ok, next check after duration ({})ms...", Long.valueOf(this.gRpcParamsConfig.getMonitorSleepDuration()));
            }
            TimeWaitUtils.wakeupAfter(this.gRpcParamsConfig.getMonitorSleepDuration(), TimeUnit.MILLISECONDS);
        }
        this.logger.info("appCheck task has quited due to sync-client shutdown...");
        return true;
    }
}
