package org.jetlinks.supports.cluster.redis;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.core.cluster.ClusterTopic;
import org.jetlinks.core.cluster.HaManager;
import org.jetlinks.core.cluster.ServerNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.ReactiveHashOperations;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/supports/cluster/redis/RedisHaManager.class */
public class RedisHaManager implements HaManager {
    private static final Logger log = LoggerFactory.getLogger(RedisHaManager.class);
    private ServerNode current;
    private String haName;
    private ClusterTopic<ServerNode> offlineTopic;
    private ClusterManager clusterManager;
    private ReactiveRedisOperations<String, ServerNode> operations;
    private ClusterTopic<ServerNode> keepalive;
    private ReactiveHashOperations<String, String, ServerNode> inRedisNode;
    private String allNodeHashKey;
    private Map<String, ServerNode> allNode = new ConcurrentHashMap();
    private FluxProcessor<ServerNode, ServerNode> onlineProcessor = EmitterProcessor.create(false);
    private FluxProcessor<ServerNode, ServerNode> offlineProcessor = EmitterProcessor.create(false);
    private volatile boolean started = false;

    public RedisHaManager(String str, ServerNode serverNode, ClusterManager clusterManager, ReactiveRedisOperations<String, ServerNode> reactiveRedisOperations) {
        this.haName = str;
        this.current = serverNode;
        this.clusterManager = clusterManager;
        this.operations = reactiveRedisOperations;
        this.inRedisNode = this.operations.opsForHash();
        this.offlineTopic = clusterManager.getTopic("__ha_offline_topic:".concat(this.haName));
        this.keepalive = clusterManager.getTopic("__ha_keepalive:".concat(this.haName));
        this.allNodeHashKey = "__ha_all_node:".concat(this.haName);
    }

    public void checkAlive() {
        this.current.setLastKeepAlive(System.currentTimeMillis());
        this.inRedisNode.put(this.allNodeHashKey, this.current.getId(), this.current).subscribe();
        this.keepalive.publish(Mono.just(this.current)).subscribe();
        Map map = (Map) getAllNode().stream().filter(serverNode -> {
            return System.currentTimeMillis() - serverNode.getLastKeepAlive() > TimeUnit.SECONDS.toMillis(10L);
        }).filter(serverNode2 -> {
            return !serverNode2.isSame(this.current);
        }).collect(Collectors.toMap((v0) -> {
            return v0.getId();
        }, Function.identity()));
        Flux keys = this.inRedisNode.keys(this.allNodeHashKey);
        map.getClass();
        Flux filter = keys.filter((v1) -> {
            return r1.containsKey(v1);
        });
        map.getClass();
        Flux flatMapMany = filter.map((v1) -> {
            return r1.get(v1);
        }).collectList().filter(list -> {
            return !list.isEmpty();
        }).flatMapMany(list2 -> {
            return this.inRedisNode.remove(this.allNodeHashKey, list2.stream().map((v0) -> {
                return v0.getId();
            }).toArray()).thenMany(Flux.fromIterable(list2));
        });
        ClusterTopic<ServerNode> clusterTopic = this.offlineTopic;
        clusterTopic.getClass();
        ((Mono) flatMapMany.as((v1) -> {
            return r1.publish(v1);
        })).subscribe();
    }

    public void shutdown() {
        this.offlineTopic.publish(Mono.just(this.current)).block();
    }

    public synchronized void startup() {
        if (this.started) {
            return;
        }
        this.started = true;
        this.allNode.put(this.current.getId(), this.current);
        this.inRedisNode.put(this.allNodeHashKey, this.current.getId(), this.current).flatMapMany(bool -> {
            return this.inRedisNode.values(this.allNodeHashKey);
        }).subscribe(serverNode -> {
            serverNode.setLastKeepAlive(System.currentTimeMillis());
            this.allNode.put(serverNode.getId(), serverNode);
            Flux.interval(Duration.ZERO, Duration.ofSeconds(5L)).doOnNext(l -> {
                checkAlive();
            }).subscribe();
        });
        this.offlineTopic.subscribe().subscribe(serverNode2 -> {
            if (currentServer().isSame(serverNode2) || this.allNode.remove(serverNode2.getId()) == null) {
                return;
            }
            log.debug("[{}]:server node [{}] offline", this.haName, serverNode2.getId());
            this.inRedisNode.remove(this.allNodeHashKey, new Object[]{serverNode2.getId()}).subscribe();
            this.onlineProcessor.onNext(serverNode2);
        });
        this.keepalive.subscribe().subscribe(serverNode3 -> {
            serverNode3.setLastKeepAlive(System.currentTimeMillis());
            if (currentServer().getId().equals(serverNode3.getId())) {
                return;
            }
            if (!this.allNode.containsKey(serverNode3.getId())) {
                log.debug("[{}]:server node [{}] online", this.haName, serverNode3.getId());
                this.onlineProcessor.onNext(serverNode3);
            }
            this.allNode.put(serverNode3.getId(), serverNode3);
        });
    }

    public ServerNode currentServer() {
        return this.current;
    }

    public Flux<ServerNode> subscribeServerOnline() {
        return this.onlineProcessor.filter(serverNode -> {
            return !serverNode.getId().equals(this.current.getId());
        });
    }

    public Flux<ServerNode> subscribeServerOffline() {
        return this.offlineProcessor.filter(serverNode -> {
            return !serverNode.getId().equals(this.current.getId());
        });
    }

    public List<ServerNode> getAllNode() {
        return new ArrayList(this.allNode.values());
    }
}
