package com.xforceplus.ultraman.oqsengine.plus.event;

import com.xforceplus.ultraman.oqsengine.plus.event.storage.EventStorage;
import java.util.Iterator;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

/* loaded from: input_file:BOOT-INF/lib/event-2023.6.19-141905-feature-merge.jar:com/xforceplus/ultraman/oqsengine/plus/event/DefaultEventBus.class */
public class DefaultEventBus implements EventBus {
    private ConcurrentMap<EventType, Queue<Consumer<Event>>> listeners = new ConcurrentHashMap();
    private EventStorage eventStorage;
    private ExecutorService worker;
    private volatile boolean closed;

    /* loaded from: input_file:BOOT-INF/lib/event-2023.6.19-141905-feature-merge.jar:com/xforceplus/ultraman/oqsengine/plus/event/DefaultEventBus$Distributor.class */
    private class Distributor implements Runnable {
        private Distributor() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!DefaultEventBus.this.closed) {
                Optional<Event> pop = DefaultEventBus.this.eventStorage.pop();
                if (pop.isPresent()) {
                    Event event = pop.get();
                    Queue queue = (Queue) DefaultEventBus.this.listeners.get(event.type());
                    if (queue != null && !queue.isEmpty()) {
                        Iterator it = queue.iterator();
                        while (it.hasNext()) {
                            ((Consumer) it.next()).accept(event);
                        }
                    }
                } else {
                    if (DefaultEventBus.this.closed) {
                        return;
                    }
                    try {
                        TimeUnit.MILLISECONDS.sleep(10L);
                    } catch (InterruptedException e) {
                        if (DefaultEventBus.this.closed) {
                            return;
                        }
                    }
                }
            }
        }
    }

    public DefaultEventBus(EventStorage eventStorage, ExecutorService executorService) {
        this.eventStorage = eventStorage;
        this.worker = executorService;
        if (this.eventStorage == null) {
            throw new IllegalArgumentException("Invalid EventStorage instance.");
        }
        if (executorService == null) {
            throw new IllegalArgumentException("Invalid ExecutorService instance.");
        }
    }

    @Override // com.xforceplus.ultraman.sdk.infra.lifecycle.SimpleLifecycle
    @PostConstruct
    public void init() {
        if (this.closed) {
            return;
        }
        this.closed = false;
        this.worker.submit(new Distributor());
    }

    @Override // com.xforceplus.ultraman.sdk.infra.lifecycle.SimpleLifecycle
    @PreDestroy
    public void destroy() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.listeners.clear();
    }

    @Override // com.xforceplus.ultraman.oqsengine.plus.event.EventBus
    public void watch(EventType eventType, Consumer<Event> consumer) {
        this.listeners.computeIfAbsent(eventType, eventType2 -> {
            return new ConcurrentLinkedQueue();
        }).offer(consumer);
    }

    @Override // com.xforceplus.ultraman.oqsengine.plus.event.EventBus
    public void notify(Event event) {
        Queue<Consumer<Event>> queue = this.listeners.get(event.type());
        if (queue != null && !queue.isEmpty() && !this.eventStorage.push(event)) {
            throw new IllegalStateException("Can not notify event!");
        }
    }
}
