/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.proxy.server;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundInvoker;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.BinaryProtoLookupService;
import org.apache.pulsar.common.api.proto.CommandGetSchema;
import org.apache.pulsar.common.api.proto.CommandGetSchemaResponse;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandLookupTopic;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.netty.NettyChannelUtil;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.apache.pulsar.proxy.server.BrokerDiscoveryProvider;
import org.apache.pulsar.proxy.server.LookupProxyHandler;
import org.apache.pulsar.proxy.server.ProxyConnection;
import org.apache.pulsar.proxy.server.ProxyService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultLookupProxyHandler
implements LookupProxyHandler {
    protected final String throttlingErrorMessage = "Too many concurrent lookup and partitionsMetadata requests";
    protected ProxyConnection proxyConnection;
    protected BrokerDiscoveryProvider discoveryProvider;
    protected boolean connectWithTLS;
    protected SocketAddress clientAddress;
    protected String brokerServiceURL;
    protected Semaphore lookupRequestSemaphore;
    private static final Logger log = LoggerFactory.getLogger(LookupProxyHandler.class);

    @Override
    public void initialize(ProxyService proxy, ProxyConnection proxyConnection) {
        this.discoveryProvider = proxy.getDiscoveryProvider();
        this.lookupRequestSemaphore = proxy.getLookupRequestSemaphore();
        this.proxyConnection = proxyConnection;
        this.clientAddress = proxyConnection.clientAddress();
        this.connectWithTLS = proxy.getConfiguration().isTlsEnabledWithBroker();
        this.brokerServiceURL = this.connectWithTLS ? proxy.getConfiguration().getBrokerServiceURLTLS() : proxy.getConfiguration().getBrokerServiceURL();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public void handleLookup(CommandLookupTopic lookup) {
        if (log.isDebugEnabled()) {
            log.debug("Received Lookup from {}", (Object)this.clientAddress);
        }
        long clientRequestId = lookup.getRequestId();
        if (this.lookupRequestSemaphore.tryAcquire()) {
            try {
                LOOKUP_REQUESTS.inc();
                String serviceUrl = this.getBrokerServiceUrl(clientRequestId);
                if (serviceUrl == null) return;
                this.performLookup(clientRequestId, lookup.getTopic(), serviceUrl, false, 10).whenComplete((brokerUrl, ex) -> {
                    if (ex != null) {
                        ServerError serverError = ex instanceof LookupException ? ((LookupException)ex).getServerError() : this.getServerError((Throwable)ex);
                        this.proxyConnection.ctx().writeAndFlush((Object)Commands.newLookupErrorResponse((ServerError)serverError, (String)ex.getMessage(), (long)clientRequestId));
                    } else {
                        this.proxyConnection.ctx().writeAndFlush((Object)Commands.newLookupResponse((String)brokerUrl, (String)brokerUrl, (boolean)true, (CommandLookupTopicResponse.LookupType)CommandLookupTopicResponse.LookupType.Connect, (long)clientRequestId, (boolean)true));
                    }
                });
                return;
            }
            finally {
                this.lookupRequestSemaphore.release();
            }
        } else {
            REJECTED_LOOKUP_REQUESTS.inc();
            if (log.isDebugEnabled()) {
                log.debug("Lookup Request ID {} from {} rejected - {}.", new Object[]{clientRequestId, this.clientAddress, "Too many concurrent lookup and partitionsMetadata requests"});
            }
            this.writeAndFlush(Commands.newLookupErrorResponse((ServerError)ServerError.TooManyRequests, (String)"Too many concurrent lookup and partitionsMetadata requests", (long)clientRequestId));
        }
    }

    protected CompletableFuture<String> performLookup(long clientRequestId, String topic, String brokerServiceUrl, boolean authoritative, int numberOfRetries) {
        URI brokerURI;
        if (numberOfRetries == 0) {
            this.writeAndFlush(Commands.newLookupErrorResponse((ServerError)ServerError.ServiceNotReady, (String)"Reached max number of redirections", (long)clientRequestId));
            return FutureUtil.failedFuture((Throwable)new LookupException(ServerError.ServiceNotReady, "Reached max number of redirections"));
        }
        try {
            brokerURI = new URI(brokerServiceUrl);
        }
        catch (URISyntaxException e) {
            this.writeAndFlush(Commands.newLookupErrorResponse((ServerError)ServerError.MetadataError, (String)e.getMessage(), (long)clientRequestId));
            return FutureUtil.failedFuture((Throwable)new LookupException(ServerError.MetadataError, e.getMessage()));
        }
        InetSocketAddress addr = InetSocketAddress.createUnresolved(brokerURI.getHost(), brokerURI.getPort());
        if (log.isDebugEnabled()) {
            log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", new Object[]{addr, topic, clientRequestId});
        }
        CompletableFuture<String> brokerUrlFuture = new CompletableFuture<String>();
        ((CompletableFuture)this.proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
            long requestId = this.proxyConnection.newRequestId();
            ByteBuf command = Commands.newLookup((String)topic, (boolean)authoritative, (long)requestId);
            clientCnx.newLookup(command, requestId).whenComplete((r, t) -> {
                if (t != null) {
                    log.warn("[{}] Failed to lookup topic {}: {}", new Object[]{this.clientAddress, topic, t.getMessage()});
                    this.writeAndFlush(Commands.newLookupErrorResponse((ServerError)this.getServerError((Throwable)t), (String)t.getMessage(), (long)clientRequestId));
                    brokerUrlFuture.completeExceptionally((Throwable)t);
                } else {
                    String brokerUrl = this.resolveBrokerUrlFromLookupDataResult((BinaryProtoLookupService.LookupDataResult)r);
                    if (r.redirect) {
                        this.performLookup(clientRequestId, topic, brokerUrl, r.authoritative, numberOfRetries - 1).whenComplete((result, ex) -> {
                            if (ex != null) {
                                brokerUrlFuture.completeExceptionally((Throwable)ex);
                            } else {
                                brokerUrlFuture.complete((String)result);
                            }
                        });
                    } else {
                        if (log.isDebugEnabled()) {
                            log.debug("Successfully perform lookup '{}' for topic '{}' with clientReq Id '{}' and lookup-broker {}", new Object[]{addr, topic, clientRequestId, brokerUrl});
                        }
                        brokerUrlFuture.complete(brokerUrl);
                    }
                }
                this.proxyConnection.getConnectionPool().releaseConnection(clientCnx);
            });
        })).exceptionally(ex -> {
            this.writeAndFlush(Commands.newLookupErrorResponse((ServerError)this.getServerError((Throwable)ex), (String)ex.getMessage(), (long)clientRequestId));
            brokerUrlFuture.completeExceptionally((Throwable)ex);
            return null;
        });
        return brokerUrlFuture;
    }

    protected String resolveBrokerUrlFromLookupDataResult(BinaryProtoLookupService.LookupDataResult r) {
        return this.connectWithTLS ? r.brokerUrlTls : r.brokerUrl;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata partitionMetadata) {
        PARTITIONS_METADATA_REQUESTS.inc();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received PartitionMetadataLookup", (Object)this.clientAddress);
        }
        long clientRequestId = partitionMetadata.getRequestId();
        if (this.lookupRequestSemaphore.tryAcquire()) {
            try {
                this.handlePartitionMetadataResponse(partitionMetadata, clientRequestId);
            }
            finally {
                this.lookupRequestSemaphore.release();
            }
        } else {
            REJECTED_PARTITIONS_METADATA_REQUESTS.inc();
            if (log.isDebugEnabled()) {
                log.debug("PartitionMetaData Request ID {} from {} rejected - {}.", new Object[]{clientRequestId, this.clientAddress, "Too many concurrent lookup and partitionsMetadata requests"});
            }
            this.writeAndFlush(Commands.newPartitionMetadataResponse((ServerError)ServerError.ServiceNotReady, (String)"Too many concurrent lookup and partitionsMetadata requests", (long)clientRequestId));
        }
    }

    private void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata partitionMetadata, long clientRequestId) {
        TopicName topicName = TopicName.get((String)partitionMetadata.getTopic());
        String serviceUrl = this.getBrokerServiceUrl(clientRequestId);
        if (serviceUrl == null) {
            log.warn("No available broker for {} to lookup partition metadata", (Object)topicName);
            return;
        }
        InetSocketAddress addr = this.getAddr(serviceUrl, clientRequestId);
        if (addr == null) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("Getting connections to '{}' for Looking up topic '{}' with clientReq Id '{}'", new Object[]{addr, topicName.getPartitionedTopicName(), clientRequestId});
        }
        ((CompletableFuture)this.proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
            long requestId = this.proxyConnection.newRequestId();
            ByteBuf command = Commands.newPartitionMetadataRequest((String)topicName.toString(), (long)requestId, (boolean)true);
            clientCnx.newLookup(command, requestId).whenComplete((r, t) -> {
                if (t != null) {
                    log.warn("[{}] failed to get Partitioned metadata : {}", new Object[]{topicName.toString(), t.getMessage(), t});
                    this.writeAndFlush(Commands.newLookupErrorResponse((ServerError)this.getServerError((Throwable)t), (String)t.getMessage(), (long)clientRequestId));
                } else {
                    this.writeAndFlush(Commands.newPartitionMetadataResponse((int)r.partitions, (long)clientRequestId));
                }
                this.proxyConnection.getConnectionPool().releaseConnection(clientCnx);
            });
        })).exceptionally(ex -> {
            this.writeAndFlush(Commands.newPartitionMetadataResponse((ServerError)this.getServerError((Throwable)ex), (String)ex.getMessage(), (long)clientRequestId));
            return null;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
        GET_TOPICS_OF_NAMESPACE_REQUESTS.inc();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received GetTopicsOfNamespace", (Object)this.clientAddress);
        }
        long requestId = commandGetTopicsOfNamespace.getRequestId();
        if (this.lookupRequestSemaphore.tryAcquire()) {
            try {
                this.handleGetTopicsOfNamespace(commandGetTopicsOfNamespace, requestId);
            }
            finally {
                this.lookupRequestSemaphore.release();
            }
        } else {
            REJECTED_GET_TOPICS_OF_NAMESPACE_REQUESTS.inc();
            if (log.isDebugEnabled()) {
                log.debug("GetTopicsOfNamespace Request ID {} from {} rejected - {}.", new Object[]{requestId, this.clientAddress, "Too many concurrent lookup and partitionsMetadata requests"});
            }
            this.writeAndFlush(Commands.newError((long)requestId, (ServerError)ServerError.ServiceNotReady, (String)"Too many concurrent lookup and partitionsMetadata requests"));
        }
    }

    private void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace, long clientRequestId) {
        String serviceUrl = this.getBrokerServiceUrl(clientRequestId);
        if (!StringUtils.isNotBlank((CharSequence)serviceUrl)) {
            return;
        }
        String topicsPattern = commandGetTopicsOfNamespace.hasTopicsPattern() ? commandGetTopicsOfNamespace.getTopicsPattern() : null;
        String topicsHash = commandGetTopicsOfNamespace.hasTopicsHash() ? commandGetTopicsOfNamespace.getTopicsHash() : null;
        this.performGetTopicsOfNamespace(clientRequestId, commandGetTopicsOfNamespace.getNamespace(), serviceUrl, 10, topicsPattern, topicsHash, commandGetTopicsOfNamespace.getMode());
    }

    private void performGetTopicsOfNamespace(long clientRequestId, String namespaceName, String brokerServiceUrl, int numberOfRetries, String topicsPattern, String topicsHash, CommandGetTopicsOfNamespace.Mode mode) {
        if (numberOfRetries == 0) {
            this.writeAndFlush(Commands.newError((long)clientRequestId, (ServerError)ServerError.ServiceNotReady, (String)"Reached max number of redirections"));
            return;
        }
        InetSocketAddress addr = this.getAddr(brokerServiceUrl, clientRequestId);
        if (addr == null) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("Getting connections to '{}' for getting TopicsOfNamespace '{}' with clientReq Id '{}'", new Object[]{addr, namespaceName, clientRequestId});
        }
        ((CompletableFuture)this.proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
            long requestId = this.proxyConnection.newRequestId();
            ByteBuf command = Commands.newGetTopicsOfNamespaceRequest((String)namespaceName, (long)requestId, (CommandGetTopicsOfNamespace.Mode)mode, (String)topicsPattern, (String)topicsHash);
            clientCnx.newGetTopicsOfNamespace(command, requestId).whenComplete((r, t) -> {
                if (t != null) {
                    log.warn("[{}] Failed to get TopicsOfNamespace {}: {}", new Object[]{this.clientAddress, namespaceName, t.getMessage()});
                    this.writeAndFlush(Commands.newError((long)clientRequestId, (ServerError)this.getServerError((Throwable)t), (String)t.getMessage()));
                } else {
                    this.writeAndFlush(Commands.newGetTopicsOfNamespaceResponse((List)r.getTopics(), (String)r.getTopicsHash(), (boolean)r.isFiltered(), (boolean)r.isChanged(), (long)clientRequestId));
                }
            });
            this.proxyConnection.getConnectionPool().releaseConnection(clientCnx);
        })).exceptionally(ex -> {
            this.writeAndFlush(Commands.newError((long)clientRequestId, (ServerError)this.getServerError((Throwable)ex), (String)ex.getMessage()));
            return null;
        });
    }

    @Override
    public void handleGetSchema(CommandGetSchema commandGetSchema) {
        GET_SCHEMA_REQUESTS.inc();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received GetSchema {}", (Object)this.clientAddress, (Object)commandGetSchema);
        }
        long clientRequestId = commandGetSchema.getRequestId();
        String serviceUrl = this.getBrokerServiceUrl(clientRequestId);
        String topic = commandGetSchema.getTopic();
        Optional<Object> schemaVersion = commandGetSchema.hasSchemaVersion() ? Optional.of(commandGetSchema.getSchemaVersion()).map(BytesSchemaVersion::of) : Optional.empty();
        if (!StringUtils.isNotBlank((CharSequence)serviceUrl)) {
            return;
        }
        InetSocketAddress addr = this.getAddr(serviceUrl, clientRequestId);
        if (addr == null) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("Getting connections to '{}' for getting schema of topic '{}' with clientReq Id '{}'", new Object[]{addr, topic, clientRequestId});
        }
        ((CompletableFuture)this.proxyConnection.getConnectionPool().getConnection(addr).thenAccept(clientCnx -> {
            long requestId = this.proxyConnection.newRequestId();
            ByteBuf command = Commands.newGetSchema((long)requestId, (String)topic, (Optional)schemaVersion);
            clientCnx.sendGetRawSchema(command, requestId).whenComplete((r, t) -> {
                if (t != null) {
                    log.warn("[{}] Failed to get schema {}: {}", new Object[]{this.clientAddress, topic, t});
                    this.writeAndFlush(Commands.newError((long)clientRequestId, (ServerError)this.getServerError((Throwable)t), (String)t.getMessage()));
                } else {
                    this.writeAndFlush(Commands.newGetSchemaResponse((long)clientRequestId, (CommandGetSchemaResponse)r));
                }
                this.proxyConnection.getConnectionPool().releaseConnection(clientCnx);
            });
        })).exceptionally(ex -> {
            this.writeAndFlush(Commands.newError((long)clientRequestId, (ServerError)this.getServerError((Throwable)ex), (String)ex.getMessage()));
            return null;
        });
    }

    protected String getBrokerServiceUrl(long clientRequestId) {
        LoadManagerReport availableBroker;
        if (StringUtils.isNotBlank((CharSequence)this.brokerServiceURL)) {
            return this.brokerServiceURL;
        }
        try {
            availableBroker = this.discoveryProvider.nextBroker();
        }
        catch (Exception e) {
            log.warn("[{}] Failed to get next active broker {}", new Object[]{this.clientAddress, e.getMessage(), e});
            this.writeAndFlush(Commands.newError((long)clientRequestId, (ServerError)ServerError.ServiceNotReady, (String)e.getMessage()));
            return null;
        }
        return this.connectWithTLS ? availableBroker.getPulsarServiceUrlTls() : availableBroker.getPulsarServiceUrl();
    }

    private InetSocketAddress getAddr(String brokerServiceUrl, long clientRequestId) {
        URI brokerURI;
        try {
            brokerURI = new URI(brokerServiceUrl);
        }
        catch (URISyntaxException e) {
            this.writeAndFlush(Commands.newError((long)clientRequestId, (ServerError)ServerError.MetadataError, (String)e.getMessage()));
            return null;
        }
        return InetSocketAddress.createUnresolved(brokerURI.getHost(), brokerURI.getPort());
    }

    protected ServerError getServerError(Throwable error) {
        ServerError responseError = error instanceof PulsarClientException.AuthorizationException ? ServerError.AuthorizationError : (error instanceof PulsarClientException.AuthenticationException ? ServerError.AuthenticationError : ServerError.ServiceNotReady);
        return responseError;
    }

    private void writeAndFlush(ByteBuf cmd) {
        ChannelHandlerContext ctx = this.proxyConnection.ctx();
        NettyChannelUtil.writeAndFlushWithVoidPromise((ChannelOutboundInvoker)ctx, (ByteBuf)cmd);
    }

    protected static class LookupException
    extends RuntimeException {
        private final ServerError serverError;

        public LookupException(ServerError serverError, String message) {
            super(message);
            this.serverError = serverError;
        }

        public ServerError getServerError() {
            return this.serverError;
        }
    }
}

