package org.jetlinks.supports.ipc;

import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/jetlinks/supports/ipc/IpcRequestHandler.class */
public class IpcRequestHandler<RES> implements Disposable {
    private static final Logger log = LoggerFactory.getLogger(IpcRequestHandler.class);
    EmitterProcessor<RES> processor = EmitterProcessor.create(Integer.MAX_VALUE);
    Disposable.Composite disposable = Disposables.composite();
    FluxSink<RES> sink = this.processor.sink();
    private final AtomicInteger seqInc = new AtomicInteger();
    private final AtomicInteger totalSeq = new AtomicInteger(-1);

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<RES> handleRequest() {
        return this.processor.next().doFinally(signalType -> {
            this.disposable.dispose();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Flux<RES> handleStream() {
        return this.processor.doFinally(signalType -> {
            this.disposable.dispose();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void complete() {
        if (this.processor.isDisposed()) {
            log.debug("handler is disposed");
        }
        this.processor.onComplete();
        this.sink.complete();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void handle(IpcResponse<RES> ipcResponse) {
        if (ipcResponse.hasResult()) {
            this.sink.next(ipcResponse.getResult());
        }
        if (ipcResponse.hasError()) {
            error(ipcResponse.getError());
            return;
        }
        int seq = ipcResponse.getSeq();
        int andIncrement = seq < 0 ? -1 : this.seqInc.getAndIncrement();
        if (ipcResponse.getType() != ResponseType.complete) {
            int i = this.totalSeq.get();
            if (i < 0 || andIncrement + 1 < i) {
                return;
            }
            complete();
            return;
        }
        if (seq < 0 || andIncrement > seq || this.totalSeq.get() != -1) {
            complete();
        } else {
            log.debug("ipc response complete early,seq[{}],total[{}]", this.seqInc, Integer.valueOf(seq));
            this.totalSeq.set(seq);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void error(Throwable th) {
        this.sink.error(th);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public IpcRequestHandler<RES> doOnDispose(Disposable disposable) {
        this.disposable.add(disposable);
        return this;
    }

    public void dispose() {
        complete();
    }
}
