package org.jetlinks.rule.engine.cluster;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;
import org.jetlinks.rule.engine.api.EventBus;
import org.jetlinks.rule.engine.api.rpc.RpcServiceFactory;
import org.jetlinks.rule.engine.api.scheduler.Scheduler;
import org.jetlinks.rule.engine.cluster.scheduler.RemoteScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/rule/engine/cluster/ClusterSchedulerRegistry.class */
public class ClusterSchedulerRegistry implements SchedulerRegistry {
    private static final Logger log = LoggerFactory.getLogger(ClusterSchedulerRegistry.class);
    private final Set<Scheduler> localSchedulers = new ConcurrentSkipListSet(Comparator.comparing((v0) -> {
        return v0.getId();
    }));
    private final Set<RemoteScheduler> remoteSchedulers = new ConcurrentSkipListSet(Comparator.comparing((v0) -> {
        return v0.getId();
    }));
    private final EmitterProcessor<Scheduler> joinProcessor = EmitterProcessor.create(false);
    private final EmitterProcessor<Scheduler> leaveProcessor = EmitterProcessor.create(false);
    private final FluxSink<Scheduler> joinSink = this.joinProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
    private final FluxSink<Scheduler> leaveSink = this.leaveProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
    private final List<Disposable> disposables = new CopyOnWriteArrayList();
    private final EventBus eventBus;
    private final RpcServiceFactory serviceFactory;

    public ClusterSchedulerRegistry(EventBus eventBus, RpcServiceFactory rpcServiceFactory) {
        this.eventBus = eventBus;
        this.serviceFactory = rpcServiceFactory;
    }

    public void setup() {
        if (this.disposables.isEmpty()) {
            this.joinProcessor.subscribe(scheduler -> {
                log.debug("remote scheduler join:{}", scheduler.getId());
            });
            this.leaveProcessor.subscribe(scheduler2 -> {
                log.debug("remote scheduler leaved:{}", scheduler2.getId());
            });
            List<Disposable> list = this.disposables;
            Flux doOnNext = this.eventBus.subscribe("/rule-engine/cluster-scheduler/join", String.class).map(str -> {
                return new RemoteScheduler(str, this.serviceFactory);
            }).filter(remoteScheduler -> {
                return (this.localSchedulers.contains(remoteScheduler) || this.remoteSchedulers.contains(remoteScheduler)) ? false : true;
            }).doOnNext(remoteScheduler2 -> {
                remoteScheduler2.init();
                this.joinSink.next(remoteScheduler2);
                publishLocal().subscribe();
            });
            Set<RemoteScheduler> set = this.remoteSchedulers;
            set.getClass();
            list.add(doOnNext.subscribe((v1) -> {
                r2.add(v1);
            }, th -> {
                log.error(th.getMessage(), th);
            }));
            List<Disposable> list2 = this.disposables;
            Flux filter = this.eventBus.subscribe("/rule-engine/cluster-scheduler/leave", String.class).map(str2 -> {
                return new RemoteScheduler(str2, this.serviceFactory);
            }).filter(remoteScheduler3 -> {
                return !this.localSchedulers.contains(remoteScheduler3);
            });
            FluxSink<Scheduler> fluxSink = this.leaveSink;
            fluxSink.getClass();
            Flux doOnNext2 = filter.doOnNext((v1) -> {
                r2.next(v1);
            });
            Set<RemoteScheduler> set2 = this.remoteSchedulers;
            set2.getClass();
            list2.add(doOnNext2.subscribe((v1) -> {
                r2.remove(v1);
            }));
            this.disposables.add(Flux.interval(Duration.ofSeconds(10L)).subscribe(l -> {
                Flux.fromIterable(this.remoteSchedulers).filterWhen((v0) -> {
                    return v0.isNoAlive();
                }).doOnNext(remoteScheduler4 -> {
                    this.remoteSchedulers.remove(remoteScheduler4);
                    this.leaveSink.next(remoteScheduler4);
                }).then(publishLocal()).subscribe();
            }));
            publishLocal().subscribe();
        }
    }

    private Mono<Void> publishLocal() {
        return this.eventBus.publish("/rule-engine/cluster-scheduler/join", Flux.fromIterable(this.localSchedulers).map((v0) -> {
            return v0.getId();
        })).then();
    }

    public void cleanup() {
        this.eventBus.publish("/rule-engine/cluster-scheduler/leave", Flux.fromIterable(this.localSchedulers).map((v0) -> {
            return v0.getId();
        })).subscribe();
        this.disposables.forEach((v0) -> {
            v0.dispose();
        });
        this.disposables.clear();
    }

    @Override // org.jetlinks.rule.engine.cluster.SchedulerRegistry
    public Flux<Scheduler> getSchedulers() {
        return Flux.just(new Set[]{this.localSchedulers, this.remoteSchedulers}).flatMapIterable(Function.identity());
    }

    @Override // org.jetlinks.rule.engine.cluster.SchedulerRegistry
    public Flux<Scheduler> handleSchedulerJoin() {
        return this.joinProcessor;
    }

    @Override // org.jetlinks.rule.engine.cluster.SchedulerRegistry
    public Flux<Scheduler> handleSchedulerLeave() {
        return this.leaveProcessor;
    }

    @Override // org.jetlinks.rule.engine.cluster.SchedulerRegistry
    public void register(Scheduler scheduler) {
        this.localSchedulers.add(scheduler);
        if (this.disposables.isEmpty()) {
            return;
        }
        publishLocal().subscribe();
    }

    @Override // org.jetlinks.rule.engine.cluster.SchedulerRegistry
    public List<Scheduler> getLocalSchedulers() {
        return new ArrayList(this.localSchedulers);
    }
}
