package org.jetlinks.rule.engine.defaults;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.jetlinks.rule.engine.api.Decoder;
import org.jetlinks.rule.engine.api.Encoder;
import org.jetlinks.rule.engine.api.EventBus;
import org.jetlinks.rule.engine.api.SubscribePayload;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/rule/engine/defaults/CompositeEventBus.class */
public class CompositeEventBus implements EventBus {
    private final List<EventBus> subBus = new CopyOnWriteArrayList();
    private final List<EventBus> pubBus = new CopyOnWriteArrayList();

    public CompositeEventBus addForSubscribe(EventBus... eventBusArr) {
        this.subBus.addAll(Arrays.asList(eventBusArr));
        return this;
    }

    public CompositeEventBus addForPublish(EventBus... eventBusArr) {
        this.pubBus.addAll(Arrays.asList(eventBusArr));
        return this;
    }

    @Override // org.jetlinks.rule.engine.api.EventBus
    public <T> Flux<T> subscribe(String str, Decoder<T> decoder) {
        return (Flux) Flux.fromIterable(this.subBus).map(eventBus -> {
            return eventBus.subscribe(str, decoder);
        }).as((v0) -> {
            return Flux.merge(v0);
        });
    }

    @Override // org.jetlinks.rule.engine.api.EventBus
    public <T> Mono<Integer> publish(String str, Encoder<T> encoder, Publisher<? extends T> publisher) {
        Flux cache = Flux.from(publisher).cache();
        return ((Flux) Flux.fromIterable(this.pubBus).map(eventBus -> {
            return eventBus.publish(str, encoder, (Publisher) cache);
        }).as((v0) -> {
            return Flux.merge(v0);
        })).reduce((v0, v1) -> {
            return Math.addExact(v0, v1);
        }).defaultIfEmpty(0);
    }

    @Override // org.jetlinks.rule.engine.api.EventBus
    public Flux<SubscribePayload> subscribe(String str) {
        return (Flux) Flux.fromIterable(this.subBus).map(eventBus -> {
            return eventBus.subscribe(str);
        }).as((v0) -> {
            return Flux.merge(v0);
        });
    }

    @Override // org.jetlinks.rule.engine.api.EventBus
    public <T> Mono<Integer> publish(String str, Publisher<T> publisher) {
        Flux cache = Flux.from(publisher).cache();
        return ((Flux) Flux.fromIterable(this.pubBus).map(eventBus -> {
            return eventBus.publish(str, (Publisher) cache);
        }).as((v0) -> {
            return Flux.merge(v0);
        })).reduce((v0, v1) -> {
            return Math.addExact(v0, v1);
        }).defaultIfEmpty(0);
    }
}
