package com.xforceplus.ultraman.flows.common.sqs.spring.core;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.xforceplus.ultraman.flows.common.sqs.MessageListener;
import com.xforceplus.ultraman.flows.common.sqs.spring.SqsListenerRegistry;
import com.xforceplus.ultraman.flows.message.util.StringUtils;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.SmartLifecycle;

/* loaded from: input_file:com/xforceplus/ultraman/flows/common/sqs/spring/core/DefaultSqsListenerRegistry.class */
public class DefaultSqsListenerRegistry implements SqsListenerRegistry, SmartLifecycle, ApplicationContextAware {
    private ConfigurableApplicationContext applicationContext;
    private static final String DYNAMIC_LISTENER_PREFIX = "$dynamic_$";
    private static final Logger logger = LoggerFactory.getLogger(DefaultSqsListenerRegistry.class);
    private final Map<String, MessageListener> listeners = new ConcurrentHashMap();
    private final AtomicInteger listenerCounter = new AtomicInteger(1);
    private volatile boolean running = false;

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        if (applicationContext instanceof ConfigurableApplicationContext) {
            this.applicationContext = (ConfigurableApplicationContext) applicationContext;
        }
    }

    @Override // com.xforceplus.ultraman.flows.common.sqs.spring.SqsListenerRegistry
    public void registerListener(MessageListener messageListener) {
        Objects.requireNonNull(messageListener, "SqsQueueMessageListener must not be null");
        String format = String.format("sqsListener%s", Integer.valueOf(this.listenerCounter.getAndIncrement()));
        if (this.listeners.putIfAbsent(format, messageListener) == null) {
            this.applicationContext.getBeanFactory().registerSingleton(format, messageListener);
        }
    }

    @Override // com.xforceplus.ultraman.flows.common.sqs.spring.SqsListenerRegistry
    public void registerDynamicListener(String str, final MessageListener messageListener) {
        Objects.requireNonNull(messageListener, "SqsQueueMessageListener must not be null");
        Preconditions.checkArgument(StringUtils.isNotBlank(str), "Listener name must not be empty");
        this.listeners.computeIfAbsent(DYNAMIC_LISTENER_PREFIX + str, new Function<String, MessageListener>() { // from class: com.xforceplus.ultraman.flows.common.sqs.spring.core.DefaultSqsListenerRegistry.1
            public MessageListener apply(String str2) {
                return messageListener;
            }
        });
    }

    public void start() {
        logger.info("Start all sqs listeners count: {} ......", Integer.valueOf(this.listeners.size()));
        this.listeners.values().forEach((v0) -> {
            v0.subscribe();
        });
        this.running = true;
        logger.info("Start all sqs listeners...... end!");
    }

    public void stop() {
        logger.info("Stop all sqs listeners count: {}......", Integer.valueOf(this.listeners.size()));
        this.running = false;
        this.listeners.values().forEach((v0) -> {
            v0.destroy();
        });
        Iterator<MessageListener> it = this.listeners.values().iterator();
        while (it.hasNext()) {
            try {
                it.next().awaitTermination(20L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        logger.info("Stop all sqs listeners...... end!");
    }

    @Override // com.xforceplus.ultraman.flows.common.sqs.spring.SqsListenerRegistry
    public void stopDynamicListeners() {
        logger.info("Stopping dynamic SQS listeners ......");
        this.listeners.entrySet().removeIf(entry -> {
            String str = (String) entry.getKey();
            if (!str.startsWith(DYNAMIC_LISTENER_PREFIX)) {
                return false;
            }
            try {
                MessageListener messageListener = (MessageListener) entry.getValue();
                messageListener.destroy();
                messageListener.awaitTermination(20L, TimeUnit.SECONDS);
                this.applicationContext.getBeanFactory().destroyScopedBean(str);
                return true;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return true;
            }
        });
        logger.info("Stopped dynamic SQS listeners.");
    }

    @Override // com.xforceplus.ultraman.flows.common.sqs.spring.SqsListenerRegistry
    public void startDynamicListeners() {
        logger.info("Start  dynamic sqs listeners count : {}......", Integer.valueOf(getDynamicListeners().size()));
        getDynamicListeners().forEach((v0) -> {
            v0.subscribe();
        });
        logger.info("Start dynamic sqs listeners...... end!");
    }

    @NotNull
    private List<MessageListener> getDynamicListeners() {
        return (List) this.listeners.entrySet().stream().filter(entry -> {
            return ((String) entry.getKey()).startsWith(DYNAMIC_LISTENER_PREFIX);
        }).map((v0) -> {
            return v0.getValue();
        }).collect(Collectors.toList());
    }

    public boolean isRunning() {
        return this.running;
    }

    public boolean isAutoStartup() {
        return false;
    }
}
