/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.apollo.janus.standalone.sdk.message.sqs;

import com.amazonaws.services.sqs.model.AmazonSQSException;
import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
import com.xforceplus.apollo.janus.standalone.sdk.message.sqs.DefaultSQSListener;
import com.xforceplus.xplat.aws.sqs.SqsService;
import com.xforceplus.xplat.aws.sqs.bean.MessageXplat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import net.wicp.tams.common.Conf;
import net.wicp.tams.common.apiext.TimeAssist;
import net.wicp.tams.common.thread.ThreadPool;
import net.wicp.tams.common.thread.threadlocal.PerThreadValue;
import net.wicp.tams.common.thread.threadlocal.PerthreadManager;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PullSqsMessageTask
extends Thread {
    private static final Logger log = LoggerFactory.getLogger(PullSqsMessageTask.class);
    private String queueName;
    private DefaultSQSListener sqsListerer;
    private SqsService sqs;
    private volatile boolean canRun = true;

    public PullSqsMessageTask(String queueName, DefaultSQSListener sqsListerer, SqsService sqs) {
        this.queueName = queueName;
        this.sqsListerer = sqsListerer;
        this.sqs = sqs;
    }

    public boolean isCanRun() {
        return this.canRun;
    }

    public void setCanRun(boolean canRun) {
        this.canRun = canRun;
    }

    @Override
    public void run() {
        final String namespace = "default";
        Properties props = Conf.copyProperties();
        int waitTimeSeconds = 0;
        String attributeNames = ".*";
        int maxNumberOfMessagesTrue = 4;
        log.info("the namespace:{} queueName:{} maxNumberOfMessages:{}", new Object[]{namespace, this.queueName, 4});
        ExecutorService threadPool = ThreadPool.getThreadPoolByName((String)namespace, (Properties)props);
        while (this.canRun) {
            try {
                List queueRecievList = this.sqs.queueReceiver(this.queueName, 4, waitTimeSeconds, new String[]{attributeNames});
                TimeAssist.reDoWaitInit((String)"aws-sqs");
                if (CollectionUtils.isNotEmpty((Collection)queueRecievList)) {
                    ArrayList<FutureTask<Boolean>> futureTasks = new ArrayList<FutureTask<Boolean>>();
                    for (final MessageXplat messageXplat : queueRecievList) {
                        FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>(){

                            @Override
                            public Boolean call() {
                                PerThreadValue createValue = PerthreadManager.getInstance().createValue((Object)"tams-namespace", String.class);
                                createValue.set((Object)namespace);
                                return PullSqsMessageTask.this.sqsListerer.doListener(messageXplat);
                            }
                        });
                        futureTasks.add(futureTask);
                        threadPool.submit(futureTask);
                    }
                    if (futureTasks.size() <= 0) continue;
                    for (FutureTask futureTask : futureTasks) {
                        futureTask.get();
                    }
                    continue;
                }
                try {
                    Thread.sleep(3000L);
                }
                catch (InterruptedException futureTasks) {
                }
            }
            catch (QueueDoesNotExistException e) {
                log.error("\u961f\u5217\uff1a" + this.queueName + "\u4e0d\u5b58\u5728");
                log.error("---  ", (Throwable)e);
                break;
            }
            catch (AmazonSQSException e) {
                log.error("\u6b64\u5e10\u53f7\u6709\u592a\u591a\u7684\u8bf7\u6c42\uff0c\u961f\u5217\uff1a" + this.queueName, (Throwable)e);
                boolean reDoWait = TimeAssist.reDoWait((String)"aws-sqs", (int)5);
                if (!reDoWait) continue;
                try {
                    Thread.sleep(32000L);
                }
                catch (InterruptedException interruptedException) {}
            }
            catch (Throwable e) {
                log.error("\u62c9\u53d6\u8bb0\u5f55\u65f6\u5931\u8d25", e);
                try {
                    Thread.sleep(60000L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }
    }
}

