/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.ultraman.cdc.processor.impl;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.protobuf.ByteString;
import com.xforceplus.ultraman.cdc.adapter.CDCBeforeCallback;
import com.xforceplus.ultraman.cdc.adapter.EngineAdapterService;
import com.xforceplus.ultraman.cdc.context.ParserContext;
import com.xforceplus.ultraman.cdc.dto.ParseResult;
import com.xforceplus.ultraman.cdc.processor.DataProcessor;
import com.xforceplus.ultraman.cdc.processor.EventQueue;
import com.xforceplus.ultraman.cdc.utils.BinLogParseUtils;
import com.xforceplus.ultraman.extensions.cdc.status.StatusService;
import com.xforceplus.ultraman.metadata.cdc.OqsEngineEntity;
import com.xforceplus.ultraman.metadata.engine.EntityClassEngine;
import com.xforceplus.ultraman.metadata.entity.EntityClassRef;
import com.xforceplus.ultraman.metadata.entity.IEntityClass;
import com.xforceplus.ultraman.metadata.entity.IEntityField;
import com.xforceplus.ultraman.metadata.helper.OriginEntityUtils;
import com.xforceplus.ultraman.sdk.core.event.EntityCreated;
import com.xforceplus.ultraman.sdk.core.event.EntityDeleted;
import com.xforceplus.ultraman.sdk.core.event.EntityUpdated;
import com.xforceplus.ultraman.sdk.infra.base.cdc.SystemAttachment;
import com.xforceplus.ultraman.sdk.infra.event.EventPublisher;
import com.xforceplus.ultraman.sdk.infra.utils.JacksonDefaultMapper;
import com.xforceplus.ultraman.sdk.infra.utils.ThreadFactoryHelper;
import io.micrometer.core.annotation.Timed;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import io.vavr.Tuple;
import io.vavr.Tuple2;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

public class DefaultDataProcessor
implements DataProcessor {
    private static final Logger log = LoggerFactory.getLogger(DefaultDataProcessor.class);
    final Logger logger = LoggerFactory.getLogger(DefaultDataProcessor.class);
    @Resource
    private EntityClassEngine engine;
    @Autowired(required=false)
    private StatusService statusService;
    @Autowired(required=false)
    private List<CDCBeforeCallback> cdcBeforeCallbacks = new ArrayList<CDCBeforeCallback>();
    int handleMessageType;
    @Resource
    private EventPublisher publisher;
    @Resource
    private EngineAdapterService engineAdapterService;
    @Resource
    private EventQueue eventQueue;
    @Resource
    private ExecutorService eventThreadPool;
    @Resource
    private ExecutorService workThreadPool;
    private int chunkSize = 100;
    private boolean usingBatch;
    private ExecutorService threadPool;
    private Timer timer = Metrics.timer((String)"cdc-sync", (String[])new String[0]);
    private Timer cdcIndexParser = Timer.builder((String)"oqs.cdc.parse").tags(new String[]{"initiator", "cdc", "action", "parse", "type", "index"}).publishPercentiles(new double[]{0.5, 0.95, 0.99}).publishPercentileHistogram().register((MeterRegistry)Metrics.globalRegistry);
    private Timer cdcEventParser = Timer.builder((String)"oqs.cdc.parse").tags(new String[]{"initiator", "cdc", "action", "parse", "type", "event"}).publishPercentiles(new double[]{0.5, 0.95, 0.99}).publishPercentileHistogram().register((MeterRegistry)Metrics.globalRegistry);

    public DefaultDataProcessor(int handleMessageType, boolean usingBatch) {
        this.handleMessageType = handleMessageType;
        this.usingBatch = usingBatch;
        this.threadPool = ThreadFactoryHelper.buildThreadPool((int)10, (int)1000, (String)"parser", (boolean)false);
    }

    public ExecutorService getEventThreadPool() {
        return this.eventThreadPool;
    }

    public void setEventThreadPool(ExecutorService eventThreadPool) {
        this.eventThreadPool = eventThreadPool;
    }

    public void setEngine(EntityClassEngine engine) {
        this.engine = engine;
    }

    public void setStatusService(StatusService statusService) {
        this.statusService = statusService;
    }

    public void setCdcBeforeCallbacks(List<CDCBeforeCallback> cdcBeforeCallbacks) {
        this.cdcBeforeCallbacks = cdcBeforeCallbacks;
    }

    public void setPublisher(EventPublisher publisher) {
        this.publisher = publisher;
    }

    public void setEngineAdapterService(EngineAdapterService engineAdapterService) {
        this.engineAdapterService = engineAdapterService;
    }

    public void setEventQueue(EventQueue eventQueue) {
        this.eventQueue = eventQueue;
    }

    @Override
    @Timed(value="oqs.process.delay.latency", percentiles={0.5, 0.9, 0.99}, extraTags={"initiator", "cdc", "action", "consume"})
    public boolean onProcess(Message message) throws Exception {
        return this.messageProcess(message) > 0;
    }

    private int messageProcess(Message message) {
        try {
            if (message.getEntries().isEmpty()) {
                if (this.statusService != null) {
                    this.statusService.onEmpty();
                }
                return 0;
            }
            if (this.handleMessageType == 0 || this.handleMessageType == 2) {
                this.workThreadPool.submit(() -> this.handleMessageEvent(message));
            }
            if (this.handleMessageType == 1 || this.handleMessageType == 2) {
                AtomicInteger count = new AtomicInteger(0);
                this.timer.record(() -> count.set(this.handleWriteIndex(message)));
                return count.get();
            }
            return 1;
        }
        catch (Exception e) {
            this.logger.error("batchId : {}, consume message failed, message : {}", (Object)message.getId(), (Object)e.getMessage());
            throw e;
        }
    }

    private boolean handleMessageEvent(Message message) {
        ParserContext parserContext = new ParserContext(message.getId());
        this.cdcEventParser.record(() -> this.parseCanalEntries(message.getEntries(), parserContext, true));
        return true;
    }

    private int handleWriteIndex(Message message) {
        ParserContext parserContext = new ParserContext(message.getId());
        ParseResult result = (ParseResult)this.cdcIndexParser.record(() -> this.parseCanalEntries(message.getEntries(), parserContext, false));
        if (result != null) {
            result.mergeEntries();
            Map<Tuple2, List<Tuple2>> mapping = result.getFinishEntries().values().stream().flatMap(Collection::stream).collect(Collectors.groupingBy(x -> Tuple.of((Object)((OqsEngineEntity)x._2).getId(), (Object)((OqsEngineEntity)x._2).getFather())));
            ArrayList<OqsEngineEntity> finalResultSet = new ArrayList<OqsEngineEntity>();
            mapping.forEach((k, v) -> {
                if ((Long)k._2 == 0L) {
                    v.forEach(tuple -> finalResultSet.add((OqsEngineEntity)tuple._2));
                } else {
                    Optional<OqsEngineEntity> reduceOp = v.stream().map(x -> (OqsEngineEntity)x._2).reduce((target, entity) -> {
                        target.getAttributes().putAll(entity.getAttributes());
                        if (entity.isDeleted()) {
                            target.setDeleted(true);
                        }
                        if (entity.getFather() > 0L && entity.getFather() != entity.getEntityClassRef().getId()) {
                            target.setFather(entity.getFather());
                            target.setEntityClassRef(entity.getEntityClassRef());
                        }
                        if (entity.getUpdateTime() >= target.getUpdateTime()) {
                            target.setUpdateTime(entity.getUpdateTime());
                        }
                        return target;
                    });
                    finalResultSet.add(reduceOp.get());
                }
            });
            this.mutateEntries(finalResultSet);
            if (this.usingBatch) {
                LinkedHashMap<String, Map> batches = new LinkedHashMap<String, Map>();
                finalResultSet.forEach(v -> {
                    if (v != null) {
                        OqsEngineEntity targetEntity = v;
                        batches.computeIfAbsent(targetEntity.getTable(), k1 -> new LinkedHashMap()).put(targetEntity.getId(), targetEntity);
                    }
                });
                AtomicInteger atomicInteger = new AtomicInteger(0);
                batches.forEach((k, v) -> {
                    boolean r = this.engineAdapterService.batchUpsertOperation(v.values());
                    if (this.statusService != null) {
                        this.statusService.clearStatus(v.values());
                    }
                    atomicInteger.addAndGet(v.size());
                    if (!r) {
                        throw new RuntimeException("Adapter Service return False");
                    }
                });
                return atomicInteger.get();
            }
            ArrayList<OqsEngineEntity> list = new ArrayList<OqsEngineEntity>();
            finalResultSet.forEach(v -> {
                if (v != null) {
                    list.add((OqsEngineEntity)v);
                }
            });
            AtomicInteger atomicInteger = new AtomicInteger(0);
            boolean r = this.engineAdapterService.batchUpsertOperation(list);
            if (this.statusService != null) {
                this.statusService.clearStatus(list);
            }
            atomicInteger.addAndGet(list.size());
            if (!r) {
                throw new RuntimeException("Adapter Service return False");
            }
            return atomicInteger.get();
        }
        return 0;
    }

    public <T> List<List<T>> splitList(List<T> inputList, int chunkSize) {
        ArrayList<List<T>> resultList = new ArrayList<List<T>>();
        for (int i = 0; i < inputList.size(); i += chunkSize) {
            int endIndex = Math.min(i + chunkSize, inputList.size());
            resultList.add(new ArrayList<T>(inputList.subList(i, endIndex)));
        }
        return resultList;
    }

    private ParseResult parseCanalEntries(List<CanalEntry.Entry> entries, ParserContext parserContext, boolean handleEvent) {
        ParseResult parseResult = new ParseResult();
        ArrayList taskList = new ArrayList();
        List<List<CanalEntry.Entry>> tasks = this.splitList(entries, this.chunkSize);
        if (!handleEvent) {
            int level = 0;
            for (List<CanalEntry.Entry> task : tasks) {
                int finalLevel = ++level;
                Future<?> submit = this.threadPool.submit(() -> {
                    for (CanalEntry.Entry entry : task) {
                        switch (entry.getEntryType()) {
                            case TRANSACTIONBEGIN: 
                            case TRANSACTIONEND: {
                                break;
                            }
                            case ROWDATA: {
                                this.rowDataParse(finalLevel, entry, parserContext, parseResult, handleEvent);
                                break;
                            }
                        }
                    }
                });
                taskList.add(submit);
            }
            taskList.forEach(x -> {
                try {
                    x.get();
                }
                catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            });
        } else {
            for (CanalEntry.Entry entry : entries) {
                switch (entry.getEntryType()) {
                    case TRANSACTIONBEGIN: 
                    case TRANSACTIONEND: {
                        break;
                    }
                    case ROWDATA: {
                        this.rowDataParse(0, entry, parserContext, parseResult, handleEvent);
                        break;
                    }
                }
            }
        }
        return parseResult;
    }

    private void mutateEntries(List<OqsEngineEntity> finalList) {
        if (!finalList.isEmpty()) {
            Optional.ofNullable(this.cdcBeforeCallbacks).orElseGet(Collections::emptyList).stream().sorted(Comparator.comparingInt(CDCBeforeCallback::getOrder)).forEach(x -> {
                try {
                    x.mutate(finalList);
                }
                catch (Throwable throwable) {
                    this.logger.error("CDC callback ERROR name:{} , ex:{}", (Object)x.name(), (Object)throwable);
                }
            });
        }
    }

    private OqsEngineEntity cloneEntity(OqsEngineEntity src) {
        OqsEngineEntity target = new OqsEngineEntity();
        target.setEntityClassRef(src.getEntityClassRef());
        target.setId(src.getId());
        target.setVersion(src.getVersion());
        Map attributes = src.getAttributes();
        try {
            String attr = JacksonDefaultMapper.OBJECT_MAPPER.writeValueAsString((Object)attributes);
            Map newAttr = (Map)JacksonDefaultMapper.OBJECT_MAPPER.readValue(attr, Map.class);
            target.setAttributes(newAttr);
        }
        catch (Throwable throwable) {
            this.logger.error("{}", throwable);
        }
        target.setUpdateTime(src.getUpdateTime());
        target.setFather(src.getFather());
        target.setDeleted(src.isDeleted());
        return target;
    }

    private void rowDataParse(int level, CanalEntry.Entry entry, ParserContext parserContext, ParseResult parseResult, boolean handleEvent) {
        String tableName = entry.getHeader().getTableName();
        if (tableName.isEmpty()) {
            this.logger.warn("[CDCBatchProcess]: {}, table name could not be Null, [{}]", (Object)parserContext.getBatchId(), (Object)entry.getStoreValue());
            return;
        }
        Optional<IEntityClass> entityClassOp = this.foundEntityClassFromTableName(tableName);
        if (!entityClassOp.isPresent()) {
            this.logger.warn("[CDCBatchProcess]: {}, tableName : {}, Got unrelated table binlog.", (Object)parserContext.getBatchId(), (Object)tableName);
            return;
        }
        CanalEntry.RowChange rowChange = null;
        try {
            rowChange = CanalEntry.RowChange.parseFrom((ByteString)entry.getStoreValue());
        }
        catch (Exception e) {
            this.logger.error("batch : {}, parse entry value failed, [{}], [{}]", new Object[]{parserContext.getBatchId(), entry.getStoreValue(), e.getMessage()});
            return;
        }
        CanalEntry.EventType eventType = rowChange.getEventType();
        HashMap<String, CanalEntry.Column> beforeColumnsMap = new HashMap<String, CanalEntry.Column>();
        HashMap<String, CanalEntry.Column> afterColumnsMap = new HashMap<String, CanalEntry.Column>();
        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
            beforeColumnsMap.clear();
            rowData.getBeforeColumnsList().forEach(column -> beforeColumnsMap.put(column.getName(), (CanalEntry.Column)column));
            afterColumnsMap.clear();
            rowData.getAfterColumnsList().forEach(column -> afterColumnsMap.put(column.getName(), (CanalEntry.Column)column));
            OqsEngineEntity beforeEntity = null;
            OqsEngineEntity afterEntity = null;
            SystemAttachment systemAttachment = null;
            try {
                switch (eventType) {
                    case INSERT: {
                        afterEntity = this.oneRowParser(tableName, afterColumnsMap, false, entityClassOp.get(), parseResult);
                        if (handleEvent) {
                            systemAttachment = this.getAttachment(afterColumnsMap);
                        }
                        this.mergeEntityToResult(parseResult, afterEntity, level);
                        break;
                    }
                    case UPDATE: {
                        afterEntity = this.oneRowParser(tableName, afterColumnsMap, false, entityClassOp.get(), parseResult);
                        this.mergeEntityToResult(parseResult, afterEntity, level);
                        if (!handleEvent) break;
                        try {
                            beforeEntity = this.oneRowParser(tableName, beforeColumnsMap, false, entityClassOp.get(), parseResult);
                        }
                        catch (Exception e) {
                            this.logger.warn("beforeEntity in update is null, will be ignore, but it will influence event");
                        }
                        systemAttachment = this.getAttachment(afterColumnsMap);
                        break;
                    }
                    case DELETE: {
                        beforeEntity = this.oneRowParser(tableName, beforeColumnsMap, true, entityClassOp.get(), parseResult);
                        if (handleEvent) {
                            systemAttachment = this.getAttachment(beforeColumnsMap);
                        }
                        this.mergeEntityToResult(parseResult, beforeEntity, level);
                    }
                }
                if (!handleEvent) continue;
                OqsEngineEntity beforeClone = null;
                OqsEngineEntity afterClone = null;
                if (beforeEntity != null) {
                    beforeClone = this.cloneEntity(beforeEntity);
                }
                if (afterEntity != null) {
                    afterClone = this.cloneEntity(afterEntity);
                }
                if (systemAttachment == null) {
                    systemAttachment = new SystemAttachment();
                }
                this.eventHandler(eventType, systemAttachment, entityClassOp.get(), beforeClone, afterClone);
            }
            catch (Exception e) {
                this.logger.warn("parse entity error, message : {}", (Object)e.getMessage());
            }
        }
    }

    private SystemAttachment getAttachment(Map<String, CanalEntry.Column> columns) {
        String attrStr = BinLogParseUtils.getStringFromColumn(columns, "_sys_dynamic");
        SystemAttachment attachment = new SystemAttachment();
        try {
            if (attrStr.isEmpty()) {
                return attachment;
            }
            Map map = OriginEntityUtils.attributesToMap((String)attrStr);
            Object o = map.get("#a");
            if (o != null) {
                Object delUname;
                Object delUid;
                Map attachmentMap = (Map)o;
                Object grouped = attachmentMap.get("grouped");
                if (grouped != null) {
                    attachment.setGrouped(Integer.parseInt(grouped.toString()));
                }
                if ((delUid = attachmentMap.get("deluid")) != null) {
                    attachment.setDelUId(Long.parseLong(delUid.toString()));
                }
                if ((delUname = attachmentMap.get("deluname")) != null) {
                    attachment.setDelUname(delUname.toString());
                }
            }
            return attachment;
        }
        catch (Throwable throwable) {
            return attachment;
        }
    }

    private Optional<IEntityClass> foundEntityClassFromTableName(String tableName) {
        String[] tableSplit = tableName.split("_");
        if (tableSplit.length < 3) {
            return Optional.empty();
        }
        String code = tableSplit[2];
        String profile = null;
        if (tableSplit.length > 3) {
            profile = tableSplit[3];
        }
        return this.engine.loadByCode(code, profile);
    }

    private void mergeEntityToResult(ParseResult parseResult, OqsEngineEntity entity, int level) {
        String key = entity.getId() + "@@" + entity.getTable();
        parseResult.getFinishEntries().compute(key, (k, target) -> {
            if (target == null) {
                target = new ArrayList<Tuple2>();
            }
            target.add(Tuple.of((Object)level, (Object)entity));
            return target;
        });
    }

    private OqsEngineEntity oneRowParser(String tableName, Map<String, CanalEntry.Column> columns, boolean isDelete, IEntityClass tableEntityClass, ParseResult parseResult) throws SQLException, JsonProcessingException {
        Collection tableEntityClasses;
        long entityClassId = BinLogParseUtils.getLongFromColumn(columns, "_sys_entityclass");
        String profile = BinLogParseUtils.getStringFromColumn(columns, "_sys_profile");
        long id = BinLogParseUtils.getLongFromColumn(columns, "id");
        IEntityClass entityClass = tableEntityClass;
        if (entityClass.id() != entityClassId) {
            IEntityClass relationTableEntityClass = (IEntityClass)this.engine.load(String.valueOf(entityClassId), profile).get();
            tableEntityClasses = relationTableEntityClass.getEntityTables();
        } else {
            tableEntityClasses = entityClass.getEntityTables();
        }
        OqsEngineEntity.Builder builder = new OqsEngineEntity.Builder();
        builder.withTable(tableName);
        builder.withDeleted(isDelete);
        builder.withEntityClassRef(new EntityClassRef(entityClassId, "", "", profile));
        builder.withId(id);
        builder.withRelationTables(Long.valueOf(tableEntityClass.id()), tableEntityClass.code());
        builder.withTableEntityClass(tableEntityClasses);
        if (entityClassId != tableEntityClass.id()) {
            builder.withFather(tableEntityClass.id());
        } else {
            IEntityClass ec = tableEntityClass.extendEntityClass();
            if (null != ec) {
                builder.withFather(ec.id());
            } else {
                builder.withFather(0L);
            }
        }
        builder.withAttribute("id", (Object)id);
        HashMap iEntityFieldMap = new HashMap();
        this.engine.describe(entityClass, profile).getAllFields().stream().forEach(p -> iEntityFieldMap.put(p.name().replace(".", "_"), p));
        for (CanalEntry.Column column : columns.values()) {
            String columnName = column.getName();
            if (columnName.equals("_sys_operatetime")) {
                long updateTime = Long.parseLong(column.getValue());
                builder.withUpdateTime(updateTime);
                continue;
            }
            if (columnName.equals("_sys_ver")) {
                String columnValue = column.getValue();
                if (columnValue.isEmpty()) continue;
                int version = Integer.parseInt(columnValue);
                builder.withVersion(version);
                continue;
            }
            if (columnName.equals("id") || columnName.equals("_sys_entityclass") || columnName.equals("_sys_profile") || columnName.equals("_sys_deleted")) continue;
            if (columnName.equals("_sys_dynamic")) {
                builder.withAttributes(OriginEntityUtils.parserDynamic((IEntityClass)entityClass, (String)BinLogParseUtils.getStringFromColumn(columns, "_sys_dynamic"), (!StringUtils.isEmpty((String)tableEntityClass.realProfile()) ? 1 : 0) != 0));
                continue;
            }
            String name = column.getName();
            String value = column.getValue();
            if (column.getIsNull()) {
                builder.withAttribute(name, null);
                continue;
            }
            IEntityField field = (IEntityField)iEntityFieldMap.get(name);
            if (null == field) {
                this.logger.warn("entityField can not be null, column name {}", (Object)name);
                continue;
            }
            OriginEntityUtils.formatFiledValues((OqsEngineEntity.Builder)builder, (IEntityField)field, (String)name, (String)value);
        }
        if (parseResult.getStartId() == -1L) {
            parseResult.setStartId(id);
        }
        return builder.build();
    }

    private void eventHandler(CanalEntry.EventType eventType, SystemAttachment attachment, IEntityClass entityClass, OqsEngineEntity before, OqsEngineEntity after) {
        switch (eventType) {
            case INSERT: {
                this.publishCreatedEvent(entityClass, attachment, after);
                break;
            }
            case UPDATE: {
                if (attachment.getDelUname() != null && attachment.getDelUId() > 0L) break;
                this.publishUpdatedEvent(entityClass, attachment, before, after);
                break;
            }
            case DELETE: {
                this.publishDeletedEvent(entityClass, attachment, before);
                break;
            }
        }
    }

    private void fillAttachment(SystemAttachment systemAttachment, Map<String, Object> context) {
        context.put("attachment", systemAttachment);
    }

    private void publishDeletedEvent(IEntityClass entityClass, SystemAttachment attachment, OqsEngineEntity b) {
        this.eventQueue.feedDelete(attachment, b).thenAcceptOnce(x -> {
            try {
                if (this.publisher != null) {
                    HashMap<String, Object> context = new HashMap<String, Object>();
                    this.fillAttachment(attachment, context);
                    this.publisher.publishTransactionEvent((Object)new EntityDeleted(entityClass.code(), Long.valueOf(x.getId()), x.getAttributes(), false, context));
                }
            }
            catch (Throwable throwable) {
                this.logger.error("{}", throwable);
            }
        }, this.eventThreadPool);
    }

    private void publishUpdatedEvent(IEntityClass entityClass, SystemAttachment attachment, OqsEngineEntity b, OqsEngineEntity a) {
        this.eventQueue.feedUpdate(attachment, b, a).thenAcceptOnce(x -> {
            try {
                if (this.publisher != null) {
                    OqsEngineEntity before = (OqsEngineEntity)x._1;
                    OqsEngineEntity after = (OqsEngineEntity)x._2;
                    HashMap<String, Object> context = new HashMap<String, Object>();
                    this.fillAttachment(attachment, context);
                    EntityUpdated entityUpdated = new EntityUpdated(entityClass.code(), Long.valueOf(before.getId()), before.getAttributes(), after.getAttributes(), false, context);
                    int version = after.getVersion();
                    entityUpdated.setVer(version);
                    this.publisher.publishTransactionEvent((Object)entityUpdated);
                }
            }
            catch (Throwable throwable) {
                this.logger.error("{}", throwable);
            }
        }, this.eventThreadPool);
    }

    private void publishCreatedEvent(IEntityClass entityClass, SystemAttachment attachment, OqsEngineEntity after) {
        this.eventQueue.feedCreate(attachment, after).thenAcceptOnce(x -> {
            try {
                if (this.publisher != null) {
                    HashMap<String, Object> context = new HashMap<String, Object>();
                    this.fillAttachment(attachment, context);
                    this.publisher.publishTransactionEvent((Object)new EntityCreated(entityClass.code(), Long.valueOf(x.getId()), x.getAttributes(), false, context));
                }
            }
            catch (Throwable throwable) {
                this.logger.error("{}", throwable);
            }
        }, this.eventThreadPool);
    }
}

