package com.mogic.migration.infrastructure.service.rocketmq;

import com.mogic.migration.infrastructure.common.exception.ErrorException;
import com.mogic.migration.infrastructure.vo.rocketmq.RocketProperties;
import com.mogic.migration.infrastructure.vo.rocketmq.TopicOffset;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/mogic/migration/infrastructure/service/rocketmq/RocketMQService.class */
public class RocketMQService {
    private static final Logger log = LoggerFactory.getLogger(RocketMQService.class);
    private final RocketProperties properties;
    private final DefaultMQProducer producer;
    private final String TOPIC_PREFIX = "mogic-migration";
    private final ConcurrentMap<String, DefaultMQPushConsumer> consumers = new ConcurrentHashMap();

    /* loaded from: input_file:com/mogic/migration/infrastructure/service/rocketmq/RocketMQService$Listener.class */
    public static abstract class Listener implements MessageListenerConcurrently {
    }

    public RocketMQService(RocketProperties rocketProperties, DefaultMQProducer defaultMQProducer) {
        this.properties = rocketProperties;
        this.producer = defaultMQProducer;
    }

    public boolean send(String str, String str2) {
        return send(str, "", str2);
    }

    public boolean send(String str, String str2, String str3) {
        ErrorException.assertEq(StringUtils.isNotEmpty(str), "Topic不能为空");
        if (StringUtils.isEmpty(str3)) {
            return false;
        }
        String finalTopic = finalTopic(str);
        AtomicInteger atomicInteger = new AtomicInteger(2);
        do {
            try {
                log.info("发送RocketMQ消息, Topic: [{}]", finalTopic);
                this.producer.send(new Message(finalTopic, (String) Optional.ofNullable(str2).filter((v0) -> {
                    return StringUtils.isNotEmpty(v0);
                }).orElse(""), "", str3.getBytes()));
                return true;
            } catch (Throwable th) {
                log.error("发送RocketMQ消息失败, Topic: [{}], Msg: [{}], Retry: [{}]", new Object[]{finalTopic, str3, Integer.valueOf(atomicInteger.get()), th});
                LockSupport.parkNanos(Duration.ofSeconds(1L).toNanos());
            }
        } while (atomicInteger.incrementAndGet() > 0);
        return false;
    }

    public void listener(String str, String str2, String str3, final Predicate<String> predicate) {
        String finalTopic = finalTopic(str);
        String str4 = (String) Optional.ofNullable(str3).filter((v0) -> {
            return StringUtils.isNotEmpty(v0);
        }).orElse("*");
        try {
            DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(str2);
            defaultMQPushConsumer.setNamesrvAddr(this.properties.getEndpoints());
            defaultMQPushConsumer.setConsumeThreadMin(5);
            defaultMQPushConsumer.setConsumeThreadMax(5);
            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(1);
            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            defaultMQPushConsumer.setMaxReconsumeTimes(6);
            defaultMQPushConsumer.subscribe(finalTopic, str4);
            defaultMQPushConsumer.registerMessageListener(new Listener() { // from class: com.mogic.migration.infrastructure.service.rocketmq.RocketMQService.1
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    try {
                        MessageExt orElse = list.stream().findFirst().orElse(null);
                        if (Objects.isNull(orElse)) {
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
                        String str5 = new String(orElse.getBody());
                        if (StringUtils.isEmpty(str5)) {
                            RocketMQService.log.warn("接收到RocketMQ消息, Topic: [{}], Tags: [{}], Key: [{}], Msg: [{}]", new Object[]{orElse.getTopic(), orElse.getTags(), orElse.getKeys(), "空消息"});
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
                        RocketMQService.log.info("接收到RocketMQ消息, Topic: [{}], Tags: [{}], Key: [{}], Msg: [{}]", new Object[]{orElse.getTopic(), orElse.getTags(), orElse.getKeys(), str5});
                        return predicate.test(str5) ? ConsumeConcurrentlyStatus.CONSUME_SUCCESS : ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    } catch (Throwable th) {
                        RocketMQService.log.error("处理RocketMQ消息失败", th);
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
            });
            defaultMQPushConsumer.start();
            this.consumers.put(finalTopic, defaultMQPushConsumer);
        } catch (Throwable th) {
            throw ErrorException.error("创建RocketMQ消费者失败, Topic: [" + finalTopic + "]", th);
        }
    }

    public TopicOffset getOffset(String str) {
        try {
            String finalTopic = finalTopic(str);
            DefaultMQPushConsumer defaultMQPushConsumer = this.consumers.get(finalTopic);
            return TopicOffset.builder().topic(finalTopic).queueOffsets((List) defaultMQPushConsumer.fetchSubscribeMessageQueues(finalTopic).stream().map(messageQueue -> {
                try {
                    return TopicOffset.QueueOffset.builder().queueId(messageQueue.getQueueId()).maxOffset(defaultMQPushConsumer.maxOffset(messageQueue)).readOffset(defaultMQPushConsumer.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).build();
                } catch (Throwable th) {
                    throw ErrorException.error("获取RocketMQ消费者偏移量失败, Topic: [" + str + "], QueueId: [" + messageQueue.getQueueId() + "]", th);
                }
            }).collect(Collectors.toList())).build();
        } catch (Throwable th) {
            throw ErrorException.error("获取RocketMQ消费者偏移量失败, Topic: [" + str + "]", th);
        }
    }

    @Deprecated
    public void rebalance(String str) {
        try {
            this.consumers.get(finalTopic(str)).getDefaultMQPushConsumerImpl().getmQClientFactory().rebalanceImmediately();
        } catch (Throwable th) {
            throw ErrorException.error("RocketMQ消费者重新平衡失败, Topic: [" + str + "]", th);
        }
    }

    private String finalTopic(String str) {
        return "mogic-migration-" + str;
    }

    @PreDestroy
    public void shutdown() {
        log.warn("销毁RocketMQ Consumer...");
        for (DefaultMQPushConsumer defaultMQPushConsumer : this.consumers.values()) {
            defaultMQPushConsumer.setAwaitTerminationMillisWhenShutdown(Duration.ofMinutes(1L).toMillis());
            defaultMQPushConsumer.shutdown();
        }
        log.warn("销毁RocketMQ 完成...");
    }
}
