package com.xforceplus.ultraman.adapter.elasticsearch.service.impl;

import com.xforceplus.ultraman.adapter.elasticsearch.service.ManageBocpMetadataService;
import com.xforceplus.ultraman.adapter.elasticsearch.service.constant.CommonProperty;
import com.xforceplus.ultraman.adapter.elasticsearch.service.constant.FieldMappingType;
import com.xforceplus.ultraman.adapter.elasticsearch.service.entity.FieldMapping;
import com.xforceplus.ultraman.adapter.elasticsearch.service.utils.ElasticSearchMappingBuildUtils;
import com.xforceplus.ultraman.adapter.elasticsearch.service.utils.ThreadPoolExecutorUtils;
import com.xforceplus.ultraman.adapter.elasticsearch.transport.ElasticsearchTransportExecutor;
import com.xforceplus.ultraman.adapter.elasticsearch.utils.DynamicConfigUtils;
import com.xforceplus.ultraman.cdc.adapter.CDCFilter;
import com.xforceplus.ultraman.cdc.adapter.EngineAdapterService;
import com.xforceplus.ultraman.cdc.adapter.IndexUpsertBeforeCallBack;
import com.xforceplus.ultraman.metadata.cdc.OqsEngineEntity;
import com.xforceplus.ultraman.metadata.engine.EntityClassEngine;
import com.xforceplus.ultraman.metadata.entity.FieldType;
import com.xforceplus.ultraman.metadata.entity.IEntityClass;
import com.xforceplus.ultraman.metadata.entity.IEntityField;
import com.xforceplus.ultraman.metadata.entity.IRelation;
import com.xforceplus.ultraman.sdk.core.datasource.route.TransportExecutor;
import com.xforceplus.ultraman.sdk.core.datasource.route.dynamic.config.DynamicConfig;
import io.vavr.Tuple2;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang.StringUtils;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.join.query.JoinQueryBuilders;
import org.elasticsearch.join.query.ParentIdQueryBuilder;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:com/xforceplus/ultraman/adapter/elasticsearch/service/impl/ElasticSearchServiceImpl.class */
public class ElasticSearchServiceImpl implements EngineAdapterService {
    private static final Logger log = LoggerFactory.getLogger(ElasticSearchServiceImpl.class);

    @Autowired
    public EntityClassEngine entityClassEngine;

    @Autowired
    private DynamicConfig dynamicConfig;

    @Autowired
    private ManageBocpMetadataService manageBocpMetadataService;

    @Autowired
    private TransportExecutor elasticsearchTransportExecutor;
    private int shards;
    private int replicas;
    private int retryOnConflict;
    private int threadHandleBatch;

    @Autowired(required = false)
    private List<CDCFilter> filters = new ArrayList();
    private long writeTimeOut = 50;

    @Autowired(required = false)
    private List<IndexUpsertBeforeCallBack> beforeCallBacks = new ArrayList();
    private ForkJoinPool pool = new ForkJoinPool(4);
    private volatile double sync_time = 0.0d;
    private volatile long sync_docs = 0;
    private ThreadPoolExecutor executor = ThreadPoolExecutorUtils.executor;

    public ElasticSearchServiceImpl(int i, int i2, int i3, int i4) {
        this.threadHandleBatch = 1000;
        this.shards = i;
        this.replicas = i2;
        this.threadHandleBatch = i3;
        this.retryOnConflict = i4;
    }

    public void setDynamicConfig(DynamicConfig dynamicConfig) {
        this.dynamicConfig = dynamicConfig;
    }

    public void setFilters(List<CDCFilter> list) {
        this.filters = list;
    }

    public void setBeforeCallBacks(List<IndexUpsertBeforeCallBack> list) {
        this.beforeCallBacks = list;
    }

    public void setManageBocpMetadataService(ManageBocpMetadataService manageBocpMetadataService) {
        this.manageBocpMetadataService = manageBocpMetadataService;
    }

    public void setElasticsearchTransportExecutor(TransportExecutor transportExecutor) {
        this.elasticsearchTransportExecutor = transportExecutor;
    }

    public void setEntityClassEngine(EntityClassEngine entityClassEngine) {
        this.entityClassEngine = entityClassEngine;
    }

    private XContentBuilder getxContentBuilder(Collection<IEntityField> collection, String str) throws IOException {
        XContentBuilder startObject = XContentFactory.jsonBuilder().startObject().field("dynamic", "true").field("properties").startObject();
        getxContentBuilder(collection, startObject, str);
        startObject.endObject();
        startObject.endObject();
        return startObject;
    }

    private XContentBuilder getxContentBuilder(Set<String> set, String str) throws IOException {
        XContentBuilder startObject = XContentFactory.jsonBuilder().startObject().field("dynamic", "true").field("properties").startObject().startObject(str.concat("_join")).field("type", "join").startObject("relations");
        if (set != null && set.size() > 0) {
            startObject = startObject.field(str, set.toArray());
        }
        startObject.endObject();
        startObject.endObject();
        startObject.endObject();
        startObject.endObject();
        return startObject;
    }

    private XContentBuilder getxContentBuilder(Collection<IEntityField> collection, XContentBuilder xContentBuilder, String str) throws IOException {
        for (FieldMapping fieldMapping : ElasticSearchMappingBuildUtils.getFieldInfos(collection)) {
            String replace = fieldMapping.getField().contains(".") ? fieldMapping.getField().replace(".", "_") : fieldMapping.getField();
            String concat = StringUtils.isNotEmpty(str) ? str.concat(".").concat(replace) : replace;
            if (StringUtils.equalsIgnoreCase(fieldMapping.getType(), FieldMappingType.TEXT.getType())) {
                xContentBuilder = xContentBuilder.startObject(concat).field("type", FieldMappingType.TEXT.getType()).field("analyzer", fieldMapping.getAnalyzer()).field("fielddata", "true").endObject();
            } else if (StringUtils.equalsIgnoreCase(fieldMapping.getType(), FieldMappingType.KEYWORD.getType())) {
                xContentBuilder = xContentBuilder.startObject(concat).field("type", FieldMappingType.KEYWORD.getType()).endObject();
                if (fieldMapping.getOriginField().type() == FieldType.STRINGS) {
                    xContentBuilder = xContentBuilder.startObject(concat.concat("@raw")).field("type", FieldMappingType.TEXT.getType()).endObject();
                }
            } else if (StringUtils.equalsIgnoreCase(fieldMapping.getType(), FieldMappingType.LONG.getType())) {
                xContentBuilder = xContentBuilder.startObject(concat).field("type", FieldMappingType.LONG.getType()).endObject();
            } else if (StringUtils.equalsIgnoreCase(fieldMapping.getType(), FieldMappingType.DOUBLE.getType())) {
                xContentBuilder = xContentBuilder.startObject(concat).field("type", FieldMappingType.DOUBLE.getType()).endObject();
            } else if (StringUtils.equalsIgnoreCase(fieldMapping.getType(), FieldMappingType.BOOLEAN.getType())) {
                xContentBuilder = xContentBuilder.startObject(concat).field("type", FieldMappingType.BOOLEAN.getType()).endObject();
            }
        }
        return xContentBuilder;
    }

    public List<CDCFilter> getFilters() {
        return this.filters;
    }

    public List<IndexUpsertBeforeCallBack> getCallBacks() {
        return this.beforeCallBacks;
    }

    public boolean initMetadataInitStatus() {
        return this.manageBocpMetadataService.getLoadFinish();
    }

    public boolean sync(List<OqsEngineEntity> list) {
        ArrayList arrayList = new ArrayList();
        long currentTimeMillis = System.currentTimeMillis();
        for (int i = 0; i <= list.size() / this.threadHandleBatch; i++) {
            int i2 = (i * this.threadHandleBatch) + this.threadHandleBatch;
            submitThreadHandler(list.subList(i * this.threadHandleBatch, i2 > list.size() ? list.size() : i2), arrayList);
        }
        Iterator<Future> it = arrayList.iterator();
        while (it.hasNext()) {
            try {
                if (!((Boolean) it.next().get(this.writeTimeOut, TimeUnit.SECONDS)).booleanValue()) {
                    return false;
                }
            } catch (Exception e) {
                log.error(e.getMessage());
                return false;
            }
        }
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        this.sync_time += currentTimeMillis2;
        this.sync_docs += list.size();
        log.info("elastc_sync(单位毫秒)-------elastic总共同步时间time:({})----总共同步记录:docs({})-----当前批次同步数docs:({}) 当前批次处理时间time:({})", new Object[]{Double.valueOf(this.sync_time), Long.valueOf(this.sync_docs), Integer.valueOf(list.size()), Long.valueOf(currentTimeMillis2)});
        return true;
    }

    private void submitThreadHandler(List<OqsEngineEntity> list, List<Future> list2) {
        list2.add(this.executor.submit(() -> {
            try {
                ArrayList arrayList = new ArrayList();
                ArrayList arrayList2 = new ArrayList();
                list.forEach(oqsEngineEntity -> {
                    if (oqsEngineEntity.isDeleted()) {
                        arrayList.add(oqsEngineEntity);
                    } else {
                        arrayList2.add(oqsEngineEntity);
                    }
                });
                if (arrayList2.size() <= 0 || saveOrUpdate(arrayList2)) {
                    return arrayList.size() <= 0 || deleteBatch(arrayList).booleanValue();
                }
                return false;
            } catch (Exception e) {
                log.error("FAILURE,elasticsearch submitThreadHandler method execute failed!,cause by:{}", e.getMessage());
                return false;
            }
        }));
    }

    public boolean saveOrUpdate(List<OqsEngineEntity> list) throws IOException {
        HashMap hashMap = new HashMap();
        handleTenantsBatchRows(list).entrySet().stream().forEach(entry -> {
            ArrayList arrayList = new ArrayList();
            ((Map) entry.getValue()).entrySet().stream().forEach(entry -> {
                BulkRequest bulkRequest = new BulkRequest();
                ((List) entry.getValue()).forEach(oqsEngineEntity -> {
                    UpdateRequest updateRequest = new UpdateRequest((String) entry.getKey(), String.valueOf(oqsEngineEntity.getId()));
                    IndexRequest source = new IndexRequest((String) entry.getKey()).source(oqsEngineEntity.getAttributes(), XContentType.JSON);
                    source.id(String.valueOf(oqsEngineEntity.getId()));
                    if (oqsEngineEntity.getRoutingId() != null) {
                        source.routing(oqsEngineEntity.getRoutingId());
                        updateRequest.routing(oqsEngineEntity.getRoutingId());
                    }
                    updateRequest.doc(source);
                    updateRequest.retryOnConflict(this.retryOnConflict);
                    updateRequest.docAsUpsert(true);
                    bulkRequest.add(updateRequest);
                });
                arrayList.add(bulkRequest);
            });
            hashMap.put(entry.getKey(), arrayList);
        });
        return sumbitBulk(hashMap);
    }

    private boolean sumbitBulk(Map<String, List<BulkRequest>> map) {
        for (Map.Entry<String, List<BulkRequest>> entry : map.entrySet()) {
            try {
                if (!((Boolean) this.pool.submit(() -> {
                    String str = StringUtils.equalsIgnoreCase((String) entry.getKey(), CommonProperty.defaultProfile) ? null : (String) entry.getKey();
                    return (Boolean) ((Stream) ((List) entry.getValue()).stream().parallel()).map(bulkRequest -> {
                        bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
                        try {
                            BulkResponse bulk = ((ElasticsearchTransportExecutor) this.elasticsearchTransportExecutor).m57executor(str).bulk(bulkRequest, RequestOptions.DEFAULT);
                            if (!bulk.hasFailures()) {
                                return true;
                            }
                            log.error("FAILURE,elasticsearch sumbitBulk method execute failed!,cause by:{}", bulk.buildFailureMessage());
                            return false;
                        } catch (IOException e) {
                            log.error("{}", e);
                            return false;
                        }
                    }).filter(bool -> {
                        return !bool.booleanValue();
                    }).findAny().orElse(true);
                }).get()).booleanValue()) {
                    return false;
                }
            } catch (InterruptedException | ExecutionException e) {
                log.error("{}", e);
                return false;
            }
        }
        return true;
    }

    public Map<String, Map<String, List<OqsEngineEntity>>> handleTenantsBatchRows(List<OqsEngineEntity> list) {
        try {
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
            for (OqsEngineEntity oqsEngineEntity : list) {
                HashMap hashMap = new HashMap();
                String profile = oqsEngineEntity.getEntityClassRef().getProfile();
                IEntityClass iEntityClass = (IEntityClass) this.entityClassEngine.load(String.valueOf(oqsEngineEntity.getEntityClassRef().getId()), profile).get();
                if (0 == 0 && this.manageBocpMetadataService.checkPassage(profile, iEntityClass)) {
                    Map<String, List<OqsEngineEntity>> map = (Map) concurrentHashMap.putIfAbsent(getProfile(profile), hashMap);
                    Map<String, List<OqsEngineEntity>> map2 = map == null ? hashMap : map;
                    ArrayList arrayList = new ArrayList();
                    Tuple2<Boolean, String> writeSegmentIndex = getWriteSegmentIndex(profile, oqsEngineEntity.getAttributes(), iEntityClass);
                    if (((Boolean) writeSegmentIndex._1).booleanValue()) {
                        List<OqsEngineEntity> putIfAbsent = map2.putIfAbsent((String) writeSegmentIndex._2, arrayList);
                        List<OqsEngineEntity> list2 = putIfAbsent == null ? arrayList : putIfAbsent;
                        if (iEntityClass.relations().size() > 0) {
                            HashMap hashMap2 = new HashMap();
                            hashMap2.put("name", iEntityClass.code());
                            oqsEngineEntity.getAttributes().put(iEntityClass.code().concat("_join"), hashMap2);
                        }
                        list2.add(oqsEngineEntity);
                        buildFatherAttributes(oqsEngineEntity, map2);
                        buildWideAttributes(oqsEngineEntity, iEntityClass, map2);
                    }
                }
            }
            return concurrentHashMap;
        } catch (Throwable th) {
            log.error("FAILURE,elasticsearch handleTenantsBatchRows method failed,cause by:{}", th.getMessage());
            throw new RuntimeException(th);
        }
    }

    @NotNull
    private String getProfile(String str) {
        String str2 = CommonProperty.defaultProfile;
        if (!StringUtils.isEmpty(str)) {
            str2 = str;
        }
        return str2;
    }

    private void buildFatherAttributes(OqsEngineEntity oqsEngineEntity, Map<String, List<OqsEngineEntity>> map) {
        if (oqsEngineEntity.getFather() == 0 || oqsEngineEntity.isDeleted()) {
            return;
        }
        String profile = oqsEngineEntity.getEntityClassRef().getProfile();
        ArrayList arrayList = new ArrayList();
        IEntityClass iEntityClass = (IEntityClass) this.entityClassEngine.load(String.valueOf(oqsEngineEntity.getFather()), profile).get();
        if (iEntityClass != null || this.manageBocpMetadataService.checkPassage(profile, iEntityClass)) {
            HashMap hashMap = new HashMap();
            Iterator it = iEntityClass.fields().iterator();
            while (it.hasNext()) {
                String replace = ((IEntityField) it.next()).name().replace(".", "_");
                if (StringUtils.equalsIgnoreCase(replace, "id")) {
                    hashMap.put(replace, Long.valueOf(oqsEngineEntity.getId()));
                } else {
                    hashMap.put(replace, oqsEngineEntity.getAttributes().get(replace));
                }
            }
            OqsEngineEntity oqsEngineEntity2 = new OqsEngineEntity();
            oqsEngineEntity2.setId(oqsEngineEntity.getId());
            oqsEngineEntity2.setAttributes(hashMap);
            Tuple2<Boolean, String> writeSegmentIndex = getWriteSegmentIndex(profile, oqsEngineEntity.getAttributes(), iEntityClass);
            if (((Boolean) writeSegmentIndex._1).booleanValue()) {
                List<OqsEngineEntity> putIfAbsent = map.putIfAbsent((String) writeSegmentIndex._2, arrayList);
                (putIfAbsent == null ? arrayList : putIfAbsent).add(oqsEngineEntity2);
            }
        }
    }

    private void buildWideAttributes(OqsEngineEntity oqsEngineEntity, IEntityClass iEntityClass, Map<String, List<OqsEngineEntity>> map) throws CloneNotSupportedException {
        generateRelationOqsEngine(oqsEngineEntity, iEntityClass, map);
    }

    private void generateRelationOqsEngine(OqsEngineEntity oqsEngineEntity, IEntityClass iEntityClass, Map<String, List<OqsEngineEntity>> map) throws CloneNotSupportedException {
        List toOneRelatedList = oqsEngineEntity.getToOneRelatedList();
        String profile = oqsEngineEntity.getEntityClassRef().getProfile();
        HashMap hashMap = new HashMap();
        String code = iEntityClass.code();
        Collection<IRelation> relations = iEntityClass.relations();
        toOneRelatedList.forEach(tuple4 -> {
        });
        if (relations.isEmpty()) {
            return;
        }
        for (IRelation iRelation : relations) {
            IEntityClass iEntityClass2 = (IEntityClass) this.entityClassEngine.load(String.valueOf(iRelation.getEntityClassId()), profile).get();
            if (StringUtils.equalsIgnoreCase(iRelation.getRelationType(), "TO_ONE") && iEntityClass2 != null && this.manageBocpMetadataService.checkPassage(profile, iEntityClass2) && !((List) iEntityClass2.relations().stream().filter(iRelation2 -> {
                return iRelation2.getEntityClassId() == iEntityClass.id();
            }).collect(Collectors.toList())).isEmpty()) {
                Object obj = oqsEngineEntity.getAttributes().get(iRelation.getName().concat("_").concat("id"));
                if (obj != null && StringUtils.isNotEmpty(String.valueOf(obj))) {
                    String valueOf = String.valueOf(obj);
                    OqsEngineEntity build = ((OqsEngineEntity.Builder) oqsEngineEntity.clone()).build();
                    Tuple2<Boolean, String> writeSegmentIndex = getWriteSegmentIndex(profile, (Map) hashMap.get(Long.valueOf(iRelation.getEntityClassId())), iEntityClass2);
                    if (((Boolean) writeSegmentIndex._1).booleanValue()) {
                        String str = (String) writeSegmentIndex._2;
                        HashMap hashMap2 = new HashMap();
                        if (build.getAttributes().containsKey(code.concat("_").concat("join"))) {
                            build.getAttributes().remove(code.concat("_").concat("join"));
                        }
                        build.getAttributes().keySet().stream().forEach(str2 -> {
                            if (StringUtils.equalsIgnoreCase("id", str2)) {
                                hashMap2.put(str2, build.getAttributes().get(str2));
                            }
                            hashMap2.put(code.concat(".").concat(str2), build.getAttributes().get(str2));
                        });
                        HashMap hashMap3 = new HashMap();
                        hashMap3.put("name", code);
                        hashMap3.put("parent", valueOf);
                        hashMap2.put(iEntityClass2.code().concat("_join"), hashMap3);
                        build.setAttributes(hashMap2);
                        build.setRoutingId(valueOf);
                        commondBuildWideRow(build, map, str);
                    }
                }
            }
        }
    }

    private Tuple2<Boolean, String> getWriteSegmentIndex(String str, Map<String, Object> map, IEntityClass iEntityClass) {
        try {
            return new Tuple2<>(true, this.manageBocpMetadataService.getWriteSegmentIndex(str, iEntityClass, map));
        } catch (Exception e) {
            log.warn(e.getMessage());
            return new Tuple2<>(false, (Object) null);
        }
    }

    private void parentIdQueryDelete(OqsEngineEntity oqsEngineEntity, IEntityClass iEntityClass, IEntityClass iEntityClass2, String str) {
        try {
            String insulateTenant = DynamicConfigUtils.insulateTenant(this.dynamicConfig, str, iEntityClass2.code(), iEntityClass2.ref().getAppCode());
            ParentIdQueryBuilder parentId = JoinQueryBuilders.parentId(iEntityClass.code(), String.valueOf(oqsEngineEntity.getId()));
            DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
            deleteByQueryRequest.setRouting(String.valueOf(oqsEngineEntity.getId()));
            deleteByQueryRequest.setQuery(parentId);
            deleteByQueryRequest.indices(new String[]{insulateTenant});
            deleteByQueryRequest.setConflicts("proceed");
            BulkByScrollResponse deleteByQuery = ((ElasticsearchTransportExecutor) this.elasticsearchTransportExecutor).m57executor(str).deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
            if (deleteByQuery.getDeleted() >= 1) {
                log.info("delete parent-child 冗余明细明细文档成立,删除文档条数: " + deleteByQuery.getDeleted() + ",indexName：" + insulateTenant);
            }
        } catch (Throwable th) {
            log.error("FAILURE,elasticsearch parentIdQueryDelete method failed,cause by:{}", th.getMessage());
            throw new RuntimeException(th);
        }
    }

    private void commondBuildWideRow(OqsEngineEntity oqsEngineEntity, Map<String, List<OqsEngineEntity>> map, String str) {
        ArrayList arrayList = new ArrayList();
        List<OqsEngineEntity> putIfAbsent = map.putIfAbsent(str, arrayList);
        (putIfAbsent == null ? arrayList : putIfAbsent).add(oqsEngineEntity);
    }

    private Boolean deleteBatch(List<OqsEngineEntity> list) throws IOException {
        HashMap hashMap = new HashMap();
        handleTenantsBatchRows(list).entrySet().stream().forEach(entry -> {
            ArrayList arrayList = new ArrayList();
            ((Map) entry.getValue()).entrySet().stream().forEach(entry -> {
                BulkRequest bulkRequest = new BulkRequest();
                ((List) entry.getValue()).forEach(oqsEngineEntity -> {
                    DeleteRequest id = new DeleteRequest((String) entry.getKey()).id(String.valueOf(oqsEngineEntity.getId()));
                    if (oqsEngineEntity.getRoutingId() != null) {
                        id.routing(oqsEngineEntity.getRoutingId());
                    }
                    bulkRequest.add(id);
                });
                arrayList.add(bulkRequest);
            });
            hashMap.put(entry.getKey(), arrayList);
        });
        return Boolean.valueOf(sumbitBulk(hashMap));
    }
}
