/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.oqsengine.sdk.service.impl;

import akka.NotUsed;
import akka.japi.function.Function;
import akka.stream.javadsl.Source;
import com.xforceplus.ultraman.oqsengine.pojo.dto.entity.IEntityClass;
import com.xforceplus.ultraman.oqsengine.pojo.reader.record.GeneralRecord;
import com.xforceplus.ultraman.oqsengine.pojo.reader.record.Record;
import com.xforceplus.ultraman.oqsengine.sdk.service.EntityService;
import com.xforceplus.ultraman.oqsengine.sdk.service.ExportSource;
import com.xforceplus.ultraman.oqsengine.sdk.vo.dto.ConditionQueryRequest;
import io.vavr.Tuple2;
import io.vavr.control.Either;
import java.io.Serializable;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SequenceExportSource
implements ExportSource {
    private Logger logger = LoggerFactory.getLogger(ExportSource.class);
    private int step = 1000;
    private int maxRetryTimes = 5;
    private EntityService entityService;

    public SequenceExportSource(EntityService entityService, int step) {
        this.entityService = entityService;
        this.step = step;
    }

    @Override
    public Source<Record, NotUsed> source(IEntityClass entityClass, ConditionQueryRequest queryRequest) {
        AtomicInteger cursor = new AtomicInteger(0);
        AtomicInteger error = new AtomicInteger(0);
        return Source.repeat((Object)1).flatMapConcat((Function & Serializable)i -> {
            this.logger.info("-----Export {}:{} ---- query {} times ----", new Object[]{entityClass.code(), entityClass.id(), cursor.get()});
            Either<String, Tuple2<Integer, List<Record>>> byCondition = this.entityService.findRecordsByCondition(entityClass, null, this.toQueryCondition(queryRequest, cursor.getAndIncrement(), this.step));
            if (byCondition.isRight()) {
                error.set(0);
                List ret = (List)((Tuple2)byCondition.get())._2();
                this.logger.warn("-----Export {}:{} ---- clear error and found size is {} ----", new Object[]{entityClass.code(), entityClass.id(), ret.size()});
                if (ret.size() < this.step) {
                    LinkedList<Record> list = new LinkedList<Record>(ret);
                    list.addLast(GeneralRecord.empty());
                    return Source.from(list);
                }
                return Source.from((Iterable)((Iterable)((Tuple2)byCondition.get())._2()));
            }
            this.logger.warn("-----Export {}:{} ---- found error {} ----", new Object[]{entityClass.code(), entityClass.id(), byCondition.getLeft()});
            if (error.getAndIncrement() < this.maxRetryTimes) {
                return Source.empty();
            }
            return Source.single((Object)GeneralRecord.empty());
        }).takeWhile(Record::nonEmpty);
    }

    private ConditionQueryRequest toQueryCondition(ConditionQueryRequest request, int pageNo, int pageSize) {
        request.setPageSize(pageSize);
        request.setPageNo(pageNo);
        return request;
    }
}

