/*
 * Decompiled with CFR 0.152.
 */
package com.xforceplus.business.messagebus.bus;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.Subscribe;
import com.xforceplus.business.company.dto.CompanyTaxwareDto;
import com.xforceplus.business.company.service.CompanyService;
import com.xforceplus.janus.message.sdk.MBClient;
import com.xforceplus.janus.message.sdk.ResponseMessage;
import com.xforceplus.janus.message.sdk.request.AckRequest;
import com.xforceplus.janus.message.sdk.response.AckResponse;
import com.xforceplus.janus.message.sdk.response.SubResponse;
import java.util.Collections;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;

@Component
@ConditionalOnExpression(value="${xforce.taxpayer.sync.enabled:false}")
public class TaxPayerSyncService
implements InitializingBean,
DisposableBean {
    private static final Logger log = LoggerFactory.getLogger(TaxPayerSyncService.class);
    @Autowired
    private CompanyService companyService;
    @Value(value="${remote.message.bus.server.url:}")
    private String url;
    @Value(value="${remote.message.bus.server.token:}")
    private String token;
    @Value(value="${xforce.taxpayer.topic.name:taxware_device_qualifications}")
    private String requestName;
    private MBClient client;
    @Autowired
    private ThreadPoolExecutor threadPoolExecutor;
    private AsyncEventBus asyncEventBus;
    private static final String NORMAL_TAX_PAYER = "NormalTaxPayer";
    private static final String SMALL_TAX_PAYER = "SmallTaxPayer";

    @Scheduled(fixedRate=60000L)
    public void pullTaxPayerMessage() {
        log.info("polling taxPayer message...");
        SubResponse subResponse = this.client.sub();
        if (subResponse.getSuccess().booleanValue()) {
            subResponse.getResponseMessages().forEach(message -> {
                if (this.requestName.equalsIgnoreCase(message.getPubCode())) {
                    log.info("get message id:{},content:{}", (Object)message.getId(), (Object)message.getContent());
                    this.asyncEventBus.post(message);
                }
            });
        } else {
            log.error("pull taxware message error:{}", (Object)subResponse.getError());
        }
    }

    public void destroy() throws Exception {
        this.threadPoolExecutor.shutdown();
    }

    public void afterPropertiesSet() throws Exception {
        this.asyncEventBus = new AsyncEventBus((Executor)this.threadPoolExecutor, (exception, context) -> log.error(exception.getMessage(), exception));
        this.asyncEventBus.register((Object)new AsyncEventProcessListener());
    }

    @PostConstruct
    public void initPubsubConf() {
        try {
            log.info("starting init MBClient...");
            this.client = MBClient.getInstance((String)this.url, (String)this.token);
        }
        catch (Exception e) {
            log.error("error while initializing pubsub config:{}", (Object)e.getMessage());
        }
    }

    public boolean handleMessage(ResponseMessage message) {
        if (message == null) {
            log.error("cannot handle null message");
            return true;
        }
        String content = message.getContent();
        if (ObjectUtils.isEmpty((Object)content)) {
            log.error("cannot handle null content, messageId {}", (Object)message.getId());
            return true;
        }
        CompanyTaxwareDto companyTaxwareDto = (CompanyTaxwareDto)JSONObject.parseObject((String)content, CompanyTaxwareDto.class);
        return this.companyService.handleTaxwareMessage(companyTaxwareDto);
    }

    private class AsyncEventProcessListener {
        private AsyncEventProcessListener() {
        }

        @Subscribe
        public void process(ResponseMessage responseMessage) {
            try {
                boolean result = TaxPayerSyncService.this.handleMessage(responseMessage);
                if (result) {
                    AckRequest ackRequest = new AckRequest(Collections.singletonList(responseMessage.getReceiptHandle()));
                    AckResponse ackResponse = TaxPayerSyncService.this.client.ack(ackRequest);
                    if (!ackResponse.getSuccess().booleanValue()) {
                        log.error("ack message error:[{}]", (Object)responseMessage.getId());
                    }
                } else {
                    log.error("failed to handle message:{}", (Object)JSON.toJSONString((Object)responseMessage));
                }
            }
            catch (Exception e) {
                log.error("error while handling message with messageId:{}, exception: {}", (Object)responseMessage.getId(), (Object)e.getMessage());
            }
        }
    }
}

