package org.jetlinks.supports.server;

import java.util.function.Function;
import javax.annotation.Nonnull;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.message.ChildDeviceMessage;
import org.jetlinks.core.message.ChildDeviceMessageReply;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.DeviceOfflineMessage;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.server.MessageHandler;
import org.jetlinks.core.server.session.DeviceSessionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/supports/server/DefaultDecodedClientMessageHandler.class */
public class DefaultDecodedClientMessageHandler implements DecodedClientMessageHandler {
    private static final Logger log = LoggerFactory.getLogger(DefaultDecodedClientMessageHandler.class);
    private final MessageHandler messageHandler;
    private final FluxProcessor<Message, Message> processor;
    private final FluxSink<Message> sink;
    private final DeviceSessionManager sessionManager;

    public DefaultDecodedClientMessageHandler(MessageHandler messageHandler, DeviceSessionManager deviceSessionManager) {
        this(messageHandler, deviceSessionManager, EmitterProcessor.create(false));
    }

    public DefaultDecodedClientMessageHandler(MessageHandler messageHandler, DeviceSessionManager deviceSessionManager, FluxProcessor<Message, Message> fluxProcessor) {
        this.messageHandler = messageHandler;
        this.processor = fluxProcessor;
        this.sessionManager = deviceSessionManager;
        this.sink = fluxProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
    }

    protected Mono<Boolean> handleChildrenDeviceMessage(DeviceOperator deviceOperator, String str, Message message) {
        return message instanceof DeviceMessageReply ? doReply((DeviceMessageReply) message) : message instanceof DeviceOnlineMessage ? this.sessionManager.registerChildren(deviceOperator.getDeviceId(), str).thenReturn(true).defaultIfEmpty(false) : message instanceof DeviceOfflineMessage ? this.sessionManager.unRegisterChildren(deviceOperator.getDeviceId(), str).thenReturn(true).defaultIfEmpty(false) : Mono.just(true);
    }

    protected Mono<Boolean> handleChildrenDeviceMessageReply(DeviceOperator deviceOperator, ChildDeviceMessage childDeviceMessage) {
        return handleChildrenDeviceMessage(deviceOperator, childDeviceMessage.getChildDeviceId(), childDeviceMessage.getChildDeviceMessage());
    }

    protected Mono<Boolean> handleChildrenDeviceMessageReply(DeviceOperator deviceOperator, ChildDeviceMessageReply childDeviceMessageReply) {
        return handleChildrenDeviceMessage(deviceOperator, childDeviceMessageReply.getChildDeviceId(), childDeviceMessageReply.getChildDeviceMessage());
    }

    public void shutdown() {
    }

    public Flux<Message> subscribe() {
        return this.processor.map(Function.identity()).doOnError(th -> {
            log.error(th.getMessage(), th);
        });
    }

    @Override // org.jetlinks.supports.server.DecodedClientMessageHandler
    public Mono<Boolean> handleMessage(DeviceOperator deviceOperator, @Nonnull Message message) {
        return Mono.defer(() -> {
            if (deviceOperator != null) {
                if (message instanceof ChildDeviceMessageReply) {
                    return handleChildrenDeviceMessageReply(deviceOperator, (ChildDeviceMessageReply) message);
                }
                if (message instanceof ChildDeviceMessage) {
                    return handleChildrenDeviceMessageReply(deviceOperator, (ChildDeviceMessage) message);
                }
            }
            return message instanceof DeviceMessageReply ? doReply((DeviceMessageReply) message) : Mono.just(true);
        }).defaultIfEmpty(false).doFinally(signalType -> {
            if (this.processor.hasDownstreams()) {
                this.sink.next(message);
            }
        }).onErrorContinue((th, obj) -> {
            log.error("handle device[{}] message [{}] error", new Object[]{deviceOperator.getDeviceId(), message, th});
        });
    }

    private Mono<Boolean> doReply(DeviceMessageReply deviceMessageReply) {
        if (log.isDebugEnabled()) {
            log.debug("reply message {}", deviceMessageReply.getMessageId());
        }
        return this.messageHandler.reply(deviceMessageReply).doOnSuccess(bool -> {
            if (log.isDebugEnabled()) {
                log.debug("reply message {} complete", deviceMessageReply.getMessageId());
            }
        }).thenReturn(true).doOnError(th -> {
            log.error("reply message error", th);
        });
    }
}
