package org.jetlinks.supports.ipc;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.jetlinks.core.Payload;
import org.jetlinks.core.codec.defaults.DirectCodec;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.event.TopicPayload;
import org.jetlinks.core.ipc.IpcCode;
import org.jetlinks.core.ipc.IpcDefinition;
import org.jetlinks.core.ipc.IpcException;
import org.jetlinks.core.ipc.IpcInvoker;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/supports/ipc/EventBusIpcRequester.class */
class EventBusIpcRequester<REQ, RES> implements IpcInvoker<REQ, RES> {
    private final int id;
    private final String name;
    private final EventBus eventBus;
    private final IpcDefinition<REQ, RES> definition;
    private final Disposable.Composite disposable = Disposables.composite();
    private final Map<Integer, IpcRequestHandler<RES>> pending = new ConcurrentHashMap();
    private final RequestIdSupplier requestIdInc = new RequestIdSupplier();
    private final String sendTopic;
    private final Logger log;

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventBusIpcRequester(int i, String str, EventBus eventBus, IpcDefinition<REQ, RES> ipcDefinition) {
        this.id = i;
        this.name = str;
        this.eventBus = eventBus;
        this.definition = ipcDefinition;
        this.sendTopic = "/_ipc/" + ipcDefinition.getAddress().replace("/", "-") + "/" + str;
        this.log = LoggerFactory.getLogger("ipc.requester." + ipcDefinition.getAddress() + "." + str);
        init();
    }

    void init() {
        this.disposable.add(this.eventBus.subscribe(Subscription.builder().subscriberId(String.join("-", "ipc", String.valueOf(this.id), this.name, "handler")).topics(new String[]{this.sendTopic + "/" + this.id + "/_reply"}).broker().local().shared().build()).doOnCancel(() -> {
            this.log.debug("cancel accept ipc[{}] response", Integer.valueOf(this.id));
        }).subscribe(this::handleReply));
    }

    public Mono<RES> request() {
        return doRequestWithHandler(RequestType.noArgRequest, null).flatMap((v0) -> {
            return v0.handleRequest();
        });
    }

    public Mono<RES> request(REQ req) {
        return doRequestWithHandler(RequestType.request, req).flatMap((v0) -> {
            return v0.handleRequest();
        });
    }

    public Flux<RES> requestStream(REQ req) {
        return doRequestWithHandler(RequestType.requestStream, req).flatMapMany((v0) -> {
            return v0.handleStream();
        });
    }

    public Flux<RES> requestStream() {
        return doRequestWithHandler(RequestType.noArgRequestStream, null).flatMapMany((v0) -> {
            return v0.handleStream();
        });
    }

    public Flux<RES> requestChannel(Publisher<REQ> publisher) {
        return doRequestChannel(publisher).handleStream();
    }

    public Mono<Void> fireAndForget() {
        return doRequest(RequestType.noArgFireAndForget, 0, null);
    }

    public Mono<Void> fireAndForget(REQ req) {
        return doRequest(RequestType.fireAndForget, 0, req);
    }

    Mono<Void> doRequest(RequestType requestType, int i, REQ req) {
        return doRequest(requestType, i, -1, req);
    }

    Mono<Void> doRequest(RequestType requestType, int i, int i2, REQ req) {
        this.log.trace("do ipc request {} {}", requestType, Integer.valueOf(i));
        return this.eventBus.publish(this.sendTopic, encodeRequest(requestType, i, i2, req)).doOnNext(l -> {
            if (l.longValue() == 0) {
                throw new IpcException(IpcCode.ipcServiceUnavailable, "Service " + this.name + " Unavailable");
            }
            if (l.longValue() > 1) {
                this.log.warn("service {} request {} has multi({}) producer", new Object[]{requestType, req, l});
            }
        }).then();
    }

    IpcRequestHandler<RES> newHandler(int i) {
        IpcRequestHandler<RES> ipcRequestHandler = new IpcRequestHandler<>();
        IpcRequestHandler<RES> put = this.pending.put(Integer.valueOf(i), ipcRequestHandler);
        if (put != null) {
            this.log.warn("repeat request id :{}", Integer.valueOf(i));
            put.complete();
        }
        return ipcRequestHandler.doOnDispose(() -> {
            this.pending.remove(Integer.valueOf(i));
        });
    }

    Mono<IpcRequestHandler<RES>> doRequestWithHandler(RequestType requestType, REQ req) {
        int nextRequestId = nextRequestId();
        return doRequest(requestType, nextRequestId, req).thenReturn(newHandler(nextRequestId));
    }

    IpcRequestHandler<RES> doRequestChannel(Publisher<REQ> publisher) {
        int nextRequestId = nextRequestId();
        IpcRequestHandler<RES> newHandler = newHandler(nextRequestId);
        AtomicInteger atomicInteger = new AtomicInteger();
        newHandler.doOnDispose(this.eventBus.publish(this.sendTopic, DirectCodec.instance(), Flux.from(publisher).index().map(tuple2 -> {
            int intValue = ((Long) tuple2.getT1()).intValue();
            atomicInteger.set(intValue);
            return encodeRequest(RequestType.requestChannel, nextRequestId, intValue, tuple2.getT2());
        }).doFinally(signalType -> {
            doRequest(RequestType.cancel, nextRequestId, atomicInteger.get(), null).subscribe();
        })).doOnNext(l -> {
            if (l.longValue() == 0) {
                newHandler.error(new IpcException(IpcCode.ipcServiceUnavailable));
            }
        }).subscribe());
        return newHandler;
    }

    Payload encodeRequest(RequestType requestType, int i, int i2, REQ req) {
        return Payload.of(IpcRequest.of(requestType, this.id, i, i2, req).toByteBuf(this.definition.requestCodec()));
    }

    void handleReply(TopicPayload topicPayload) {
        try {
            IpcResponse<RES> decode = IpcResponse.decode(topicPayload, this.definition.responseCodec(), this.definition.errorCodec());
            this.log.trace("handle ipc response {} id:{} seq:{}", new Object[]{decode.getType(), Integer.valueOf(decode.getMessageId()), Integer.valueOf(decode.getSeq())});
            IpcRequestHandler<RES> ipcRequestHandler = this.pending.get(Integer.valueOf(decode.getMessageId()));
            if (ipcRequestHandler == null) {
                this.log.debug("unknown response {}", decode);
            } else {
                ipcRequestHandler.handle(decode);
            }
        } catch (Throwable th) {
            this.log.error("handle response error", th);
        }
    }

    public int nextRequestId() {
        RequestIdSupplier requestIdSupplier = this.requestIdInc;
        Map<Integer, IpcRequestHandler<RES>> map = this.pending;
        map.getClass();
        return requestIdSupplier.nextId((v1) -> {
            return r1.containsKey(v1);
        });
    }

    public void dispose() {
        this.pending.values().forEach((v0) -> {
            v0.dispose();
        });
        this.pending.clear();
        this.disposable.dispose();
    }

    public String getName() {
        return this.name;
    }
}
