package org.jetlinks.rule.engine.defaults;

import io.netty.buffer.ByteBuf;
import java.util.Iterator;
import java.util.function.Function;
import org.jetlinks.core.topic.Topic;
import org.jetlinks.rule.engine.api.Decoder;
import org.jetlinks.rule.engine.api.Encoder;
import org.jetlinks.rule.engine.api.EventBus;
import org.jetlinks.rule.engine.api.NativePayload;
import org.jetlinks.rule.engine.api.Payload;
import org.jetlinks.rule.engine.api.SubscribePayload;
import org.jetlinks.rule.engine.api.codec.Codecs;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/rule/engine/defaults/LocalEventBus.class */
public class LocalEventBus implements EventBus {
    private static final Logger log = LoggerFactory.getLogger(LocalEventBus.class);
    private final Topic<FluxSink<SubscribePayload>> subs = Topic.createRoot();

    @Override // org.jetlinks.rule.engine.api.EventBus
    public <T> Flux<T> subscribe(String str, Decoder<T> decoder) {
        return subscribe(str).map(subscribePayload -> {
            Payload payload = subscribePayload.getPayload();
            return payload instanceof NativePayload ? ((NativePayload) payload).getNativeObject() : decoder.decode(subscribePayload.getPayload());
        });
    }

    @Override // org.jetlinks.rule.engine.api.EventBus
    public <T> Mono<Integer> publish(String str, Publisher<T> publisher) {
        return publish(str, obj -> {
            return Codecs.lookup(obj.getClass()).encode(obj).getBody();
        }, publisher);
    }

    public <T> Mono<Integer> publish(String str, Function<T, ByteBuf> function, Publisher<? extends T> publisher) {
        return this.subs.findTopic(str).map((v0) -> {
            return v0.getSubscribers();
        }).flatMap(set -> {
            log.debug("publish topic: {}", str);
            return Flux.from(publisher).doOnNext(obj -> {
                Iterator it = set.iterator();
                while (it.hasNext()) {
                    try {
                        ((FluxSink) it.next()).next(SubscribePayload.of(str, NativePayload.of(obj, () -> {
                            return (ByteBuf) function.apply(obj);
                        })));
                    } catch (Exception e) {
                        log.error(e.getMessage(), e);
                    }
                }
            }).then(Mono.just(Integer.valueOf(set.size())));
        }).reduce((v0, v1) -> {
            return Math.addExact(v0, v1);
        });
    }

    @Override // org.jetlinks.rule.engine.api.EventBus
    public <T> Mono<Integer> publish(String str, Encoder<T> encoder, Publisher<? extends T> publisher) {
        return publish(str, obj -> {
            return encoder.encode(obj).getBody();
        }, publisher);
    }

    @Override // org.jetlinks.rule.engine.api.EventBus
    public Flux<SubscribePayload> subscribe(String str) {
        return Flux.create(fluxSink -> {
            log.debug("subscription topic: {}", str);
            Topic append = this.subs.append(str);
            append.subscribe(new FluxSink[]{fluxSink});
            fluxSink.onDispose(() -> {
                log.debug("unsubscription topic: {}", str);
                append.unsubscribe(new FluxSink[]{fluxSink});
            });
        });
    }
}
