package com.mogic.migration.application.service.schedule;

import com.google.common.collect.ImmutableList;
import com.mogic.migration.domain.service.MigrationService;
import com.mogic.migration.infrastructure.service.mogicbase.ECIService;
import com.mogic.migration.infrastructure.service.redis.RedisService;
import com.mogic.migration.infrastructure.vo.redis.RedisKey;
import com.mogic.migration.infrastructure.vo.rocketmq.TopicOffset;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/mogic/migration/application/service/schedule/ScalingSchedule.class */
public class ScalingSchedule extends IJobHandler {
    private volatile int interval = 10;
    private final RedisService redisService;
    private final ECIService eciService;
    private final List<MigrationService<?, ?>> migrationServices;
    private static final Logger log = LoggerFactory.getLogger(ScalingSchedule.class);
    private static final RedisKey<String> PreReadOffsetKey = RedisKey.build("PreReadOffset", new String[]{"ScalingSchedule"});
    private static final RedisKey<String> CountKey = RedisKey.build("Count", new String[]{"ScalingSchedule"});

    public ScalingSchedule(RedisService redisService, ECIService eCIService, List<MigrationService<?, ?>> list) {
        this.redisService = redisService;
        this.eciService = eCIService;
        this.migrationServices = list;
    }

    @XxlJob("scaling_schedule")
    public void execute() {
        int eciGroupNumber;
        Optional.ofNullable(XxlJobHelper.getJobParam()).filter((v0) -> {
            return StringUtils.isNotBlank(v0);
        }).filter(NumberUtils::isCreatable).map(NumberUtils::createInteger).filter(num -> {
            return num.intValue() > 0;
        }).ifPresent(num2 -> {
            this.interval = num2.intValue();
        });
        try {
            eciGroupNumber = this.eciService.getEciGroupNumber();
            log.info("基础机器数: {}", Integer.valueOf(eciGroupNumber));
        } catch (Throwable th) {
            log.error("扩缩容调度器异常", th);
        }
        if (eciGroupNumber <= 0) {
            log.info("基础机器数小于等于0，拒绝扩缩容");
            return;
        }
        int andIncrCount = getAndIncrCount();
        log.info("检查次数: {}", Integer.valueOf(andIncrCount));
        BigDecimal needEciGroupNumber = getNeedEciGroupNumber(eciGroupNumber);
        log.info("基础机器数: {}, 需要机器数: {}", Integer.valueOf(eciGroupNumber), needEciGroupNumber);
        if (andIncrCount >= 3) {
            if (needEciGroupNumber.compareTo(BigDecimal.valueOf(eciGroupNumber)) > 0) {
                log.info("执行扩容，需要机器数: {}", needEciGroupNumber);
                this.eciService.modifyEciGroupNumber(needEciGroupNumber.intValue());
            } else if (needEciGroupNumber.compareTo(BigDecimal.valueOf(eciGroupNumber)) < 0) {
                log.info("执行缩容，需要机器数: {}", needEciGroupNumber);
                this.eciService.modifyEciGroupNumber(needEciGroupNumber.intValue());
            } else {
                log.info("不需要扩缩容");
            }
            resetCount();
        } else {
            log.info("检查次数小于等于3，不执行扩缩容。Count: {}", Integer.valueOf(andIncrCount));
        }
        XxlJobHelper.handleSuccess();
    }

    private BigDecimal getNeedEciGroupNumber(int i) {
        if (i <= 0) {
            log.info("基础机器数小于等于0，拒绝计算需要的机器数");
            return BigDecimal.valueOf(i);
        }
        try {
            BigDecimal valueOf = BigDecimal.valueOf(this.interval);
            BigDecimal valueOf2 = BigDecimal.valueOf(i);
            BigDecimal valueOf3 = BigDecimal.valueOf(readOffsetDiff());
            BigDecimal valueOf4 = BigDecimal.valueOf(getUnreadOffsetNumber());
            log.info("间隔时间: {} 分钟, 基础机器数: {}, 消费偏移量差值: {}, 挤压消息数: {}", new Object[]{Integer.valueOf(this.interval), Integer.valueOf(i), valueOf3, valueOf4});
            if (valueOf4.compareTo(BigDecimal.ZERO) <= 0) {
                log.info("挤压消息数小于等于0，不需要扩容");
                BigDecimal bigDecimal = BigDecimal.ZERO;
                setPreReadOffset();
                return bigDecimal;
            }
            if (valueOf3.compareTo(BigDecimal.ZERO) <= 0) {
                log.info("消费偏移量差值小于等于0，但挤压消息数大于0，表示此时集群有可能都在处理大文件。所以此时消费偏移量差值等于机器数");
                valueOf3 = valueOf2;
            }
            log.info("间隔时间: {} 分钟, 基础机器数: {}, 消费偏移量差值: {}, 挤压消息数: {}", new Object[]{Integer.valueOf(this.interval), Integer.valueOf(i), valueOf3, valueOf4});
            BigDecimal divide = valueOf3.divide(valueOf, 2, RoundingMode.CEILING).divide(valueOf2, 2, RoundingMode.CEILING);
            log.info("单机每分钟可处理的消息数: {}", divide);
            BigDecimal multiply = divide.multiply(valueOf);
            log.info("间隔时间单机可处理的消息数: {}", multiply);
            BigDecimal divide2 = valueOf4.divide(multiply, 2, RoundingMode.CEILING);
            log.info("间隔时间内处理完挤压消息需要的机器数: {}", divide2);
            setPreReadOffset();
            return divide2;
        } catch (Throwable th) {
            setPreReadOffset();
            throw th;
        }
    }

    private long getUnreadOffsetNumber() {
        long unreadOffset = getTotalOffset().getUnreadOffset();
        log.info("挤压消息数: {}", Long.valueOf(unreadOffset));
        return unreadOffset;
    }

    private long readOffsetDiff() {
        String ifAbsent = this.redisService.getIfAbsent(PreReadOffsetKey, () -> {
            long readOffset = getTotalOffset().getReadOffset();
            log.info("第一次检查，设置前一次已消费的偏移量: {}", Long.valueOf(readOffset));
            return String.valueOf(readOffset);
        }, Duration.ofDays(1L));
        log.info("前一次已消费的偏移量（redis中的值）: {}", ifAbsent);
        long longValue = ((Long) Optional.ofNullable(ifAbsent).filter((v0) -> {
            return StringUtils.isNotEmpty(v0);
        }).filter(NumberUtils::isCreatable).map(NumberUtils::createLong).orElse(0L)).longValue();
        log.info("前一次已消费的偏移量: {}", Long.valueOf(longValue));
        long readOffset = getTotalOffset().getReadOffset();
        log.info("当前已消费的偏移量: {}", Long.valueOf(readOffset));
        long j = readOffset - longValue;
        log.info("偏移量差值: {}", Long.valueOf(j));
        return j;
    }

    private void setPreReadOffset() {
        long readOffset = getTotalOffset().getReadOffset();
        log.info("设置前一次已消费的偏移量: {}", Long.valueOf(readOffset));
        this.redisService.set(PreReadOffsetKey, String.valueOf(readOffset), Duration.ofDays(1L));
    }

    private TopicOffset getTotalOffset() {
        log.info("开始获取系统中所有topic的偏移量");
        TopicOffset topicOffset = (TopicOffset) this.migrationServices.stream().map(migrationService -> {
            TopicOffset topicOffset2 = migrationService.getTopicOffset();
            log.info("topic: {}, readOffset: {}, unreadOffset: {}", new Object[]{topicOffset2.getTopic(), Long.valueOf(topicOffset2.getReadOffset()), Long.valueOf(topicOffset2.getUnreadOffset())});
            return topicOffset2;
        }).reduce((topicOffset2, topicOffset3) -> {
            return topicOffset2.toBuilder().topic(topicOffset2.getTopic().concat(",").concat(topicOffset3.getTopic())).queueOffsets(ImmutableList.builder().addAll(topicOffset2.getQueueOffsets()).addAll(topicOffset3.getQueueOffsets()).build()).build();
        }).orElse(TopicOffset.builder().build());
        log.info("系统中所有topic的偏移量，topics: {}, readOffset: {}, unreadOffset: {}", new Object[]{topicOffset.getTopic(), Long.valueOf(topicOffset.getReadOffset()), Long.valueOf(topicOffset.getUnreadOffset())});
        return topicOffset;
    }

    private int getAndIncrCount() {
        this.redisService.incr(CountKey, () -> {
            return 0;
        }, Duration.ofDays(1L));
        return getCount();
    }

    private void resetCount() {
        log.info("重置计数");
        this.redisService.del(CountKey);
    }

    private int getCount() {
        return ((Integer) Optional.ofNullable(this.redisService.get(CountKey)).filter((v0) -> {
            return StringUtils.isNotEmpty(v0);
        }).filter(NumberUtils::isCreatable).map(NumberUtils::createInteger).orElse(0)).intValue();
    }
}
