package org.jetlinks.supports.cluster;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.core.cluster.ClusterTopic;
import org.jetlinks.core.device.DeviceOperationBroker;
import org.jetlinks.core.device.DeviceStateInfo;
import org.jetlinks.core.device.ReplyFailureHandler;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.BroadcastMessage;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.server.MessageHandler;
import org.jetlinks.supports.cluster.redis.DeviceCheckRequest;
import org.jetlinks.supports.cluster.redis.DeviceCheckResponse;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;

/* loaded from: input_file:org/jetlinks/supports/cluster/ClusterDeviceOperationBroker.class */
public class ClusterDeviceOperationBroker implements DeviceOperationBroker, MessageHandler {
    private static final Logger log = LoggerFactory.getLogger(ClusterDeviceOperationBroker.class);
    private ClusterManager clusterManager;
    private String serverId;
    private Function<Publisher<String>, Flux<DeviceStateInfo>> localStateChecker;
    private Map<String, FluxProcessor<DeviceMessageReply, DeviceMessageReply>> replyProcessor = new ConcurrentHashMap();
    private Map<String, FluxProcessor<DeviceCheckResponse, DeviceCheckResponse>> checkRequests = new ConcurrentHashMap();
    private Map<String, AtomicInteger> fragmentCounter = new ConcurrentHashMap();
    private ReplyFailureHandler replyFailureHandler = (th, deviceMessageReply) -> {
        log.warn("unhandled reply message:{}", deviceMessageReply, th);
    };

    public ClusterDeviceOperationBroker(ClusterManager clusterManager) {
        this.clusterManager = clusterManager;
        this.serverId = clusterManager.getCurrentServerId();
        init();
    }

    public void init() {
        this.clusterManager.getTopic("device:state:check:result:".concat(this.serverId)).subscribe().subscribe(deviceCheckResponse -> {
            Optional.ofNullable(this.checkRequests.remove(deviceCheckResponse.getRequestId())).ifPresent(fluxProcessor -> {
                fluxProcessor.onNext(deviceCheckResponse);
                fluxProcessor.onComplete();
            });
        });
        this.clusterManager.getTopic("device:msg:reply").subscribe().subscribe(obj -> {
            if (obj instanceof DeviceMessageReply) {
                handleReply((DeviceMessageReply) obj);
            }
        });
    }

    public Flux<DeviceStateInfo> getDeviceState(String str, Collection<String> collection) {
        return Flux.defer(() -> {
            if (this.serverId.equals(str) && this.localStateChecker != null) {
                return this.localStateChecker.apply(Flux.fromIterable(collection));
            }
            String uuid = UUID.randomUUID().toString();
            DeviceCheckRequest deviceCheckRequest = new DeviceCheckRequest(this.serverId, uuid, new ArrayList(collection));
            FluxProcessor<DeviceCheckResponse, DeviceCheckResponse> create = EmitterProcessor.create(true);
            this.checkRequests.put(uuid, create);
            return this.clusterManager.getTopic("device:state:checker:".concat(str)).publish(Mono.just(deviceCheckRequest)).flatMapMany(num -> {
                return create.flatMap(deviceCheckResponse -> {
                    return Flux.fromIterable(deviceCheckResponse.getStateInfoList());
                });
            }).timeout(Duration.ofSeconds(5L), Flux.empty());
        });
    }

    public void handleGetDeviceState(String str, Function<Publisher<String>, Flux<DeviceStateInfo>> function) {
        this.localStateChecker = function;
        this.clusterManager.getTopic("device:state:checker:".concat(str)).subscribe().subscribe(deviceCheckRequest -> {
            Mono map = ((Flux) function.apply(Flux.fromIterable(deviceCheckRequest.getDeviceId()))).collectList().map(list -> {
                return new DeviceCheckResponse(list, deviceCheckRequest.getRequestId());
            });
            ClusterTopic topic = this.clusterManager.getTopic("device:state:check:result:".concat(deviceCheckRequest.getFrom()));
            topic.getClass();
            ((Mono) map.as((v1) -> {
                return r1.publish(v1);
            })).subscribe(num -> {
                if (num.intValue() <= 0) {
                    log.warn("device check reply fail");
                }
            });
        });
    }

    public Flux<DeviceMessageReply> handleReply(String str, Duration duration) {
        return this.replyProcessor.computeIfAbsent(str, str2 -> {
            return UnicastProcessor.create();
        }).timeout(duration, Mono.error(() -> {
            return new DeviceOperationException(ErrorCode.TIME_OUT);
        })).doFinally(signalType -> {
            this.replyProcessor.remove(str);
            this.fragmentCounter.remove(str);
        });
    }

    public Mono<Integer> send(String str, Publisher<? extends Message> publisher) {
        return Flux.from(publisher).map(message -> {
            return message.addHeader(Headers.sendFrom, this.clusterManager.getCurrentServerId());
        }).flatMap(deviceMessage -> {
            return this.clusterManager.getTopic("device:msg:p2p:".concat(str)).publish(Mono.just(deviceMessage));
        }).takeWhile(num -> {
            return num.intValue() > 0;
        }).last(0);
    }

    public Mono<Integer> send(Publisher<? extends BroadcastMessage> publisher) {
        return this.clusterManager.getTopic("device:msg:broadcast").publish(publisher);
    }

    public Flux<Message> handleSendToDeviceMessage(String str) {
        return this.clusterManager.getTopic("device:msg:p2p:".concat(str)).subscribe().cast(Message.class);
    }

    public Mono<Boolean> reply(DeviceMessageReply deviceMessageReply) {
        return Mono.defer(() -> {
            deviceMessageReply.addHeader(Headers.replyFrom, this.serverId);
            if (!this.replyProcessor.containsKey(deviceMessageReply.getMessageId())) {
                return this.clusterManager.getTopic("device:msg:reply").publish(Mono.just(deviceMessageReply)).map(num -> {
                    return Boolean.valueOf(num.intValue() > 0);
                }).switchIfEmpty(Mono.just(false));
            }
            handleReply(deviceMessageReply);
            return Mono.just(true);
        });
    }

    private void handleReply(DeviceMessageReply deviceMessageReply) {
        try {
            String messageId = deviceMessageReply.getMessageId();
            String str = (String) deviceMessageReply.getHeader(Headers.fragmentBodyMessageId).orElse(null);
            if (str == null) {
                FluxProcessor<DeviceMessageReply, DeviceMessageReply> fluxProcessor = this.replyProcessor.get(messageId);
                if (fluxProcessor == null || fluxProcessor.isDisposed()) {
                    this.replyProcessor.remove(messageId);
                } else {
                    fluxProcessor.onNext(deviceMessageReply);
                    fluxProcessor.onComplete();
                }
                return;
            }
            FluxProcessor<DeviceMessageReply, DeviceMessageReply> orDefault = this.replyProcessor.getOrDefault(str, this.replyProcessor.get(messageId));
            if (orDefault == null || orDefault.isDisposed()) {
                this.replyProcessor.remove(str);
                return;
            }
            int intValue = ((Integer) deviceMessageReply.getHeader(Headers.fragmentNumber).orElse(1)).intValue();
            AtomicInteger computeIfAbsent = this.fragmentCounter.computeIfAbsent(str, str2 -> {
                return new AtomicInteger(intValue);
            });
            try {
                orDefault.onNext(deviceMessageReply);
                if (computeIfAbsent.decrementAndGet() <= 0 || ((Boolean) deviceMessageReply.getHeader(Headers.fragmentLast).orElse(false)).booleanValue()) {
                    try {
                        orDefault.onComplete();
                        this.replyProcessor.remove(str);
                        this.fragmentCounter.remove(str);
                    } finally {
                    }
                }
            } catch (Throwable th) {
                if (computeIfAbsent.decrementAndGet() <= 0 || ((Boolean) deviceMessageReply.getHeader(Headers.fragmentLast).orElse(false)).booleanValue()) {
                    try {
                        orDefault.onComplete();
                        this.replyProcessor.remove(str);
                        this.fragmentCounter.remove(str);
                    } finally {
                    }
                }
                throw th;
            }
        } catch (Exception e) {
            this.replyFailureHandler.handle(e, deviceMessageReply);
        }
    }

    public void setReplyFailureHandler(ReplyFailureHandler replyFailureHandler) {
        this.replyFailureHandler = replyFailureHandler;
    }
}
