package org.jetlinks.rule.engine.cluster.worker;

import java.util.List;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.rule.engine.api.RuleConstants;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.scheduler.ScheduleJob;
import org.jetlinks.rule.engine.api.task.ConditionEvaluator;
import org.jetlinks.rule.engine.api.task.Output;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/rule/engine/cluster/worker/QueueOutput.class */
public class QueueOutput implements Output {
    private static final Logger log = LoggerFactory.getLogger(QueueOutput.class);
    private final String instanceId;
    private final ClusterManager clusterManager;
    private final List<ScheduleJob.Output> outputs;
    private final ConditionEvaluator evaluator;

    public Mono<Boolean> write(Publisher<RuleData> publisher) {
        return Flux.from(publisher).flatMap(ruleData -> {
            return Flux.fromIterable(this.outputs).filterWhen(output -> {
                return Mono.fromCallable(() -> {
                    return Boolean.valueOf(this.evaluator.evaluate(output.getCondition(), ruleData));
                }).onErrorResume(th -> {
                    log.warn(th.getMessage(), th);
                    return Mono.just(false);
                });
            }).flatMap(output2 -> {
                return this.clusterManager.getQueue(createTopic(output2.getOutput())).add(Mono.just(ruleData));
            });
        }).then(Mono.just(true));
    }

    public Mono<Void> write(String str, Publisher<RuleData> publisher) {
        return this.clusterManager.getQueue(createTopic(str)).add(publisher).then();
    }

    private String createTopic(String str) {
        return RuleConstants.Topics.input(this.instanceId, str);
    }

    public QueueOutput(String str, ClusterManager clusterManager, List<ScheduleJob.Output> list, ConditionEvaluator conditionEvaluator) {
        this.instanceId = str;
        this.clusterManager = clusterManager;
        this.outputs = list;
        this.evaluator = conditionEvaluator;
    }
}
