package org.jetlinks.rule.engine.defaults.rpc;

import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.rule.engine.api.EventBus;
import org.jetlinks.rule.engine.api.Payload;
import org.jetlinks.rule.engine.api.rpc.RpcDefinition;
import org.jetlinks.rule.engine.api.rpc.RpcService;
import org.jetlinks.rule.engine.defaults.rpc.RcpResult;
import org.jetlinks.rule.engine.defaults.rpc.RpcRequest;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/rule/engine/defaults/rpc/EventBusRcpService.class */
public class EventBusRcpService implements RpcService {
    private static final Logger log = LoggerFactory.getLogger(EventBusRcpService.class);
    private final EventBus eventBus;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/jetlinks/rule/engine/defaults/rpc/EventBusRcpService$RunningRequest.class */
    public class RunningRequest<REQ, RES> {
        long requestId;
        String reqTopicRes;
        String reqTopic;
        RpcDefinition<REQ, RES> definition;
        BiFunction<String, Publisher<REQ>, Publisher<RES>> invoker;
        Disposable disposable;
        EmitterProcessor<REQ> processor = EmitterProcessor.create();
        FluxSink<REQ> sink = this.processor.sink(FluxSink.OverflowStrategy.BUFFER);

        public RunningRequest(long j, RpcDefinition<REQ, RES> rpcDefinition, BiFunction<String, Publisher<REQ>, Publisher<RES>> biFunction, Disposable disposable) {
            this.requestId = j;
            this.reqTopic = rpcDefinition.getAddress();
            this.reqTopicRes = rpcDefinition.getAddress() + "/_reply";
            this.definition = rpcDefinition;
            this.invoker = biFunction;
            this.disposable = disposable;
            Flux.from(biFunction.apply(this.reqTopic, this.processor.onBackpressureBuffer())).flatMap(obj -> {
                return EventBusRcpService.this.reply(this.reqTopicRes, RcpResult.result(j, rpcDefinition.responseCodec().encode(obj)));
            }).doOnComplete(() -> {
                EventBusRcpService.this.reply(this.reqTopicRes, RcpResult.complete(j)).subscribe();
            }).doOnError(th -> {
                EventBusRcpService.log.error(th.getMessage(), th);
                EventBusRcpService.this.reply(this.reqTopicRes, RcpResult.error(j, rpcDefinition.errorCodec().encode(th))).subscribe();
            }).subscribe();
            this.sink.onDispose(disposable);
        }

        void next(RpcRequest rpcRequest) {
            try {
                if (rpcRequest.getType() == RpcRequest.Type.COMPLETE) {
                    this.sink.complete();
                    return;
                }
                REQ decode = this.definition.requestCodec().decode(rpcRequest);
                if (decode != null) {
                    this.sink.next(decode);
                }
                if (rpcRequest.getType() == RpcRequest.Type.NEXT_AND_END) {
                    this.sink.complete();
                }
            } catch (Throwable th) {
                EventBusRcpService.log.error(th.getMessage(), th);
                this.sink.error(th);
            }
        }
    }

    @Override // org.jetlinks.rule.engine.api.rpc.RpcService
    public <REQ, RES> Disposable listen(RpcDefinition<REQ, RES> rpcDefinition, BiFunction<String, REQ, Publisher<RES>> biFunction) {
        return doListen(rpcDefinition, (str, publisher) -> {
            return Flux.from(publisher).flatMap(obj -> {
                return (Publisher) biFunction.apply(str, obj);
            });
        });
    }

    @Override // org.jetlinks.rule.engine.api.rpc.RpcService
    public <RES> Disposable listen(RpcDefinition<Void, RES> rpcDefinition, Function<String, Publisher<RES>> function) {
        return doListen(rpcDefinition, (str, publisher) -> {
            return Flux.from(publisher).thenMany((Publisher) function.apply(str));
        });
    }

    @Override // org.jetlinks.rule.engine.api.rpc.RpcService
    public <REQ, RES> Flux<RES> invoke(RpcDefinition<REQ, RES> rpcDefinition, Publisher<? extends REQ> publisher) {
        return doInvoke(rpcDefinition, (l, str) -> {
            return publisher instanceof Mono ? Mono.from(publisher).flatMap(obj -> {
                return this.eventBus.publish(str, (String) RpcRequest.nextAndComplete(l.longValue(), rpcDefinition.requestCodec().encode(obj)));
            }) : ((Mono) Flux.from(publisher).map(obj2 -> {
                return RpcRequest.next(l.longValue(), rpcDefinition.requestCodec().encode(obj2));
            }).as(flux -> {
                return this.eventBus.publish(str, (Publisher) flux);
            })).doOnSuccess(num -> {
                this.eventBus.publish(str, (String) RpcRequest.complete(l.longValue())).subscribe();
            });
        });
    }

    @Override // org.jetlinks.rule.engine.api.rpc.RpcService
    public <RES> Flux<RES> invoke(RpcDefinition<Void, RES> rpcDefinition) {
        return doInvoke(rpcDefinition, (l, str) -> {
            return this.eventBus.publish(str, (String) RpcRequest.nextAndComplete(l.longValue(), Payload.voidPayload));
        });
    }

    protected Mono<Void> reply(String str, RcpResult rcpResult) {
        return this.eventBus.publish(str, (String) rcpResult).then();
    }

    private <REQ, RES> Disposable doListen(RpcDefinition<REQ, RES> rpcDefinition, BiFunction<String, Publisher<REQ>, Publisher<RES>> biFunction) {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        Flux map = this.eventBus.subscribe(rpcDefinition.getAddress()).map((v0) -> {
            return RpcRequest.parse(v0);
        });
        concurrentHashMap.getClass();
        return map.doOnCancel(concurrentHashMap::clear).subscribe(rpcRequest -> {
            ((RunningRequest) concurrentHashMap.computeIfAbsent(Long.valueOf(rpcRequest.getRequestId()), l -> {
                return new RunningRequest(l.longValue(), rpcDefinition, biFunction, () -> {
                });
            })).next(rpcRequest);
        });
    }

    private <REQ, RES> Flux<RES> doInvoke(RpcDefinition<REQ, RES> rpcDefinition, BiFunction<Long, String, Mono<Integer>> biFunction) {
        String address = rpcDefinition.getAddress();
        String str = rpcDefinition.getAddress() + "/_reply";
        long longValue = ((Long) IDGenerator.SNOW_FLAKE.generate()).longValue();
        return Flux.create(fluxSink -> {
            Flux doOnNext = this.eventBus.subscribe(str).map((v0) -> {
                return RcpResult.parse(v0);
            }).filter(rcpResult -> {
                return rcpResult.getRequestId() == longValue;
            }).doOnNext(rcpResult2 -> {
                if (rcpResult2.getType() == RcpResult.Type.RESULT_AND_COMPLETE) {
                    T decode = rpcDefinition.responseCodec().decode(rcpResult2);
                    if (decode != 0) {
                        fluxSink.next(decode);
                    }
                    fluxSink.complete();
                    return;
                }
                if (rcpResult2.getType() == RcpResult.Type.RESULT) {
                    T decode2 = rpcDefinition.responseCodec().decode(rcpResult2);
                    if (decode2 != 0) {
                        fluxSink.next(decode2);
                        return;
                    }
                    return;
                }
                if (rcpResult2.getType() == RcpResult.Type.COMPLETE) {
                    fluxSink.complete();
                    return;
                }
                if (rcpResult2.getType() == RcpResult.Type.ERROR) {
                    Throwable decode3 = rpcDefinition.errorCodec().decode(rcpResult2);
                    if (decode3 != null) {
                        fluxSink.error(decode3);
                    } else {
                        fluxSink.complete();
                    }
                }
            });
            fluxSink.getClass();
            fluxSink.onDispose(doOnNext.doOnError(fluxSink::error).subscribe());
            log.debug("do invoke rpc:{}", rpcDefinition.getAddress());
            Mono map = ((Mono) biFunction.apply(Long.valueOf(longValue), address)).map(num -> {
                if (num.intValue() == 0) {
                    throw new UnsupportedOperationException("no rpc service for:" + rpcDefinition.getAddress());
                }
                return num;
            });
            fluxSink.getClass();
            map.doOnError(fluxSink::error).subscribe();
        });
    }

    public EventBusRcpService(EventBus eventBus) {
        this.eventBus = eventBus;
    }
}
