/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.oqsengine.meta.executor;

import com.xforceplus.ultraman.oqsengine.meta.common.dto.WatchElement;
import com.xforceplus.ultraman.oqsengine.meta.common.proto.sync.EntityClassSyncResponse;
import com.xforceplus.ultraman.oqsengine.meta.dto.ResponseWatcher;
import com.xforceplus.ultraman.oqsengine.meta.dto.ServerConnectorInfo;
import com.xforceplus.ultraman.oqsengine.meta.executor.IResponseWatchExecutor;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ResponseWatchExecutor
implements IResponseWatchExecutor {
    private final Logger logger = LoggerFactory.getLogger(ResponseWatchExecutor.class);
    private static final Map<String, Integer> appVersions = new HashMap<String, Integer>();
    private static final Map<String, Set<String>> appWatchers = new HashMap<String, Set<String>>();
    private static final Map<String, ResponseWatcher> uidWatchers = new ConcurrentHashMap<String, ResponseWatcher>();

    public void start() {
        this.logger.debug("responseWatchExecutor start.");
    }

    public void stop() {
        uidWatchers.forEach((k, v) -> {
            if (v.isActive()) {
                v.release();
            }
        });
        uidWatchers.clear();
        appWatchers.clear();
        appVersions.clear();
        this.logger.debug("responseWatchExecutor stop.");
    }

    @Override
    public void keepAliveCheck(long heartbeatTimeout) {
        long current = System.currentTimeMillis();
        uidWatchers.forEach((k, v) -> {
            if (current - v.heartBeat() >= heartbeatTimeout) {
                this.release((String)k);
                this.logger.warn("release broken stream, [client-uid : {}-{}]", (Object)v.clientId(), k);
            }
        });
    }

    @Override
    public Integer version(String appId, String env) {
        return appVersions.get(ResponseWatchExecutor.keyAppWithEnv(appId, env));
    }

    @Override
    public boolean addVersion(String appId, String env, int version) {
        return this.addVersionWithLock(ResponseWatchExecutor.keyAppWithEnv(appId, env), version);
    }

    @Override
    public void add(String clientId, String uid, StreamObserver<EntityClassSyncResponse> observer, WatchElement watchElement, boolean force) {
        uidWatchers.computeIfAbsent(uid, v -> new ResponseWatcher(clientId, uid, observer)).addWatch(watchElement, force);
        this.operationWithLock(ResponseWatchExecutor.keyAppWithEnv(watchElement.getAppId(), watchElement.getEnv()), uid, Operation.NEW);
    }

    public void resetHeartBeat(String uid) {
        ResponseWatcher w = uidWatchers.get(uid);
        if (null != w) {
            w.resetHeartBeat();
        }
    }

    @Override
    public synchronized boolean update(String uid, WatchElement watchElement) {
        ResponseWatcher watcher = uidWatchers.get(uid);
        if (null != watcher && this.canUpdate(watcher, watchElement)) {
            watcher.watches().put(watchElement.getAppId(), watchElement);
            return true;
        }
        return false;
    }

    public void release(String uid) {
        ResponseWatcher watcher = uidWatchers.remove(uid);
        if (null != watcher && watcher.isActive()) {
            watcher.inActive();
            watcher.release(() -> {
                watcher.watches().forEach((k, v) -> this.operationWithLock(ResponseWatchExecutor.keyAppWithEnv(k, v.getEnv()), uid, Operation.RELEASE));
                return true;
            });
        }
    }

    @Override
    public List<ResponseWatcher> need(WatchElement watchElement) {
        Set<String> res = appWatchers.get(ResponseWatchExecutor.keyAppWithEnv(watchElement.getAppId(), watchElement.getEnv()));
        ArrayList<ResponseWatcher> needList = new ArrayList<ResponseWatcher>();
        if (null != res) {
            res.forEach(r -> {
                ResponseWatcher watcher = uidWatchers.get(r);
                if (null != watcher && watcher.onWatch(watchElement)) {
                    needList.add(watcher);
                }
            });
        }
        return needList;
    }

    @Override
    public ResponseWatcher watcher(String uid) {
        return uidWatchers.get(uid);
    }

    @Override
    public Optional<ServerConnectorInfo> connectorInfo() {
        ArrayList<ServerConnectorInfo.ClientWatches> clientWatches = new ArrayList<ServerConnectorInfo.ClientWatches>();
        uidWatchers.forEach((k, v) -> {
            ArrayList<WatchElement> watchElements = new ArrayList<WatchElement>();
            v.watches().values().forEach(w -> watchElements.add(new WatchElement(w.getAppId(), w.getEnv(), w.getVersion(), w.getStatus())));
            clientWatches.add(new ServerConnectorInfo.ClientWatches(v.clientId(), watchElements, v.heartBeat()));
        });
        return Optional.of(new ServerConnectorInfo(clientWatches));
    }

    @Override
    public Set<String> appWatchers(String appId, String env) {
        return appWatchers.get(ResponseWatchExecutor.keyAppWithEnv(appId, env));
    }

    private boolean canUpdate(ResponseWatcher watcher, WatchElement watchElement) {
        WatchElement current = (WatchElement)watcher.watches().get(watchElement.getAppId());
        return null != current && (current.getVersion() < watchElement.getVersion() || current.getStatus().ordinal() < watchElement.getStatus().ordinal());
    }

    private synchronized boolean addVersionWithLock(String key, int version) {
        Integer v = appVersions.get(key);
        if (null == v || v < version) {
            appVersions.put(key, version);
            return true;
        }
        return false;
    }

    private synchronized void operationWithLock(String key, String value, Operation operation) {
        this.logger.debug("appWatcher [{}], key [{}], value [{}]", new Object[]{operation, key, value});
        switch (operation) {
            case NEW: {
                appWatchers.computeIfAbsent(key, k -> new HashSet()).add(value);
                break;
            }
            case RELEASE: {
                Set<String> v = appWatchers.get(key);
                if (null == v) break;
                if (!v.isEmpty()) {
                    v.remove(value);
                }
                if (!v.isEmpty()) break;
                appWatchers.remove(key);
                break;
            }
        }
    }

    public static String keyAppWithEnv(String appId, String env) {
        return appId + "_" + env;
    }

    public static enum Operation {
        NEW,
        RELEASE;

    }
}

