package com.xforceplus.ultraman.oqsengine.meta.handler;

import com.xforceplus.ultraman.oqsengine.meta.common.config.GRpcParams;
import com.xforceplus.ultraman.oqsengine.meta.common.constant.RequestStatus;
import com.xforceplus.ultraman.oqsengine.meta.common.dto.WatchElement;
import com.xforceplus.ultraman.oqsengine.meta.common.exception.MetaSyncClientException;
import com.xforceplus.ultraman.oqsengine.meta.common.monitor.MetricsRecorder;
import com.xforceplus.ultraman.oqsengine.meta.common.monitor.dto.SyncCode;
import com.xforceplus.ultraman.oqsengine.meta.common.proto.sync.EntityClassSyncRequest;
import com.xforceplus.ultraman.oqsengine.meta.common.proto.sync.EntityClassSyncResponse;
import com.xforceplus.ultraman.oqsengine.meta.common.proto.sync.EntityClassSyncRspProto;
import com.xforceplus.ultraman.oqsengine.meta.common.utils.MD5Utils;
import com.xforceplus.ultraman.oqsengine.meta.common.utils.ThreadUtils;
import com.xforceplus.ultraman.oqsengine.meta.common.utils.TimeWaitUtils;
import com.xforceplus.ultraman.oqsengine.meta.dto.RequestWatcher;
import com.xforceplus.ultraman.oqsengine.meta.executor.IRequestWatchExecutor;
import com.xforceplus.ultraman.oqsengine.meta.provider.outter.SyncExecutor;
import com.xforceplus.ultraman.oqsengine.meta.utils.SendUtils;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xforceplus/ultraman/oqsengine/meta/handler/SyncRequestHandler.class */
public class SyncRequestHandler implements IRequestHandler {

    @Resource(name = "grpcSyncExecutor")
    private SyncExecutor syncExecutor;

    @Resource
    private IRequestWatchExecutor requestWatchExecutor;

    @Resource
    private GRpcParams grpcParams;

    @Resource(name = "grpcTaskExecutor")
    private ExecutorService executorService;

    @Resource
    private MetricsRecorder metricsRecorder;
    private static final int PRINT_CHECK_DURATION = 10;
    private Logger logger = LoggerFactory.getLogger(SyncRequestHandler.class);
    private Queue<WatchElement> forgotQueue = new ConcurrentLinkedDeque();
    private List<Thread> longRunTasks = new ArrayList(2);
    private volatile boolean isShutdown = false;

    public void start() {
        this.isShutdown = false;
        this.longRunTasks.add(ThreadUtils.create(this::keepAlive));
        this.longRunTasks.add(ThreadUtils.create(this::watchElementCheck));
        this.longRunTasks.forEach((v0) -> {
            v0.start();
        });
        this.logger.info("requestWatchExecutor start.");
    }

    public void stop() {
        this.isShutdown = true;
        this.requestWatchExecutor.stop();
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.handler.IRequestHandler
    public synchronized boolean register(WatchElement watchElement) {
        RequestWatcher watcher = this.requestWatchExecutor.watcher();
        if (null == watcher) {
            this.logger.warn("current gRpc-client is not init, can't offer appIds:{}.", watchElement);
            addToForgotQueue(watchElement);
            return false;
        }
        WatchElement watchElement2 = (WatchElement) watcher.watches().get(watchElement.getAppId());
        if (null == watchElement2) {
            if (!this.requestWatchExecutor.isAlive(watcher.uid()) || send(watcher.clientId(), watcher.uid(), false, true, RequestStatus.REGISTER, watchElement)) {
                return true;
            }
            this.metricsRecorder.error(watchElement.getAppId(), SyncCode.REGISTER_ERROR.name(), String.format("send register failed, env %s", watchElement.getEnv()));
            return false;
        }
        if (!watchElement2.getEnv().equals(watchElement.getEnv())) {
            this.metricsRecorder.error(watchElement2.getAppId(), SyncCode.REGISTER_ERROR.name(), String.format("can't register same appId [%s] with another env [%s], env [%s] already registered.", watchElement2.getAppId(), watchElement.getEnv(), watchElement2.getEnv()));
            return false;
        }
        if (watchElement.getVersion() == -1 || watchElement.getVersion() > watchElement2.getVersion()) {
            if (send(watcher.clientId(), watcher.uid(), true, true, RequestStatus.REGISTER, watchElement2)) {
                return true;
            }
            this.metricsRecorder.error(watchElement2.getAppId(), SyncCode.REGISTER_ERROR.name(), String.format("send register failed, env %s", watchElement2.getEnv()));
            return false;
        }
        if (!this.logger.isDebugEnabled()) {
            return true;
        }
        this.logger.debug("current watchList has this watchElement, appId [{}], ignore register...", watchElement.getAppId());
        return true;
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.handler.IRequestHandler
    public boolean reRegister() {
        RequestWatcher watcher = this.requestWatchExecutor.watcher();
        if (null == watcher || watcher.watches().size() <= 0) {
            return true;
        }
        for (Map.Entry entry : watcher.watches().entrySet()) {
            if (!send(watcher.clientId(), watcher.uid(), false, false, RequestStatus.REGISTER, (WatchElement) entry.getValue())) {
                String format = String.format("reRegister failed, env : %s", ((WatchElement) entry.getValue()).getEnv());
                this.metricsRecorder.error(((WatchElement) entry.getValue()).getAppId(), SyncCode.REGISTER_ERROR.name(), format);
                watcher.observer().onError(new Throwable(format));
                return false;
            }
            ((WatchElement) entry.getValue()).setRegisterTime(System.currentTimeMillis());
            this.logger.info("reRegister success uid [{}], appId [{}], env [{}], version [{}].", new Object[]{watcher.uid(), entry.getKey(), ((WatchElement) entry.getValue()).getEnv(), Integer.valueOf(((WatchElement) entry.getValue()).getVersion())});
        }
        return true;
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.handler.IRequestHandler
    public void initWatcher(String str, String str2, StreamObserver<EntityClassSyncRequest> streamObserver) {
        this.requestWatchExecutor.create(str, str2, streamObserver);
    }

    public void invoke(EntityClassSyncResponse entityClassSyncResponse, Void r11) {
        this.requestWatchExecutor.resetHeartBeat(entityClassSyncResponse.getUid());
        if (entityClassSyncResponse.getStatus() == RequestStatus.REGISTER_OK.ordinal()) {
            if (this.requestWatchExecutor.update(new WatchElement(entityClassSyncResponse.getAppId(), entityClassSyncResponse.getEnv(), entityClassSyncResponse.getVersion(), WatchElement.ElementStatus.Confirmed))) {
                this.metricsRecorder.info(entityClassSyncResponse.getAppId(), SyncCode.REGISTER_OK.name(), String.format("register success, uid : %s, env : %s, version : %s success.", entityClassSyncResponse.getUid(), entityClassSyncResponse.getEnv(), Integer.valueOf(entityClassSyncResponse.getVersion())));
            }
        } else if (entityClassSyncResponse.getStatus() == RequestStatus.SYNC.ordinal()) {
            this.executorService.submit(() -> {
                try {
                    accept(entityClassSyncResponse);
                } catch (Exception e) {
                    if (this.requestWatchExecutor.watcher().isActive()) {
                        this.requestWatchExecutor.watcher().observer().onError(e);
                    }
                }
            });
        }
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.handler.IRequestHandler
    public MetricsRecorder metricsRecorder() {
        return this.metricsRecorder;
    }

    public boolean isShutDown() {
        return this.isShutdown;
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.handler.IRequestHandler
    public IRequestWatchExecutor watchExecutor() {
        return this.requestWatchExecutor;
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.handler.IRequestHandler
    public void notReady() {
        this.requestWatchExecutor.inActive();
        if (this.isShutdown) {
            this.logger.warn("stream has broken due to client has been shutdown...");
            return;
        }
        String uid = null != this.requestWatchExecutor.watcher() ? this.requestWatchExecutor.watcher().uid() : "unKnow-stream";
        this.logger.warn("stream [{}] has broken, reCreate new stream after ({})ms...", uid, Long.valueOf(this.grpcParams.getReconnectDuration()));
        TimeWaitUtils.wakeupAfter(this.grpcParams.getReconnectDuration(), TimeUnit.MILLISECONDS);
        this.requestWatchExecutor.release(uid);
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.handler.IRequestHandler
    public void ready() {
        this.requestWatchExecutor.active();
    }

    @Override // com.xforceplus.ultraman.oqsengine.meta.handler.IRequestHandler
    public boolean reset(WatchElement watchElement) {
        WatchElement watchElement2 = (WatchElement) this.requestWatchExecutor.watcher().watches().get(watchElement.getAppId());
        try {
            watchElement.setStatus(WatchElement.ElementStatus.Register);
            this.requestWatchExecutor.add(watchElement, true);
            return send(this.requestWatchExecutor.watcher().clientId(), this.requestWatchExecutor.watcher().uid(), false, true, RequestStatus.RESET, watchElement);
        } catch (Exception e) {
            this.requestWatchExecutor.add(watchElement2, true);
            this.metricsRecorder.error(watchElement.getAppId(), SyncCode.RESET_ENV_ERROR.name(), String.format("reset failed, env : %s, cause : %s", watchElement.getEnv(), e.getMessage()));
            return false;
        }
    }

    private boolean send(String str, String str2, boolean z, boolean z2, RequestStatus requestStatus, WatchElement watchElement) {
        EntityClassSyncRequest build = EntityClassSyncRequest.newBuilder().setUid(str2).setAppId(watchElement.getAppId()).setEnv(watchElement.getEnv()).setVersion(watchElement.getVersion()).setForce(z).setStatus(requestStatus.ordinal()).setClientId(str).build();
        this.requestWatchExecutor.add(watchElement, false);
        try {
            SendUtils.sendRequest(this.requestWatchExecutor.watcher(), build, z2);
            this.logger.info("register success uid [{}], appId [{}], env [{}], version [{}].", new Object[]{str2, watchElement.getAppId(), watchElement.getEnv(), Integer.valueOf(watchElement.getVersion())});
            return true;
        } catch (Exception e) {
            watchElement.setStatus(WatchElement.ElementStatus.Init);
            return false;
        }
    }

    private void accept(EntityClassSyncResponse entityClassSyncResponse) {
        EntityClassSyncRequest.Builder status;
        try {
            status = execute(entityClassSyncResponse);
        } catch (Exception e) {
            status = EntityClassSyncRequest.newBuilder().setStatus(RequestStatus.SYNC_FAIL.ordinal());
            this.metricsRecorder.error(entityClassSyncResponse.getAppId(), SyncCode.SYNC_DATA_ERROR.name(), String.format("parse sync-data failed, env :%s, version : %s, cause : %s", entityClassSyncResponse.getEnv(), Integer.valueOf(entityClassSyncResponse.getVersion()), e.getMessage()));
        }
        if (!entityClassSyncResponse.getForce()) {
            EntityClassSyncRequest build = status.setClientId(this.requestWatchExecutor.watcher().clientId()).setUid(entityClassSyncResponse.getUid()).build();
            try {
                SendUtils.sendRequest(this.requestWatchExecutor.watcher(), build, true);
            } catch (Exception e2) {
                this.metricsRecorder.error(build.getAppId(), SyncCode.SEND_REQUEST_ERROR.name(), String.format("send sync result failed, env :%s, version : %s, cause : %s", entityClassSyncResponse.getEnv(), Integer.valueOf(entityClassSyncResponse.getVersion()), e2.getMessage()));
                throw e2;
            }
        }
        if (status.getStatus() == RequestStatus.SYNC_OK.ordinal()) {
            this.metricsRecorder.info(entityClassSyncResponse.getAppId(), SyncCode.SYNC_DATA_OK.name(), String.format("sync-data ok, uid : %s, env : %s, version : %s", entityClassSyncResponse.getUid(), entityClassSyncResponse.getEnv(), Integer.valueOf(entityClassSyncResponse.getVersion())));
        }
    }

    private EntityClassSyncRequest.Builder execute(EntityClassSyncResponse entityClassSyncResponse) {
        EntityClassSyncRequest.Builder newBuilder = EntityClassSyncRequest.newBuilder();
        if (entityClassSyncResponse.getAppId().isEmpty() || entityClassSyncResponse.getAppId().isEmpty() || -1 == entityClassSyncResponse.getVersion()) {
            throw new MetaSyncClientException("appId/version/env could not be null...", false);
        }
        newBuilder.setAppId(entityClassSyncResponse.getAppId()).setVersion(entityClassSyncResponse.getVersion()).setEnv(entityClassSyncResponse.getEnv());
        EntityClassSyncRspProto entityClassSyncRspProto = entityClassSyncResponse.getEntityClassSyncRspProto();
        if (!md5Check(entityClassSyncResponse.getMd5(), entityClassSyncRspProto)) {
            throw new MetaSyncClientException("checkMD5 failed.", false);
        }
        WatchElement watchElement = new WatchElement(entityClassSyncResponse.getAppId(), entityClassSyncResponse.getEnv(), entityClassSyncResponse.getVersion(), WatchElement.ElementStatus.Confirmed);
        if (entityClassSyncResponse.getForce() || this.requestWatchExecutor.watcher().onWatch(watchElement)) {
            try {
                this.syncExecutor.sync(entityClassSyncResponse.getAppId(), entityClassSyncResponse.getEnv(), entityClassSyncResponse.getVersion(), entityClassSyncRspProto);
                this.requestWatchExecutor.update(watchElement);
            } catch (Exception e) {
                throw new MetaSyncClientException(e.getMessage(), false);
            }
        } else {
            this.logger.debug(String.format("sync data error, current oqs not watch this version or sync version is less than service-version, env : %s, version : %s", entityClassSyncResponse.getEnv(), Integer.valueOf(entityClassSyncResponse.getVersion())));
        }
        return newBuilder.setStatus(RequestStatus.SYNC_OK.ordinal());
    }

    private boolean md5Check(String str, EntityClassSyncRspProto entityClassSyncRspProto) {
        if (null == str || str.isEmpty() || null == entityClassSyncRspProto) {
            return false;
        }
        return str.equals(MD5Utils.getMD5(entityClassSyncRspProto.toByteArray()));
    }

    private boolean keepAlive() {
        this.logger.debug("start keepAlive task ok...");
        while (!isShutDown()) {
            RequestWatcher watcher = this.requestWatchExecutor.watcher();
            if (null != watcher) {
                try {
                    if (watcher.isActive() && System.currentTimeMillis() - watcher.heartBeat() > this.grpcParams.getDefaultHeartbeatTimeout()) {
                        watcher.observer().onCompleted();
                        this.logger.warn("last heartbeat time [{}] reaches max timeout [{}]", Long.valueOf(System.currentTimeMillis() - watcher.heartBeat()), Long.valueOf(this.grpcParams.getDefaultHeartbeatTimeout()));
                    }
                    SendUtils.sendRequest(watcher, EntityClassSyncRequest.newBuilder().setClientId(watcher.clientId()).setUid(watcher.uid()).setStatus(RequestStatus.HEARTBEAT.ordinal()).build(), true);
                } catch (Exception e) {
                    this.logger.warn("send keepAlive failed, message [{}], but exception will ignore due to retry...", e.getMessage());
                }
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("keepAlive ok, print next check after ({})ms...", Long.valueOf(this.grpcParams.getKeepAliveSendDuration()));
                }
            }
            TimeWaitUtils.wakeupAfter(this.grpcParams.getKeepAliveSendDuration(), TimeUnit.MILLISECONDS);
        }
        if (!this.logger.isDebugEnabled()) {
            return true;
        }
        this.logger.debug("keepAlive task has quited due to sync-client shutdown...");
        return true;
    }

    private boolean watchElementCheck() {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("start appCheck task ok...");
        }
        long j = 0;
        while (!isShutDown()) {
            RequestWatcher watcher = this.requestWatchExecutor.watcher();
            if (null != watcher && watcher.isActive()) {
                for (int size = this.forgotQueue.size(); size > 0; size--) {
                    try {
                        WatchElement remove = this.forgotQueue.remove();
                        if (!watcher.watches().containsKey(remove.getAppId())) {
                            watcher.watches().put(remove.getAppId(), remove);
                        }
                    } catch (Exception e) {
                    }
                }
                watcher.watches().values().stream().filter(watchElement -> {
                    return watchElement.getStatus().ordinal() == WatchElement.ElementStatus.Init.ordinal() || (watchElement.getStatus().ordinal() < WatchElement.ElementStatus.Confirmed.ordinal() && System.currentTimeMillis() - watchElement.getRegisterTime() > this.grpcParams.getDefaultDelayTaskDuration());
                }).forEach(watchElement2 -> {
                    try {
                        SendUtils.sendRequest(watcher, EntityClassSyncRequest.newBuilder().setUid(watcher.uid()).setClientId(watcher.clientId()).setAppId(watchElement2.getAppId()).setEnv(watchElement2.getEnv()).setVersion(watchElement2.getVersion()).setStatus(RequestStatus.REGISTER.ordinal()).build(), true);
                        watchElement2.setRegisterTime(System.currentTimeMillis());
                    } catch (Exception e2) {
                        watchElement2.setStatus(WatchElement.ElementStatus.Init);
                    }
                });
                if (j % 10 == 0 && this.logger.isDebugEnabled()) {
                    this.logger.debug("app check ok, print next check after ({})ms...", Long.valueOf(this.grpcParams.getMonitorSleepDuration() * 10));
                }
                j++;
            }
            TimeWaitUtils.wakeupAfter(this.grpcParams.getMonitorSleepDuration(), TimeUnit.MILLISECONDS);
        }
        if (!this.logger.isDebugEnabled()) {
            return true;
        }
        this.logger.debug("appCheck task has quited due to sync-client shutdown...");
        return true;
    }

    public Queue<WatchElement> getForgotQueue() {
        return this.forgotQueue;
    }

    private void addToForgotQueue(WatchElement watchElement) {
        Stream<WatchElement> stream = this.forgotQueue.stream();
        Objects.requireNonNull(watchElement);
        if (stream.noneMatch(watchElement::logicEquals)) {
            this.forgotQueue.add(watchElement);
        }
    }
}
