package com.xforceplus.business.messagebus.bus;

import com.alibaba.fastjson.JSON;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.Subscribe;
import com.xforceplus.business.company.dto.CompanyTaxwareDto;
import com.xforceplus.business.company.service.CompanyService;
import com.xforceplus.janus.message.sdk.MBClient;
import com.xforceplus.janus.message.sdk.ResponseMessage;
import com.xforceplus.janus.message.sdk.request.AckRequest;
import com.xforceplus.janus.message.sdk.response.SubResponse;
import java.util.Collections;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;

@Component
@ConditionalOnExpression("${xforce.taxpayer.sync.enabled:false}")
/* loaded from: input_file:com/xforceplus/business/messagebus/bus/TaxPayerSyncService.class */
public class TaxPayerSyncService implements InitializingBean {
    private static final Logger log = LoggerFactory.getLogger(TaxPayerSyncService.class);
    private final CompanyService companyService;

    @Value("${remote.message.bus.server.url:}")
    private String url;

    @Value("${remote.message.bus.server.token:}")
    private String token;

    @Value("${xforce.taxpayer.topic.name:taxware_device_qualifications}")
    private String requestName;
    private MBClient client;
    private final ThreadPoolExecutor threadPoolExecutor;
    private AsyncEventBus asyncEventBus;

    /* loaded from: input_file:com/xforceplus/business/messagebus/bus/TaxPayerSyncService$AsyncEventProcessListener.class */
    private class AsyncEventProcessListener {
        private AsyncEventProcessListener() {
        }

        @Subscribe
        public void process(ResponseMessage responseMessage) {
            try {
                if (TaxPayerSyncService.this.handleMessage(responseMessage)) {
                    if (!TaxPayerSyncService.this.client.ack(new AckRequest(Collections.singletonList(responseMessage.getReceiptHandle()))).getSuccess().booleanValue()) {
                        TaxPayerSyncService.log.error("ack message error:[{}]", responseMessage.getId());
                    }
                } else {
                    TaxPayerSyncService.log.info("failed to handle message:{}", JSON.toJSONString(responseMessage));
                }
            } catch (Exception e) {
                TaxPayerSyncService.log.error("error while handling message with messageId:{}, exception: {}", responseMessage.getId(), e.getMessage());
            }
        }
    }

    public TaxPayerSyncService(CompanyService companyService, @Qualifier("asyncThreadPoolExecutor") ThreadPoolExecutor threadPoolExecutor) {
        this.companyService = companyService;
        this.threadPoolExecutor = threadPoolExecutor;
    }

    @Scheduled(fixedRate = 60000)
    public void pullTaxPayerMessage() {
        log.info("polling taxPayer message...");
        SubResponse sub = this.client.sub();
        if (sub.getSuccess().booleanValue()) {
            sub.getResponseMessages().forEach(responseMessage -> {
                if (this.requestName.equalsIgnoreCase(responseMessage.getPubCode())) {
                    log.info("get message id:{},content:{}", responseMessage.getId(), responseMessage.getContent());
                    this.asyncEventBus.post(responseMessage);
                }
            });
        } else {
            log.error("pull taxware message error:{}", sub.getError());
        }
    }

    public void afterPropertiesSet() {
        this.asyncEventBus = new AsyncEventBus(this.threadPoolExecutor, (th, subscriberExceptionContext) -> {
            log.error(th.getMessage(), th);
        });
        this.asyncEventBus.register(new AsyncEventProcessListener());
        try {
            log.info("starting init MBClient...");
            this.client = MBClient.getInstance(this.url, this.token);
        } catch (Exception e) {
            log.error("error while initializing pubsub config:{}", e.getMessage());
        }
    }

    public boolean handleMessage(ResponseMessage responseMessage) {
        if (responseMessage == null) {
            log.error("cannot handle null message");
            return true;
        }
        String content = responseMessage.getContent();
        if (ObjectUtils.isEmpty(content)) {
            log.error("cannot handle null content, messageId {}", responseMessage.getId());
            return true;
        }
        return this.companyService.handleTaxwareMessage((CompanyTaxwareDto) JSON.parseObject(content, CompanyTaxwareDto.class));
    }
}
