package org.jetlinks.rule.engine.defaults.rpc;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.jetlinks.core.utils.BytesUtils;
import org.jetlinks.rule.engine.api.Payload;
import org.jetlinks.rule.engine.api.codec.Codec;
import org.jetlinks.rule.engine.api.codec.Codecs;
import org.jetlinks.rule.engine.api.rpc.RpcDefinition;
import org.jetlinks.rule.engine.api.rpc.RpcService;
import org.jetlinks.rule.engine.api.rpc.RpcServiceFactory;
import org.jetlinks.rule.engine.defaults.codec.DirectCodec;
import org.reactivestreams.Publisher;
import org.springframework.core.ResolvableType;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

/* loaded from: input_file:org/jetlinks/rule/engine/defaults/rpc/DefaultRpcServiceFactory.class */
public class DefaultRpcServiceFactory implements RpcServiceFactory {
    RpcService rpcService;
    static RpcRequestCodec requestCodec = new RpcRequestCodec();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/jetlinks/rule/engine/defaults/rpc/DefaultRpcServiceFactory$MethodRpcRequest.class */
    public static final class MethodRpcRequest {
        private final String method;
        private final Object[] args;

        public MethodRpcRequest(String str, Object[] objArr) {
            this.method = str;
            this.args = objArr;
        }

        public String getMethod() {
            return this.method;
        }

        public Object[] getArgs() {
            return this.args;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/jetlinks/rule/engine/defaults/rpc/DefaultRpcServiceFactory$MethodRpcRequestCodec.class */
    public class MethodRpcRequestCodec implements Codec<MethodRpcRequest> {
        private final byte[] method;
        private final List<Codec> parameter;

        @Override // org.jetlinks.rule.engine.api.Decoder
        @Nullable
        public MethodRpcRequest decode(@Nonnull Payload payload) {
            ByteBuf skipBytes = payload.getBody().skipBytes(4 + this.method.length);
            Object[] objArr = new Object[this.parameter.size()];
            int i = 0;
            for (Codec codec : this.parameter) {
                byte[] bArr = new byte[4];
                skipBytes.readBytes(bArr);
                int i2 = i;
                i++;
                objArr[i2] = codec.decode(Payload.of(skipBytes.readBytes(BytesUtils.beToInt(bArr))));
            }
            skipBytes.resetReaderIndex();
            return new MethodRpcRequest(new String(this.method), objArr);
        }

        @Override // org.jetlinks.rule.engine.api.Encoder
        public Payload encode(MethodRpcRequest methodRpcRequest) {
            ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(4 + this.method.length);
            buffer.writeBytes(BytesUtils.intToBe(this.method.length));
            buffer.writeBytes(this.method);
            Object[] args = methodRpcRequest.getArgs();
            if (args != null) {
                int length = args.length;
                for (int i = 0; i < length; i++) {
                    ByteBuf body = this.parameter.get(i).encode(args[i]).getBody();
                    buffer.writeBytes(BytesUtils.intToBe(body.capacity()));
                    buffer.writeBytes(body);
                }
            }
            return Payload.of(buffer);
        }

        public MethodRpcRequestCodec(byte[] bArr, List<Codec> list) {
            this.method = bArr;
            this.parameter = list;
        }
    }

    /* loaded from: input_file:org/jetlinks/rule/engine/defaults/rpc/DefaultRpcServiceFactory$RpcRequestCodec.class */
    static class RpcRequestCodec implements Codec<Tuple2<String, Payload>> {
        RpcRequestCodec() {
        }

        @Override // org.jetlinks.rule.engine.api.Decoder
        @Nullable
        public Tuple2<String, Payload> decode(@Nonnull Payload payload) {
            ByteBuf body = payload.getBody();
            byte[] bArr = new byte[4];
            body.readBytes(bArr);
            ByteBuf readBytes = body.readBytes(BytesUtils.beToInt(bArr));
            body.resetReaderIndex();
            return Tuples.of(readBytes.toString(StandardCharsets.UTF_8), payload);
        }

        @Override // org.jetlinks.rule.engine.api.Encoder
        public Payload encode(Tuple2<String, Payload> tuple2) {
            byte[] bytes = ((String) tuple2.getT1()).getBytes();
            ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(4 + bytes.length);
            buffer.writeBytes(BytesUtils.intToBe(bytes.length));
            buffer.writeBytes(bytes);
            return Payload.of(buffer);
        }
    }

    @Override // org.jetlinks.rule.engine.api.rpc.RpcServiceFactory
    public <T> T createProducer(String str, Class<T> cls) {
        if (!cls.isInterface()) {
            throw new UnsupportedOperationException("only support interface class");
        }
        HashMap hashMap = new HashMap();
        for (Method method : cls.getMethods()) {
            hashMap.put(method, createRpcDefinition(cls, method));
        }
        RpcDefinition of = RpcDefinition.of(str, DirectCodec.INSTANCE, DirectCodec.INSTANCE);
        return (T) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{cls}, (obj, method2, objArr) -> {
            RpcDefinition rpcDefinition = (RpcDefinition) hashMap.get(method2);
            Flux flatMap = this.rpcService.invoke((RpcDefinition<RpcDefinition, RES>) of, (RpcDefinition) rpcDefinition.requestCodec().encode(new MethodRpcRequest(rpcDefinition.getAddress(), objArr))).flatMap(payload -> {
                return Mono.justOrEmpty(rpcDefinition.responseCodec().decode(payload));
            });
            return method2.getReturnType().isAssignableFrom(Mono.class) ? Mono.from(flatMap) : flatMap;
        });
    }

    @Override // org.jetlinks.rule.engine.api.rpc.RpcServiceFactory
    public <T> Disposable createConsumer(String str, Class<T> cls, final T t) {
        HashMap hashMap = new HashMap();
        for (final Method method : cls.getDeclaredMethods()) {
            final RpcDefinition<MethodRpcRequest, ?> createRpcDefinition = createRpcDefinition(t.getClass(), method);
            hashMap.put(createRpcDefinition.getAddress(), new Function<Payload, Publisher<Payload>>() { // from class: org.jetlinks.rule.engine.defaults.rpc.DefaultRpcServiceFactory.1
                /* JADX WARN: Multi-variable type inference failed */
                @Override // java.util.function.Function
                public Publisher<Payload> apply(Payload payload) {
                    Object invoke = method.invoke(t, ((MethodRpcRequest) createRpcDefinition.requestCodec().decode(payload)).getArgs());
                    Codec responseCodec = createRpcDefinition.responseCodec();
                    return invoke instanceof Mono ? ((Mono) invoke).map(obj -> {
                        return responseCodec.encode(obj);
                    }) : invoke instanceof Flux ? ((Flux) invoke).map(obj2 -> {
                        return responseCodec.encode(obj2);
                    }) : Mono.just(responseCodec.encode(invoke));
                }
            });
        }
        return this.rpcService.listen(RpcDefinition.of(str, requestCodec, DirectCodec.INSTANCE), (str2, tuple2) -> {
            return (Publisher) ((Function) hashMap.get(tuple2.getT1())).apply(tuple2.getT2());
        });
    }

    private RpcDefinition<MethodRpcRequest, ?> createRpcDefinition(Class<?> cls, Method method) {
        int parameterCount = method.getParameterCount();
        ArrayList arrayList = new ArrayList();
        StringBuilder sb = new StringBuilder(method.getName());
        for (int i = 0; i < parameterCount; i++) {
            ResolvableType forMethodParameter = ResolvableType.forMethodParameter(method, i);
            Class resolve = forMethodParameter.resolve(Object.class);
            arrayList.add(Codecs.lookup(forMethodParameter));
            sb.append(resolve.getSimpleName()).append(",");
        }
        MethodRpcRequestCodec methodRpcRequestCodec = new MethodRpcRequestCodec(sb.toString().getBytes(), arrayList);
        ResolvableType forMethodReturnType = ResolvableType.forMethodReturnType(method, cls);
        if (Publisher.class.isAssignableFrom(forMethodReturnType.toClass())) {
            return RpcDefinition.of(sb.toString(), methodRpcRequestCodec, Codecs.lookup(forMethodReturnType));
        }
        throw new UnsupportedOperationException("unsupported return type:" + forMethodReturnType);
    }

    public DefaultRpcServiceFactory(RpcService rpcService) {
        this.rpcService = rpcService;
    }
}
