package com.mogic.migration.domain.service;

import com.mogic.migration.domain.entity.migration.MigrationRecord;
import com.mogic.migration.domain.entity.migration.MigrationStatusEnum;
import com.mogic.migration.domain.repository.IMigrationRecordRepo;
import com.mogic.migration.domain.vo.migration.MigrateInfo;
import com.mogic.migration.domain.vo.migration.MigrationProgress;
import com.mogic.migration.domain.vo.migration.MigrationStatusChange;
import com.mogic.migration.infrastructure.common.GsonUtils;
import com.mogic.migration.infrastructure.common.exception.ErrorException;
import com.mogic.migration.infrastructure.common.exception.WarnException;
import com.mogic.migration.infrastructure.service.redis.RedisService;
import com.mogic.migration.infrastructure.service.rocketmq.RocketMQService;
import com.mogic.migration.infrastructure.vo.CloseException;
import com.mogic.migration.infrastructure.vo.redis.RedisKey;
import com.mogic.migration.infrastructure.vo.rocketmq.TopicOffset;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/mogic/migration/domain/service/MigrationService.class */
public abstract class MigrationService<I extends MigrateInfo, R extends MigrationRecord> {
    private static final Logger log = LoggerFactory.getLogger(MigrationService.class);
    private static final String MIGRATE_STATUS_TOPIC = "migrate_status_topic";
    private final Function<Long, RedisKey<Long>> MIGRATE_PROGRESS_KEY = l -> {
        return RedisKey.build(l, new String[]{"progress", "migrate"});
    };
    private final Function<Long, RedisKey<Long>> MIGRATE_STOP_SING_KEY = l -> {
        return RedisKey.build(l, new String[]{"stop", "migrate"});
    };
    private final IMigrationRecordRepo<?> migrationRecordRepo;
    protected final RedisService redisService;
    protected final RocketMQService rocketService;

    /* JADX INFO: Access modifiers changed from: protected */
    public MigrationService(IMigrationRecordRepo<?> iMigrationRecordRepo, RedisService redisService, RocketMQService rocketMQService) {
        this.migrationRecordRepo = iMigrationRecordRepo;
        this.redisService = redisService;
        this.rocketService = rocketMQService;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public IMigrationRecordRepo<?> getMigrationRecordRepo() {
        return this.migrationRecordRepo;
    }

    public void listener() {
        log.info("开始监听转存任务，Topic: {}", getMigrateTopic());
        this.rocketService.listener(getMigrateTopic(), getConsumerGroup(), "*", str -> {
            Long l = (Long) Optional.ofNullable(str).filter((v0) -> {
                return StringUtils.isNotEmpty(v0);
            }).filter(NumberUtils::isCreatable).map(NumberUtils::createLong).orElse(0L);
            if (l.longValue() <= 0) {
                log.warn("转存记录ID为空");
                return true;
            }
            try {
                log.info("开始转存记录ID，Topic: {} Message: {}", getMigrateTopic(), str);
                handle(l.longValue(), true);
                log.info("完成转存记录ID，Topic: {} Message: {}", getMigrateTopic(), str);
                return true;
            } catch (Throwable th) {
                log.info("记录转存失败，Topic: {}，MigrationId: {}，Message: {}", new Object[]{getMigrateTopic(), l, str, th});
                return false;
            }
        });
    }

    public void handle(long j, boolean z) {
        RedisKey build = RedisKey.build(Long.valueOf(j), new String[]{"handle", "migrate"});
        try {
            if (!this.redisService.lock(build, Duration.ofMinutes(10L), 1)) {
                log.info("转存中禁止重复转存，跳过当前转存请求. migrationId: [" + j + "]");
                return;
            }
            try {
                R migrateRecord = getMigrateRecord(j);
                if (Objects.isNull(migrateRecord)) {
                    log.warn("转存记录不存在. migrationId: [" + j + "]");
                    log.info("释放转存锁. migrationId: [{}]", Long.valueOf(j));
                    try {
                        this.redisService.unlock(build);
                        return;
                    } catch (Throwable th) {
                        log.error("释放转存锁失败. migrationId: [{}]", Long.valueOf(j), th);
                        return;
                    }
                }
                if (Objects.equals(MigrationStatusEnum.Success, migrateRecord.getStatus())) {
                    log.warn("转存记录已转存完成, 无需重复转存. migrationId: [" + j + "]");
                    log.info("释放转存锁. migrationId: [{}]", Long.valueOf(j));
                    try {
                        this.redisService.unlock(build);
                        return;
                    } catch (Throwable th2) {
                        log.error("释放转存锁失败. migrationId: [{}]", Long.valueOf(j), th2);
                        return;
                    }
                }
                ing(j);
                log.info("开始转存. migrationId: [{}]", Long.valueOf(j));
                success(migrating(migrateRecord), z);
                log.info("转存完成. migrationId: [{}]", Long.valueOf(j));
                log.info("释放转存锁. migrationId: [{}]", Long.valueOf(j));
                try {
                    this.redisService.unlock(build);
                } catch (Throwable th3) {
                    log.error("释放转存锁失败. migrationId: [{}]", Long.valueOf(j), th3);
                }
            } catch (CloseException e) {
                log.warn("文件传输过程终止. ErrMsg: [{}]", e.getMessage(), e);
                stop(j, z);
                log.info("释放转存锁. migrationId: [{}]", Long.valueOf(j));
                try {
                    this.redisService.unlock(build);
                } catch (Throwable th4) {
                    log.error("释放转存锁失败. migrationId: [{}]", Long.valueOf(j), th4);
                }
            } catch (Throwable th5) {
                log.info("文件转存失败. migrationId: [{}]", Long.valueOf(j), th5);
                try {
                    fail(j, z);
                } catch (Throwable th6) {
                    log.warn("更改转存状态失败. migrationId: [{}]", Long.valueOf(j), th6);
                }
                throw ErrorException.error(String.format("文件转存失败. migrationId: [%d] ErrMsg: [%s]", Long.valueOf(j), th5.getMessage()), th5);
            }
        } catch (Throwable th7) {
            log.info("释放转存锁. migrationId: [{}]", Long.valueOf(j));
            try {
                this.redisService.unlock(build);
            } catch (Throwable th8) {
                log.error("释放转存锁失败. migrationId: [{}]", Long.valueOf(j), th8);
            }
            throw th7;
        }
    }

    public void delProgress(long j) {
        this.redisService.del(this.MIGRATE_PROGRESS_KEY.apply(Long.valueOf(j)));
    }

    public void initProgress(long j) {
        this.redisService.set(this.MIGRATE_PROGRESS_KEY.apply(Long.valueOf(j)), BigDecimal.ZERO, Duration.ofHours(1L));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void decrProgress(long j, int i) {
        incrProgress(j, -i);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void incrProgress(long j, int i) {
        this.redisService.add(this.MIGRATE_PROGRESS_KEY.apply(Long.valueOf(j)), () -> {
            return BigDecimal.valueOf(i);
        }, BigDecimal.valueOf(i), Duration.ofHours(1L));
    }

    protected abstract IMigrationRecordRepo.ToSuccessParam migrating(R r);

    public abstract List<R> migrate(List<I> list);

    public void remigrate(List<Long> list) {
        WarnException.assertEq(CollectionUtils.isNotEmpty(list), "缺少有效的转存Id");
        Iterator<?> it = this.migrationRecordRepo.selectByMigrationIds(list).iterator();
        while (it.hasNext()) {
            MigrationRecord migrationRecord = (MigrationRecord) it.next();
            if (Objects.equals(MigrationStatusEnum.Wait, migrationRecord.getStatus()) || Objects.equals(MigrationStatusEnum.Fail, migrationRecord.getStatus())) {
                log.info("转存Id: [{}] 转存状态为: [{}], 重入队列", Long.valueOf(migrationRecord.getMigrationId()), migrationRecord.getStatus());
                this.migrationRecordRepo.toWait(migrationRecord.getMigrationId());
            } else {
                log.warn("转存Id: [{}] 转存状态为: [{}], 不支持重入队列", Long.valueOf(migrationRecord.getMigrationId()), migrationRecord.getStatus());
            }
        }
        send(list);
    }

    public List<MigrationProgress> progress(List<Long> list) {
        WarnException.assertEq(CollectionUtils.isNotEmpty(list), "缺少有效的转存Id");
        return (List) this.migrationRecordRepo.selectByMigrationIds(list).stream().map(migrationRecord -> {
            int i = 0;
            if (Objects.equals(MigrationStatusEnum.Ing, migrationRecord.getStatus())) {
                BigDecimal bigDecimal = (BigDecimal) Optional.ofNullable(this.redisService.get(this.MIGRATE_PROGRESS_KEY.apply(Long.valueOf(migrationRecord.getMigrationId())))).map((v0) -> {
                    return String.valueOf(v0);
                }).filter(NumberUtils::isCreatable).map(BigDecimal::new).filter(bigDecimal2 -> {
                    return BigDecimal.ZERO.compareTo(bigDecimal2) <= 0;
                }).orElse(BigDecimal.ZERO);
                BigDecimal valueOf = BigDecimal.valueOf(migrationRecord.getSize());
                if (valueOf.compareTo(BigDecimal.ZERO) > 0) {
                    int intValue = bigDecimal.divide(valueOf, 2, RoundingMode.HALF_UP).multiply(BigDecimal.valueOf(100L)).setScale(0, RoundingMode.HALF_UP).intValue();
                    i = intValue >= 100 ? 99 : intValue;
                } else {
                    i = 99;
                }
            } else if (Objects.equals(MigrationStatusEnum.Success, migrationRecord.getStatus())) {
                i = 100;
            }
            return MigrationProgress.builder().migrationId(migrationRecord.getMigrationId()).md5(migrationRecord.getMd5()).rate(i).status(migrationRecord.getStatus()).fileSubType(migrationRecord.getFileSubType()).size(migrationRecord.getSize()).mimeType(migrationRecord.getMimeType()).cdnDestPath(migrationRecord.destPathWithCDN()).filename(migrationRecord.getFilename()).origPath(migrationRecord.getOrigPath()).destPath(migrationRecord.destPath()).build();
        }).collect(Collectors.toList());
    }

    protected abstract R getMigrateRecord(long j);

    /* JADX INFO: Access modifiers changed from: protected */
    public void send(List<Long> list) {
        log.info("推送转存记录，转存Id: [{}]", list.stream().map((v0) -> {
            return String.valueOf(v0);
        }).collect(Collectors.joining(",")));
        Iterator<Long> it = list.iterator();
        while (it.hasNext()) {
            this.rocketService.send(getMigrateTopic(), String.valueOf(it.next()));
        }
    }

    protected abstract String getMigrateTopic();

    protected abstract String getConsumerGroup();

    public TopicOffset getTopicOffset() {
        return this.rocketService.getOffset(getMigrateTopic());
    }

    public void close(List<Long> list) {
        Iterator<?> it = this.migrationRecordRepo.selectByMigrationIds(list).iterator();
        while (it.hasNext()) {
            MigrationRecord migrationRecord = (MigrationRecord) it.next();
            log.info("转存Id: [{}] 转存状态为: [{}], 发送停止信号", Long.valueOf(migrationRecord.getMigrationId()), migrationRecord.getStatus());
            this.redisService.set(this.MIGRATE_STOP_SING_KEY.apply(Long.valueOf(migrationRecord.getMigrationId())), Integer.valueOf(MigrationStatusEnum.Stop.getCode()), Duration.ofHours(1L));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkStop(long j) {
        CloseException.assertNotEq(Objects.equals(MigrationStatusEnum.Stop, (MigrationStatusEnum) Optional.ofNullable(this.redisService.get(this.MIGRATE_STOP_SING_KEY.apply(Long.valueOf(j)))).filter((v0) -> {
            return StringUtils.isNotEmpty(v0);
        }).filter(NumberUtils::isCreatable).map(NumberUtils::toInt).map((v0) -> {
            return MigrationStatusEnum.of(v0);
        }).orElse(null)), "文件转存过程终止, migrationId: [" + j + "]");
    }

    private void delStopSign(long j) {
        this.redisService.del(this.MIGRATE_STOP_SING_KEY.apply(Long.valueOf(j)));
    }

    public void success(IMigrationRecordRepo.ToSuccessParam toSuccessParam, boolean z) {
        log.info("转存文件完成. Param: [{}]", toSuccessParam);
        this.migrationRecordRepo.toSuccess(toSuccessParam);
        delStopSign(toSuccessParam.getMigrationId());
        notifyStatusChange(toSuccessParam.getMigrationId(), MigrationStatusEnum.Success, z);
    }

    public void fail(long j, boolean z) {
        log.info("转存文件失败. MigrationId: [{}]", Long.valueOf(j));
        this.migrationRecordRepo.toFail(j);
        delStopSign(j);
        notifyStatusChange(j, MigrationStatusEnum.Fail, z);
    }

    public void stop(long j, boolean z) {
        log.info("转存文件暂停. MigrationId: [{}]", Long.valueOf(j));
        this.migrationRecordRepo.toStop(j);
        delStopSign(j);
        notifyStatusChange(j, MigrationStatusEnum.Stop, z);
    }

    public void ing(long j) {
        log.info("转存文件中. MigrationId: [{}]", Long.valueOf(j));
        this.migrationRecordRepo.toIng(j);
        initProgress(j);
    }

    private void notifyStatusChange(long j, MigrationStatusEnum migrationStatusEnum, boolean z) {
        log.info("转存文件状态变更. MigrationId: [{}], Status: [{}]", Long.valueOf(j), migrationStatusEnum);
        R migrateRecord = getMigrateRecord(j);
        if (!z || !migrateRecord.isAsync()) {
            log.info("转存文件状态变更，无需推送变更消息. MigrationId: [{}], Status: [{}]，Notify: [{}]，Sync: [{}]", new Object[]{Long.valueOf(j), migrationStatusEnum, Boolean.valueOf(z), Boolean.valueOf(migrateRecord.isAsync())});
        } else {
            MigrationStatusChange build = MigrationStatusChange.builder().migrationId(migrateRecord.getMigrationId()).status(migrationStatusEnum.getCode()).destPath(migrateRecord.destPath()).destPathCDN(migrateRecord.destPathWithCDN()).md5(migrateRecord.getMd5()).size(migrateRecord.getSize()).channel(migrateRecord.getChannel()).build();
            this.rocketService.send(MIGRATE_STATUS_TOPIC, build.getChannel(), GsonUtils.gson.toJson(build));
        }
    }
}
