/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.cdc.processor.impl;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.xforceplus.ultraman.cdc.adapter.EngineAdapterService;
import com.xforceplus.ultraman.cdc.context.ParserContext;
import com.xforceplus.ultraman.cdc.processor.impl.ParseMessageToRowData;
import com.xforceplus.ultraman.cdc.utils.ThreadPoolExecutorUtils;
import com.xforceplus.ultraman.metadata.cdc.OqsEngineEntity;
import com.xforceplus.ultraman.metadata.engine.EntityClassEngine;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BinlogToESUtils {
    private static final Logger log = LoggerFactory.getLogger(BinlogToESUtils.class);
    private static ThreadPoolExecutor executor = ThreadPoolExecutorUtils.executor;
    private final int threadBatchSize = 1024;
    private final int maxThreadNums = 4;
    private static long time_total = 0L;

    private int dynamicThread(int entries, int threadBatchSize) {
        int threadNum;
        if (entries <= threadBatchSize) {
            return 1;
        }
        int n = threadNum = entries % threadBatchSize == 0 ? entries / threadBatchSize : entries / threadBatchSize + 1;
        if (threadNum > 4) {
            threadNum = 4;
        }
        return threadNum;
    }

    public static int parseBinlogMessage(Message message, EntityClassEngine engine, EngineAdapterService engineAdapterService) {
        try {
            long start = System.currentTimeMillis();
            ParserContext parserContext = new ParserContext(message.getId());
            AtomicInteger oqsEngineEntityTotals = new AtomicInteger();
            ParseMessageToRowData messageToRowData = new ParseMessageToRowData(engine);
            Map<CanalEntry.EventType, Map<String, Map<Long, OqsEngineEntity>>> binlogRowDataMap = messageToRowData.parseCanalEntries(message.getEntries(), parserContext);
            long endTime = System.currentTimeMillis() - start;
            log.info("\u8ba1\u7b97\u5355\u4f4d(\u6beb\u79d2) Message\u5f53\u524d\u62c9\u53d6\u6279\u6b21\u5927\u5c0f:{},CDC\u5904\u7406\u603b\u6d88\u8017\u65f6\u95f4:{},CDC\u5f53\u65f6\u5904\u7406\u6279\u6b21\u6d88\u8017\u65f6\u95f4:{}", new Object[]{message.getEntries().size(), time_total += endTime, endTime});
            HashMap<String, Collection<OqsEngineEntity>> oqsEngineEntityMap = new HashMap<String, Collection<OqsEngineEntity>>();
            BinlogToESUtils.parseEventTypeMaps(oqsEngineEntityMap, binlogRowDataMap, CanalEntry.EventType.INSERT);
            BinlogToESUtils.parseEventTypeMaps(oqsEngineEntityMap, binlogRowDataMap, CanalEntry.EventType.UPDATE);
            BinlogToESUtils.parseEventTypeMaps(oqsEngineEntityMap, binlogRowDataMap, CanalEntry.EventType.DELETE);
            ArrayList<Future<Boolean>> futures = new ArrayList<Future<Boolean>>();
            for (Map.Entry entry : oqsEngineEntityMap.entrySet()) {
                Future<Boolean> future = executor.submit(() -> engineAdapterService.batchUpsertOperation((Collection)tableEntry.getValue()));
                futures.add(future);
                oqsEngineEntityTotals.addAndGet(((Collection)entry.getValue()).size());
            }
            for (Future future : futures) {
                boolean statusFlag = (Boolean)future.get();
                if (statusFlag) continue;
                throw new RuntimeException("Adapter Service return False");
            }
            return oqsEngineEntityTotals.get();
        }
        catch (Exception e) {
            throw new RuntimeException("Adapter Service return False");
        }
    }

    private static void parseEventTypeMaps(Map<String, Collection<OqsEngineEntity>> oqsEngineEntityMap, Map<CanalEntry.EventType, Map<String, Map<Long, OqsEngineEntity>>> binlogRowDataMap, CanalEntry.EventType eventType) {
        Map<String, Map<Long, OqsEngineEntity>> eventTypeMap = binlogRowDataMap.get(eventType);
        if (eventTypeMap != null) {
            eventTypeMap.entrySet().forEach(tableOqsEngine -> {
                Collection oqsEngineEntities = (Collection)oqsEngineEntityMap.get(tableOqsEngine.getKey());
                if (oqsEngineEntities != null) {
                    oqsEngineEntities.addAll(((Map)tableOqsEngine.getValue()).values());
                } else {
                    oqsEngineEntityMap.put((String)tableOqsEngine.getKey(), ((Map)tableOqsEngine.getValue()).values());
                }
            });
        }
    }
}

