package com.linkedin.venice.router;

import com.linkedin.alpini.base.concurrency.AsyncFuture;
import com.linkedin.alpini.base.concurrency.TimeoutProcessor;
import com.linkedin.alpini.base.concurrency.impl.SuccessAsyncFuture;
import com.linkedin.alpini.base.registry.ResourceRegistry;
import com.linkedin.alpini.base.registry.ShutdownableExecutorService;
import com.linkedin.alpini.base.registry.ShutdownableExecutors;
import com.linkedin.alpini.netty4.ssl.SslInitializer;
import com.linkedin.alpini.router.api.LongTailRetrySupplier;
import com.linkedin.alpini.router.api.ScatterGatherHelper;
import com.linkedin.alpini.router.impl.Router;
import com.linkedin.venice.acl.DynamicAccessController;
import com.linkedin.venice.acl.handler.StoreAclHandler;
import com.linkedin.venice.authentication.AuthenticationService;
import com.linkedin.venice.authentication.AuthenticationServiceUtils;
import com.linkedin.venice.authorization.AuthorizerService;
import com.linkedin.venice.authorization.AuthorizerServiceUtils;
import com.linkedin.venice.compression.CompressorFactory;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.helix.HelixAdapterSerializer;
import com.linkedin.venice.helix.HelixBaseRoutingRepository;
import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository;
import com.linkedin.venice.helix.HelixHybridStoreQuotaRepository;
import com.linkedin.venice.helix.HelixInstanceConfigRepository;
import com.linkedin.venice.helix.HelixLiveInstanceMonitor;
import com.linkedin.venice.helix.HelixReadOnlySchemaRepository;
import com.linkedin.venice.helix.HelixReadOnlySchemaRepositoryAdapter;
import com.linkedin.venice.helix.HelixReadOnlyStoreConfigRepository;
import com.linkedin.venice.helix.HelixReadOnlyStoreRepository;
import com.linkedin.venice.helix.HelixReadOnlyStoreRepositoryAdapter;
import com.linkedin.venice.helix.HelixReadOnlyZKSharedSchemaRepository;
import com.linkedin.venice.helix.HelixReadOnlyZKSharedSystemStoreRepository;
import com.linkedin.venice.helix.SafeHelixManager;
import com.linkedin.venice.helix.ZkRoutersClusterManager;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.read.RequestType;
import com.linkedin.venice.router.api.DictionaryRetrievalService;
import com.linkedin.venice.router.api.MetaStoreShadowReader;
import com.linkedin.venice.router.api.RouterExceptionAndTrackingUtils;
import com.linkedin.venice.router.api.RouterHeartbeat;
import com.linkedin.venice.router.api.RouterKey;
import com.linkedin.venice.router.api.VeniceDelegateMode;
import com.linkedin.venice.router.api.VeniceDispatcher;
import com.linkedin.venice.router.api.VeniceHostFinder;
import com.linkedin.venice.router.api.VeniceHostHealth;
import com.linkedin.venice.router.api.VeniceMetricsProvider;
import com.linkedin.venice.router.api.VeniceMultiKeyRoutingStrategy;
import com.linkedin.venice.router.api.VenicePartitionFinder;
import com.linkedin.venice.router.api.VenicePathParser;
import com.linkedin.venice.router.api.VeniceResponseAggregator;
import com.linkedin.venice.router.api.VeniceRoleFinder;
import com.linkedin.venice.router.api.VeniceVersionFinder;
import com.linkedin.venice.router.api.path.VenicePath;
import com.linkedin.venice.router.api.routing.helix.HelixGroupSelector;
import com.linkedin.venice.router.httpclient.ApacheHttpAsyncStorageNodeClient;
import com.linkedin.venice.router.httpclient.HttpClient5StorageNodeClient;
import com.linkedin.venice.router.httpclient.StorageNodeClient;
import com.linkedin.venice.router.httpclient.StorageNodeClientType;
import com.linkedin.venice.router.stats.AdminOperationsStats;
import com.linkedin.venice.router.stats.AggHostHealthStats;
import com.linkedin.venice.router.stats.AggRouterHttpRequestStats;
import com.linkedin.venice.router.stats.HealthCheckStats;
import com.linkedin.venice.router.stats.LongTailRetryStatsProvider;
import com.linkedin.venice.router.stats.RouteHttpRequestStats;
import com.linkedin.venice.router.stats.RouterStats;
import com.linkedin.venice.router.stats.RouterThrottleStats;
import com.linkedin.venice.router.stats.SecurityStats;
import com.linkedin.venice.router.stats.StaleVersionStats;
import com.linkedin.venice.router.streaming.VeniceChunkedWriteHandler;
import com.linkedin.venice.router.throttle.NoopRouterThrottler;
import com.linkedin.venice.router.throttle.ReadRequestThrottler;
import com.linkedin.venice.router.throttle.RouterThrottler;
import com.linkedin.venice.router.utils.VeniceRouterUtils;
import com.linkedin.venice.security.SSLFactory;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.servicediscovery.ServiceDiscoveryAnnouncer;
import com.linkedin.venice.stats.TehutiUtils;
import com.linkedin.venice.stats.VeniceJVMStats;
import com.linkedin.venice.stats.ZkClientStatusStats;
import com.linkedin.venice.throttle.EventThrottler;
import com.linkedin.venice.utils.HelixUtils;
import com.linkedin.venice.utils.SslUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.tehuti.metrics.MetricsRepository;
import java.io.Closeable;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import javax.annotation.Nonnull;
import org.apache.helix.InstanceType;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/router/RouterServer.class */
public class RouterServer extends AbstractVeniceService {
    private static final Logger LOGGER = LogManager.getLogger(RouterServer.class);
    private final List<ServiceDiscoveryAnnouncer> serviceDiscoveryAnnouncers;
    private final MetricsRepository metricsRepository;
    private final Optional<SSLFactory> sslFactory;
    private final Optional<DynamicAccessController> accessController;
    private final Optional<AuthenticationService> authenticationService;
    private final Optional<AuthorizerService> authorizerService;
    private final VeniceRouterConfig config;
    private final ZkClient zkClient;
    private SafeHelixManager manager;
    private ReadOnlySchemaRepository schemaRepository;
    private Optional<MetaStoreShadowReader> metaStoreShadowReader;
    private HelixCustomizedViewOfflinePushRepository routingDataRepository;
    private Optional<HelixHybridStoreQuotaRepository> hybridStoreQuotaRepository;
    private ReadOnlyStoreRepository metadataRepository;
    private RouterStats<AggRouterHttpRequestStats> routerStats;
    private HelixReadOnlyStoreConfigRepository storeConfigRepository;
    private HelixLiveInstanceMonitor liveInstanceMonitor;
    private HelixInstanceConfigRepository instanceConfigRepository;
    private HelixGroupSelector helixGroupSelector;
    private VeniceResponseAggregator responseAggregator;
    private TimeoutProcessor timeoutProcessor;
    private AsyncFuture<SocketAddress> serverFuture;
    private AsyncFuture<SocketAddress> secureServerFuture;
    private ResourceRegistry registry;
    private StorageNodeClient storageNodeClient;
    private VeniceDispatcher dispatcher;
    private RouterHeartbeat heartbeat;
    private VeniceDelegateMode scatterGatherMode;
    private final HelixAdapterSerializer adapter;
    private ZkRoutersClusterManager routersClusterManager;
    private Optional<Router> router;
    private Router secureRouter;
    private DictionaryRetrievalService dictionaryRetrievalService;
    private RouterThrottler readRequestThrottler;
    private RouterThrottler noopRequestThrottler;
    private MultithreadEventLoopGroup workerEventLoopGroup;
    private MultithreadEventLoopGroup serverEventLoopGroup;
    private MultithreadEventLoopGroup sslResolverEventLoopGroup;
    private ExecutorService workerExecutor;
    private EventThrottler routerEarlyThrottler;
    private final Map<String, ChannelHandler> optionalChannelHandlers;
    private static final String ROUTER_SERVICE_NAME = "venice-router";
    private static final int ROUTER_BOSS_THREAD_NUM = 1;
    private VeniceJVMStats jvmStats;
    private final AggHostHealthStats aggHostHealthStats;

    /* renamed from: com.linkedin.venice.router.RouterServer$3, reason: invalid class name */
    /* loaded from: input_file:com/linkedin/venice/router/RouterServer$3.class */
    static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$com$linkedin$venice$router$httpclient$StorageNodeClientType = new int[StorageNodeClientType.values().length];

        static {
            try {
                $SwitchMap$com$linkedin$venice$router$httpclient$StorageNodeClientType[StorageNodeClientType.APACHE_HTTP_ASYNC_CLIENT.ordinal()] = RouterServer.ROUTER_BOSS_THREAD_NUM;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$linkedin$venice$router$httpclient$StorageNodeClientType[StorageNodeClientType.HTTP_CLIENT_5_CLIENT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public static void main(String[] strArr) throws Exception {
        if (strArr.length != ROUTER_BOSS_THREAD_NUM) {
            Utils.exit("USAGE: java -jar venice-router-all.jar <router_config_file_path>");
        }
        try {
            run(strArr[0], true);
        } catch (Exception e) {
            throw new VeniceException("No config file parameter found", e);
        }
    }

    public static void run(String str, boolean z) throws Exception {
        Optional empty;
        VeniceProperties parseProperties = Utils.parseProperties(str);
        LOGGER.info("Zookeeper: {}", parseProperties.getString("zookeeper.address"));
        LOGGER.info("Cluster: {}", parseProperties.getString("cluster.name"));
        LOGGER.info("Port: {}", Integer.valueOf(parseProperties.getInt("listener.port")));
        LOGGER.info("SSL Port: {}", Integer.valueOf(parseProperties.getInt("listener.ssl.port")));
        LOGGER.info("IO worker count: {}", Integer.valueOf(parseProperties.getInt("router.io.worker.count")));
        Optional buildAuthenticationService = AuthenticationServiceUtils.buildAuthenticationService(parseProperties);
        Optional buildAuthorizerService = AuthorizerServiceUtils.buildAuthorizerService(parseProperties);
        if (!parseProperties.getBoolean("router.enable.ssl", true)) {
            empty = Optional.empty();
        } else if (parseProperties.getBoolean("router.local.ssl", true)) {
            empty = Optional.of(SslUtils.getVeniceLocalSslFactory());
        } else {
            empty = Optional.of(SslUtils.getSSLFactory(parseProperties.toProperties(), parseProperties.getString("ssl.factory.class.name", "com.linkedin.venice.security.DefaultSSLFactory")));
        }
        RouterServer routerServer = new RouterServer(parseProperties, new ArrayList(), Optional.empty(), buildAuthenticationService, buildAuthorizerService, empty);
        routerServer.start();
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: com.linkedin.venice.router.RouterServer.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                if (RouterServer.this.isRunning()) {
                    try {
                        RouterServer.this.stop();
                    } catch (Exception e) {
                        RouterServer.LOGGER.error("Error shutting the server. ", e);
                    }
                }
            }
        });
        if (!z) {
            return;
        }
        while (true) {
            Thread.sleep(TimeUnit.HOURS.toMillis(1L));
        }
    }

    public RouterServer(VeniceProperties veniceProperties, List<ServiceDiscoveryAnnouncer> list, Optional<DynamicAccessController> optional, Optional<AuthenticationService> optional2, Optional<AuthorizerService> optional3, Optional<SSLFactory> optional4) {
        this(veniceProperties, list, optional, optional2, optional3, optional4, TehutiUtils.getMetricsRepository(ROUTER_SERVICE_NAME));
    }

    public MetricsRepository getMetricsRepository() {
        return this.metricsRepository;
    }

    public RouterServer(VeniceProperties veniceProperties, List<ServiceDiscoveryAnnouncer> list, Optional<DynamicAccessController> optional, Optional<AuthenticationService> optional2, Optional<AuthorizerService> optional3, Optional<SSLFactory> optional4, MetricsRepository metricsRepository) {
        this(veniceProperties, list, optional, optional2, optional3, optional4, metricsRepository, true);
        HelixReadOnlyZKSharedSystemStoreRepository helixReadOnlyZKSharedSystemStoreRepository = new HelixReadOnlyZKSharedSystemStoreRepository(this.zkClient, this.adapter, this.config.getSystemSchemaClusterName());
        HelixReadOnlyStoreRepository helixReadOnlyStoreRepository = new HelixReadOnlyStoreRepository(this.zkClient, this.adapter, this.config.getClusterName(), this.config.getRefreshAttemptsForZkReconnect(), this.config.getRefreshIntervalForZkReconnectInMs());
        this.metadataRepository = new HelixReadOnlyStoreRepositoryAdapter(helixReadOnlyZKSharedSystemStoreRepository, helixReadOnlyStoreRepository, this.config.getClusterName());
        this.routerStats = new RouterStats<>(requestType -> {
            return new AggRouterHttpRequestStats(metricsRepository, requestType, this.config.isKeyValueProfilingEnabled(), this.metadataRepository, this.config.isUnregisterMetricForDeletedStoreEnabled());
        });
        this.schemaRepository = new HelixReadOnlySchemaRepositoryAdapter(new HelixReadOnlyZKSharedSchemaRepository(helixReadOnlyZKSharedSystemStoreRepository, this.zkClient, this.adapter, this.config.getSystemSchemaClusterName(), this.config.getRefreshAttemptsForZkReconnect(), this.config.getRefreshIntervalForZkReconnectInMs()), new HelixReadOnlySchemaRepository(helixReadOnlyStoreRepository, this.zkClient, this.adapter, this.config.getClusterName(), this.config.getRefreshAttemptsForZkReconnect(), this.config.getRefreshIntervalForZkReconnectInMs()));
        this.metaStoreShadowReader = this.config.isMetaStoreShadowReadEnabled() ? Optional.of(new MetaStoreShadowReader(this.schemaRepository)) : Optional.empty();
        this.routingDataRepository = new HelixCustomizedViewOfflinePushRepository(this.manager, this.metadataRepository);
        this.hybridStoreQuotaRepository = this.config.isHelixHybridStoreQuotaEnabled() ? Optional.of(new HelixHybridStoreQuotaRepository(this.manager)) : Optional.empty();
        this.storeConfigRepository = new HelixReadOnlyStoreConfigRepository(this.zkClient, this.adapter, this.config.getRefreshAttemptsForZkReconnect(), this.config.getRefreshIntervalForZkReconnectInMs());
        this.liveInstanceMonitor = new HelixLiveInstanceMonitor(this.zkClient, this.config.getClusterName());
    }

    private RouterServer(VeniceProperties veniceProperties, List<ServiceDiscoveryAnnouncer> list, Optional<DynamicAccessController> optional, Optional<AuthenticationService> optional2, Optional<AuthorizerService> optional3, Optional<SSLFactory> optional4, MetricsRepository metricsRepository, boolean z) {
        this.serverFuture = null;
        this.secureServerFuture = null;
        this.registry = null;
        this.heartbeat = null;
        this.router = Optional.empty();
        this.optionalChannelHandlers = new LinkedHashMap();
        this.config = new VeniceRouterConfig(veniceProperties);
        this.zkClient = new ZkClient(this.config.getZkConnection(), 30000, 60000);
        this.zkClient.subscribeStateChanges(new ZkClientStatusStats(metricsRepository, "router-zk-client"));
        this.adapter = new HelixAdapterSerializer();
        if (z) {
            this.manager = new SafeHelixManager(new ZKHelixManager(this.config.getClusterName(), (String) null, InstanceType.SPECTATOR, this.config.getZkConnection()));
        }
        this.metaStoreShadowReader = Optional.empty();
        this.metricsRepository = metricsRepository;
        this.aggHostHealthStats = new AggHostHealthStats(metricsRepository);
        this.serviceDiscoveryAnnouncers = list;
        this.accessController = optional;
        this.authenticationService = optional2;
        this.authorizerService = optional3;
        this.sslFactory = optional4;
        verifySslOk();
    }

    public RouterServer(VeniceProperties veniceProperties, HelixCustomizedViewOfflinePushRepository helixCustomizedViewOfflinePushRepository, Optional<HelixHybridStoreQuotaRepository> optional, HelixReadOnlyStoreRepository helixReadOnlyStoreRepository, HelixReadOnlySchemaRepository helixReadOnlySchemaRepository, HelixReadOnlyStoreConfigRepository helixReadOnlyStoreConfigRepository, List<ServiceDiscoveryAnnouncer> list, Optional<SSLFactory> optional2, HelixLiveInstanceMonitor helixLiveInstanceMonitor) {
        this(veniceProperties, list, Optional.empty(), Optional.empty(), Optional.empty(), optional2, new MetricsRepository(), false);
        this.routingDataRepository = helixCustomizedViewOfflinePushRepository;
        this.hybridStoreQuotaRepository = optional;
        this.metadataRepository = helixReadOnlyStoreRepository;
        this.routerStats = new RouterStats<>(requestType -> {
            return new AggRouterHttpRequestStats(this.metricsRepository, requestType, this.config.isKeyValueProfilingEnabled(), helixReadOnlyStoreRepository, this.config.isUnregisterMetricForDeletedStoreEnabled());
        });
        this.schemaRepository = helixReadOnlySchemaRepository;
        this.storeConfigRepository = helixReadOnlyStoreConfigRepository;
        this.liveInstanceMonitor = helixLiveInstanceMonitor;
    }

    public boolean startInner() throws Exception {
        Class cls;
        SslInitializer sslInitializer;
        ResourceRegistry.setGlobalShutdownDelayMillis(TimeUnit.SECONDS.toMillis(this.config.getRouterNettyGracefulShutdownPeriodSeconds() * 2));
        this.jvmStats = new VeniceJVMStats(this.metricsRepository, "VeniceJVMStats");
        this.metadataRepository.refresh();
        this.storeConfigRepository.refresh();
        this.registry = new ResourceRegistry();
        this.workerExecutor = this.registry.factory(ShutdownableExecutors.class).newCachedThreadPool(new DefaultThreadFactory("RouterThread", true, 10));
        this.timeoutProcessor = new TimeoutProcessor(this.registry, true, ROUTER_BOSS_THREAD_NUM);
        Optional<SSLFactory> empty = Optional.empty();
        if (this.config.isSslToStorageNodes()) {
            if (!this.sslFactory.isPresent()) {
                throw new VeniceException("SSLFactory is required when enabling ssl to storage nodes");
            }
            empty = this.config.isHttpClientOpensslEnabled() ? Optional.of(SslUtils.toSSLFactoryWithOpenSSLSupport(this.sslFactory.get())) : this.sslFactory;
        }
        VenicePartitionFinder venicePartitionFinder = new VenicePartitionFinder(this.routingDataRepository, this.metadataRepository);
        boolean z = ROUTER_BOSS_THREAD_NUM;
        try {
            this.serverEventLoopGroup = new EpollEventLoopGroup(ROUTER_BOSS_THREAD_NUM);
            this.workerEventLoopGroup = new EpollEventLoopGroup(this.config.getRouterIOWorkerCount(), this.workerExecutor);
            cls = EpollServerSocketChannel.class;
        } catch (LinkageError e) {
            z = false;
            LOGGER.info("Epoll is only supported on Linux; switching to NIO");
            this.serverEventLoopGroup = new NioEventLoopGroup(ROUTER_BOSS_THREAD_NUM);
            this.workerEventLoopGroup = new NioEventLoopGroup(this.config.getRouterIOWorkerCount(), this.workerExecutor);
            cls = NioServerSocketChannel.class;
        }
        switch (AnonymousClass3.$SwitchMap$com$linkedin$venice$router$httpclient$StorageNodeClientType[this.config.getStorageNodeClientType().ordinal()]) {
            case ROUTER_BOSS_THREAD_NUM /* 1 */:
                LOGGER.info("Router will use Apache_Http_Async_Client");
                this.storageNodeClient = new ApacheHttpAsyncStorageNodeClient(this.config, empty, this.metricsRepository, this.liveInstanceMonitor);
                break;
            case 2:
                LOGGER.info("Router will use HTTP CLIENT5");
                this.storageNodeClient = new HttpClient5StorageNodeClient(empty, this.config);
                break;
            default:
                throw new VeniceException("Router client type " + this.config.getStorageNodeClientType().toString() + " is not supported!");
        }
        RouteHttpRequestStats routeHttpRequestStats = new RouteHttpRequestStats(this.metricsRepository, this.storageNodeClient);
        VeniceHostHealth veniceHostHealth = new VeniceHostHealth(this.liveInstanceMonitor, this.storageNodeClient, this.config, routeHttpRequestStats, this.aggHostHealthStats);
        this.dispatcher = new VeniceDispatcher(this.config, this.metadataRepository, this.routerStats, this.metricsRepository, this.storageNodeClient, routeHttpRequestStats, this.aggHostHealthStats, this.routerStats);
        this.scatterGatherMode = new VeniceDelegateMode(this.config, this.routerStats, routeHttpRequestStats);
        if (this.config.isRouterHeartBeatEnabled()) {
            this.heartbeat = new RouterHeartbeat(this.liveInstanceMonitor, veniceHostHealth, this.config, empty, this.storageNodeClient);
            this.heartbeat.startInner();
        }
        CompressorFactory compressorFactory = new CompressorFactory();
        this.dictionaryRetrievalService = new DictionaryRetrievalService(this.routingDataRepository, this.config, empty, this.metadataRepository, this.storageNodeClient, compressorFactory);
        MetaDataHandler metaDataHandler = new MetaDataHandler(this.routingDataRepository, this.schemaRepository, this.storeConfigRepository, this.config.getClusterToD2Map(), this.config.getClusterToServerD2Map(), this.metadataRepository, this.hybridStoreQuotaRepository, this.config.getClusterName(), this.config.getZkConnection(), this.config.getKafkaBootstrapServers());
        VeniceHostFinder veniceHostFinder = new VeniceHostFinder(this.routingDataRepository, this.routerStats, veniceHostHealth);
        VenicePathParser venicePathParser = new VenicePathParser(new VeniceVersionFinder(this.metadataRepository, this.routingDataRepository, new StaleVersionStats(this.metricsRepository, "stale_version"), this.storeConfigRepository, this.config.getClusterToD2Map(), this.config.getClusterName(), compressorFactory), venicePartitionFinder, this.routerStats, this.metadataRepository, this.config, compressorFactory);
        RouterExceptionAndTrackingUtils.setRouterStats(this.routerStats);
        VeniceRouterConfig veniceRouterConfig = this.config;
        Objects.requireNonNull(veniceRouterConfig);
        final SuccessAsyncFuture successAsyncFuture = new SuccessAsyncFuture(veniceRouterConfig::getLongTailRetryForSingleGetThresholdMs);
        LongTailRetrySupplier<VenicePath, RouterKey> longTailRetrySupplier = new LongTailRetrySupplier<VenicePath, RouterKey>() { // from class: com.linkedin.venice.router.RouterServer.2
            private final TreeMap<Integer, Integer> longTailRetryConfigForBatchGet;

            {
                this.longTailRetryConfigForBatchGet = RouterServer.this.config.getLongTailRetryForBatchGetThresholdMs();
            }

            @Nonnull
            public AsyncFuture<LongSupplier> getLongTailRetryMilliseconds(@Nonnull VenicePath venicePath, @Nonnull String str) {
                if (VeniceRouterUtils.isHttpGet(str)) {
                    venicePath.setLongTailRetryThresholdMs(RouterServer.this.config.getLongTailRetryForSingleGetThresholdMs());
                    return successAsyncFuture;
                }
                int size = venicePath.getPartitionKeys().size();
                if (size == 0) {
                    throw new VeniceException("Met scatter-gather request without any keys");
                }
                int intValue = this.longTailRetryConfigForBatchGet.floorEntry(Integer.valueOf(size)).getValue().intValue();
                venicePath.setLongTailRetryThresholdMs(intValue);
                return new SuccessAsyncFuture(() -> {
                    return intValue;
                });
            }
        };
        this.responseAggregator = new VeniceResponseAggregator(this.routerStats, this.metaStoreShadowReader);
        ScatterGatherHelper build = ScatterGatherHelper.builder().roleFinder(new VeniceRoleFinder()).pathParserExtended(venicePathParser).partitionFinder(venicePartitionFinder).hostFinder(veniceHostFinder).dispatchHandler(this.dispatcher).scatterMode(this.scatterGatherMode).responseAggregatorFactory(this.responseAggregator.withSingleGetTardyThreshold(this.config.getSingleGetTardyLatencyThresholdMs(), TimeUnit.MILLISECONDS).withMultiGetTardyThreshold(this.config.getMultiGetTardyLatencyThresholdMs(), TimeUnit.MILLISECONDS).withComputeTardyThreshold(this.config.getComputeTardyLatencyThresholdMs(), TimeUnit.MILLISECONDS)).metricsProvider(new VeniceMetricsProvider()).longTailRetrySupplier(longTailRetrySupplier).scatterGatherStatsProvider(new LongTailRetryStatsProvider(this.routerStats)).enableStackTraceResponseForException(true).enableRetryRequestAlwaysUseADifferentHost(true).build();
        SecurityStats securityStats = new SecurityStats(this.metricsRepository, "security", this.secureRouter != null ? () -> {
            return this.secureRouter.getConnectedCount();
        } : () -> {
            return 0;
        });
        RouterThrottleStats routerThrottleStats = new RouterThrottleStats(this.metricsRepository, "router_throttler_stats");
        this.routerEarlyThrottler = new EventThrottler(this.config.getMaxRouterReadCapacityCu(), this.config.getRouterQuotaCheckWindow(), "router-early-throttler", true, EventThrottler.REJECT_STRATEGY);
        RouterSslVerificationHandler routerSslVerificationHandler = new RouterSslVerificationHandler(securityStats, this.config.isEnforcingSecureOnly());
        HealthCheckStats healthCheckStats = new HealthCheckStats(this.metricsRepository, "healthcheck_stats");
        AdminOperationsHandler adminOperationsHandler = new AdminOperationsHandler(this.accessController.orElse(null), this, new AdminOperationsStats(this.metricsRepository, "admin_stats", this.config));
        StoreAclHandler storeAclHandler = (this.accessController.isPresent() || this.authenticationService.isPresent()) ? new StoreAclHandler(this.accessController, this.authenticationService, this.authorizerService, this.metadataRepository) : null;
        LOGGER.info("StoreAclHandler {} AuthenticationService {} AuthorizerService {}", storeAclHandler, this.authenticationService, this.authorizerService);
        if (!this.config.isEnforcingSecureOnly()) {
            this.router = Optional.of(Router.builder(build).name("VeniceRouterHttp").resourceRegistry(this.registry).serverSocketChannel(cls).bossPoolBuilder(EventLoopGroup.class, executor -> {
                return this.serverEventLoopGroup;
            }).ioWorkerPoolBuilder(EventLoopGroup.class, executor2 -> {
                return this.workerEventLoopGroup;
            }).connectionLimit(this.config.getConnectionLimit()).timeoutProcessor(this.timeoutProcessor).beforeHttpRequestHandler(ChannelPipeline.class, channelPipeline -> {
                channelPipeline.addLast("RouterThrottleHandler", new RouterThrottleHandler(routerThrottleStats, this.routerEarlyThrottler, this.config));
                channelPipeline.addLast("HealthCheckHandler", new HealthCheckHandler(healthCheckStats));
                channelPipeline.addLast("VerifySslHandler", routerSslVerificationHandler);
                channelPipeline.addLast("MetadataHandler", metaDataHandler);
                channelPipeline.addLast("AdminOperationsHandler", adminOperationsHandler);
                if (this.authenticationService.isPresent()) {
                    channelPipeline.addLast("StoreAclHandler", storeAclHandler);
                }
                addStreamingHandler(channelPipeline);
                addOptionalChannelHandlersToPipeline(channelPipeline);
            }).idleTimeout(3L, TimeUnit.HOURS).enableInboundHttp2(this.config.isHttp2InboundEnabled()).build());
        }
        RouterSslVerificationHandler routerSslVerificationHandler2 = new RouterSslVerificationHandler(securityStats);
        if (this.sslFactory.isPresent()) {
            sslInitializer = new SslInitializer(SslUtils.toAlpiniSSLFactory(this.sslFactory.get()), false);
            if (this.config.isThrottleClientSslHandshakesEnabled()) {
                ShutdownableExecutorService newFixedThreadPool = this.registry.factory(ShutdownableExecutors.class).newFixedThreadPool(this.config.getClientSslHandshakeThreads(), new DefaultThreadFactory("RouterSSLHandshakeThread", true, 5));
                int clientSslHandshakeThreads = this.config.getClientSslHandshakeThreads();
                int maxConcurrentClientSslHandshakes = this.config.getMaxConcurrentClientSslHandshakes();
                int clientSslHandshakeAttempts = this.config.getClientSslHandshakeAttempts();
                long clientSslHandshakeBackoffMs = this.config.getClientSslHandshakeBackoffMs();
                if (z) {
                    this.sslResolverEventLoopGroup = new EpollEventLoopGroup(clientSslHandshakeThreads, newFixedThreadPool);
                } else {
                    this.sslResolverEventLoopGroup = new NioEventLoopGroup(clientSslHandshakeThreads, newFixedThreadPool);
                }
                sslInitializer.enableResolveBeforeSSL(this.sslResolverEventLoopGroup, clientSslHandshakeAttempts, clientSslHandshakeBackoffMs, maxConcurrentClientSslHandshakes);
            }
        } else {
            sslInitializer = null;
        }
        Consumer consumer = channelPipeline2 -> {
        };
        SslInitializer sslInitializer2 = sslInitializer;
        Consumer consumer2 = channelPipeline3 -> {
            channelPipeline3.addFirst("SSL Initializer", sslInitializer2);
        };
        HealthCheckHandler healthCheckHandler = new HealthCheckHandler(healthCheckStats);
        RouterThrottleHandler routerThrottleHandler = new RouterThrottleHandler(routerThrottleStats, this.routerEarlyThrottler, this.config);
        this.secureRouter = Router.builder(build).name("SecureVeniceRouterHttps").resourceRegistry(this.registry).serverSocketChannel(cls).bossPoolBuilder(EventLoopGroup.class, executor3 -> {
            return this.serverEventLoopGroup;
        }).ioWorkerPoolBuilder(EventLoopGroup.class, executor4 -> {
            return this.workerEventLoopGroup;
        }).connectionLimit(this.config.getConnectionLimit()).timeoutProcessor(this.timeoutProcessor).beforeHttpServerCodec(ChannelPipeline.class, this.sslFactory.isPresent() ? consumer2 : consumer).beforeHttpRequestHandler(ChannelPipeline.class, (this.accessController.isPresent() || this.authenticationService.isPresent()) ? channelPipeline4 -> {
            channelPipeline4.addLast("HealthCheckHandler", healthCheckHandler);
            channelPipeline4.addLast("VerifySslHandler", routerSslVerificationHandler2);
            channelPipeline4.addLast("MetadataHandler", metaDataHandler);
            channelPipeline4.addLast("AdminOperationsHandler", adminOperationsHandler);
            channelPipeline4.addLast("StoreAclHandler", storeAclHandler);
            channelPipeline4.addLast("RouterThrottleHandler", routerThrottleHandler);
            addStreamingHandler(channelPipeline4);
            addOptionalChannelHandlersToPipeline(channelPipeline4);
        } : channelPipeline5 -> {
            channelPipeline5.addLast("HealthCheckHandler", healthCheckHandler);
            channelPipeline5.addLast("VerifySslHandler", routerSslVerificationHandler2);
            channelPipeline5.addLast("MetadataHandler", metaDataHandler);
            channelPipeline5.addLast("AdminOperationsHandler", adminOperationsHandler);
            channelPipeline5.addLast("RouterThrottleHandler", routerThrottleHandler);
            addStreamingHandler(channelPipeline5);
            addOptionalChannelHandlersToPipeline(channelPipeline5);
        }).idleTimeout(3L, TimeUnit.HOURS).enableInboundHttp2(this.config.isHttp2InboundEnabled()).http2MaxConcurrentStreams(this.config.getHttp2MaxConcurrentStreams()).http2HeaderTableSize(this.config.getHttp2HeaderTableSize()).http2InitialWindowSize(this.config.getHttp2InitialWindowSize()).http2MaxFrameSize(this.config.getHttp2MaxFrameSize()).http2MaxHeaderListSize(this.config.getHttp2MaxHeaderListSize()).build();
        boolean isAsyncStartEnabled = this.config.isAsyncStartEnabled();
        CompletableFuture startServices = startServices(isAsyncStartEnabled);
        if (isAsyncStartEnabled) {
            startServices.whenComplete((obj, th) -> {
                if (th != null) {
                    LOGGER.error("Router has failed to start", th);
                    close();
                }
            });
        } else {
            startServices.get();
            LOGGER.info("All the required services have been started");
        }
        return !isAsyncStartEnabled;
    }

    private void addStreamingHandler(ChannelPipeline channelPipeline) {
        channelPipeline.addLast("VeniceChunkedWriteHandler", new VeniceChunkedWriteHandler());
    }

    private void addOptionalChannelHandlersToPipeline(ChannelPipeline channelPipeline) {
        for (Map.Entry<String, ChannelHandler> entry : this.optionalChannelHandlers.entrySet()) {
            channelPipeline.addLast(entry.getKey(), entry.getValue());
        }
    }

    public void addOptionalChannelHandler(String str, ChannelHandler channelHandler) {
        this.optionalChannelHandlers.put(str, channelHandler);
    }

    public void stopInner() throws Exception {
        for (ServiceDiscoveryAnnouncer serviceDiscoveryAnnouncer : this.serviceDiscoveryAnnouncers) {
            LOGGER.info("Unregistering from service discovery: {}", serviceDiscoveryAnnouncer);
            try {
                serviceDiscoveryAnnouncer.unregister();
            } catch (RuntimeException e) {
                LOGGER.error("Service discovery announcer {} failed to unregister properly", serviceDiscoveryAnnouncer, e);
            }
        }
        Thread.sleep(TimeUnit.SECONDS.toMillis(this.config.getRouterNettyGracefulShutdownPeriodSeconds()));
        if (this.serverFuture != null && !this.serverFuture.cancel(false)) {
            this.serverFuture.awaitUninterruptibly();
        }
        if (this.secureServerFuture != null && !this.secureServerFuture.cancel(false)) {
            this.secureServerFuture.awaitUninterruptibly();
        }
        this.storageNodeClient.close();
        this.workerEventLoopGroup.shutdownGracefully();
        this.serverEventLoopGroup.shutdownGracefully();
        if (this.sslResolverEventLoopGroup != null) {
            this.sslResolverEventLoopGroup.shutdownGracefully();
        }
        this.dispatcher.stop();
        this.router.ifPresent((v0) -> {
            v0.shutdown();
        });
        this.secureRouter.shutdown();
        if (this.router.isPresent()) {
            this.router.get().waitForShutdown();
        }
        LOGGER.info("Non-secure router has been shutdown completely");
        this.secureRouter.waitForShutdown();
        LOGGER.info("Secure router has been shutdown completely");
        this.registry.shutdown();
        this.registry.waitForShutdown();
        LOGGER.info("Other resources managed by local ResourceRegistry have been shutdown completely");
        this.routersClusterManager.unregisterRouter(Utils.getHelixNodeIdentifier(this.config.getHostname(), this.config.getPort()));
        this.routersClusterManager.clear();
        this.routingDataRepository.clear();
        this.metadataRepository.clear();
        this.schemaRepository.clear();
        this.storeConfigRepository.clear();
        this.hybridStoreQuotaRepository.ifPresent(helixHybridStoreQuotaRepository -> {
            helixHybridStoreQuotaRepository.clear();
        });
        this.liveInstanceMonitor.clear();
        this.timeoutProcessor.shutdownNow();
        this.dictionaryRetrievalService.stop();
        if (this.instanceConfigRepository != null) {
            this.instanceConfigRepository.clear();
        }
        if (this.manager != null) {
            this.manager.disconnect();
        }
        if (this.zkClient != null) {
            this.zkClient.close();
        }
        if (this.heartbeat != null) {
            this.heartbeat.stopInner();
        }
        if (this.authenticationService.isPresent()) {
            Utils.closeQuietlyWithErrorLogged(new Closeable[]{(Closeable) this.authenticationService.get()});
        }
        if (this.authenticationService.isPresent()) {
            Utils.closeQuietlyWithErrorLogged(new Closeable[]{(Closeable) this.authenticationService.get()});
        }
    }

    public HelixBaseRoutingRepository getRoutingDataRepository() {
        return this.routingDataRepository;
    }

    public ReadOnlyStoreRepository getMetadataRepository() {
        return this.metadataRepository;
    }

    public ReadOnlySchemaRepository getSchemaRepository() {
        return this.schemaRepository;
    }

    private void handleExceptionInStartServices(VeniceException veniceException, boolean z) throws VeniceException {
        if (!z) {
            throw veniceException;
        }
        Utils.exit("Failed to start router services due to " + veniceException);
    }

    private CompletableFuture startServices(boolean z) {
        return CompletableFuture.runAsync(() -> {
            try {
                if (this.manager == null) {
                    LOGGER.info("Not connecting to Helix because the HelixManager is null (the test constructor was used)");
                } else {
                    HelixUtils.connectHelixManager(this.manager, 30, ROUTER_BOSS_THREAD_NUM);
                    LOGGER.info("{} finished connectHelixManager()", this);
                }
            } catch (VeniceException e) {
                LOGGER.error("{} got an exception while trying to connectHelixManager()", this, e);
                handleExceptionInStartServices(e, z);
            }
            this.liveInstanceMonitor.refresh();
            try {
                this.storageNodeClient.start();
            } catch (VeniceException e2) {
                LOGGER.error("Encountered issue when starting storage node client", e2);
                handleExceptionInStartServices(e2, z);
            }
            this.routersClusterManager = new ZkRoutersClusterManager(this.zkClient, this.adapter, this.config.getClusterName(), this.config.getRefreshAttemptsForZkReconnect(), this.config.getRefreshIntervalForZkReconnectInMs());
            this.routersClusterManager.refresh();
            this.routersClusterManager.registerRouter(Utils.getHelixNodeIdentifier(this.config.getHostname(), this.config.getPort()));
            this.routingDataRepository.refresh();
            this.hybridStoreQuotaRepository.ifPresent((v0) -> {
                v0.refresh();
            });
            this.readRequestThrottler = new ReadRequestThrottler(this.routersClusterManager, this.metadataRepository, this.routingDataRepository, this.routerStats.getStatsByType(RequestType.SINGLE_GET), this.config);
            this.noopRequestThrottler = new NoopRouterThrottler(this.routersClusterManager, this.metadataRepository, this.routerStats.getStatsByType(RequestType.SINGLE_GET));
            setReadRequestThrottling(this.config.isReadThrottlingEnabled());
            if (this.config.getMultiKeyRoutingStrategy().equals(VeniceMultiKeyRoutingStrategy.HELIX_ASSISTED_ROUTING)) {
                this.instanceConfigRepository = new HelixInstanceConfigRepository(this.manager, this.config.isUseGroupFieldInHelixDomain());
                this.instanceConfigRepository.refresh();
                this.helixGroupSelector = new HelixGroupSelector(this.metricsRepository, this.instanceConfigRepository, this.config.getHelixGroupSelectionStrategy(), this.timeoutProcessor);
                this.scatterGatherMode.initHelixGroupSelector(this.helixGroupSelector);
                this.responseAggregator.initHelixGroupSelector(this.helixGroupSelector);
            }
            try {
                this.dictionaryRetrievalService.startInner();
            } catch (VeniceException e3) {
                LOGGER.error("Encountered issue when starting dictionary retriever", e3);
                handleExceptionInStartServices(e3, z);
            }
            if (this.router.isPresent()) {
                this.serverFuture = this.router.get().start(new InetSocketAddress(this.config.getPort()));
            }
            this.secureServerFuture = this.secureRouter.start(new InetSocketAddress(this.config.getSslPort()));
            try {
                if (this.router.isPresent()) {
                    this.serverFuture.await();
                }
                this.secureServerFuture.await();
            } catch (InterruptedException e4) {
                handleExceptionInStartServices(new VeniceException(e4), z);
            }
            for (ServiceDiscoveryAnnouncer serviceDiscoveryAnnouncer : this.serviceDiscoveryAnnouncers) {
                LOGGER.info("Registering to service discovery: {}", serviceDiscoveryAnnouncer);
                serviceDiscoveryAnnouncer.register();
            }
            try {
                if (this.router.isPresent()) {
                    LOGGER.info("{} started on non-ssl port: {}", this, Integer.valueOf(((InetSocketAddress) this.serverFuture.get()).getPort()));
                }
                LOGGER.info("{} started on ssl port: {}", this, Integer.valueOf(((InetSocketAddress) this.secureServerFuture.get()).getPort()));
            } catch (Exception e5) {
                LOGGER.error("Exception while waiting for {} to start", this, e5);
                this.serviceState.set(AbstractVeniceService.ServiceState.STOPPED);
                handleExceptionInStartServices(new VeniceException(e5), z);
            }
            this.serviceState.set(AbstractVeniceService.ServiceState.STARTED);
        });
    }

    private void verifySslOk() {
        if (this.config.isSslToStorageNodes() && !this.sslFactory.isPresent()) {
            throw new VeniceException("Must specify an SSLFactory in order to use SSL in requests to storage nodes");
        }
    }

    public ZkRoutersClusterManager getRoutersClusterManager() {
        return this.routersClusterManager;
    }

    public VeniceRouterConfig getConfig() {
        return this.config;
    }

    public void setReadRequestThrottling(boolean z) {
        this.scatterGatherMode.initReadRequestThrottler(z ? this.readRequestThrottler : this.noopRequestThrottler);
    }

    public void refresh() {
        this.liveInstanceMonitor.refresh();
        this.storeConfigRepository.refresh();
        this.metadataRepository.refresh();
        this.schemaRepository.refresh();
        this.routingDataRepository.refresh();
        this.hybridStoreQuotaRepository.ifPresent((v0) -> {
            v0.refresh();
        });
    }
}
