package org.jetlinks.supports.cluster.event;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.jetlinks.core.Payload;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.core.cluster.ServerNode;
import org.jetlinks.core.codec.Codec;
import org.jetlinks.core.codec.Codecs;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.event.TopicPayload;
import org.jetlinks.supports.event.EventBroker;
import org.jetlinks.supports.event.EventConnection;
import org.jetlinks.supports.event.EventConsumer;
import org.jetlinks.supports.event.EventProducer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.RedisSerializer;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/supports/cluster/event/AbstractClusterEventBroker.class */
public abstract class AbstractClusterEventBroker implements EventBroker {
    private static final Logger log = LoggerFactory.getLogger(AbstractClusterEventBroker.class);
    protected final ReactiveRedisOperations<String, byte[]> redis;
    private final String id;
    protected final ClusterManager clusterManager;
    private final EmitterProcessor<EventConnection> processor = EmitterProcessor.create(false);
    private final Map<String, ClusterConnecting> connections = new ConcurrentHashMap();
    private final FluxSink<EventConnection> sink = this.processor.sink(FluxSink.OverflowStrategy.BUFFER);
    private boolean started = false;
    protected final Codec<Subscription> subscriptionCodec = Codecs.lookup(Subscription.class);
    protected final Disposable.Composite disposable = Disposables.composite();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/jetlinks/supports/cluster/event/AbstractClusterEventBroker$ClusterConnecting.class */
    public class ClusterConnecting implements EventProducer, EventConsumer {
        private final String brokerId;
        private final String localId;
        FluxSink<TopicPayload> output;
        private final String allSubsInfoKey;
        private final EmitterProcessor<TopicPayload> processor = EmitterProcessor.create(Integer.MAX_VALUE, false);
        private final FluxSink<TopicPayload> input = this.processor.sink(FluxSink.OverflowStrategy.BUFFER);
        EmitterProcessor<Subscription> subProcessor = EmitterProcessor.create(Integer.MAX_VALUE, false);
        FluxSink<Subscription> subSink = this.subProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
        EmitterProcessor<Subscription> unsubProcessor = EmitterProcessor.create(Integer.MAX_VALUE, false);
        FluxSink<Subscription> unsubSink = this.unsubProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
        Disposable.Composite disposable = Disposables.composite();

        public ClusterConnecting(String str, String str2) {
            this.brokerId = str2;
            this.localId = str;
            this.allSubsInfoKey = "/broker/" + str + "/" + str2 + "/subs";
            Disposable.Composite composite = this.disposable;
            EmitterProcessor<Subscription> emitterProcessor = this.subProcessor;
            emitterProcessor.getClass();
            composite.add(emitterProcessor::onComplete);
            Disposable.Composite composite2 = this.disposable;
            EmitterProcessor<Subscription> emitterProcessor2 = this.unsubProcessor;
            emitterProcessor2.getClass();
            composite2.add(emitterProcessor2::onComplete);
            Disposable.Composite composite3 = this.disposable;
            EmitterProcessor<TopicPayload> emitterProcessor3 = this.processor;
            emitterProcessor3.getClass();
            composite3.add(emitterProcessor3::onComplete);
            this.disposable.add(AbstractClusterEventBroker.this.listen(str, str2).doOnNext(topicPayload -> {
                if (!this.processor.hasDownstreams()) {
                    topicPayload.release();
                } else {
                    AbstractClusterEventBroker.log.trace("{} handle cluster [{}] event {}", new Object[]{str, str2, topicPayload.getTopic()});
                    this.input.next(topicPayload);
                }
            }).onErrorContinue((th, obj) -> {
                AbstractClusterEventBroker.log.error(th.getMessage(), th);
            }).subscribe());
            this.disposable.add(AbstractClusterEventBroker.this.redis.listenToPattern(new String[]{"/broker/" + str2 + "/" + str + "/*"}).subscribe(message -> {
                Subscription subscription = (Subscription) Payload.of((byte[]) message.getMessage()).decode(AbstractClusterEventBroker.this.subscriptionCodec);
                if (subscription != null) {
                    if (((String) message.getChannel()).endsWith("unsub") && this.unsubProcessor.hasDownstreams()) {
                        this.unsubSink.next(subscription);
                    } else if (((String) message.getChannel()).endsWith("sub") && this.subProcessor.hasDownstreams()) {
                        this.subSink.next(subscription);
                    }
                }
            }));
            this.disposable.add(AbstractClusterEventBroker.this.redis.opsForSet().members("/broker/" + str2 + "/" + str + "/subs").doOnNext(bArr -> {
                this.subSink.next((Subscription) Payload.of(bArr).decode(AbstractClusterEventBroker.this.subscriptionCodec));
            }).onErrorContinue((th2, obj2) -> {
                AbstractClusterEventBroker.log.warn(th2.getMessage(), th2);
            }).subscribe());
            this.disposable.add(Flux.create(fluxSink -> {
                this.output = fluxSink;
            }).flatMap(topicPayload2 -> {
                return AbstractClusterEventBroker.this.dispatch(str, str2, topicPayload2).onErrorResume(th3 -> {
                    AbstractClusterEventBroker.log.error(th3.getMessage(), th3);
                    return Mono.empty();
                });
            }).onErrorContinue((th3, obj3) -> {
                AbstractClusterEventBroker.log.error(th3.getMessage(), th3);
            }).subscribe());
        }

        /* JADX WARN: Type inference failed for: r2v1, types: [byte[], java.lang.Object[]] */
        @Override // org.jetlinks.supports.event.EventProducer
        public Mono<Void> subscribe(Subscription subscription) {
            byte[] bytes = AbstractClusterEventBroker.this.subscriptionCodec.encode(subscription).getBytes(true);
            return AbstractClusterEventBroker.this.redis.opsForSet().add(this.allSubsInfoKey, (Object[]) new byte[]{bytes}).then(AbstractClusterEventBroker.this.redis.convertAndSend("/broker/" + this.localId + "/" + this.brokerId + "/sub", bytes)).then();
        }

        @Override // org.jetlinks.supports.event.EventProducer
        public Mono<Void> unsubscribe(Subscription subscription) {
            byte[] bytes = AbstractClusterEventBroker.this.subscriptionCodec.encode(subscription).getBytes(true);
            return AbstractClusterEventBroker.this.redis.opsForSet().remove(this.allSubsInfoKey, new Object[]{bytes}).then(AbstractClusterEventBroker.this.redis.convertAndSend("/broker/" + this.localId + "/" + this.brokerId + "/unsub", bytes)).then();
        }

        @Override // org.jetlinks.supports.event.EventProducer
        public Flux<TopicPayload> subscribe() {
            return this.processor;
        }

        @Override // org.jetlinks.supports.event.EventConnection
        public String getId() {
            return this.brokerId;
        }

        @Override // org.jetlinks.supports.event.EventConnection
        public boolean isAlive() {
            return true;
        }

        @Override // org.jetlinks.supports.event.EventConnection
        public void doOnDispose(Disposable disposable) {
            this.disposable.add(disposable);
        }

        @Override // org.jetlinks.supports.event.EventConnection
        public EventBroker getBroker() {
            return AbstractClusterEventBroker.this;
        }

        @Override // org.jetlinks.supports.event.EventConnection
        public EventConnection.Feature[] features() {
            return new EventConnection.Feature[]{EventConnection.Feature.consumeAnotherBroker};
        }

        @Override // org.jetlinks.supports.event.EventConsumer
        public Flux<Subscription> handleSubscribe() {
            return this.subProcessor;
        }

        @Override // org.jetlinks.supports.event.EventConsumer
        public Flux<Subscription> handleUnSubscribe() {
            return this.unsubProcessor;
        }

        @Override // org.jetlinks.supports.event.EventConsumer
        public FluxSink<TopicPayload> sink() {
            return this.output;
        }

        public void dispose() {
            this.disposable.dispose();
        }

        public boolean isDisposed() {
            return false;
        }

        public String getBrokerId() {
            return this.brokerId;
        }
    }

    public AbstractClusterEventBroker(ClusterManager clusterManager, ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
        this.id = clusterManager.getClusterName();
        this.redis = new ReactiveRedisTemplate(reactiveRedisConnectionFactory, RedisSerializationContext.newSerializationContext().key(RedisSerializer.string()).hashKey(RedisSerializer.string()).value(RedisSerializer.byteArray()).hashValue(RedisSerializer.byteArray()).build());
        this.clusterManager = clusterManager;
        startup();
    }

    public void startup() {
        if (this.started) {
            return;
        }
        this.started = true;
        this.clusterManager.getHaManager().getAllNode().forEach(serverNode -> {
            if (serverNode.getId().equals(this.clusterManager.getCurrentServerId())) {
                return;
            }
            handleServerNodeJoin(serverNode);
            handleRemoteConnection(this.clusterManager.getCurrentServerId(), serverNode.getId());
        });
        this.disposable.add(this.clusterManager.getHaManager().subscribeServerOnline().subscribe(serverNode2 -> {
            handleServerNodeJoin(serverNode2);
            handleRemoteConnection(this.clusterManager.getCurrentServerId(), serverNode2.getId());
        }));
        this.disposable.add(this.clusterManager.getHaManager().subscribeServerOffline().subscribe(this::handleServerNodeLeave));
    }

    public void shutdown() {
        Iterator<ClusterConnecting> it = this.connections.values().iterator();
        while (it.hasNext()) {
            it.next().disposable.dispose();
        }
        this.disposable.dispose();
    }

    protected void handleServerNodeJoin(ServerNode serverNode) {
    }

    protected void handleServerNodeLeave(ServerNode serverNode) {
    }

    protected void handleRemoteConnection(String str, String str2) {
        this.connections.computeIfAbsent(str2, str3 -> {
            log.debug("handle redis connection:{}", str2);
            ClusterConnecting clusterConnecting = new ClusterConnecting(str, str3);
            this.sink.next(onConnectionCreated(clusterConnecting));
            return clusterConnecting;
        });
    }

    protected ClusterConnecting onConnectionCreated(ClusterConnecting clusterConnecting) {
        return clusterConnecting;
    }

    @Override // org.jetlinks.supports.event.EventBroker
    public String getId() {
        return this.id;
    }

    @Override // org.jetlinks.supports.event.EventBroker
    public Flux<EventConnection> accept() {
        return Flux.concat(new Publisher[]{Flux.fromIterable(this.connections.values()), this.processor}).distinct();
    }

    protected abstract Flux<TopicPayload> listen(String str, String str2);

    protected abstract Mono<Void> dispatch(String str, String str2, TopicPayload topicPayload);
}
