package com.linkedin.davinci.ingestion.isolated;

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.d2.balancer.D2ClientBuilder;
import com.linkedin.davinci.compression.StorageEngineBackedCompressorFactory;
import com.linkedin.davinci.config.VeniceConfigLoader;
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
import com.linkedin.davinci.helix.LeaderFollowerPartitionStateModel;
import com.linkedin.davinci.ingestion.DefaultIngestionBackend;
import com.linkedin.davinci.ingestion.utils.IsolatedIngestionUtils;
import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService;
import com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType;
import com.linkedin.davinci.kafka.consumer.RemoteIngestionRepairService;
import com.linkedin.davinci.repository.VeniceMetadataRepositoryBuilder;
import com.linkedin.davinci.stats.AggVersionedStorageEngineStats;
import com.linkedin.davinci.stats.RocksDBMemoryStats;
import com.linkedin.davinci.storage.StorageEngineMetadataService;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.venice.ConfigKeys;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.helix.HelixReadOnlyZKSharedSchemaRepository;
import com.linkedin.venice.ingestion.protocol.IngestionMetricsReport;
import com.linkedin.venice.ingestion.protocol.IngestionTaskReport;
import com.linkedin.venice.ingestion.protocol.enums.IngestionReportType;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.kafka.protocol.state.StoreVersionState;
import com.linkedin.venice.meta.ClusterInfoProvider;
import com.linkedin.venice.meta.ReadOnlyLiveClusterConfigRepository;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubClientsFactory;
import com.linkedin.venice.schema.SchemaReader;
import com.linkedin.venice.security.SSLFactory;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.stats.AbstractVeniceStats;
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.RedundantExceptionFilter;
import com.linkedin.venice.utils.ReflectUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.tehuti.metrics.MetricsRepository;
import java.io.FileNotFoundException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bouncycastle.crypto.tls.CipherSuite;

/* loaded from: input_file:com/linkedin/davinci/ingestion/isolated/IsolatedIngestionServer.class */
public class IsolatedIngestionServer extends AbstractVeniceService {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) IsolatedIngestionServer.class);
    private final ServerBootstrap bootstrap;
    private final EventLoopGroup bossGroup;
    private final EventLoopGroup workerGroup;
    private final int servicePort;
    private final long connectionTimeoutMs;
    private ChannelFuture serverFuture;
    private VeniceConfigLoader configLoader;
    private InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer;
    private InternalAvroSpecificSerializer<StoreVersionState> storeVersionStateSerializer;
    private IsolatedIngestionRequestClient reportClient;
    private IsolatedIngestionRequestClient metricClient;
    private int stopConsumptionWaitRetriesNum;
    private DefaultIngestionBackend ingestionBackend;
    private final RemoteIngestionRepairService repairService;
    private final RedundantExceptionFilter redundantExceptionFilter = new RedundantExceptionFilter(134217728, TimeUnit.MINUTES.toMillis(10));
    private final ExecutorService ingestionExecutor = Executors.newFixedThreadPool(10);
    private final ScheduledExecutorService heartbeatCheckScheduler = Executors.newScheduledThreadPool(1);
    private final ScheduledExecutorService metricsCollectionScheduler = Executors.newScheduledThreadPool(1);
    private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
    private final ExecutorService longRunningTaskExecutor = Executors.newFixedThreadPool(10);
    private final ExecutorService statusReportingExecutor = Executors.newSingleThreadExecutor();
    private final Map<String, Map<Integer, AtomicLong>> leaderSessionIdMap = new VeniceConcurrentHashMap();
    private final Map<String, Map<Integer, AtomicBoolean>> topicPartitionSubscriptionMap = new VeniceConcurrentHashMap();
    private final Map<String, Double> metricsMap = new VeniceConcurrentHashMap();
    private MetricsRepository metricsRepository = null;
    private ReadOnlyStoreRepository storeRepository = null;
    private ReadOnlyLiveClusterConfigRepository liveConfigRepository = null;
    private StorageService storageService = null;
    private KafkaStoreIngestionService storeIngestionService = null;
    private StorageMetadataService storageMetadataService = null;
    private boolean isInitiated = false;
    private volatile long heartbeatTimeInMs = System.currentTimeMillis();

    public IsolatedIngestionServer(String str) throws FileNotFoundException {
        VeniceProperties loadVenicePropertiesFromFile = IsolatedIngestionUtils.loadVenicePropertiesFromFile(str);
        this.configLoader = new VeniceConfigLoader(loadVenicePropertiesFromFile, loadVenicePropertiesFromFile, IsolatedIngestionUtils.loadForkedIngestionKafkaClusterMapConfig(new VeniceConfigLoader(loadVenicePropertiesFromFile, loadVenicePropertiesFromFile).getVeniceServerConfig().getDataBasePath()));
        this.servicePort = this.configLoader.getVeniceServerConfig().getIngestionServicePort();
        this.connectionTimeoutMs = this.configLoader.getCombinedProperties().getLong(ConfigKeys.SERVER_INGESTION_ISOLATION_CONNECTION_TIMEOUT_SECONDS, 180L) * 1000;
        this.bossGroup = new NioEventLoopGroup();
        this.workerGroup = new NioEventLoopGroup();
        this.bootstrap = new ServerBootstrap();
        this.repairService = new RemoteIngestionRepairService(this.configLoader.getCombinedProperties().getInt(ConfigKeys.SERVER_REMOTE_INGESTION_REPAIR_SLEEP_INTERVAL_SECONDS, RemoteIngestionRepairService.DEFAULT_REPAIR_THREAD_SLEEP_INTERVAL_SECONDS));
        this.bootstrap.group(this.bossGroup, this.workerGroup).channel(NioServerSocketChannel.class).childHandler(new IsolatedIngestionServerChannelInitializer(this)).option(ChannelOption.SO_BACKLOG, 1000).childOption(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.SO_REUSEADDR, true).childOption(ChannelOption.TCP_NODELAY, true);
    }

    /* JADX WARN: Type inference failed for: r1v5, types: [io.netty.channel.ChannelFuture] */
    @Override // com.linkedin.venice.service.AbstractVeniceService
    public boolean startInner() {
        int i = 0;
        while (true) {
            try {
                this.serverFuture = this.bootstrap.bind(this.servicePort).sync2();
                LOGGER.info("Listener service started on port: {}", Integer.valueOf(this.servicePort));
                initializeIsolatedIngestionServer();
                LOGGER.info("All ingestion components are initialized.");
                this.heartbeatCheckScheduler.scheduleAtFixedRate(this::checkHeartbeatTimeout, 0L, 5L, TimeUnit.SECONDS);
                this.metricsCollectionScheduler.scheduleAtFixedRate(this::reportMetricsUpdateToMainProcess, 0L, 1L, TimeUnit.MINUTES);
                this.repairService.start();
                return true;
            } catch (Exception e) {
                i++;
                if (i > 100) {
                    throw new VeniceException("Ingestion Service is unable to bind to target port " + this.servicePort + " after 100 retries.");
                }
                Utils.sleep(500L);
            }
        }
    }

    @Override // com.linkedin.venice.service.AbstractVeniceService
    public void stopInner() throws Exception {
        ChannelFuture closeFuture = this.serverFuture.channel().closeFuture();
        this.workerGroup.shutdownGracefully();
        this.bossGroup.shutdownGracefully();
        closeFuture.sync2();
        this.redundantExceptionFilter.shutdown();
        try {
            if (this.storeIngestionService != null) {
                this.storeIngestionService.stop();
            }
            LOGGER.info("StoreIngestionService has been shutdown.");
            if (this.storageService != null) {
                this.storageService.stop();
            }
            LOGGER.info("StorageService has been shutdown.");
            this.heartbeatCheckScheduler.shutdownNow();
            this.metricsCollectionScheduler.shutdownNow();
            this.ingestionExecutor.shutdown();
            this.longRunningTaskExecutor.shutdown();
            try {
                if (!this.longRunningTaskExecutor.awaitTermination(5L, TimeUnit.SECONDS)) {
                    this.longRunningTaskExecutor.shutdownNow();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            this.statusReportingExecutor.shutdown();
            try {
                if (!this.statusReportingExecutor.awaitTermination(5L, TimeUnit.SECONDS)) {
                    this.statusReportingExecutor.shutdownNow();
                }
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
            }
            this.repairService.stop();
        } catch (Throwable th) {
            throw new VeniceException("Unable to stop Ingestion Service", th);
        }
    }

    public void setConfigLoader(VeniceConfigLoader veniceConfigLoader) {
        this.configLoader = veniceConfigLoader;
    }

    public void setStoreRepository(ReadOnlyStoreRepository readOnlyStoreRepository) {
        this.storeRepository = readOnlyStoreRepository;
    }

    public void setStorageService(StorageService storageService) {
        this.storageService = storageService;
    }

    public void setStorageMetadataService(StorageMetadataService storageMetadataService) {
        this.storageMetadataService = storageMetadataService;
    }

    public void setMetricsRepository(MetricsRepository metricsRepository) {
        this.metricsRepository = metricsRepository;
    }

    public void setPartitionStateSerializer(InternalAvroSpecificSerializer<PartitionState> internalAvroSpecificSerializer) {
        this.partitionStateSerializer = internalAvroSpecificSerializer;
    }

    public void setStoreVersionStateSerializer(InternalAvroSpecificSerializer<StoreVersionState> internalAvroSpecificSerializer) {
        this.storeVersionStateSerializer = internalAvroSpecificSerializer;
    }

    public boolean isInitiated() {
        return this.isInitiated;
    }

    public StorageService getStorageService() {
        return this.storageService;
    }

    public VeniceConfigLoader getConfigLoader() {
        return this.configLoader;
    }

    public KafkaStoreIngestionService getStoreIngestionService() {
        return this.storeIngestionService;
    }

    public StorageMetadataService getStorageMetadataService() {
        return this.storageMetadataService;
    }

    public ReadOnlyStoreRepository getStoreRepository() {
        return this.storeRepository;
    }

    public MetricsRepository getMetricsRepository() {
        return this.metricsRepository;
    }

    public DefaultIngestionBackend getIngestionBackend() {
        return this.ingestionBackend;
    }

    public InternalAvroSpecificSerializer<PartitionState> getPartitionStateSerializer() {
        return this.partitionStateSerializer;
    }

    public InternalAvroSpecificSerializer<StoreVersionState> getStoreVersionStateSerializer() {
        return this.storeVersionStateSerializer;
    }

    public int getStopConsumptionWaitRetriesNum() {
        return this.stopConsumptionWaitRetriesNum;
    }

    public Map<String, Double> getMetricsMap() {
        return this.metricsMap;
    }

    public void updateHeartbeatTime() {
        this.heartbeatTimeInMs = System.currentTimeMillis();
        LOGGER.info("Received heartbeat from main process at: {}", Long.valueOf(this.heartbeatTimeInMs));
    }

    public void cleanupTopicPartitionState(String str, int i) {
        Map<Integer, AtomicBoolean> map = this.topicPartitionSubscriptionMap.get(str);
        if (map != null) {
            map.remove(Integer.valueOf(i));
        }
        Map<Integer, AtomicLong> map2 = this.leaderSessionIdMap.get(str);
        if (map2 != null) {
            map2.remove(Integer.valueOf(i));
        }
    }

    public void cleanupTopicState(String str) {
        this.topicPartitionSubscriptionMap.remove(str);
        this.leaderSessionIdMap.remove(str);
    }

    public void reportIngestionStatus(IngestionTaskReport ingestionTaskReport) {
        IngestionReportType valueOf = IngestionReportType.valueOf(ingestionTaskReport.reportType);
        if (!valueOf.equals(IngestionReportType.COMPLETED) && !valueOf.equals(IngestionReportType.ERROR)) {
            this.statusReportingExecutor.execute(() -> {
                this.reportClient.reportIngestionStatus(ingestionTaskReport);
            });
            return;
        }
        String charSequence = ingestionTaskReport.topicName.toString();
        int i = ingestionTaskReport.partitionId;
        long j = ingestionTaskReport.offset;
        if (valueOf.equals(IngestionReportType.COMPLETED)) {
            getStoreIngestionService().syncTopicPartitionOffset(charSequence, i);
            ingestionTaskReport.offsetRecordArray = getStoreIngestionService().getPartitionOffsetRecords(charSequence, i);
            StoreVersionState storeVersionState = this.storageMetadataService.getStoreVersionState(charSequence);
            if (storeVersionState == null) {
                throw new VeniceException("StoreVersionState does not exist for topic: " + charSequence);
            }
            ingestionTaskReport.storeVersionState = ByteBuffer.wrap(IsolatedIngestionUtils.serializeStoreVersionState(charSequence, storeVersionState));
            LeaderFollowerStateType leaderStateFromPartitionConsumptionState = getStoreIngestionService().getLeaderStateFromPartitionConsumptionState(charSequence, i);
            LOGGER.info("Ingestion completed for topic: {}, partition: {}, offset: {}, leaderState: {}.", charSequence, Integer.valueOf(i), Long.valueOf(j), leaderStateFromPartitionConsumptionState);
            ingestionTaskReport.leaderFollowerState = leaderStateFromPartitionConsumptionState.getValue();
        } else {
            LOGGER.error("Ingestion error for topic: {}, partition: {}, error message: {}", charSequence, Integer.valueOf(i), ingestionTaskReport.message);
        }
        stopConsumptionAndReport(ingestionTaskReport);
    }

    public RedundantExceptionFilter getRedundantExceptionFilter() {
        return this.redundantExceptionFilter;
    }

    public synchronized LeaderFollowerPartitionStateModel.LeaderSessionIdChecker getLeaderSectionIdChecker(String str, int i) {
        this.leaderSessionIdMap.putIfAbsent(str, new VeniceConcurrentHashMap());
        Map<Integer, AtomicLong> map = this.leaderSessionIdMap.get(str);
        map.putIfAbsent(Integer.valueOf(i), new AtomicLong(0L));
        AtomicLong atomicLong = map.get(Integer.valueOf(i));
        return new LeaderFollowerPartitionStateModel.LeaderSessionIdChecker(atomicLong.incrementAndGet(), atomicLong);
    }

    public void setResourceToBeUnsubscribed(String str, int i) {
        getTopicPartitionSubscriptionMap().computeIfAbsent(str, str2 -> {
            return new VeniceConcurrentHashMap();
        }).computeIfAbsent(Integer.valueOf(i), num -> {
            return new AtomicBoolean(false);
        }).set(false);
    }

    public void setResourceToBeSubscribed(String str, int i) {
        getTopicPartitionSubscriptionMap().computeIfAbsent(str, str2 -> {
            return new VeniceConcurrentHashMap();
        }).computeIfAbsent(Integer.valueOf(i), num -> {
            return new AtomicBoolean(true);
        }).set(true);
    }

    public boolean isResourceSubscribed(String str, int i) {
        AtomicBoolean atomicBoolean = getTopicPartitionSubscriptionMap().getOrDefault(str, Collections.emptyMap()).get(Integer.valueOf(i));
        if (atomicBoolean == null) {
            return false;
        }
        return atomicBoolean.get();
    }

    public void maybeSubscribeNewResource(String str, int i) {
        getTopicPartitionSubscriptionMap().computeIfAbsent(str, str2 -> {
            return new VeniceConcurrentHashMap();
        }).computeIfAbsent(Integer.valueOf(i), num -> {
            return new AtomicBoolean(true);
        });
    }

    Map<String, Map<Integer, AtomicBoolean>> getTopicPartitionSubscriptionMap() {
        return this.topicPartitionSubscriptionMap;
    }

    ExecutorService getStatusReportingExecutor() {
        return this.statusReportingExecutor;
    }

    IsolatedIngestionRequestClient getReportClient() {
        return this.reportClient;
    }

    void stopConsumptionAndReport(IngestionTaskReport ingestionTaskReport) {
        String charSequence = ingestionTaskReport.topicName.toString();
        int i = ingestionTaskReport.partitionId;
        Future<?> submitStopConsumptionAndCloseStorageTask = submitStopConsumptionAndCloseStorageTask(ingestionTaskReport);
        getStatusReportingExecutor().execute(() -> {
            try {
                submitStopConsumptionAndCloseStorageTask.get();
            } catch (InterruptedException | ExecutionException e) {
                LOGGER.warn("Encounter exception when trying to stop consumption and close storage for {} of topic: {}", Integer.valueOf(i), charSequence);
            }
            if (getReportClient().reportIngestionStatus(ingestionTaskReport)) {
                setResourceToBeUnsubscribed(charSequence, i);
            }
        });
    }

    Future<?> submitStopConsumptionAndCloseStorageTask(IngestionTaskReport ingestionTaskReport) {
        String charSequence = ingestionTaskReport.topicName.toString();
        int i = ingestionTaskReport.partitionId;
        return this.longRunningTaskExecutor.submit(() -> {
            VeniceStoreVersionConfig storeConfig = getConfigLoader().getStoreConfig(charSequence);
            long currentTimeMillis = System.currentTimeMillis();
            getStoreIngestionService().stopConsumptionAndWait(storeConfig, i, 1, this.stopConsumptionWaitRetriesNum);
            getStorageService().closeStorePartition(storeConfig, i);
            LOGGER.info("Partition: {} of topic: {} closed in {} ms.", Integer.valueOf(i), charSequence, Long.valueOf(LatencyUtils.getElapsedTimeInMs(currentTimeMillis)));
        });
    }

    private void checkHeartbeatTimeout() {
        if (this.isShuttingDown.get()) {
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Checking heartbeat timeout at {}, latest heartbeat on server: {}", Long.valueOf(currentTimeMillis), Long.valueOf(this.heartbeatTimeInMs));
        }
        if (currentTimeMillis - this.heartbeatTimeInMs > this.connectionTimeoutMs) {
            LOGGER.warn("Lost connection to parent process after {} ms, will shutdown the ingestion backend gracefully.", Long.valueOf(this.connectionTimeoutMs));
            this.isShuttingDown.set(true);
            try {
                stop();
                System.exit(0);
            } catch (Exception e) {
                LOGGER.info("Unable to shutdown ingestion service gracefully", (Throwable) e);
            }
        }
    }

    void reportMetricsUpdateToMainProcess() {
        try {
            IngestionMetricsReport ingestionMetricsReport = new IngestionMetricsReport();
            ingestionMetricsReport.aggregatedMetrics = new VeniceConcurrentHashMap();
            if (getMetricsRepository() != null) {
                getMetricsRepository().metrics().forEach((str, metric) -> {
                    if (metric != null) {
                        try {
                            Double d = getMetricsMap().get(str);
                            Double valueOf = Double.valueOf(metric.value());
                            if (d == null || !d.equals(valueOf)) {
                                ingestionMetricsReport.aggregatedMetrics.put(str, valueOf);
                            }
                            getMetricsMap().put(str, valueOf);
                        } catch (Exception e) {
                            String str = "Encounter exception when retrieving value of metric: " + str;
                            if (getRedundantExceptionFilter().isRedundantException(str)) {
                                return;
                            }
                            LOGGER.error(str, (Throwable) e);
                        }
                    }
                });
            }
            getMetricClient().reportMetricUpdate(ingestionMetricsReport);
        } catch (Exception e) {
            LOGGER.warn("Encounter exception when fetching latest metrics and reporting back to main process", (Throwable) e);
        }
    }

    IsolatedIngestionRequestClient getMetricClient() {
        return this.metricClient;
    }

    private void initializeIsolatedIngestionServer() {
        this.stopConsumptionWaitRetriesNum = this.configLoader.getCombinedProperties().getInt(ConfigKeys.SERVER_STOP_CONSUMPTION_WAIT_RETRIES_NUM, CipherSuite.TLS_DHE_PSK_WITH_NULL_SHA256);
        String string = this.configLoader.getCombinedProperties().getString(ConfigKeys.D2_ZK_HOSTS_ADDRESS);
        SSLFactory orElse = IsolatedIngestionUtils.getSSLFactoryForIngestion(this.configLoader).orElse(null);
        D2Client build = orElse != null ? new D2ClientBuilder().setZkHosts(string).setIsSSLEnabled(true).setSSLParameters(orElse.getSSLParameters()).setSSLContext(orElse.getSSLContext()).build() : new D2ClientBuilder().setZkHosts(string).build();
        IsolatedIngestionUtils.startD2Client(build);
        ClientConfig d2ServiceName = new ClientConfig().setD2Client(build).setD2ServiceName(this.configLoader.getCombinedProperties().getString(ConfigKeys.CLUSTER_DISCOVERY_D2_SERVICE, ClientConfig.DEFAULT_CLUSTER_DISCOVERY_D2_SERVICE_NAME));
        this.metricsRepository = new MetricsRepository();
        VeniceMetadataRepositoryBuilder veniceMetadataRepositoryBuilder = new VeniceMetadataRepositoryBuilder(this.configLoader, d2ServiceName, this.metricsRepository, null, true);
        this.storeRepository = veniceMetadataRepositoryBuilder.getStoreRepo();
        this.liveConfigRepository = veniceMetadataRepositoryBuilder.getLiveClusterConfigRepo();
        ReadOnlySchemaRepository schemaRepo = veniceMetadataRepositoryBuilder.getSchemaRepo();
        Optional<HelixReadOnlyZKSharedSchemaRepository> readOnlyZKSharedSchemaRepository = veniceMetadataRepositoryBuilder.getReadOnlyZKSharedSchemaRepository();
        ClusterInfoProvider clusterInfoProvider = veniceMetadataRepositoryBuilder.getClusterInfoProvider();
        SchemaReader schemaReader = ClientFactory.getSchemaReader(ClientConfig.cloneConfig(d2ServiceName).setStoreName(AvroProtocolDefinition.PARTITION_STATE.getSystemStoreName()), null);
        SchemaReader schemaReader2 = ClientFactory.getSchemaReader(ClientConfig.cloneConfig(d2ServiceName).setStoreName(AvroProtocolDefinition.STORE_VERSION_STATE.getSystemStoreName()), null);
        this.partitionStateSerializer = AvroProtocolDefinition.PARTITION_STATE.getSerializer();
        this.partitionStateSerializer.setSchemaReader(schemaReader);
        this.storeVersionStateSerializer = AvroProtocolDefinition.STORE_VERSION_STATE.getSerializer();
        this.storeVersionStateSerializer.setSchemaReader(schemaReader2);
        RocksDBMemoryStats rocksDBMemoryStats = this.configLoader.getVeniceServerConfig().isDatabaseMemoryStatsEnabled() ? new RocksDBMemoryStats(this.metricsRepository, "RocksDBMemoryStats", this.configLoader.getVeniceServerConfig().getRocksDBServerConfig().isRocksDBPlainTableFormatEnabled()) : null;
        for (String str : this.configLoader.getCombinedProperties().getString(ConfigKeys.SERVER_INGESTION_ISOLATION_STATS_CLASS_LIST, "").split(",")) {
            if (str.length() != 0) {
                Class loadClass = ReflectUtils.loadClass(str);
                if (!AbstractVeniceStats.class.isAssignableFrom(loadClass)) {
                    throw new VeniceException("Class: " + str + " does not extends AbstractVeniceStats");
                }
                LOGGER.info("Created Ingestion Isolation stats: {}", ((AbstractVeniceStats) ReflectUtils.callConstructor(loadClass, new Class[]{MetricsRepository.class}, new Object[]{this.metricsRepository})).getName());
            } else {
                LOGGER.info("Ingestion isolation stats class name is empty, will skip it.");
            }
        }
        this.storageService = new StorageService(this.configLoader, new AggVersionedStorageEngineStats(this.metricsRepository, this.storeRepository, this.configLoader.getVeniceServerConfig().isUnregisterMetricForDeletedStoreEnabled()), rocksDBMemoryStats, this.storeVersionStateSerializer, this.partitionStateSerializer, this.storeRepository, false, true);
        this.storageService.start();
        SchemaReader schemaReader3 = ClientFactory.getSchemaReader(ClientConfig.cloneConfig(d2ServiceName).setStoreName(AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getSystemStoreName()), null);
        this.storageMetadataService = new StorageEngineMetadataService(this.storageService.getStorageEngineRepository(), this.partitionStateSerializer);
        StorageEngineBackedCompressorFactory storageEngineBackedCompressorFactory = new StorageEngineBackedCompressorFactory(this.storageMetadataService);
        boolean isDaVinciClient = veniceMetadataRepositoryBuilder.isDaVinciClient();
        this.storeIngestionService = new KafkaStoreIngestionService(this.storageService.getStorageEngineRepository(), this.configLoader, this.storageMetadataService, clusterInfoProvider, this.storeRepository, schemaRepo, Optional.empty(), Optional.empty(), this.liveConfigRepository, this.metricsRepository, Optional.of(schemaReader3), isDaVinciClient ? Optional.empty() : Optional.of(d2ServiceName), this.partitionStateSerializer, readOnlyZKSharedSchemaRepository, null, true, storageEngineBackedCompressorFactory, Optional.empty(), isDaVinciClient, this.repairService, new PubSubClientsFactory(new ApacheKafkaProducerAdapterFactory()));
        this.storeIngestionService.start();
        this.storeIngestionService.addIngestionNotifier(new IsolatedIngestionNotifier(this));
        this.ingestionBackend = new DefaultIngestionBackend(this.storageMetadataService, this.storeIngestionService, this.storageService);
        LOGGER.info("Starting report client with target application port: {}", Integer.valueOf(this.configLoader.getVeniceServerConfig().getIngestionApplicationPort()));
        this.reportClient = new IsolatedIngestionRequestClient(this.configLoader);
        this.metricClient = new IsolatedIngestionRequestClient(this.configLoader);
        this.isInitiated = true;
    }

    public static void main(String[] strArr) throws Exception {
        LOGGER.info("Capture arguments: {}", Arrays.toString(strArr));
        if (strArr.length != 1) {
            throw new VeniceException("Expected exactly one argument for config file path. Got " + strArr.length);
        }
        new IsolatedIngestionServer(strArr[0]).start();
    }
}
