package org.jetlinks.rule.engine.cluster.redis;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import java.util.Iterator;
import org.jetlinks.core.topic.Topic;
import org.jetlinks.rule.engine.api.Decoder;
import org.jetlinks.rule.engine.api.Encoder;
import org.jetlinks.rule.engine.api.EventBus;
import org.jetlinks.rule.engine.api.Payload;
import org.jetlinks.rule.engine.api.SubscribePayload;
import org.jetlinks.rule.engine.api.codec.Codecs;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/jetlinks/rule/engine/cluster/redis/RedisEventBus.class */
public class RedisEventBus implements EventBus {
    private final ReactiveRedisOperations<String, ByteBuf> redisTemplate;
    private final Topic<FluxSink<SubscribePayload>> topic;
    private final String prefix;
    private static final Logger log = LoggerFactory.getLogger(RedisEventBus.class);
    private static final RedisSerializer<ByteBuf> serializer = new RedisSerializer<ByteBuf>() { // from class: org.jetlinks.rule.engine.cluster.redis.RedisEventBus.1
        public byte[] serialize(ByteBuf byteBuf) throws SerializationException {
            return ByteBufUtil.getBytes(byteBuf);
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public ByteBuf m3deserialize(byte[] bArr) throws SerializationException {
            return Unpooled.wrappedBuffer(bArr);
        }
    };

    public RedisEventBus(ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
        this("/event-bus", reactiveRedisConnectionFactory);
    }

    public RedisEventBus(String str, ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
        this.topic = Topic.createRoot();
        this.prefix = str;
        this.redisTemplate = new ReactiveRedisTemplate(reactiveRedisConnectionFactory, RedisSerializationContext.newSerializationContext().key(RedisSerializer.string()).value(serializer).hashKey(RedisSerializer.string()).hashValue(serializer).build());
        init();
    }

    public void init() {
        this.redisTemplate.listenToPattern(new String[]{this.prefix + "*"}).publishOn(Schedulers.parallel()).subscribe(message -> {
            String substring = ((String) message.getChannel()).substring(this.prefix.length());
            this.topic.findTopic(substring).map((v0) -> {
                return v0.getSubscribers();
            }).distinct().subscribe(set -> {
                Iterator it = set.iterator();
                while (it.hasNext()) {
                    try {
                        ((FluxSink) it.next()).next(SubscribePayload.of(substring, Payload.of((ByteBuf) message.getMessage())));
                    } catch (Throwable th) {
                        log.error(th.getMessage(), th);
                    }
                }
            });
        });
    }

    private String wrapRedisTopic(String str) {
        return this.prefix + str;
    }

    public <T> Flux<T> subscribe(String str, Decoder<T> decoder) {
        return subscribe(str).flatMap(subscribePayload -> {
            return Mono.justOrEmpty(decoder.decode(subscribePayload));
        });
    }

    public <T> Mono<Integer> publish(String str, Encoder<T> encoder, Publisher<? extends T> publisher) {
        return Flux.from(publisher).flatMap(obj -> {
            return this.redisTemplate.convertAndSend(wrapRedisTopic(str), encoder.encode(obj).getBody());
        }).reduce((v0, v1) -> {
            return Math.addExact(v0, v1);
        }).map((v0) -> {
            return v0.intValue();
        });
    }

    public Flux<SubscribePayload> subscribe(String str) {
        return Flux.create(fluxSink -> {
            log.debug("subscribe topic {}", str);
            Topic append = this.topic.append(str);
            append.subscribe(new FluxSink[]{fluxSink});
            fluxSink.onDispose(() -> {
                append.unsubscribe(new FluxSink[]{fluxSink});
            });
        }, FluxSink.OverflowStrategy.BUFFER);
    }

    public <T> Mono<Integer> publish(String str, Publisher<T> publisher) {
        return Flux.from(publisher).flatMap(obj -> {
            return this.redisTemplate.convertAndSend(wrapRedisTopic(str), Codecs.lookup(obj.getClass()).encode(obj).getBody());
        }).reduce((v0, v1) -> {
            return Math.addExact(v0, v1);
        }).map((v0) -> {
            return v0.intValue();
        });
    }
}
