package com.mogic.information.infrastructure.consumer.rocketmq;

import com.mogic.information.infrastructure.common.constant.CommonConstant;
import com.mogic.information.infrastructure.common.exception.ErrorException;
import com.mogic.information.infrastructure.vo.rocketmq.RocketProperties;
import com.mogic.information.infrastructure.vo.rocketmq.TopicOffset;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/mogic/information/infrastructure/consumer/rocketmq/RocketMQService.class */
public class RocketMQService {
    private static final Logger log = LoggerFactory.getLogger(RocketMQService.class);
    private final RocketProperties properties;
    private final String TOPIC_PREFIX = CommonConstant.APP_NAME;
    private final ConcurrentMap<String, DefaultMQPushConsumer> consumers = new ConcurrentHashMap();
    private final DefaultMQProducer producer = new DefaultMQProducer("mogic-information-producer");

    public RocketMQService(RocketProperties rocketProperties) {
        this.properties = rocketProperties;
        this.producer.setNamesrvAddr(rocketProperties.getEndpoints());
        try {
            this.producer.start();
        } catch (MQClientException e) {
            throw ErrorException.error("RocketMQ生产者启动失败，endpoints: " + rocketProperties.getEndpoints(), e);
        }
    }

    public void send(String str, String str2, String str3) {
        ErrorException.assertEq(StringUtils.isNotEmpty(str), "Topic不能为空");
        ErrorException.assertEq(StringUtils.isNotEmpty(str2), "Key不能为空");
        ErrorException.assertEq(StringUtils.isNotEmpty(str3), "Body不能为空");
        String finalTopic = finalTopic(str);
        try {
            log.info("发送RocketMQ消息, Topic: [{}], Key: [{}], Msg: [{}]", new Object[]{finalTopic, str2, str3});
            Message message = new Message(finalTopic, "", str2, str3.getBytes());
            this.producer.setDefaultTopicQueueNums(50);
            log.info("发送RocketMQ消息成功, Topic: [{}], Key: [{}], Msg: [{}], MsgId: [{}]", new Object[]{finalTopic, str2, str3, this.producer.send(message).getMsgId()});
        } catch (Throwable th) {
            throw ErrorException.error(String.format("发送RocketMQ消息失败, Topic: [%s], Key: [%s], Msg: [%s]", finalTopic, str2, str3), th);
        }
    }

    public void listener(String str, Predicate<String> predicate) {
        String finalTopic = finalTopic(str);
        try {
            DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("mogic-information-group");
            defaultMQPushConsumer.setNamesrvAddr(this.properties.getEndpoints());
            defaultMQPushConsumer.setConsumeThreadMin(5);
            defaultMQPushConsumer.setConsumeThreadMax(5);
            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(1);
            defaultMQPushConsumer.subscribe(finalTopic, "*");
            defaultMQPushConsumer.registerMessageListener((list, consumeConcurrentlyContext) -> {
                MessageExt messageExt = (MessageExt) list.stream().findFirst().orElse(null);
                if (Objects.isNull(messageExt)) {
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                String str2 = new String(messageExt.getBody());
                if (StringUtils.isEmpty(str2)) {
                    log.warn("接收到RocketMQ消息, Topic: [{}], Key: [{}], Msg: [{}]", new Object[]{messageExt.getTopic(), messageExt.getKeys(), "空消息"});
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                log.info("接收到RocketMQ消息, Topic: [{}], Key: [{}], Msg: [{}]", new Object[]{messageExt.getTopic(), messageExt.getKeys(), str2});
                return predicate.test(str2) ? ConsumeConcurrentlyStatus.CONSUME_SUCCESS : ConsumeConcurrentlyStatus.RECONSUME_LATER;
            });
            defaultMQPushConsumer.start();
            this.consumers.put(finalTopic, defaultMQPushConsumer);
        } catch (Throwable th) {
            throw ErrorException.error("创建RocketMQ消费者失败, Topic: [" + finalTopic + "]", th);
        }
    }

    public TopicOffset getOffset(String str) {
        try {
            String finalTopic = finalTopic(str);
            DefaultMQPushConsumer defaultMQPushConsumer = this.consumers.get(finalTopic);
            Set<MessageQueue> fetchSubscribeMessageQueues = defaultMQPushConsumer.fetchSubscribeMessageQueues(finalTopic);
            String str2 = (String) fetchSubscribeMessageQueues.stream().map((v0) -> {
                return v0.getQueueId();
            }).map((v0) -> {
                return String.valueOf(v0);
            }).collect(Collectors.joining(","));
            long j = 0;
            long j2 = 0;
            for (MessageQueue messageQueue : fetchSubscribeMessageQueues) {
                j += defaultMQPushConsumer.maxOffset(messageQueue);
                j2 += defaultMQPushConsumer.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE);
            }
            return TopicOffset.builder().queueIds(str2).maxOffset(j).readOffset(j2).build();
        } catch (Throwable th) {
            throw ErrorException.error("获取RocketMQ消费者偏移量失败, Topic: [" + str + "]", th);
        }
    }

    private String finalTopic(String str) {
        return "mogic-information-" + str;
    }
}
