/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.csp.sentinel.dashboard.metric;

import com.alibaba.csp.sentinel.concurrent.NamedThreadFactory;
import com.alibaba.csp.sentinel.config.SentinelConfig;
import com.alibaba.csp.sentinel.dashboard.datasource.entity.MetricEntity;
import com.alibaba.csp.sentinel.dashboard.discovery.AppInfo;
import com.alibaba.csp.sentinel.dashboard.discovery.AppManagement;
import com.alibaba.csp.sentinel.dashboard.discovery.MachineInfo;
import com.alibaba.csp.sentinel.dashboard.repository.metric.MetricsRepository;
import com.alibaba.csp.sentinel.node.metric.MetricNode;
import com.alibaba.csp.sentinel.util.StringUtil;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.RedirectStrategy;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MetricFetcher {
    public static final String NO_METRICS = "No metrics";
    private static final int HTTP_OK = 200;
    private static final long MAX_LAST_FETCH_INTERVAL_MS = 15000L;
    private static final long FETCH_INTERVAL_SECOND = 6L;
    private static final Charset DEFAULT_CHARSET = Charset.forName(SentinelConfig.charset());
    private static final String METRIC_URL_PATH = "metric";
    private static Logger logger = LoggerFactory.getLogger(MetricFetcher.class);
    private final long intervalSecond = 1L;
    private Map<String, AtomicLong> appLastFetchTime = new ConcurrentHashMap<String, AtomicLong>();
    @Autowired
    private MetricsRepository<MetricEntity> metricStore;
    @Autowired
    private AppManagement appManagement;
    private CloseableHttpAsyncClient httpclient;
    private ScheduledExecutorService fetchScheduleService = Executors.newScheduledThreadPool(1, (ThreadFactory)new NamedThreadFactory("sentinel-dashboard-metrics-fetch-task"));
    private ExecutorService fetchService;
    private ExecutorService fetchWorker;
    private static final Set<String> RES_EXCLUSION_SET = new HashSet<String>(){
        {
            this.add("__total_inbound_traffic__");
            this.add("__system_load__");
            this.add("__cpu_usage__");
        }
    };

    public MetricFetcher() {
        int cores = Runtime.getRuntime().availableProcessors() * 2;
        long keepAliveTime = 0L;
        int queueSize = 2048;
        ThreadPoolExecutor.DiscardPolicy handler = new ThreadPoolExecutor.DiscardPolicy();
        this.fetchService = new ThreadPoolExecutor(cores, cores, keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(queueSize), (ThreadFactory)new NamedThreadFactory("sentinel-dashboard-metrics-fetchService"), handler);
        this.fetchWorker = new ThreadPoolExecutor(cores, cores, keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(queueSize), (ThreadFactory)new NamedThreadFactory("sentinel-dashboard-metrics-fetchWorker"), handler);
        IOReactorConfig ioConfig = IOReactorConfig.custom().setConnectTimeout(3000).setSoTimeout(3000).setIoThreadCount(Runtime.getRuntime().availableProcessors() * 2).build();
        this.httpclient = HttpAsyncClients.custom().setRedirectStrategy((RedirectStrategy)new DefaultRedirectStrategy(){

            protected boolean isRedirectable(String method) {
                return false;
            }
        }).setMaxConnTotal(4000).setMaxConnPerRoute(1000).setDefaultIOReactorConfig(ioConfig).build();
        this.httpclient.start();
        this.start();
    }

    private void start() {
        this.fetchScheduleService.scheduleAtFixedRate(() -> {
            try {
                this.fetchAllApp();
            }
            catch (Exception e) {
                logger.info("fetchAllApp error:", (Throwable)e);
            }
        }, 10L, 1L, TimeUnit.SECONDS);
    }

    private void writeMetric(Map<String, MetricEntity> map) {
        if (map.isEmpty()) {
            return;
        }
        Date date = new Date();
        for (MetricEntity entity : map.values()) {
            entity.setGmtCreate(date);
            entity.setGmtModified(date);
        }
        this.metricStore.saveAll(map.values());
    }

    private void fetchAllApp() {
        List<String> apps = this.appManagement.getAppNames();
        if (apps == null) {
            return;
        }
        for (String app : apps) {
            this.fetchService.submit(() -> {
                try {
                    this.doFetchAppMetric(app);
                }
                catch (Exception e) {
                    logger.error("fetchAppMetric error", (Throwable)e);
                }
            });
        }
    }

    private void fetchOnce(String app, long startTime, long endTime, int maxWaitSeconds) {
        if (maxWaitSeconds <= 0) {
            throw new IllegalArgumentException("maxWaitSeconds must > 0, but " + maxWaitSeconds);
        }
        AppInfo appInfo = this.appManagement.getDetailApp(app);
        if (appInfo.isDead()) {
            logger.info("Dead app removed: {}", (Object)app);
            this.appManagement.removeApp(app);
            return;
        }
        Set<MachineInfo> machines = appInfo.getMachines();
        logger.debug("enter fetchOnce(" + app + "), machines.size()=" + machines.size() + ", time intervalMs [" + startTime + ", " + endTime + "]");
        if (machines.isEmpty()) {
            return;
        }
        String msg = "fetch";
        AtomicLong unhealthy = new AtomicLong();
        final AtomicLong success = new AtomicLong();
        final AtomicLong fail = new AtomicLong();
        long start = System.currentTimeMillis();
        final ConcurrentHashMap<String, MetricEntity> metricMap = new ConcurrentHashMap<String, MetricEntity>(16);
        final CountDownLatch latch = new CountDownLatch(machines.size());
        for (final MachineInfo machine : machines) {
            if (machine.isDead()) {
                latch.countDown();
                this.appManagement.getDetailApp(app).removeMachine(machine.getIp(), machine.getPort());
                logger.info("Dead machine removed: {}:{} of {}", new Object[]{machine.getIp(), machine.getPort(), app});
                continue;
            }
            if (!machine.isHealthy()) {
                latch.countDown();
                unhealthy.incrementAndGet();
                continue;
            }
            final String url = "http://" + machine.getIp() + ":" + machine.getPort() + "/" + METRIC_URL_PATH + "?startTime=" + startTime + "&endTime=" + endTime + "&refetch=" + false;
            final HttpGet httpGet = new HttpGet(url);
            httpGet.setHeader("Connection", "Close");
            this.httpclient.execute((HttpUriRequest)httpGet, (FutureCallback)new FutureCallback<HttpResponse>(){

                public void completed(HttpResponse response) {
                    try {
                        MetricFetcher.this.handleResponse(response, machine, metricMap);
                        success.incrementAndGet();
                    }
                    catch (Exception e) {
                        logger.error("fetch metric " + url + " error:", (Throwable)e);
                    }
                    finally {
                        latch.countDown();
                    }
                }

                public void failed(Exception ex) {
                    latch.countDown();
                    fail.incrementAndGet();
                    httpGet.abort();
                    if (ex instanceof SocketTimeoutException) {
                        logger.error("Failed to fetch metric from <{}>: socket timeout", (Object)url);
                    } else if (ex instanceof ConnectException) {
                        logger.error("Failed to fetch metric from <{}> (ConnectionException: {})", (Object)url, (Object)ex.getMessage());
                    } else {
                        logger.error("fetch metric " + url + " error", (Throwable)ex);
                    }
                }

                public void cancelled() {
                    latch.countDown();
                    fail.incrementAndGet();
                    httpGet.abort();
                }
            });
        }
        try {
            latch.await(maxWaitSeconds, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            logger.info("fetch metric, wait http client error:", (Throwable)e);
        }
        long cost = System.currentTimeMillis() - start;
        this.writeMetric(metricMap);
    }

    private void doFetchAppMetric(String app) {
        long endTime;
        long now = System.currentTimeMillis();
        long lastFetchMs = now - 15000L;
        if (this.appLastFetchTime.containsKey(app)) {
            lastFetchMs = Math.max(lastFetchMs, this.appLastFetchTime.get(app).get() + 1000L);
        }
        if ((endTime = (lastFetchMs = lastFetchMs / 1000L * 1000L) + 6000L) > now - 2000L) {
            return;
        }
        this.appLastFetchTime.computeIfAbsent(app, a -> new AtomicLong()).set(endTime);
        long finalLastFetchMs = lastFetchMs;
        long finalEndTime = endTime;
        try {
            this.fetchWorker.submit(() -> {
                try {
                    this.fetchOnce(app, finalLastFetchMs, finalEndTime, 5);
                }
                catch (Exception e) {
                    logger.info("fetchOnce(" + app + ") error", (Throwable)e);
                }
            });
        }
        catch (Exception e) {
            logger.info("submit fetchOnce(" + app + ") fail, intervalMs [" + lastFetchMs + ", " + endTime + "]", (Throwable)e);
        }
    }

    private void handleResponse(HttpResponse response, MachineInfo machine, Map<String, MetricEntity> metricMap) throws Exception {
        int code = response.getStatusLine().getStatusCode();
        if (code != 200) {
            return;
        }
        Charset charset = null;
        try {
            String contentTypeStr = response.getFirstHeader("Content-type").getValue();
            if (StringUtil.isNotEmpty((String)contentTypeStr)) {
                ContentType contentType = ContentType.parse((String)contentTypeStr);
                charset = contentType.getCharset();
            }
        }
        catch (Exception contentTypeStr) {
            // empty catch block
        }
        String body = EntityUtils.toString((HttpEntity)response.getEntity(), (Charset)(charset != null ? charset : DEFAULT_CHARSET));
        if (StringUtil.isEmpty((String)body) || body.startsWith(NO_METRICS)) {
            return;
        }
        String[] lines = body.split("\n");
        this.handleBody(lines, machine, metricMap);
    }

    private void handleBody(String[] lines, MachineInfo machine, Map<String, MetricEntity> map) {
        if (lines.length < 1) {
            return;
        }
        for (String line : lines) {
            try {
                MetricNode node = MetricNode.fromThinString((String)line);
                if (this.shouldFilterOut(node.getResource())) continue;
                String key = this.buildMetricKey(machine.getApp(), node.getResource(), node.getTimestamp());
                MetricEntity entity = map.get(key);
                if (entity != null) {
                    entity.addPassQps(node.getPassQps());
                    entity.addBlockQps(node.getBlockQps());
                    entity.addRtAndSuccessQps(node.getRt(), node.getSuccessQps());
                    entity.addExceptionQps(node.getExceptionQps());
                    entity.addCount(1);
                    continue;
                }
                entity = new MetricEntity();
                entity.setApp(machine.getApp());
                entity.setTimestamp(new Date(node.getTimestamp()));
                entity.setPassQps(node.getPassQps());
                entity.setBlockQps(node.getBlockQps());
                entity.setRtAndSuccessQps(node.getRt(), node.getSuccessQps());
                entity.setExceptionQps(node.getExceptionQps());
                entity.setCount(1);
                entity.setResource(node.getResource());
                map.put(key, entity);
            }
            catch (Exception e) {
                logger.warn("handleBody line exception, machine: {}, line: {}", (Object)machine.toLogString(), (Object)line);
            }
        }
    }

    private String buildMetricKey(String app, String resource, long timestamp) {
        return app + "__" + resource + "__" + timestamp / 1000L;
    }

    private boolean shouldFilterOut(String resource) {
        return RES_EXCLUSION_SET.contains(resource);
    }
}

