package com.xforceplus.ultraman.cdc.processor.impl;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.xforceplus.ultraman.cdc.adapter.EngineAdapterService;
import com.xforceplus.ultraman.cdc.context.ParserContext;
import com.xforceplus.ultraman.cdc.dto.constant.CDCConstant;
import com.xforceplus.ultraman.cdc.utils.ThreadPoolExecutorUtils;
import com.xforceplus.ultraman.metadata.cdc.OqsEngineEntity;
import com.xforceplus.ultraman.metadata.engine.EntityClassEngine;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xforceplus/ultraman/cdc/processor/impl/BinlogToESUtils.class */
public class BinlogToESUtils {
    private final int threadBatchSize = CDCConstant.DEFAULT_BATCH_SIZE;
    private final int maxThreadNums = 4;
    private static final Logger log = LoggerFactory.getLogger(BinlogToESUtils.class);
    private static ThreadPoolExecutor executor = ThreadPoolExecutorUtils.executor;
    private static long time_total = 0;

    private int dynamicThread(int i, int i2) {
        if (i <= i2) {
            return 1;
        }
        int i3 = i % i2 == 0 ? i / i2 : (i / i2) + 1;
        if (i3 > 4) {
            i3 = 4;
        }
        return i3;
    }

    public static int parseBinlogMessage(Message message, EntityClassEngine entityClassEngine, EngineAdapterService engineAdapterService) {
        try {
            ParserContext parserContext = new ParserContext(message.getId());
            AtomicInteger atomicInteger = new AtomicInteger();
            long currentTimeMillis = System.currentTimeMillis();
            Map<CanalEntry.EventType, Map<String, Map<Long, OqsEngineEntity>>> parseCanalEntries = new ParseMessageToRowData(entityClassEngine).parseCanalEntries(message.getEntries(), parserContext);
            time_total += System.currentTimeMillis() - currentTimeMillis;
            log.info("总执行消耗时间:{}", Long.valueOf(time_total));
            HashMap hashMap = new HashMap();
            parseEventTypeMaps(hashMap, parseCanalEntries, CanalEntry.EventType.INSERT);
            parseEventTypeMaps(hashMap, parseCanalEntries, CanalEntry.EventType.UPDATE);
            parseEventTypeMaps(hashMap, parseCanalEntries, CanalEntry.EventType.DELETE);
            ArrayList arrayList = new ArrayList();
            for (Map.Entry entry : hashMap.entrySet()) {
                arrayList.add(executor.submit(() -> {
                    return Boolean.valueOf(engineAdapterService.batchUpsertOperation((Collection) entry.getValue()));
                }));
                atomicInteger.addAndGet(((Collection) entry.getValue()).size());
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                if (!((Boolean) ((Future) it.next()).get()).booleanValue()) {
                    throw new RuntimeException("Adapter Service return False");
                }
            }
            return atomicInteger.get();
        } catch (Exception e) {
            throw new RuntimeException("Adapter Service return False");
        }
    }

    private static void parseEventTypeMaps(Map<String, Collection<OqsEngineEntity>> map, Map<CanalEntry.EventType, Map<String, Map<Long, OqsEngineEntity>>> map2, CanalEntry.EventType eventType) {
        Map<String, Map<Long, OqsEngineEntity>> map3 = map2.get(eventType);
        if (map3 != null) {
            map3.entrySet().forEach(entry -> {
                Collection collection = (Collection) map.get(entry.getKey());
                if (collection != null) {
                    collection.addAll(((Map) entry.getValue()).values());
                } else {
                    map.put(entry.getKey(), ((Map) entry.getValue()).values());
                }
            });
        }
    }
}
