/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.extensions.alarm.impl;

import akka.japi.function.Function;
import akka.stream.ActorAttributes;
import akka.stream.ActorMaterializer;
import akka.stream.Graph;
import akka.stream.Materializer;
import akka.stream.OverflowStrategy;
import akka.stream.Supervision;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.SourceQueueWithComplete;
import com.xforceplus.apollo.logger.ApolloDdingFactory;
import com.xforceplus.ultraman.extensions.alarm.AlarmService;
import java.io.Serializable;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AlarmServiceImpl
implements AlarmService {
    private static final Logger log = LoggerFactory.getLogger(AlarmServiceImpl.class);
    private Map<String, SourceQueueWithComplete> mapping = new ConcurrentHashMap<String, SourceQueueWithComplete>();
    private ActorMaterializer materializer;

    public AlarmServiceImpl(ActorMaterializer materializer) {
        this.materializer = materializer;
    }

    @Override
    public <T> SourceQueueWithComplete<T> initQueue(String token, int bufferSize, int groupSize, int groupTimeInSec, java.util.function.Function<T, String> groupKey, BiFunction<String, List<T>, String> messageConverter) {
        SourceQueueWithComplete queue = (SourceQueueWithComplete)Source.queue((int)bufferSize, (OverflowStrategy)OverflowStrategy.backpressure()).groupedWithin(groupSize, Duration.ofSeconds(groupTimeInSec)).map((Function & Serializable)x -> {
            try {
                Map groupBy = x.stream().collect(Collectors.groupingBy(groupKey));
                groupBy.forEach((g, l) -> ApolloDdingFactory.getFactory().sendDdingNotice((String)messageConverter.apply((String)g, (List)l)));
            }
            catch (Throwable throwable) {
                log.error("", throwable);
            }
            return 1;
        }).to((Graph)Sink.ignore()).withAttributes(ActorAttributes.withSupervisionStrategy((Function & Serializable)x -> Supervision.resume())).run((Materializer)this.materializer);
        this.mapping.put(token, queue);
        return queue;
    }
}

