/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.oqsengine.event;

import com.xforceplus.ultraman.oqsengine.event.Event;
import com.xforceplus.ultraman.oqsengine.event.EventBus;
import com.xforceplus.ultraman.oqsengine.event.EventType;
import com.xforceplus.ultraman.oqsengine.event.storage.EventStorage;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

public class DefaultEventBus
implements EventBus {
    private ConcurrentMap<EventType, Queue<Consumer<Event>>> listeners = new ConcurrentHashMap<EventType, Queue<Consumer<Event>>>();
    private EventStorage eventStorage;
    private ExecutorService worker;
    private volatile boolean closed;

    public DefaultEventBus(EventStorage eventStorage, ExecutorService worker) {
        this.eventStorage = eventStorage;
        this.worker = worker;
        if (this.eventStorage == null) {
            throw new IllegalArgumentException("Invalid EventStorage instance.");
        }
        if (worker == null) {
            throw new IllegalArgumentException("Invalid ExecutorService instance.");
        }
    }

    @PostConstruct
    public void init() {
        if (!this.closed) {
            this.closed = false;
            this.worker.submit(new Distributor());
        }
    }

    @PreDestroy
    public void destroy() {
        if (!this.closed) {
            this.closed = true;
            this.listeners.clear();
        }
    }

    @Override
    public void watch(EventType type, Consumer<Event> listener) {
        Queue existTypeListeners = this.listeners.computeIfAbsent(type, t -> new ConcurrentLinkedQueue());
        existTypeListeners.offer(listener);
    }

    @Override
    public void notify(Event event) {
        EventType type = event.type();
        Queue eventlisteners = (Queue)this.listeners.get((Object)type);
        if (eventlisteners == null || eventlisteners.isEmpty()) {
            return;
        }
        if (!this.eventStorage.push(event)) {
            throw new IllegalStateException("Can not notify event!");
        }
    }

    private class Distributor
    implements Runnable {
        private Distributor() {
        }

        @Override
        public void run() {
            while (!DefaultEventBus.this.closed) {
                Optional<Event> eventOp = DefaultEventBus.this.eventStorage.pop();
                if (eventOp.isPresent()) {
                    Event event = eventOp.get();
                    Queue eventListeners = (Queue)DefaultEventBus.this.listeners.get((Object)event.type());
                    if (eventListeners == null || eventListeners.isEmpty()) continue;
                    DefaultEventBus.this.worker.submit(new Noticer(event, eventListeners));
                    continue;
                }
                if (DefaultEventBus.this.closed) break;
                try {
                    TimeUnit.MILLISECONDS.sleep(10L);
                }
                catch (InterruptedException e) {
                    if (!DefaultEventBus.this.closed) continue;
                    break;
                }
            }
        }
    }

    private static class Noticer
    implements Runnable {
        private Event event;
        private Queue<Consumer<Event>> listeners;

        public Noticer(Event event, Queue<Consumer<Event>> listeners) {
            this.event = event;
            this.listeners = listeners;
        }

        @Override
        public void run() {
            for (Consumer consumer : this.listeners) {
                consumer.accept(this.event);
            }
        }
    }
}

