package org.jetlinks.supports.ipc;

import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.jetlinks.core.Payload;
import org.jetlinks.core.codec.defaults.DirectCodec;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.event.TopicPayload;
import org.jetlinks.core.ipc.IpcDefinition;
import org.jetlinks.core.ipc.IpcInvoker;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/supports/ipc/EventBusIpcResponder.class */
class EventBusIpcResponder<REQ, RES> implements Disposable {
    private static final Logger log = LoggerFactory.getLogger(EventBusIpcResponder.class);
    private final EventBus eventBus;
    private final IpcDefinition<REQ, RES> definition;
    private final IpcInvoker<REQ, RES> invoker;
    private final Map<Integer, EmitterProcessor<REQ>> pendingChannel = new ConcurrentHashMap();
    private final String acceptTopic;
    private Disposable disposable;

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventBusIpcResponder(EventBus eventBus, IpcDefinition<REQ, RES> ipcDefinition, IpcInvoker<REQ, RES> ipcInvoker) {
        this.eventBus = eventBus;
        this.definition = ipcDefinition;
        this.invoker = ipcInvoker;
        this.acceptTopic = "/_ipc/" + ipcDefinition.getAddress().replace("/", "-") + "/" + ipcInvoker.getName();
        init();
    }

    void init() {
        this.disposable = this.eventBus.subscribe(Subscription.builder().subscriberId(this.definition.getAddress()).local().broker().shared().topics(new String[]{this.acceptTopic}).build()).flatMap(this::handleRequest).subscribe();
    }

    private Mono<Void> handleRequest(TopicPayload topicPayload) {
        try {
            return handleRequest(IpcRequest.decode(topicPayload, this.definition.requestCodec())).onErrorResume(th -> {
                log.error(th.getMessage(), th);
                return Mono.empty();
            });
        } catch (Throwable th2) {
            log.error(th2.getMessage(), th2);
            return Mono.empty();
        }
    }

    private Mono<Void> handleRequest(IpcRequest<REQ> ipcRequest) {
        int consumerId = ipcRequest.getConsumerId();
        int messageId = ipcRequest.getMessageId();
        log.trace("handle ipc request {} {}", ipcRequest.getType(), Integer.valueOf(messageId));
        switch (ipcRequest.getType()) {
            case fireAndForget:
                return this.invoker.fireAndForget(ipcRequest.getRequest());
            case noArgFireAndForget:
                return this.invoker.fireAndForget();
            case request:
                return handleInvoke(consumerId, messageId, this.invoker.request(ipcRequest.getRequest()));
            case noArgRequest:
                return handleInvoke(consumerId, messageId, this.invoker.request());
            case requestStream:
                return handleInvoke(consumerId, messageId, this.invoker.requestStream(ipcRequest.getRequest()));
            case noArgRequestStream:
                return handleInvoke(consumerId, messageId, this.invoker.requestStream());
            case requestChannel:
                this.pendingChannel.computeIfAbsent(Integer.valueOf(messageId), num -> {
                    EmitterProcessor create = EmitterProcessor.create(Integer.MAX_VALUE);
                    handleInvoke(consumerId, messageId, this.invoker.requestChannel(create)).subscribe();
                    return create;
                }).onNext(ipcRequest.getRequest());
                return Mono.empty();
            case cancel:
                Optional.ofNullable(this.pendingChannel.remove(Integer.valueOf(messageId))).ifPresent((v0) -> {
                    v0.onComplete();
                });
                break;
        }
        return Mono.empty();
    }

    private Mono<Void> handleInvoke(int i, int i2, Publisher<RES> publisher) {
        if (publisher instanceof Mono) {
            return Mono.from(publisher).switchIfEmpty(Mono.defer(() -> {
                return doReply(i, i2, -1, ResponseType.complete, null).then(Mono.empty());
            })).flatMap(obj -> {
                return doReply(i, i2, -1, ResponseType.complete, obj);
            }).onErrorResume(th -> {
                return doReply(i, i2, th);
            });
        }
        AtomicReference atomicReference = new AtomicReference(-1);
        return this.eventBus.publish(this.acceptTopic + "/" + i + "/_reply", DirectCodec.instance(), Flux.from(publisher).index().map(tuple2 -> {
            int intValue = ((Long) tuple2.getT1()).intValue();
            atomicReference.set(Integer.valueOf(intValue));
            return Payload.of(IpcResponse.of(ResponseType.next, intValue, i2, tuple2.getT2(), null).toByteBuf(this.definition.responseCodec(), this.definition.errorCodec()));
        })).flatMap(l -> {
            return doReply(i, i2, ((Integer) atomicReference.get()).intValue(), ResponseType.complete, null);
        }).doOnError(th2 -> {
            log.warn("reply [{}.{}] error", new Object[]{Integer.valueOf(i), Integer.valueOf(i2), th2});
            doReply(i, i2, th2).subscribe();
        }).then();
    }

    private Mono<Void> doReply(int i, int i2, Throwable th) {
        return doReply(i, IpcResponse.of(ResponseType.error, -1, i2, null, th).toByteBuf(this.definition.responseCodec(), this.definition.errorCodec()));
    }

    private Mono<Void> doReply(int i, int i2, int i3, ResponseType responseType, RES res) {
        return doReply(i, IpcResponse.of(responseType, i3, i2, res, null).toByteBuf(this.definition.responseCodec(), this.definition.errorCodec()));
    }

    private Mono<Void> doReply(int i, ByteBuf byteBuf) {
        Payload of = Payload.of(byteBuf);
        return this.eventBus.publish(this.acceptTopic + "/" + i + "/_reply", of).doOnNext(l -> {
            if (l.longValue() == 0) {
                log.warn("reply ipc failed,no consumer[{}] listener", Integer.valueOf(i));
                ReferenceCountUtil.safeRelease(of);
            }
        }).then();
    }

    public boolean isDisposed() {
        return this.disposable.isDisposed();
    }

    public void dispose() {
        this.disposable.dispose();
    }
}
