package com.xforceplus.ultraman.flows.common.sqs;

import com.google.common.collect.Lists;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.exception.AbortedException;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
import software.amazon.awssdk.services.sqs.model.SqsException;

/* loaded from: input_file:com/xforceplus/ultraman/flows/common/sqs/SqsMessageListener.class */
public class SqsMessageListener implements MessageListener {
    private static final Logger log = LoggerFactory.getLogger(SqsMessageListener.class);
    private static final String CONSUMER_LOOP_THREAD_PREFIX = "sqs-consumer-loop";
    private static final String MESSAGE_PROCESSOR_THREAD_PREFIX = "sqs-listener";
    private final SqsQueue queue;
    private final SqsClient sqsClient;
    private final Map<String, Future<?>> consumerMap = new ConcurrentHashMap();
    private final Object monitor = new Object();
    private volatile boolean isRunning = false;
    private ExecutorService taskExecutor;
    private ExecutorService consumerLoopExecutor;

    /* loaded from: input_file:com/xforceplus/ultraman/flows/common/sqs/SqsMessageListener$MessageProcessor.class */
    private static class MessageProcessor implements Runnable {
        private final Message message;
        private final SqsQueue queue;
        private final Consumer<List<Message>> delFun;

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.queue.getInterceptors().forEach(handlerInterceptor -> {
                    handlerInterceptor.beforeHandle(this.message);
                });
                if (this.queue.getHandler().handle(this.message)) {
                    this.delFun.accept(Lists.newArrayList(new Message[]{this.message}));
                }
            } catch (Exception e) {
                ErrorHandler errorHandler = this.queue.getErrorHandler();
                if (errorHandler != null) {
                    errorHandler.onError(this.message, e);
                } else {
                    SqsMessageListener.log.error("{} failed to process message {}", new Object[]{this.queue.getUrl(), this.message.messageId(), e});
                }
            } finally {
                MDC.clear();
            }
        }

        private MessageProcessor(Message message, SqsQueue sqsQueue, Consumer<List<Message>> consumer) {
            this.message = message;
            this.queue = sqsQueue;
            this.delFun = consumer;
        }
    }

    /* loaded from: input_file:com/xforceplus/ultraman/flows/common/sqs/SqsMessageListener$QueueConsumer.class */
    private class QueueConsumer implements Runnable {
        private final String id;

        @Override // java.lang.Runnable
        public void run() {
            while (SqsMessageListener.this.isActive(this.id)) {
                try {
                    for (Message message : SqsMessageListener.this.receive().messages()) {
                        ExecutorService executorService = SqsMessageListener.this.taskExecutor;
                        SqsQueue sqsQueue = SqsMessageListener.this.queue;
                        SqsMessageListener sqsMessageListener = SqsMessageListener.this;
                        executorService.submit(new MessageProcessor(message, sqsQueue, sqsMessageListener::deleteBatch));
                    }
                } catch (Throwable th) {
                    SqsMessageListener.log.error("{} - Unhandled exception in QueueConsumer", SqsMessageListener.this.queue.getUrl(), th);
                }
            }
        }

        private QueueConsumer(String str) {
            this.id = str;
        }
    }

    public static SqsMessageListenerBuilder builder() {
        return new SqsMessageListenerBuilder();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SqsMessageListener(SqsClient sqsClient, ExecutorService executorService, SqsQueue sqsQueue) {
        this.sqsClient = (SqsClient) Objects.requireNonNull(sqsClient);
        this.queue = (SqsQueue) Objects.requireNonNull(sqsQueue);
        this.taskExecutor = executorService;
    }

    @Override // com.xforceplus.ultraman.flows.common.sqs.MessageListener
    public void subscribe() {
        synchronized (this.monitor) {
            if (this.isRunning) {
                return;
            }
            log.info("{} - starting SqsMessageListener", this.queue.getUrl());
            this.consumerLoopExecutor = createConsumerLoopExecutor();
            if (this.taskExecutor == null) {
                this.taskExecutor = defaultMessageProcessorExecutor();
            }
            scheduleConsumers();
            this.isRunning = true;
            log.info("{} - SqsMessageListener started", this.queue.getUrl());
        }
    }

    @Override // com.xforceplus.ultraman.flows.common.sqs.MessageListener
    public void destroy() {
        synchronized (this.monitor) {
            log.info("{} - shutting down SqsMessageListener", this.queue.getUrl());
            this.isRunning = false;
            this.consumerMap.values().forEach(future -> {
                future.cancel(false);
            });
            this.consumerMap.clear();
            this.taskExecutor.shutdown();
            this.consumerLoopExecutor.shutdown();
        }
    }

    @Override // com.xforceplus.ultraman.flows.common.sqs.MessageListener
    public void awaitTermination(long j, TimeUnit timeUnit) throws InterruptedException {
        if (this.consumerLoopExecutor.awaitTermination(j, timeUnit)) {
            return;
        }
        this.consumerLoopExecutor.shutdownNow();
    }

    SqsQueue getQueue() {
        return this.queue;
    }

    private ExecutorService createConsumerLoopExecutor() {
        return Executors.newFixedThreadPool(this.queue.getLoopConcurrency().intValue(), new PrefixedNamedThreadFactory(CONSUMER_LOOP_THREAD_PREFIX));
    }

    private ExecutorService defaultMessageProcessorExecutor() {
        int intValue = this.queue.getConcurrency().intValue();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(intValue, intValue, 120L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new PrefixedNamedThreadFactory(MESSAGE_PROCESSOR_THREAD_PREFIX));
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        return threadPoolExecutor;
    }

    private void scheduleConsumers() {
        for (int i = 0; i < this.queue.getLoopConcurrency().intValue(); i++) {
            this.consumerMap.computeIfAbsent(UUID.randomUUID().toString(), str -> {
                return this.consumerLoopExecutor.submit(new QueueConsumer(str));
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isActive(String str) {
        Future<?> future = this.consumerMap.get(str);
        return (future == null || future.isDone()) ? false : true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ReceiveMessageResponse receive() {
        try {
            return this.sqsClient.receiveMessage((ReceiveMessageRequest) ReceiveMessageRequest.builder().queueUrl(this.queue.getUrl()).waitTimeSeconds(Integer.valueOf(this.queue.getLongPolling().booleanValue() ? 20 : 1)).maxNumberOfMessages(this.queue.getMaxBatchSize()).visibilityTimeout(this.queue.getVisibilityTimeoutSeconds()).messageAttributeNames(new String[]{"All"}).build());
        } catch (Exception e) {
            return (ReceiveMessageResponse) ReceiveMessageResponse.builder().messages(Collections.emptyList()).build();
        } catch (SdkClientException | SqsException e2) {
            log.error("{} - SQS sdk receiveMessage error", this.queue.getUrl(), e2);
            try {
                TimeUnit.SECONDS.sleep(5L);
            } catch (InterruptedException e3) {
                Thread.currentThread().interrupt();
            }
            return (ReceiveMessageResponse) ReceiveMessageResponse.builder().messages(Collections.emptyList()).build();
        } catch (AbortedException e4) {
            return (ReceiveMessageResponse) ReceiveMessageResponse.builder().messages(Collections.emptyList()).build();
        }
    }

    public void deleteBatch(List<Message> list) {
        List list2 = (List) list.stream().map(message -> {
            return (DeleteMessageBatchRequestEntry) DeleteMessageBatchRequestEntry.builder().id(message.messageId()).receiptHandle(message.receiptHandle()).build();
        }).collect(Collectors.toList());
        if (list2.isEmpty()) {
            return;
        }
        try {
            this.sqsClient.deleteMessageBatch((DeleteMessageBatchRequest) DeleteMessageBatchRequest.builder().queueUrl(this.queue.getUrl()).entries(list2).build());
        } catch (AwsServiceException | SdkClientException e) {
            log.error("{} - SQS sdk deleteMessageBatch error", this.queue.getUrl(), e);
        }
    }
}
