package org.jetlinks.supports.cluster.redis;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnull;
import org.jetlinks.core.cluster.ClusterQueue;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/supports/cluster/redis/RedisClusterQueue.class */
public class RedisClusterQueue<T> implements ClusterQueue<T> {
    private final String id;
    protected final ReactiveRedisOperations<String, T> operations;
    private boolean useScript;
    private static final Logger log = LoggerFactory.getLogger(RedisClusterQueue.class);
    private static final RedisScript<List<?>> lifoPollScript = RedisScript.of(String.join("\n", "local val = redis.call('lrange',KEYS[1],0,KEYS[2]);", "redis.call('ltrim',KEYS[1],KEYS[2]+1,-1);", "return val;"), List.class);
    private static final RedisScript<List<?>> fifoPollScript = RedisScript.of(String.join("\n", "local size = redis.call('llen',KEYS[1]);", "if size == 0 then", "return nil", "end", "local index = size - KEYS[2];", "if index == 0 then", "return redis.call('lpop',KEYS[1]);", "end", "local val = redis.call('lrange',KEYS[1],index,size);", "redis.call('ltrim',KEYS[1],0,index-1);", "return val;"), List.class);
    private static final RedisScript<Long> pushAndPublish = RedisScript.of("local val = redis.call('lpush',KEYS[1],ARGV[1]);redis.call('publish','queue:data:produced',ARGV[2]);return val;", Long.class);
    private AtomicBoolean polling = new AtomicBoolean(false);
    private int maxBatchSize = 32;
    private volatile float localConsumerPercent = 1.0f;
    private long lastRequestSize = this.maxBatchSize;
    private ClusterQueue.Mod mod = ClusterQueue.Mod.FIFO;
    private List<FluxSink<T>> subscribers = new CopyOnWriteArrayList();
    AtomicInteger lastPush = new AtomicInteger(0);

    public void setLocalConsumerPercent(float f) {
        this.localConsumerPercent = f;
    }

    public RedisClusterQueue(String str, ReactiveRedisTemplate<String, T> reactiveRedisTemplate) {
        this.id = str;
        this.operations = reactiveRedisTemplate;
        this.useScript = !reactiveRedisTemplate.getConnectionFactory().isClusterAware();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void tryPoll() {
        doPoll(this.lastRequestSize);
    }

    private boolean push(Iterable<T> iterable) {
        Iterator<T> it = iterable.iterator();
        while (it.hasNext()) {
            if (!push((RedisClusterQueue<T>) it.next())) {
                return false;
            }
        }
        return true;
    }

    private boolean push(T t) {
        int size = this.subscribers.size();
        if (size == 0) {
            return false;
        }
        if (size == 1) {
            this.subscribers.get(0).next(t);
            return true;
        }
        if (this.lastPush.incrementAndGet() >= size) {
            this.lastPush.set(0);
        }
        this.subscribers.get(this.lastPush.get()).next(t);
        return true;
    }

    private void doPoll(long j) {
        if (this.subscribers.size() > 0 && this.polling.compareAndSet(false, true)) {
            AtomicLong atomicLong = new AtomicLong(j);
            pollBatch((int) Math.min(atomicLong.get(), this.maxBatchSize)).flatMap(obj -> {
                return !push((RedisClusterQueue<T>) obj) ? this.operations.opsForList().leftPush(this.id, obj).then() : Mono.just(obj);
            }).count().doFinally(signalType -> {
                this.polling.set(false);
            }).subscribe(l -> {
                if (l.longValue() <= 0 || atomicLong.addAndGet(-l.longValue()) <= 0) {
                    return;
                }
                this.polling.set(false);
                doPoll(atomicLong.get());
                log.trace("poll datas[{}] from redis [{}] ", l, this.id);
            });
        }
    }

    protected void stopPoll() {
    }

    @Nonnull
    public Flux<T> subscribe() {
        return Flux.create(fluxSink -> {
            this.subscribers.add(fluxSink);
            fluxSink.onDispose(() -> {
                this.subscribers.remove(fluxSink);
            });
            doPoll(fluxSink.requestedFromDownstream());
        }).doOnRequest(j -> {
            this.lastRequestSize = j;
            doPoll(j);
        });
    }

    public void stop() {
        stopPoll();
    }

    public Mono<Integer> size() {
        return this.operations.opsForList().size(this.id).map((v0) -> {
            return v0.intValue();
        });
    }

    public void setPollMod(ClusterQueue.Mod mod) {
        this.mod = mod;
    }

    @Nonnull
    public Mono<T> poll() {
        return this.mod == ClusterQueue.Mod.LIFO ? this.operations.opsForList().leftPop(this.id) : this.operations.opsForList().rightPop(this.id);
    }

    private Flux<T> pollBatch(int i) {
        if (i == 1 || !this.useScript) {
            return poll().flux();
        }
        return (this.mod == ClusterQueue.Mod.FIFO ? this.operations.execute(fifoPollScript, Arrays.asList(this.id, String.valueOf(i))).doOnNext(list -> {
            Collections.reverse(list);
        }) : this.operations.execute(lifoPollScript, Arrays.asList(this.id, String.valueOf(i)))).flatMap((v0) -> {
            return Flux.fromIterable(v0);
        }).map(obj -> {
            return obj;
        });
    }

    private ReactiveRedisOperations getOperations() {
        return this.operations;
    }

    private boolean isLocalConsumer() {
        return this.subscribers.size() > 0 && (this.localConsumerPercent == 1.0f || ThreadLocalRandom.current().nextFloat() < this.localConsumerPercent);
    }

    public Mono<Boolean> add(Publisher<T> publisher) {
        return Flux.from(publisher).flatMap(obj -> {
            return (isLocalConsumer() && push((RedisClusterQueue<T>) obj)) ? Mono.just(1) : getOperations().execute(pushAndPublish, Arrays.asList(this.id), Arrays.asList(obj, this.id));
        }).then(Mono.just(true));
    }

    public Mono<Boolean> addBatch(Publisher<? extends Collection<T>> publisher) {
        return Flux.from(publisher).flatMap(collection -> {
            return (isLocalConsumer() && push((Iterable) collection)) ? Mono.just(1) : this.operations.opsForList().leftPushAll(this.id, collection).then(getOperations().convertAndSend("queue:data:produced", this.id));
        }).then(Mono.just(true));
    }
}
