package com.xforceplus.ultraman.oqsengine.meta.executor;

import com.xforceplus.ultraman.oqsengine.meta.common.dto.WatchElement;
import com.xforceplus.ultraman.oqsengine.meta.common.proto.sync.EntityClassSyncResponse;
import com.xforceplus.ultraman.oqsengine.meta.dto.ResponseWatcher;
import com.xforceplus.ultraman.oqsengine.meta.dto.ServerConnectorInfo;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/xplat-meta-oqsengine-meta-server-2.0.0-SNAPSHOT.jar:com/xforceplus/ultraman/oqsengine/meta/executor/ResponseWatchExecutor.class */
public class ResponseWatchExecutor implements IResponseWatchExecutor {
    private final Logger logger = LoggerFactory.getLogger((Class<?>) ResponseWatchExecutor.class);
    private static final Map<String, Integer> appVersions = new HashMap();
    private static final Map<String, Set<String>> appWatchers = new HashMap();
    private static final Map<String, ResponseWatcher> uidWatchers = new ConcurrentHashMap();

    /* loaded from: input_file:BOOT-INF/lib/xplat-meta-oqsengine-meta-server-2.0.0-SNAPSHOT.jar:com/xforceplus/ultraman/oqsengine/meta/executor/ResponseWatchExecutor$Operation.class */
    public enum Operation {
        NEW,
        RELEASE
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.common.executor.IBasicSyncExecutor
    public void start() {
        this.logger.debug("responseWatchExecutor start.");
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.common.executor.IBasicSyncExecutor
    public void stop() {
        uidWatchers.forEach((str, responseWatcher) -> {
            if (responseWatcher.isActive()) {
                responseWatcher.release();
            }
        });
        uidWatchers.clear();
        appWatchers.clear();
        appVersions.clear();
        this.logger.debug("responseWatchExecutor stop.");
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.executor.IResponseWatchExecutor
    public void keepAliveCheck(long j) {
        long currentTimeMillis = System.currentTimeMillis();
        uidWatchers.forEach((str, responseWatcher) -> {
            if (currentTimeMillis - responseWatcher.heartBeat() >= j) {
                release(str);
                this.logger.warn("release broken stream, [client-uid : {}-{}]", responseWatcher.clientId(), str);
            }
        });
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.executor.IResponseWatchExecutor
    public Integer version(String str, String str2) {
        return appVersions.get(keyAppWithEnv(str, str2));
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.executor.IResponseWatchExecutor
    public boolean addVersion(String str, String str2, int i) {
        return addVersionWithLock(keyAppWithEnv(str, str2), i);
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.executor.IResponseWatchExecutor
    public void add(String str, String str2, StreamObserver<EntityClassSyncResponse> streamObserver, WatchElement watchElement, boolean z) {
        uidWatchers.computeIfAbsent(str2, str3 -> {
            return new ResponseWatcher(str, str2, streamObserver);
        }).addWatch(watchElement, z);
        operationWithLock(keyAppWithEnv(watchElement.getAppId(), watchElement.getEnv()), str2, Operation.NEW);
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.common.executor.IWatchExecutor
    public void resetHeartBeat(String str) {
        ResponseWatcher responseWatcher = uidWatchers.get(str);
        if (null != responseWatcher) {
            responseWatcher.resetHeartBeat();
        }
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.executor.IResponseWatchExecutor
    public synchronized boolean update(String str, WatchElement watchElement) {
        ResponseWatcher responseWatcher = uidWatchers.get(str);
        if (null == responseWatcher || !canUpdate(responseWatcher, watchElement)) {
            return false;
        }
        responseWatcher.watches().put(watchElement.getAppId(), watchElement);
        return true;
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.common.executor.IWatchExecutor
    public void release(String str) {
        ResponseWatcher remove = uidWatchers.remove(str);
        if (null == remove || !remove.isActive()) {
            return;
        }
        remove.inActive();
        remove.release(() -> {
            remove.watches().forEach((str2, watchElement) -> {
                operationWithLock(keyAppWithEnv(str2, watchElement.getEnv()), str, Operation.RELEASE);
            });
            return true;
        });
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.executor.IResponseWatchExecutor
    public List<ResponseWatcher> need(WatchElement watchElement) {
        Set<String> set = appWatchers.get(keyAppWithEnv(watchElement.getAppId(), watchElement.getEnv()));
        ArrayList arrayList = new ArrayList();
        if (null != set) {
            set.forEach(str -> {
                ResponseWatcher responseWatcher = uidWatchers.get(str);
                if (null == responseWatcher || !responseWatcher.onWatch(watchElement)) {
                    return;
                }
                arrayList.add(responseWatcher);
            });
        }
        return arrayList;
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.executor.IResponseWatchExecutor
    public ResponseWatcher watcher(String str) {
        return uidWatchers.get(str);
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.executor.IResponseWatchExecutor
    public Optional<ServerConnectorInfo> connectorInfo() {
        ArrayList arrayList = new ArrayList();
        uidWatchers.forEach((str, responseWatcher) -> {
            ArrayList arrayList2 = new ArrayList();
            responseWatcher.watches().values().forEach(watchElement -> {
                arrayList2.add(new WatchElement(watchElement.getAppId(), watchElement.getEnv(), watchElement.getVersion(), watchElement.getStatus()));
            });
            arrayList.add(new ServerConnectorInfo.ClientWatches(responseWatcher.clientId(), arrayList2, responseWatcher.heartBeat()));
        });
        return Optional.of(new ServerConnectorInfo(arrayList));
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.executor.IResponseWatchExecutor
    public Set<String> appWatchers(String str, String str2) {
        return appWatchers.get(keyAppWithEnv(str, str2));
    }

    private boolean canUpdate(ResponseWatcher responseWatcher, WatchElement watchElement) {
        WatchElement watchElement2 = responseWatcher.watches().get(watchElement.getAppId());
        return null != watchElement2 && (watchElement2.getVersion() < watchElement.getVersion() || watchElement2.getStatus().ordinal() < watchElement.getStatus().ordinal());
    }

    private synchronized boolean addVersionWithLock(String str, int i) {
        Integer num = appVersions.get(str);
        if (null != num && num.intValue() >= i) {
            return false;
        }
        appVersions.put(str, Integer.valueOf(i));
        return true;
    }

    private synchronized void operationWithLock(String str, String str2, Operation operation) {
        this.logger.debug("appWatcher [{}], key [{}], value [{}]", operation, str, str2);
        switch (operation) {
            case NEW:
                appWatchers.computeIfAbsent(str, str3 -> {
                    return new HashSet();
                }).add(str2);
                return;
            case RELEASE:
                Set<String> set = appWatchers.get(str);
                if (null != set) {
                    if (!set.isEmpty()) {
                        set.remove(str2);
                    }
                    if (set.isEmpty()) {
                        appWatchers.remove(str);
                        return;
                    }
                    return;
                }
                return;
            default:
                return;
        }
    }

    public static String keyAppWithEnv(String str, String str2) {
        return str + "_" + str2;
    }
}
