package com.mogic.infra.application.service.xxljob;

import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.schedulerx.worker.domain.JobContext;
import com.alibaba.schedulerx.worker.processor.JavaProcessor;
import com.alibaba.schedulerx.worker.processor.ProcessResult;
import com.google.common.base.Stopwatch;
import com.mogic.infra.domain.entity.DelEnum;
import com.mogic.infra.domain.entity.MessageQueueCheckStatusEnum;
import com.mogic.infra.domain.entity.MessageQueueKindEnum;
import com.mogic.infra.domain.entity.MessageQueueStatusEnum;
import com.mogic.infra.domain.entity.query.MessageQueueRecordQuery;
import com.mogic.infra.domain.repository.IMessageQueueRecordRepository;
import com.mogic.infra.infrastructure.common.thread.BizThreadPool;
import com.mogic.infra.infrastructure.vo.common.MessageQueueProperties;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/mogic/infra/application/service/xxljob/MessageQueueCheckJob.class */
public class MessageQueueCheckJob extends JavaProcessor {
    private static final Logger log = LoggerFactory.getLogger(MessageQueueCheckJob.class);

    @Resource
    private IMessageQueueRecordRepository messageQueueRecordRepository;

    @Resource
    private MessageQueueProperties messageQueueProperties;

    public ProcessResult process(JobContext jobContext) throws Exception {
        log.info("MessageQueueCheckJob收到任务的触发通知...");
        Stopwatch createStarted = Stopwatch.createStarted();
        MessageQueueRecordQuery messageQueueRecordQuery = new MessageQueueRecordQuery();
        messageQueueRecordQuery.setDelStatus(Integer.valueOf(DelEnum.NORMAL.getCode()));
        Calendar calendar = Calendar.getInstance();
        Date time = calendar.getTime();
        calendar.add(11, -1);
        Date time2 = calendar.getTime();
        messageQueueRecordQuery.setStatus(MessageQueueStatusEnum.SEND.getCode());
        messageQueueRecordQuery.setKind(MessageQueueKindEnum.PRODUCER.getCode());
        messageQueueRecordQuery.setCheckStatus(MessageQueueCheckStatusEnum.CHECKED.getStatus());
        messageQueueRecordQuery.setStartTime(time2);
        messageQueueRecordQuery.setEndTime(time);
        messageQueueRecordQuery.setEchoTimeout(time);
        long countRecordByExample = this.messageQueueRecordRepository.countRecordByExample(messageQueueRecordQuery);
        if (countRecordByExample <= 0) {
            new ProcessResult(true);
        }
        int i = (int) ((countRecordByExample / 500) + (countRecordByExample % ((long) 500) == 0 ? 0 : 1));
        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(BizThreadPool.BRUSH_MESSAGE_QUEUE_CHECK_TASK_EVENT);
        for (int i2 = 0; i2 < i; i2++) {
            int i3 = i2 + 1;
            messageQueueRecordQuery.setPageIndex(i3);
            messageQueueRecordQuery.setPageSize(500);
            List resultObj = this.messageQueueRecordRepository.queryPageRecord(messageQueueRecordQuery).getResultObj();
            if (!CollectionUtil.isEmpty(resultObj)) {
                log.info("总共是totalNum={}条 总页数是totalPage={} 每页大小大小为={} 正在处理第currentPage={}页 ", new Object[]{Long.valueOf(countRecordByExample), Integer.valueOf(i), 500, Integer.valueOf(i3)});
                CountDownLatch countDownLatch = new CountDownLatch(resultObj.size());
                resultObj.forEach(messageQueueRecordModel -> {
                    executorCompletionService.submit(new Callable<String>() { // from class: com.mogic.infra.application.service.xxljob.MessageQueueCheckJob.1
                        /* JADX WARN: Can't rename method to resolve collision */
                        @Override // java.util.concurrent.Callable
                        public String call() throws Exception {
                            try {
                                try {
                                    if (MessageQueueCheckJob.this.messageQueueProperties.isExclude(messageQueueRecordModel.getApplicationName(), messageQueueRecordModel.getTopic(), messageQueueRecordModel.getTags())) {
                                        return "";
                                    }
                                    if (MessageQueueCheckJob.this.messageQueueRecordRepository.countRecordByTraceAndKind(messageQueueRecordModel.getTraceId(), MessageQueueKindEnum.CONSUMER.getCode()) > 0) {
                                        countDownLatch.countDown();
                                        return "";
                                    }
                                    MessageQueueCheckJob.log.warn("消息超时未返回任务,appName={}、topic={}、tag={}、messageId={}", new Object[]{messageQueueRecordModel.getApplicationName(), messageQueueRecordModel.getTopic(), messageQueueRecordModel.getTags(), messageQueueRecordModel.getMessageId()});
                                    MessageQueueCheckJob.this.messageQueueRecordRepository.updateRecordCheckStatusById(messageQueueRecordModel.getId(), MessageQueueCheckStatusEnum.CHECKED.getStatus());
                                    countDownLatch.countDown();
                                    return "";
                                } catch (Exception e) {
                                    MessageQueueCheckJob.log.info("定时检查消息超时未返回任务出错了不予执行messageQueueRecordModel={}", messageQueueRecordModel, e);
                                    countDownLatch.countDown();
                                    return "";
                                }
                            } finally {
                                countDownLatch.countDown();
                            }
                        }
                    });
                });
                try {
                    countDownLatch.await();
                    TimeUnit.SECONDS.sleep(2L);
                } catch (Exception e) {
                    log.info("MessageQueueCheckJob在countDownLatch等待时出错了", e);
                }
            }
        }
        log.info("定时检查消息超时未返回任务执行结束了...耗时时间为stopwatch={}", createStarted.stop());
        return new ProcessResult(true);
    }
}
