package org.jetlinks.supports.event;

import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.ThreadLocalRandom;
import java.beans.ConstructorProperties;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.validation.constraints.NotNull;
import org.hswebframework.web.dict.EnumDict;
import org.jetlinks.core.Payload;
import org.jetlinks.core.codec.Codecs;
import org.jetlinks.core.codec.Decoder;
import org.jetlinks.core.codec.Encoder;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.event.TopicPayload;
import org.jetlinks.core.topic.Topic;
import org.jetlinks.supports.event.EventConnection;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/jetlinks/supports/event/BrokerEventBus.class */
public class BrokerEventBus implements EventBus {
    private final Topic<SubscriptionInfo> root = Topic.createRoot();
    private final Map<String, EventBroker> brokers = new ConcurrentHashMap(32);
    private final Map<String, EventConnection> connections = new ConcurrentHashMap(512);
    private Scheduler publishScheduler = Schedulers.immediate();
    private Logger log = LoggerFactory.getLogger(BrokerEventBus.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/jetlinks/supports/event/BrokerEventBus$SubscriptionInfo.class */
    public static class SubscriptionInfo implements Disposable {
        String subscriber;
        long features;
        FluxSink<TopicPayload> sink;
        boolean broker;
        Disposable.Composite disposable;
        EventBroker eventBroker;
        EventConnection eventConnection;
        long connectionFeatures;

        public String toString() {
            return isLocal() ? this.subscriber + "@local" : this.subscriber + "@" + this.eventBroker.getId() + ":" + this.eventConnection.getId();
        }

        public Subscription toSubscription(String str) {
            return Subscription.of(this.subscriber, new String[]{str}, (Subscription.Feature[]) EnumDict.getByMask(Subscription.Feature.class, this.features).toArray(new Subscription.Feature[0]));
        }

        public SubscriptionInfo connection(EventBroker eventBroker, EventConnection eventConnection) {
            this.eventConnection = eventConnection;
            this.eventBroker = eventBroker;
            this.connectionFeatures = EnumDict.toMask(eventConnection.features());
            return this;
        }

        public static SubscriptionInfo of(String str) {
            return of(str, 0L, null, false);
        }

        public static SubscriptionInfo of(Subscription subscription, FluxSink<TopicPayload> fluxSink, boolean z) {
            return of(subscription.getSubscriber(), EnumDict.toMask(subscription.getFeatures()), fluxSink, z);
        }

        public static SubscriptionInfo of(String str, long j, FluxSink<TopicPayload> fluxSink, boolean z) {
            return new SubscriptionInfo(str, j, fluxSink, z);
        }

        public SubscriptionInfo(String str, long j, FluxSink<TopicPayload> fluxSink, boolean z) {
            this.subscriber = str;
            this.features = j;
            this.sink = fluxSink;
            this.broker = z;
        }

        synchronized void onDispose(Disposable disposable) {
            if (this.disposable == null) {
                this.disposable = Disposables.composite(new Disposable[]{disposable});
            } else {
                this.disposable.add(disposable);
            }
        }

        public void dispose() {
            if (this.disposable != null) {
                this.disposable.dispose();
            }
        }

        boolean isLocal() {
            return !this.broker;
        }

        boolean hasFeature(Subscription.Feature feature) {
            return feature.in(this.features);
        }

        boolean hasConnectionFeature(EventConnection.Feature feature) {
            return feature.in(this.connectionFeatures);
        }

        @ConstructorProperties({"subscriber", "features", "sink", "broker", "disposable", "eventBroker", "eventConnection", "connectionFeatures"})
        private SubscriptionInfo(String str, long j, FluxSink<TopicPayload> fluxSink, boolean z, Disposable.Composite composite, EventBroker eventBroker, EventConnection eventConnection, long j2) {
            this.subscriber = str;
            this.features = j;
            this.sink = fluxSink;
            this.broker = z;
            this.disposable = composite;
            this.eventBroker = eventBroker;
            this.eventConnection = eventConnection;
            this.connectionFeatures = j2;
        }

        public static SubscriptionInfo of(String str, long j, FluxSink<TopicPayload> fluxSink, boolean z, Disposable.Composite composite, EventBroker eventBroker, EventConnection eventConnection, long j2) {
            return new SubscriptionInfo(str, j, fluxSink, z, composite, eventBroker, eventConnection, j2);
        }

        public String getSubscriber() {
            return this.subscriber;
        }

        public long getFeatures() {
            return this.features;
        }

        public FluxSink<TopicPayload> getSink() {
            return this.sink;
        }

        public Disposable.Composite getDisposable() {
            return this.disposable;
        }

        public EventBroker getEventBroker() {
            return this.eventBroker;
        }

        public EventConnection getEventConnection() {
            return this.eventConnection;
        }

        public long getConnectionFeatures() {
            return this.connectionFeatures;
        }

        public void setSubscriber(String str) {
            this.subscriber = str;
        }

        public void setFeatures(long j) {
            this.features = j;
        }

        public void setSink(FluxSink<TopicPayload> fluxSink) {
            this.sink = fluxSink;
        }

        public void setBroker(boolean z) {
            this.broker = z;
        }

        public void setDisposable(Disposable.Composite composite) {
            this.disposable = composite;
        }

        public void setEventBroker(EventBroker eventBroker) {
            this.eventBroker = eventBroker;
        }

        public void setEventConnection(EventConnection eventConnection) {
            this.eventConnection = eventConnection;
        }

        public void setConnectionFeatures(long j) {
            this.connectionFeatures = j;
        }

        public boolean isBroker() {
            return this.broker;
        }
    }

    public <T> Flux<T> subscribe(@NotNull Subscription subscription, @NotNull Decoder<T> decoder) {
        return subscribe(subscription).flatMap(topicPayload -> {
            try {
                try {
                    Mono justOrEmpty = Mono.justOrEmpty(topicPayload.decode(decoder, false));
                    ReferenceCountUtil.safeRelease(topicPayload);
                    return justOrEmpty;
                } catch (Throwable th) {
                    this.log.error("decode message [{}] error", topicPayload.getTopic(), th);
                    ReferenceCountUtil.safeRelease(topicPayload);
                    return Mono.empty();
                }
            } catch (Throwable th2) {
                ReferenceCountUtil.safeRelease(topicPayload);
                throw th2;
            }
        }).publishOn(this.publishScheduler);
    }

    public Flux<TopicPayload> subscribe(Subscription subscription) {
        return Flux.create(fluxSink -> {
            Disposable.Composite composite = Disposables.composite();
            String subscriber = subscription.getSubscriber();
            for (String str : subscription.getTopics()) {
                Topic append = this.root.append(str);
                SubscriptionInfo of = SubscriptionInfo.of(subscriber, EnumDict.toMask(subscription.getFeatures()), fluxSink, false);
                append.subscribe(new SubscriptionInfo[]{of});
                composite.add(() -> {
                    append.unsubscribe(new SubscriptionInfo[]{of});
                    of.dispose();
                });
            }
            fluxSink.onDispose(composite);
            if (subscription.hasFeature(Subscription.Feature.broker)) {
                doSubscribeBroker(subscription).doOnSuccess(r3 -> {
                    if (subscription.getDoOnSubscribe() != null) {
                        subscription.getDoOnSubscribe().run();
                    }
                }).subscribe();
                composite.add(() -> {
                    doUnsubscribeBroker(subscription).subscribe();
                });
            } else if (subscription.getDoOnSubscribe() != null) {
                subscription.getDoOnSubscribe().run();
            }
            this.log.debug("local subscriber [{}],features:{},topics: {}", new Object[]{subscriber, subscription.getFeatures(), subscription.getTopics()});
        });
    }

    public void addBroker(EventBroker eventBroker) {
        this.brokers.put(eventBroker.getId(), eventBroker);
        startBroker(eventBroker);
    }

    public void removeBroker(EventBroker eventBroker) {
        this.brokers.remove(eventBroker.getId());
    }

    public void removeBroker(String str) {
        this.brokers.remove(str);
    }

    public List<EventBroker> getBrokers() {
        return new ArrayList(this.brokers.values());
    }

    private Mono<Void> doSubscribeBroker(Subscription subscription) {
        return Flux.fromIterable(this.connections.values()).filter(eventConnection -> {
            return eventConnection.isProducer() && eventConnection.isAlive();
        }).cast(EventProducer.class).flatMap(eventProducer -> {
            return eventProducer.subscribe(subscription);
        }).then();
    }

    private Mono<Void> doUnsubscribeBroker(Subscription subscription) {
        return Flux.fromIterable(this.connections.values()).filter(eventConnection -> {
            return eventConnection.isProducer() && eventConnection.isAlive();
        }).cast(EventProducer.class).flatMap(eventProducer -> {
            return eventProducer.unsubscribe(subscription);
        }).then();
    }

    private void startBroker(EventBroker eventBroker) {
        eventBroker.accept().subscribe(eventConnection -> {
            String concat = eventBroker.getId().concat(":").concat(eventConnection.getId());
            EventConnection put = this.connections.put(concat, eventConnection);
            if (put == eventConnection) {
                return;
            }
            if (put != null) {
                put.dispose();
            }
            eventConnection.doOnDispose(() -> {
                this.connections.remove(concat);
            });
            eventConnection.asProducer().flatMap(eventProducer -> {
                return this.root.getAllSubscriber().doOnNext(topic -> {
                    for (SubscriptionInfo subscriptionInfo : topic.getSubscribers()) {
                        if (subscriptionInfo.isLocal() && subscriptionInfo.hasFeature(Subscription.Feature.broker)) {
                            eventProducer.subscribe(subscriptionInfo.toSubscription(topic.getTopic())).subscribe();
                        }
                    }
                }).then(Mono.just(eventProducer));
            }).flatMapMany((v0) -> {
                return v0.subscribe();
            }).flatMap(topicPayload -> {
                return doPublishFromBroker(topicPayload, subscriptionInfo -> {
                    if (subscriptionInfo.isLocal()) {
                        return subscriptionInfo.hasFeature(Subscription.Feature.broker);
                    }
                    if (subscriptionInfo.isBroker()) {
                        return subscriptionInfo.getEventBroker() == eventBroker ? subscriptionInfo.getEventConnection() == eventConnection ? subscriptionInfo.hasConnectionFeature(EventConnection.Feature.consumeSameConnection) : subscriptionInfo.hasConnectionFeature(EventConnection.Feature.consumeSameBroker) : subscriptionInfo.hasConnectionFeature(EventConnection.Feature.consumeAnotherBroker);
                    }
                    return false;
                });
            }, Integer.MAX_VALUE).onErrorContinue((th, obj) -> {
                this.log.error(th.getMessage(), th);
            }).subscribe();
            eventConnection.asConsumer().subscribe(eventConsumer -> {
                eventConsumer.handleSubscribe().doOnNext(subscription -> {
                    handleBrokerSubscription(subscription, SubscriptionInfo.of(subscription.getSubscriber(), EnumDict.toMask(subscription.getFeatures()), eventConsumer.sink(), true).connection(eventBroker, eventConnection), eventConnection);
                }).onErrorContinue((th2, obj2) -> {
                    this.log.error(th2.getMessage(), th2);
                }).subscribe();
                eventConsumer.handleUnSubscribe().doOnNext(subscription2 -> {
                    handleBrokerUnsubscription(subscription2, SubscriptionInfo.of(subscription2.getSubscriber()), eventConnection);
                }).onErrorContinue((th3, obj3) -> {
                    this.log.error(th3.getMessage(), th3);
                }).subscribe();
            });
        });
    }

    private void handleBrokerUnsubscription(Subscription subscription, SubscriptionInfo subscriptionInfo, EventConnection eventConnection) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("broker [{}] unsubscribe : {}", subscriptionInfo, subscription.getTopics());
        }
        for (String str : subscription.getTopics()) {
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            this.root.append(str).unsubscribe(subscriptionInfo2 -> {
                return subscriptionInfo2.getEventConnection() == eventConnection && subscriptionInfo2.getSubscriber().equals(subscriptionInfo.getSubscriber()) && atomicBoolean.compareAndSet(false, true);
            });
        }
    }

    private void subAnotherBroker(Subscription subscription, SubscriptionInfo subscriptionInfo, EventConnection eventConnection) {
        Subscription copy = subscription.hasFeature(Subscription.Feature.shared) ? subscription.copy(new Subscription.Feature[]{Subscription.Feature.shared, Subscription.Feature.local}) : subscription.copy(new Subscription.Feature[]{Subscription.Feature.local});
        Flux.fromIterable(this.connections.values()).filter(eventConnection2 -> {
            if (eventConnection2 == eventConnection) {
                return subscriptionInfo.hasConnectionFeature(EventConnection.Feature.consumeSameConnection);
            }
            if (eventConnection2.getBroker() == eventConnection.getBroker()) {
                return subscriptionInfo.hasConnectionFeature(EventConnection.Feature.consumeSameBroker);
            }
            return true;
        }).flatMap((v0) -> {
            return v0.asProducer();
        }).flatMap(eventProducer -> {
            return eventProducer.subscribe(copy);
        }).subscribe();
    }

    private void handleBrokerSubscription(Subscription subscription, SubscriptionInfo subscriptionInfo, EventConnection eventConnection) {
        if (this.log.isDebugEnabled()) {
            this.log.debug("broker [{}] subscribe : {}", subscriptionInfo, subscription.getTopics());
        }
        for (String str : subscription.getTopics()) {
            Topic append = this.root.append(str);
            append.subscribe(new SubscriptionInfo[]{subscriptionInfo});
            subscriptionInfo.onDispose(() -> {
                append.unsubscribe(new SubscriptionInfo[]{subscriptionInfo});
            });
        }
        if (subscription.hasFeature(Subscription.Feature.broker) && subscriptionInfo.hasConnectionFeature(EventConnection.Feature.consumeAnotherBroker)) {
            subAnotherBroker(subscription, subscriptionInfo, eventConnection);
        }
    }

    private boolean doPublish(String str, SubscriptionInfo subscriptionInfo, TopicPayload topicPayload) {
        try {
            if (subscriptionInfo.sink.isCancelled()) {
                return false;
            }
            topicPayload.retain();
            subscriptionInfo.sink.next(topicPayload);
            if (!this.log.isDebugEnabled()) {
                return true;
            }
            this.log.debug("publish [{}] to [{}] complete", str, subscriptionInfo);
            return true;
        } catch (Throwable th) {
            this.log.error("publish [{}] to [{}] event error", new Object[]{str, subscriptionInfo, th});
            ReferenceCountUtil.safeRelease(topicPayload);
            return false;
        }
    }

    private Mono<Long> doPublish(String str, Predicate<SubscriptionInfo> predicate, Function<Flux<SubscriptionInfo>, Mono<Long>> function) {
        return (Mono) this.root.findTopic(str).flatMapIterable((v0) -> {
            return v0.getSubscribers();
        }).filter(subscriptionInfo -> {
            if (!subscriptionInfo.isBroker() || subscriptionInfo.getEventConnection().isAlive()) {
                return predicate.test(subscriptionInfo);
            }
            subscriptionInfo.dispose();
            return false;
        }).groupBy((v0) -> {
            return v0.getSubscriber();
        }, Integer.MAX_VALUE).flatMap(groupedFlux -> {
            return groupedFlux.groupBy(subscriptionInfo2 -> {
                return Boolean.valueOf(subscriptionInfo2.hasFeature(Subscription.Feature.shared));
            }).flatMap(groupedFlux -> {
                return Boolean.TRUE.equals(groupedFlux.key()) ? selectSharedSubscription(groupedFlux) : groupedFlux;
            });
        }, Integer.MAX_VALUE).distinct((v0) -> {
            return v0.getSink();
        }).as(function);
    }

    private Flux<SubscriptionInfo> selectSharedSubscription(Flux<SubscriptionInfo> flux) {
        return flux.collectList().flatMapMany(list -> {
            return Flux.just(list.get(ThreadLocalRandom.current().nextInt(0, list.size())));
        });
    }

    private Mono<Long> doPublishFromBroker(TopicPayload topicPayload, Predicate<SubscriptionInfo> predicate) {
        return doPublish(topicPayload.getTopic(), predicate, flux -> {
            return flux.doOnNext(subscriptionInfo -> {
                try {
                    topicPayload.retain();
                    subscriptionInfo.sink.next(topicPayload);
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("broker publish [{}] to [{}] complete", topicPayload.getTopic(), subscriptionInfo);
                    }
                } catch (Throwable th) {
                    this.log.warn("broker publish [{}] to [{}] error", new Object[]{topicPayload.getTopic(), subscriptionInfo, th});
                }
            }).count();
        }).doFinally(signalType -> {
            ReferenceCountUtil.safeRelease(topicPayload);
        });
    }

    public <T> Mono<Long> publish(String str, Publisher<T> publisher) {
        return publish(str, obj -> {
            return Codecs.lookup(obj.getClass()).encode(obj);
        }, publisher);
    }

    public <T> Mono<Long> publish(String str, Encoder<T> encoder, Publisher<? extends T> publisher) {
        return publish(str, encoder, publisher, this.publishScheduler);
    }

    public <T> Mono<Long> publish(String str, Encoder<T> encoder, Publisher<? extends T> publisher, Scheduler scheduler) {
        return (Mono) doPublish(str, subscriptionInfo -> {
            return !subscriptionInfo.isLocal() || subscriptionInfo.hasFeature(Subscription.Feature.local);
        }, flux -> {
            Flux cache = Flux.from(publisher).map(obj -> {
                return TopicPayload.of(str, Payload.of(obj, encoder));
            }).cache();
            return flux.flatMap(subscriptionInfo2 -> {
                return cache.map(topicPayload -> {
                    return Boolean.valueOf(doPublish(str, subscriptionInfo2, topicPayload));
                }).count();
            }).count().flatMap(l -> {
                return l.longValue() > 0 ? cache.map(topicPayload -> {
                    ReferenceCountUtil.safeRelease(topicPayload);
                    return true;
                }).then().thenReturn(l) : Mono.just(l);
            });
        }).as(mono -> {
            return this.log.isTraceEnabled() ? mono.doOnNext(l -> {
                this.log.trace("topic [{}] has {} subscriber", str, l);
            }) : mono;
        });
    }

    public void setPublishScheduler(Scheduler scheduler) {
        this.publishScheduler = scheduler;
    }

    public void setLog(Logger logger) {
        this.log = logger;
    }
}
