package com.xforceplus.ultraman.cdc.processor.impl;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.xforceplus.ultraman.cdc.adapter.EngineAdapterService;
import com.xforceplus.ultraman.cdc.context.ParserContext;
import com.xforceplus.ultraman.cdc.utils.ThreadPoolExecutorUtils;
import com.xforceplus.ultraman.metadata.cdc.OqsEngineEntity;
import com.xforceplus.ultraman.metadata.engine.EntityClassEngine;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xforceplus/ultraman/cdc/processor/impl/BinlogToElasticProcess.class */
public class BinlogToElasticProcess {
    private static final int maxThreadNums = 10;
    private static final Logger log = LoggerFactory.getLogger(BinlogToElasticProcess.class);
    private static ThreadPoolExecutor executor = ThreadPoolExecutorUtils.executor;
    private static long time_total = 0;

    private static int dynamicThread(int i, int i2) {
        if (i <= i2) {
            return 1;
        }
        int i3 = i % i2 == 0 ? i / i2 : (i / i2) + 1;
        if (i3 > 10) {
            i3 = 10;
        }
        return i3;
    }

    public static int parseBinlogMessage(Message message, EntityClassEngine entityClassEngine, EngineAdapterService engineAdapterService, int i) {
        do {
            try {
            } catch (Throwable th) {
                th.printStackTrace();
                throw new RuntimeException("Adapter Service return False");
            }
        } while (!engineAdapterService.initMetedataInitStatus());
        long currentTimeMillis = System.currentTimeMillis();
        ParserContext parserContext = new ParserContext(message.getId());
        AtomicInteger atomicInteger = new AtomicInteger();
        Map<CanalEntry.EventType, Map<String, Map<Long, OqsEngineEntity>>> parseCanalEntries = new ParseMessageToRowData(entityClassEngine).parseCanalEntries(message.getEntries(), parserContext);
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        time_total += currentTimeMillis2;
        log.info("计算单位(毫秒) Message当前拉取批次大小:{},CDC处理总消耗时间:{},CDC当时处理批次消耗时间:{}", new Object[]{Integer.valueOf(message.getEntries().size()), Long.valueOf(time_total), Long.valueOf(currentTimeMillis2)});
        HashMap hashMap = new HashMap();
        mergeEventTypeMaps(hashMap, parseCanalEntries);
        ArrayList arrayList = new ArrayList();
        Iterator it = hashMap.entrySet().iterator();
        while (it.hasNext()) {
            List list = (List) ((Map) ((Map.Entry) it.next()).getValue()).values().stream().collect(Collectors.toList());
            dynamicThread(list.size(), i);
            arrayList.add(executor.submit(() -> {
                return Boolean.valueOf(engineAdapterService.batchUpsertOperation(list));
            }));
            atomicInteger.addAndGet(list.size());
            threadWait(arrayList);
        }
        return atomicInteger.get();
    }

    private static void threadWait(List<Future> list) throws Throwable {
        if (list.size() >= 10) {
            Iterator<Future> it = list.iterator();
            while (it.hasNext()) {
                if (!((Boolean) it.next().get()).booleanValue()) {
                    throw new RuntimeException("Adapter Service return False");
                }
            }
        }
    }

    private static void mergeEventTypeMaps(Map<String, Map<Long, OqsEngineEntity>> map, Map<CanalEntry.EventType, Map<String, Map<Long, OqsEngineEntity>>> map2) {
        for (CanalEntry.EventType eventType : new CanalEntry.EventType[]{CanalEntry.EventType.INSERT, CanalEntry.EventType.UPDATE, CanalEntry.EventType.UPDATE}) {
            Map<String, Map<Long, OqsEngineEntity>> map3 = map2.get(eventType);
            if (map3 != null) {
                map3.entrySet().forEach(entry -> {
                    Map map4 = (Map) map.get(entry.getKey());
                    if (map4 != null) {
                        map4.putAll((Map) entry.getValue());
                    } else {
                        map.put(entry.getKey(), entry.getValue());
                    }
                });
            }
        }
    }
}
