package com.linkedin.venice.listener;

import com.linkedin.davinci.compression.StorageEngineBackedCompressorFactory;
import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.storage.DiskHealthCheckService;
import com.linkedin.davinci.storage.MetadataRetriever;
import com.linkedin.davinci.storage.StorageEngineRepository;
import com.linkedin.venice.acl.DynamicAccessController;
import com.linkedin.venice.acl.StaticAccessController;
import com.linkedin.venice.authentication.AuthenticationService;
import com.linkedin.venice.authorization.AuthorizerService;
import com.linkedin.venice.cleaner.ResourceReadUsageTracker;
import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.security.SSLFactory;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.stats.ThreadPoolStats;
import com.linkedin.venice.utils.concurrent.ThreadPoolFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.tehuti.metrics.MetricsRepository;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/listener/ListenerService.class */
public class ListenerService extends AbstractVeniceService {
    private final ServerBootstrap bootstrap;
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    private ChannelFuture serverFuture;
    private final int port;
    private final VeniceServerConfig serverConfig;
    private final ThreadPoolExecutor executor;
    private final ThreadPoolExecutor computeExecutor;
    private ThreadPoolExecutor sslHandshakeExecutor;
    private static final Logger LOGGER = LogManager.getLogger(ListenerService.class);
    private static int nettyBacklogSize = 1000;

    public ListenerService(StorageEngineRepository storageEngineRepository, ReadOnlyStoreRepository readOnlyStoreRepository, ReadOnlySchemaRepository readOnlySchemaRepository, CompletableFuture<HelixCustomizedViewOfflinePushRepository> completableFuture, MetadataRetriever metadataRetriever, VeniceServerConfig veniceServerConfig, MetricsRepository metricsRepository, Optional<SSLFactory> optional, Optional<StaticAccessController> optional2, Optional<DynamicAccessController> optional3, Optional<AuthenticationService> optional4, Optional<AuthorizerService> optional5, DiskHealthCheckService diskHealthCheckService, StorageEngineBackedCompressorFactory storageEngineBackedCompressorFactory, Optional<ResourceReadUsageTracker> optional6) {
        this.serverConfig = veniceServerConfig;
        this.port = veniceServerConfig.getListenerPort();
        this.executor = createThreadPool(veniceServerConfig.getRestServiceStorageThreadNum(), "StorageExecutionThread", veniceServerConfig.getDatabaseLookupQueueCapacity());
        new ThreadPoolStats(metricsRepository, this.executor, "storage_execution_thread_pool");
        this.computeExecutor = createThreadPool(veniceServerConfig.getServerComputeThreadNum(), "StorageComputeThread", veniceServerConfig.getComputeQueueCapacity());
        new ThreadPoolStats(metricsRepository, this.computeExecutor, "storage_compute_thread_pool");
        if (optional.isPresent() && veniceServerConfig.getSslHandshakeThreadPoolSize() > 0) {
            this.sslHandshakeExecutor = createThreadPool(veniceServerConfig.getSslHandshakeThreadPoolSize(), "SSLHandShakeThread", veniceServerConfig.getSslHandshakeQueueCapacity());
            new ThreadPoolStats(metricsRepository, this.sslHandshakeExecutor, "ssl_handshake_thread_pool");
        }
        HttpChannelInitializer httpChannelInitializer = new HttpChannelInitializer(readOnlyStoreRepository, completableFuture, metricsRepository, optional, this.sslHandshakeExecutor, veniceServerConfig, optional2, optional3, optional4, optional5, createRequestHandler(this.executor, this.computeExecutor, storageEngineRepository, readOnlyStoreRepository, readOnlySchemaRepository, metadataRetriever, diskHealthCheckService, veniceServerConfig.isComputeFastAvroEnabled(), veniceServerConfig.isEnableParallelBatchGet(), veniceServerConfig.getParallelBatchGetChunkSize(), storageEngineBackedCompressorFactory, optional6));
        Class cls = NioServerSocketChannel.class;
        boolean isRestServiceEpollEnabled = veniceServerConfig.isRestServiceEpollEnabled();
        if (isRestServiceEpollEnabled) {
            try {
                this.bossGroup = new EpollEventLoopGroup(1);
                this.workerGroup = new EpollEventLoopGroup(veniceServerConfig.getNettyWorkerThreadCount());
                cls = EpollServerSocketChannel.class;
                LOGGER.info("Epoll is enabled in Server Rest Service");
            } catch (LinkageError e) {
                LOGGER.info("Epoll is only supported on Linux; switching to NIO for Server Rest Service", e);
                isRestServiceEpollEnabled = false;
            }
        }
        if (!isRestServiceEpollEnabled) {
            this.bossGroup = new NioEventLoopGroup(1);
            this.workerGroup = new NioEventLoopGroup(veniceServerConfig.getNettyWorkerThreadCount());
            cls = NioServerSocketChannel.class;
        }
        this.bootstrap = new ServerBootstrap();
        this.bootstrap.group(this.bossGroup, this.workerGroup).channel(cls).childHandler(httpChannelInitializer).option(ChannelOption.SO_BACKLOG, Integer.valueOf(nettyBacklogSize)).childOption(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.SO_REUSEADDR, true).childOption(ChannelOption.TCP_NODELAY, true);
    }

    public boolean startInner() throws Exception {
        this.serverFuture = this.bootstrap.bind(this.port).sync();
        LOGGER.info("Listener service started on port: {}", Integer.valueOf(this.port));
        return true;
    }

    public void stopInner() throws Exception {
        ChannelFuture closeFuture = this.serverFuture.channel().closeFuture();
        Thread.sleep(TimeUnit.SECONDS.toMillis(this.serverConfig.getNettyGracefulShutdownPeriodSeconds()));
        this.workerGroup.shutdownGracefully();
        this.bossGroup.shutdownGracefully();
        closeFuture.sync();
    }

    protected ThreadPoolExecutor createThreadPool(int i, String str, int i2) {
        return ThreadPoolFactory.createThreadPool(i, str, i2, this.serverConfig.getBlockingQueueType());
    }

    protected StorageReadRequestsHandler createRequestHandler(ThreadPoolExecutor threadPoolExecutor, ThreadPoolExecutor threadPoolExecutor2, StorageEngineRepository storageEngineRepository, ReadOnlyStoreRepository readOnlyStoreRepository, ReadOnlySchemaRepository readOnlySchemaRepository, MetadataRetriever metadataRetriever, DiskHealthCheckService diskHealthCheckService, boolean z, boolean z2, int i, StorageEngineBackedCompressorFactory storageEngineBackedCompressorFactory, Optional<ResourceReadUsageTracker> optional) {
        return new StorageReadRequestsHandler(threadPoolExecutor, threadPoolExecutor2, storageEngineRepository, readOnlyStoreRepository, readOnlySchemaRepository, metadataRetriever, diskHealthCheckService, z, z2, i, this.serverConfig, storageEngineBackedCompressorFactory, optional);
    }
}
