package org.jetlinks.rule.engine.cluster.scheduler;

import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;
import org.jetlinks.core.rpc.RpcServiceFactory;
import org.jetlinks.rule.engine.api.scheduler.ScheduleJob;
import org.jetlinks.rule.engine.api.scheduler.Scheduler;
import org.jetlinks.rule.engine.api.task.Task;
import org.jetlinks.rule.engine.api.worker.Worker;
import org.jetlinks.rule.engine.api.worker.WorkerSelector;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/rule/engine/cluster/scheduler/ClusterLocalScheduler.class */
public class ClusterLocalScheduler implements Scheduler {
    private final String id;
    static final WorkerSelector defaultSelector = (flux, scheduleJob) -> {
        return flux.take(1L);
    };
    private WorkerSelector workerSelector = defaultSelector;
    private final List<Disposable> disposables = new CopyOnWriteArrayList();
    private final Set<Worker> localWorkers = new ConcurrentSkipListSet(Comparator.comparing((v0) -> {
        return v0.getId();
    }));
    private final Map<String, Map<String, List<Task>>> localTasks = new ConcurrentHashMap();
    private final LocalSchedulerRpcService rpcService;

    public ClusterLocalScheduler(String str, RpcServiceFactory rpcServiceFactory) {
        this.id = str;
        this.rpcService = new LocalSchedulerRpcService(this, rpcServiceFactory);
    }

    public void cleanup() {
        this.disposables.forEach((v0) -> {
            v0.dispose();
        });
        this.disposables.clear();
        this.rpcService.shutdown();
    }

    public void addWorker(Worker worker) {
        this.localWorkers.add(wrapLocalWorker(worker));
    }

    private Worker wrapLocalWorker(Worker worker) {
        return worker;
    }

    public Flux<Worker> getWorkers() {
        return Flux.just(this.localWorkers).flatMapIterable(Function.identity());
    }

    public Mono<Worker> getWorker(String str) {
        return getWorkers().filter(worker -> {
            return worker.getId().equals(str);
        }).take(1L).singleOrEmpty();
    }

    public Flux<Task> schedule(ScheduleJob scheduleJob) {
        List<Task> tasks = getTasks(scheduleJob.getInstanceId(), scheduleJob.getNodeId());
        return tasks.isEmpty() ? createExecutor(scheduleJob) : Flux.fromIterable(tasks).flatMap(task -> {
            return task.setJob(scheduleJob).then(task.reload()).thenReturn(task);
        });
    }

    private Flux<Task> createExecutor(ScheduleJob scheduleJob) {
        return findWorker(scheduleJob.getExecutor(), scheduleJob).switchIfEmpty(Mono.error(() -> {
            return new UnsupportedOperationException("unsupported executor:" + scheduleJob.getExecutor());
        })).flatMap(worker -> {
            return worker.createTask(this.id, scheduleJob);
        }).doOnNext(task -> {
            getTasks(scheduleJob.getInstanceId(), scheduleJob.getNodeId()).add(task);
        });
    }

    public Mono<Void> shutdown(String str) {
        return getSchedulingTask(str).flatMap((v0) -> {
            return v0.shutdown();
        }).then(Mono.fromRunnable(() -> {
            getTasks(str).clear();
        }));
    }

    public Flux<Task> getSchedulingTask(String str) {
        return Flux.fromIterable(getTasks(str).values()).flatMapIterable(Function.identity());
    }

    public Flux<Task> getSchedulingTasks() {
        return Flux.fromIterable(this.localTasks.values()).flatMapIterable((v0) -> {
            return v0.values();
        }).flatMapIterable(Function.identity());
    }

    public Mono<Long> totalTask() {
        return getSchedulingTasks().count();
    }

    public Mono<Boolean> canSchedule(ScheduleJob scheduleJob) {
        return findWorker(scheduleJob.getExecutor(), scheduleJob).hasElements();
    }

    protected Flux<Worker> findWorker(String str, ScheduleJob scheduleJob) {
        return this.workerSelector.select(Flux.fromIterable(this.localWorkers).filterWhen(worker -> {
            return worker.getSupportExecutors().map(list -> {
                return Boolean.valueOf(list.contains(str));
            }).defaultIfEmpty(false);
        }), scheduleJob);
    }

    private List<Task> getTasks(String str, String str2) {
        return getTasks(str).computeIfAbsent(str2, str3 -> {
            return new CopyOnWriteArrayList();
        });
    }

    private Map<String, List<Task>> getTasks(String str) {
        return this.localTasks.computeIfAbsent(str, str2 -> {
            return new ConcurrentHashMap();
        });
    }

    public String getId() {
        return this.id;
    }

    public void setWorkerSelector(WorkerSelector workerSelector) {
        this.workerSelector = workerSelector;
    }
}
