package com.xforceplus.tech.base.pubsub.contrib.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.xforceplus.tech.base.pubsub.PubSub;
import com.xforceplus.tech.base.pubsub.PubSubError;
import com.xforceplus.tech.base.pubsub.dispatcher.PubSubDispatcher;
import com.xforceplus.tech.base.pubsub.domain.NewMessage;
import com.xforceplus.tech.base.pubsub.domain.PublishRequest;
import com.xforceplus.tech.base.pubsub.domain.SubscribeRequest;
import com.xforceplus.tech.metadata.spec.Metadata;
import io.vavr.control.Either;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xforceplus/tech/base/pubsub/contrib/rabbitmq/RabbitmqPubSub.class */
public class RabbitmqPubSub implements PubSub {
    private String name;
    private Metadata metadata;
    private PubSubDispatcher pubSubDispatcher;
    private static final String HOST = "host";
    private static final String CONSUMER_ID = "consumerID";
    private static final String DURABLE = "durable";
    private static final String DELETED_WHEN_UNUSED = "deletedWhenUnused";
    private static final String AUTO_ACK = "autoAck";
    private static final String DELIVERY_MODE = "deliveryMode";
    private static final String REQUEUE_IN_FAILURE = "requeueInFailure";
    private static final String PREFETCH_COUNT = "prefetchCount";
    private static final String RECONNECT_WAIT = "reconnectWait";
    private static final String CONCURRENCY_MODE = "concurrencyMode";
    private static final String BACKOFF_POLICY = "backOffPolicy";
    private static final String BACKOFF_DURATION = "backOffDuration";
    private static final String BACKOFF_INITIAL_INTERVAL = "backOffInitialInterval";
    private static final String BACKOFF_MAX_INTERVAL = "backOffMaxInterval";
    private static final String BACKOFF_MAX_RETRIES = "backOffMaxRetries";
    private static final String BACKOFF_RANDOMIZATION_FACTOR = "backOffRandomizationFactor";
    private static final String BACKOFF_MULTIPLIER = "backOffMultiplier";
    private static final String BACKOFF_MAX_ELAPSED_TIME = "backOffMaxElapsedTime";
    private static final String ENABLE_DEAD_LETTER = "enableDeadLetter";
    private static final String MAX_LEN = "maxLen";
    private static final String MAX_LEN_BYTES = "maxLenBytes";
    private static final String EXCHANGE_KIND = "exchangeKind";
    private static final String reqMetadataRoutingKey = "routingKey";
    private static final String errorChannelNotInitialized = "channel not initialized";
    private static final String defaultDeadLetterExchangeFormat = "dlx-%s";
    private static final String defaultDeadLetterQueueFormat = "dlq-%s";
    private Logger logger = LoggerFactory.getLogger(RabbitmqPubSub.class);
    private Connection conn = null;
    private Channel channel = null;
    private Map<String, Boolean> declaredExchange = new HashMap();
    private Map<String, String> consumerRegistry = new ConcurrentHashMap();
    private int connectionCount = 0;

    public PubSubError publish(PublishRequest publishRequest) {
        if (this.channel == null) {
            throw new RuntimeException(errorChannelNotInitialized);
        }
        publishSync(publishRequest);
        return null;
    }

    private synchronized Either<Throwable, String> publishSync(PublishRequest publishRequest) {
        String topic = publishRequest.getTopic();
        Either<Throwable, String> ensureExchangeDeclared = ensureExchangeDeclared(topic, this.metadata.getString(EXCHANGE_KIND, "fanout"));
        if (!ensureExchangeDeclared.isRight()) {
            return ensureExchangeDeclared;
        }
        Optional flatMap = Optional.ofNullable(publishRequest.getMetadata()).flatMap(map -> {
            return Optional.ofNullable(map.get(reqMetadataRoutingKey));
        });
        try {
            this.channel.basicPublish(topic, flatMap.isPresent() ? (String) flatMap.get() : "", true, buildProperties(publishRequest), publishRequest.getData());
            return Either.right("ok");
        } catch (Throwable th) {
            return Either.left(th);
        }
    }

    private AMQP.BasicProperties buildProperties(PublishRequest publishRequest) {
        return new AMQP.BasicProperties().builder().contentType("application/cloudevents+json").build();
    }

    private synchronized Either<Throwable, String> ensureExchangeDeclared(String str, String str2) {
        if (((Boolean) Optional.ofNullable(this.declaredExchange.get(str)).orElse(false)).booleanValue()) {
            try {
                this.channel.exchangeDeclare(str, str2, true, false, false, Collections.emptyMap());
                this.declaredExchange.put(str, true);
            } catch (IOException e) {
                return Either.left(e);
            }
        }
        return Either.right("ok");
    }

    public String[] features() {
        return null;
    }

    public PubSubError subscribe(SubscribeRequest subscribeRequest, Consumer<NewMessage> consumer) {
        subscribeForever(subscribeRequest, String.format("%s-%s", this.metadata.getString(CONSUMER_ID, UUID.randomUUID().toString()), subscribeRequest.getTopic()), consumer);
        return null;
    }

    private String subscribeForever(final SubscribeRequest subscribeRequest, final String str, final Consumer<NewMessage> consumer) {
        Either<Throwable, String> ensureSubscription = ensureSubscription(subscribeRequest, str);
        if (!ensureSubscription.isRight()) {
            return null;
        }
        try {
            return this.channel.basicConsume((String) ensureSubscription.get(), this.metadata.getBool(AUTO_ACK, true).booleanValue(), str, false, false, Collections.emptyMap(), new DefaultConsumer(this.channel) { // from class: com.xforceplus.tech.base.pubsub.contrib.rabbitmq.RabbitmqPubSub.1
                public void handleDelivery(String str2, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                    NewMessage newMessage = new NewMessage();
                    newMessage.setMetadata(subscribeRequest.getMetadata());
                    newMessage.setData(bArr);
                    newMessage.setSource(RabbitmqPubSub.this.metadata.getString(RabbitmqPubSub.HOST));
                    newMessage.setContentType(new String[]{basicProperties.getContentType()});
                    newMessage.setTopic(str);
                    if ("single".equals(RabbitmqPubSub.this.metadata.getString(RabbitmqPubSub.CONCURRENCY_MODE, "single"))) {
                        consumer.accept(newMessage);
                    }
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
            return null;
        }
    }

    private synchronized Either<Throwable, String> ensureSubscription(SubscribeRequest subscribeRequest, String str) {
        return this.channel == null ? Either.left(new RuntimeException(errorChannelNotInitialized)) : prepareSubscription(subscribeRequest, str);
    }

    private Either<Throwable, String> prepareSubscription(SubscribeRequest subscribeRequest, String str) {
        Either<Throwable, String> ensureExchangeDeclared = ensureExchangeDeclared(subscribeRequest.getTopic(), this.metadata.getString(EXCHANGE_KIND, "fanout"));
        if (ensureExchangeDeclared.isLeft()) {
            return ensureExchangeDeclared.map(str2 -> {
                return null;
            });
        }
        try {
            AMQP.Queue.DeclareOk queueDeclare = this.channel.queueDeclare(str, this.metadata.getBool(DURABLE, false).booleanValue(), false, this.metadata.getBool(DELETED_WHEN_UNUSED, true).booleanValue(), Collections.emptyMap());
            Optional flatMap = Optional.ofNullable(subscribeRequest.getMetadata()).flatMap(map -> {
                return Optional.ofNullable(map.get(reqMetadataRoutingKey));
            });
            String str3 = flatMap.isPresent() ? (String) flatMap.get() : "";
            for (String str4 : str3.split(",")) {
                this.channel.queueBind(queueDeclare.getQueue(), subscribeRequest.getTopic(), str3, Collections.emptyMap());
            }
            return Either.right(queueDeclare.getQueue());
        } catch (Throwable th) {
            return Either.left(th);
        }
    }

    public String kind() {
        return "PubSub";
    }

    public String name() {
        return this.name;
    }

    public void name(String str) {
        this.name = str;
    }

    public void init(Metadata metadata) {
        this.metadata = metadata;
        dial(metadata.getString(HOST)).ifPresent(channel -> {
            this.channel = channel;
        });
    }

    private Optional<Channel> dial(String str) {
        Connection connection = null;
        try {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setAutomaticRecoveryEnabled(true);
            connectionFactory.setUri(this.metadata.getString(HOST));
            connection = connectionFactory.newConnection("");
            return connection.openChannel();
        } catch (Throwable th) {
            this.logger.error("{}", th);
            if (connection != null) {
                try {
                    connection.close();
                } catch (IOException e) {
                }
            }
            return Optional.empty();
        }
    }

    public Metadata currentMetadata() {
        return this.metadata;
    }
}
