package com.xforceplus.ultraman.oqsengine.task.queue;

import com.xforceplus.ultraman.oqsengine.common.id.LongIdGenerator;
import com.xforceplus.ultraman.oqsengine.common.lifecycle.Lifecycle;
import com.xforceplus.ultraman.oqsengine.common.pool.ExecutorHelper;
import com.xforceplus.ultraman.oqsengine.common.serializable.SerializeStrategy;
import com.xforceplus.ultraman.oqsengine.lock.ResourceLocker;
import com.xforceplus.ultraman.oqsengine.storage.KeyValueStorage;
import com.xforceplus.ultraman.oqsengine.task.Task;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xforceplus/ultraman/oqsengine/task/queue/TaskKeyValueQueue.class */
public class TaskKeyValueQueue implements TaskQueue, Lifecycle {
    private static final String DEFAULT_NAME = "default";
    private Logger logger;

    @Resource(name = "resourceLocker")
    private ResourceLocker locker;

    @Resource(name = "longContinuousPartialOrderIdGenerator")
    private LongIdGenerator idGenerator;

    @Resource
    private KeyValueStorage kv;

    @Resource
    private SerializeStrategy serializeStrategy;
    private ExecutorService worker;
    private static final String ELEMENT_KEY = "task-queue-e";
    private static final String POINT_KEY = "task-queue-p";
    public static final String UNUSED = "unused";
    private String anyLock;
    private long initPoint;
    private ConcurrentHashMap<String, byte[]> unSubmitTask;
    private String name;
    private String pointKey;
    private String elementKeyPrefix;
    private String unusedTaskSize;
    private volatile boolean running;
    private CountDownLatch latch;
    private long syncGapTimeMs;

    /* loaded from: input_file:com/xforceplus/ultraman/oqsengine/task/queue/TaskKeyValueQueue$TaskBatchSave.class */
    private class TaskBatchSave implements Runnable {
        private TaskBatchSave() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (TaskKeyValueQueue.this.running) {
                try {
                    try {
                        TimeUnit.MILLISECONDS.sleep(TaskKeyValueQueue.this.syncGapTimeMs);
                    } catch (InterruptedException e) {
                        if (!TaskKeyValueQueue.this.running) {
                            break;
                        }
                    }
                    if (TaskKeyValueQueue.this.unSubmitTask.size() > 0) {
                        Set entrySet = new HashMap(TaskKeyValueQueue.this.unSubmitTask).entrySet();
                        TaskKeyValueQueue.this.kv.save(entrySet);
                        TaskKeyValueQueue.this.kv.incr(TaskKeyValueQueue.this.unusedTaskSize, r0.size());
                        Iterator it = entrySet.iterator();
                        while (it.hasNext()) {
                            TaskKeyValueQueue.this.unSubmitTask.remove(((Map.Entry) it.next()).getKey());
                        }
                    }
                } catch (RuntimeException e2) {
                    TaskKeyValueQueue.this.logger.error(String.format("kv error when batchSave or incr unusedTask.source[{}]", e2.getMessage()), e2);
                }
            }
            TaskKeyValueQueue.this.latch.countDown();
        }
    }

    public TaskKeyValueQueue() {
        this(DEFAULT_NAME);
    }

    public TaskKeyValueQueue(String str) {
        this(str, 1L);
    }

    public TaskKeyValueQueue(String str, long j) {
        this.logger = LoggerFactory.getLogger(TaskKeyValueQueue.class);
        this.name = str;
        this.anyLock = "anyLock-" + str;
        this.initPoint = j;
    }

    public TaskKeyValueQueue(ResourceLocker resourceLocker, LongIdGenerator longIdGenerator, KeyValueStorage keyValueStorage, SerializeStrategy serializeStrategy, long j, String str) {
        this.logger = LoggerFactory.getLogger(TaskKeyValueQueue.class);
        this.locker = resourceLocker;
        this.idGenerator = longIdGenerator;
        this.kv = keyValueStorage;
        this.serializeStrategy = serializeStrategy;
        this.initPoint = j;
        this.name = str;
        this.anyLock = "anyLock-" + str;
    }

    @PostConstruct
    public void init() throws Exception {
        this.pointKey = String.format("%s-%s", this.name, POINT_KEY);
        this.unusedTaskSize = String.format("%s-%s", this.name, UNUSED);
        this.elementKeyPrefix = String.format("%s-%s", this.name, ELEMENT_KEY);
        this.running = true;
        this.latch = new CountDownLatch(1);
        this.syncGapTimeMs = 30L;
        this.unSubmitTask = new ConcurrentHashMap<>();
        this.worker = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(10000), ExecutorHelper.buildNameThreadFactory("task", false), new ThreadPoolExecutor.AbortPolicy());
        this.worker.submit(new TaskBatchSave());
    }

    @PreDestroy
    public void destroy() {
        this.running = false;
        try {
            this.latch.await(60L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
        }
        if (this.worker.isTerminated() && this.worker.isShutdown()) {
            return;
        }
        ExecutorHelper.shutdownAndAwaitTermination(this.worker);
    }

    private void destroyNow() {
        this.running = false;
        this.worker.shutdownNow();
    }

    @Override // com.xforceplus.ultraman.oqsengine.task.queue.TaskQueue
    public void append(Task task) {
        checkRunning();
        if (task == null) {
            return;
        }
        long nextId = nextId();
        if (nextId == this.initPoint) {
            this.kv.incr(this.pointKey, 0L);
            this.kv.incr(this.pointKey, this.initPoint - 1);
        }
        task.setLocation(nextId);
        this.unSubmitTask.put(buildNextElementKey(nextId), this.serializeStrategy.serialize(task));
    }

    @Override // com.xforceplus.ultraman.oqsengine.task.queue.TaskQueue
    public Task get() {
        return get(Long.MAX_VALUE);
    }

    @Override // com.xforceplus.ultraman.oqsengine.task.queue.TaskQueue
    public Task get(long j) {
        checkRunning();
        try {
            try {
                long currentTimeMillis = System.currentTimeMillis();
                this.locker.lock(this.anyLock);
                while (System.currentTimeMillis() - currentTimeMillis < j) {
                    if (this.kv.incr(this.unusedTaskSize, 0L) <= 0 && this.running) {
                        try {
                            TimeUnit.MILLISECONDS.sleep(5000L);
                        } catch (InterruptedException e) {
                            if (!this.running) {
                            }
                        }
                    }
                    this.kv.incr(this.unusedTaskSize, -1L);
                    this.locker.unlock(this.anyLock);
                    return getTask();
                }
                this.locker.unlock(this.anyLock);
                return null;
            } catch (Exception e2) {
                this.logger.error(e2.getMessage(), e2);
                this.locker.unlock(this.anyLock);
                return null;
            }
        } catch (Throwable th) {
            this.locker.unlock(this.anyLock);
            throw th;
        }
    }

    private Task getTask() {
        Task task = null;
        try {
            String buildNextElementKey = buildNextElementKey(this.kv.incr(this.pointKey));
            int i = 0;
            while (i <= 3) {
                task = getTask(buildNextElementKey);
                if (task != null) {
                    break;
                }
                int i2 = i;
                i++;
                if (i2 >= 3) {
                    throw new RuntimeException(String.format("Task not found where elementKey equals %s .", buildNextElementKey));
                }
                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100L));
            }
            return task;
        } catch (RuntimeException e) {
            this.logger.error("incr pointKey {} failed when get task", this.pointKey);
            this.kv.incr(this.unusedTaskSize);
            throw new RuntimeException(String.format("pointKey %s incr failed. ", this.pointKey));
        }
    }

    private Task getTask(String str) {
        Optional optional = this.kv.get(str);
        if (optional.isPresent()) {
            return (Task) this.serializeStrategy.unserialize((byte[]) optional.get(), Task.class);
        }
        return null;
    }

    public String toString() {
        return "TaskKeyValueQueue{name='" + this.name + "'}";
    }

    @Override // com.xforceplus.ultraman.oqsengine.task.queue.TaskQueue
    public void ack(Task task) {
        checkRunning();
        if (task == null) {
            return;
        }
        long location = task.location();
        int i = 0;
        while (i <= 3) {
            try {
                this.kv.delete(buildNextElementKey(location));
                return;
            } catch (Exception e) {
                int i2 = i;
                i++;
                if (i2 >= 3) {
                    this.logger.error(e.getMessage());
                    throw new RuntimeException(String.format("Task ack failed taskLocation = %s", buildNextElementKey(location)));
                }
                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100L));
            }
        }
    }

    public boolean isEmpty() {
        return this.kv.incr(this.unusedTaskSize, 0L) <= 0;
    }

    private String buildNextElementKey(long j) {
        StringBuilder sb = new StringBuilder();
        sb.append(this.elementKeyPrefix).append('-').append(j);
        return sb.toString();
    }

    private long nextId() {
        if (this.idGenerator.supportNameSpace() && this.idGenerator.isContinuous()) {
            return ((Long) this.idGenerator.next(this.name)).longValue();
        }
        this.logger.error(this.idGenerator.getClass().getName() + " do not support namespace ");
        throw new RuntimeException(this.idGenerator.getClass().getName() + " do not support namespace ");
    }

    private void checkRunning() {
        if (!this.running) {
            throw new IllegalStateException("The task queue has not been initialized.");
        }
    }

    public void shutDownWorker() {
        this.worker.shutdown();
    }
}
