package com.linkedin.davinci.kafka.consumer;

import com.linkedin.davinci.compression.StorageEngineBackedCompressorFactory;
import com.linkedin.davinci.config.VeniceConfigLoader;
import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
import com.linkedin.davinci.helix.LeaderFollowerPartitionStateModel;
import com.linkedin.davinci.listener.response.AdminResponse;
import com.linkedin.davinci.listener.response.MetadataResponse;
import com.linkedin.davinci.notifier.LogNotifier;
import com.linkedin.davinci.notifier.MetaSystemStoreReplicaStatusNotifier;
import com.linkedin.davinci.notifier.PartitionPushStatusNotifier;
import com.linkedin.davinci.notifier.VeniceNotifier;
import com.linkedin.davinci.stats.AggHostLevelIngestionStats;
import com.linkedin.davinci.stats.AggLagStats;
import com.linkedin.davinci.stats.AggVersionedDIVStats;
import com.linkedin.davinci.stats.AggVersionedIngestionStats;
import com.linkedin.davinci.stats.HostLevelIngestionStats;
import com.linkedin.davinci.stats.ParticipantStoreConsumptionStats;
import com.linkedin.davinci.stats.StoreBufferServiceStats;
import com.linkedin.davinci.storage.StorageEngineRepository;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
import com.linkedin.davinci.store.view.VeniceViewWriterFactory;
import com.linkedin.venice.ConfigConstants;
import com.linkedin.venice.SSLConfig;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceNoStoreException;
import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository;
import com.linkedin.venice.helix.HelixInstanceConfigRepository;
import com.linkedin.venice.helix.HelixReadOnlyZKSharedSchemaRepository;
import com.linkedin.venice.kafka.TopicManagerRepository;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.meta.ClusterInfoProvider;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.meta.Partition;
import com.linkedin.venice.meta.ReadOnlyLiveClusterConfigRepository;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.ServerAdminAction;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreDataChangedListener;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionStatus;
import com.linkedin.venice.metadata.response.VersionProperties;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapterFactory;
import com.linkedin.venice.pubsub.adapter.kafka.consumer.ApacheKafkaConsumerAdapterFactory;
import com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerAdapterFactory;
import com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerConfig;
import com.linkedin.venice.pubsub.adapter.kafka.producer.SharedKafkaProducerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubClientsFactory;
import com.linkedin.venice.pubsub.api.PubSubProducerAdapterFactory;
import com.linkedin.venice.pubsub.kafka.KafkaPubSubMessageDeserializer;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.schema.SchemaReader;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.serialization.avro.OptimizedKafkaValueSerializer;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.service.ICProvider;
import com.linkedin.venice.system.store.MetaStoreWriter;
import com.linkedin.venice.throttle.EventThrottler;
import com.linkedin.venice.utils.ComplementSet;
import com.linkedin.venice.utils.DaemonThreadFactory;
import com.linkedin.venice.utils.DiskUsage;
import com.linkedin.venice.utils.HelixUtils;
import com.linkedin.venice.utils.KafkaSSLUtils;
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.PartitionUtils;
import com.linkedin.venice.utils.SystemTime;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.locks.AutoCloseableLock;
import com.linkedin.venice.utils.locks.ResourceAutoClosableLockManager;
import com.linkedin.venice.utils.pools.LandFillObjectPool;
import com.linkedin.venice.writer.VeniceWriterFactory;
import io.tehuti.metrics.MetricsRepository;
import it.unimi.dsi.fastutil.ints.IntListIterator;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.class */
public class KafkaStoreIngestionService extends AbstractVeniceService implements StoreIngestionService {
    private static final String GROUP_ID_FORMAT = "%s_%s";
    private static final Logger LOGGER = LogManager.getLogger(KafkaStoreIngestionService.class);
    private final VeniceConfigLoader veniceConfigLoader;
    private final StorageMetadataService storageMetadataService;
    private final ReadOnlyStoreRepository metadataRepo;
    private final ReadOnlySchemaRepository schemaRepo;
    private HelixCustomizedViewOfflinePushRepository customizedViewRepository;
    private HelixInstanceConfigRepository helixInstanceConfigRepository;
    private final AggHostLevelIngestionStats hostLevelIngestionStats;
    private final AggVersionedIngestionStats versionedIngestionStats;
    private final AggLagStats aggLagStats;
    private final AbstractStoreBufferService storeBufferService;
    private final AggKafkaConsumerService aggKafkaConsumerService;
    private final Optional<SchemaReader> kafkaMessageEnvelopeSchemaReader;
    private final MetaStoreWriter metaStoreWriter;
    private final MetaSystemStoreReplicaStatusNotifier metaSystemStoreReplicaStatusNotifier;
    private final StoreIngestionTaskFactory ingestionTaskFactory;
    private final boolean isIsolatedIngestion;
    private final TopicManagerRepository topicManagerRepository;
    private ExecutorService participantStoreConsumerExecutorService;
    private ExecutorService ingestionExecutorService;
    private ParticipantStoreConsumptionTask participantStoreConsumptionTask;
    private final Optional<ObjectCacheBackend> cacheBackend;
    private final PubSubProducerAdapterFactory producerAdapterFactory;
    private final InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer;
    private final StorageEngineBackedCompressorFactory compressorFactory;
    private final Queue<VeniceNotifier> leaderFollowerNotifiers = new ConcurrentLinkedQueue();
    private boolean metaSystemStoreReplicaStatusNotifierQueued = false;
    private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
    private final NavigableMap<String, StoreIngestionTask> topicNameToIngestionTaskMap = new ConcurrentSkipListMap();
    private final ResourceAutoClosableLockManager<String> topicLockManager = new ResourceAutoClosableLockManager<>(ReentrantLock::new);

    public KafkaStoreIngestionService(StorageEngineRepository storageEngineRepository, VeniceConfigLoader veniceConfigLoader, StorageMetadataService storageMetadataService, ClusterInfoProvider clusterInfoProvider, ReadOnlyStoreRepository readOnlyStoreRepository, ReadOnlySchemaRepository readOnlySchemaRepository, Optional<CompletableFuture<HelixCustomizedViewOfflinePushRepository>> optional, Optional<CompletableFuture<HelixInstanceConfigRepository>> optional2, ReadOnlyLiveClusterConfigRepository readOnlyLiveClusterConfigRepository, MetricsRepository metricsRepository, Optional<SchemaReader> optional3, Optional<ClientConfig> optional4, InternalAvroSpecificSerializer<PartitionState> internalAvroSpecificSerializer, Optional<HelixReadOnlyZKSharedSchemaRepository> optional5, ICProvider iCProvider, boolean z, StorageEngineBackedCompressorFactory storageEngineBackedCompressorFactory, Optional<ObjectCacheBackend> optional6, boolean z2, RemoteIngestionRepairService remoteIngestionRepairService, PubSubClientsFactory pubSubClientsFactory) {
        Map emptyMap;
        this.cacheBackend = optional6;
        this.storageMetadataService = storageMetadataService;
        this.metadataRepo = readOnlyStoreRepository;
        this.schemaRepo = readOnlySchemaRepository;
        this.veniceConfigLoader = veniceConfigLoader;
        this.isIsolatedIngestion = z;
        this.partitionStateSerializer = internalAvroSpecificSerializer;
        this.compressorFactory = storageEngineBackedCompressorFactory;
        optional.ifPresent(completableFuture -> {
            completableFuture.thenApply(helixCustomizedViewOfflinePushRepository -> {
                this.customizedViewRepository = helixCustomizedViewOfflinePushRepository;
                return helixCustomizedViewOfflinePushRepository;
            });
        });
        optional2.ifPresent(completableFuture2 -> {
            completableFuture2.thenApply(helixInstanceConfigRepository -> {
                this.helixInstanceConfigRepository = helixInstanceConfigRepository;
                return helixInstanceConfigRepository;
            });
        });
        VeniceServerConfig veniceServerConfig = veniceConfigLoader.getVeniceServerConfig();
        Properties properties = veniceConfigLoader.getVeniceClusterConfig().getClusterProperties().toProperties();
        if (veniceServerConfig.isKafkaOpenSSLEnabled()) {
            properties.setProperty("ssl.context.provider.class", ConfigConstants.DEFAULT_KAFKA_SSL_CONTEXT_PROVIDER_CLASS_NAME);
        }
        if (!properties.containsKey("kafka.batch.size")) {
            properties.put("kafka.batch.size", "524288");
        }
        if (!properties.containsKey("kafka.linger.ms")) {
            properties.put("kafka.linger.ms", "1000");
        }
        LOGGER.info("Shared kafka producer service is {}", veniceServerConfig.isSharedKafkaProducerEnabled() ? "enabled" : "disabled");
        if (veniceServerConfig.isSharedKafkaProducerEnabled()) {
            this.producerAdapterFactory = new SharedKafkaProducerAdapterFactory(properties, veniceServerConfig.getSharedProducerPoolSizePerKafkaCluster(), new ApacheKafkaProducerAdapterFactory(), metricsRepository, veniceServerConfig.getKafkaProducerMetrics());
        } else {
            this.producerAdapterFactory = pubSubClientsFactory.getProducerAdapterFactory();
        }
        VeniceWriterFactory veniceWriterFactory = new VeniceWriterFactory(properties, this.producerAdapterFactory, metricsRepository);
        VeniceWriterFactory veniceWriterFactory2 = new VeniceWriterFactory(properties);
        EventThrottler eventThrottler = new EventThrottler(veniceServerConfig.getKafkaFetchQuotaBytesPerSecond(), veniceServerConfig.getKafkaFetchQuotaTimeWindow(), "kafka_consumption_bandwidth", false, EventThrottler.BLOCK_STRATEGY);
        EventThrottler eventThrottler2 = new EventThrottler(veniceServerConfig.getKafkaFetchQuotaRecordPerSecond(), veniceServerConfig.getKafkaFetchQuotaTimeWindow(), "kafka_consumption_records_count", false, EventThrottler.BLOCK_STRATEGY);
        if (readOnlyLiveClusterConfigRepository != null) {
            Set<String> regionNames = veniceServerConfig.getRegionNames();
            emptyMap = new HashMap(regionNames.size());
            regionNames.forEach(str -> {
                emptyMap.put(str, new EventThrottler(() -> {
                    return readOnlyLiveClusterConfigRepository.getConfigs().getServerKafkaFetchQuotaRecordsPerSecondForRegion(str);
                }, veniceServerConfig.getKafkaFetchQuotaTimeWindow(), "kafka_consumption_records_count_" + str, true, EventThrottler.REJECT_STRATEGY));
            });
        } else {
            emptyMap = Collections.emptyMap();
        }
        KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler = new KafkaClusterBasedRecordThrottler(emptyMap);
        this.topicManagerRepository = TopicManagerRepository.builder().setPubSubTopicRepository(this.pubSubTopicRepository).setMetricsRepository(metricsRepository).setLocalKafkaBootstrapServers(veniceServerConfig.getKafkaBootstrapServers()).setPubSubConsumerAdapterFactory(new ApacheKafkaConsumerAdapterFactory()).setTopicDeletionStatusPollIntervalMs(2000L).setTopicMinLogCompactionLagMs(86400000L).setKafkaOperationTimeoutMs(30000L).setPubSubProperties(this::getPubSubSSLPropertiesFromServerConfig).setPubSubAdminAdapterFactory(new ApacheKafkaAdminAdapterFactory()).build();
        this.leaderFollowerNotifiers.add(new LogNotifier());
        if (optional5.isPresent()) {
            this.metaStoreWriter = new MetaStoreWriter(this.topicManagerRepository.getTopicManager(), veniceWriterFactory2, optional5.get(), this.pubSubTopicRepository);
            this.metaSystemStoreReplicaStatusNotifier = new MetaSystemStoreReplicaStatusNotifier(veniceServerConfig.getClusterName(), this.metaStoreWriter, readOnlyStoreRepository, Instance.fromHostAndPort(Utils.getHostName(), veniceServerConfig.getListenerPort()));
            LOGGER.info("MetaSystemStoreReplicaStatusNotifier was initialized");
            readOnlyStoreRepository.registerStoreDataChangedListener(new StoreDataChangedListener() { // from class: com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService.1
                public void handleStoreDeleted(Store store) {
                    String name = store.getName();
                    if (VeniceSystemStoreType.META_STORE.equals(VeniceSystemStoreType.getSystemStoreType(name))) {
                        KafkaStoreIngestionService.this.metaStoreWriter.removeMetaStoreWriter(name);
                        KafkaStoreIngestionService.LOGGER.info("MetaSystemWriter for meta store: {} got removed.", name);
                    }
                }
            });
        } else {
            this.metaStoreWriter = null;
            this.metaSystemStoreReplicaStatusNotifier = null;
        }
        this.hostLevelIngestionStats = new AggHostLevelIngestionStats(metricsRepository, veniceServerConfig, this.topicNameToIngestionTaskMap, readOnlyStoreRepository, veniceServerConfig.isUnregisterMetricForDeletedStoreEnabled(), SystemTime.INSTANCE);
        AggVersionedDIVStats aggVersionedDIVStats = new AggVersionedDIVStats(metricsRepository, readOnlyStoreRepository, veniceServerConfig.isUnregisterMetricForDeletedStoreEnabled());
        this.versionedIngestionStats = new AggVersionedIngestionStats(metricsRepository, readOnlyStoreRepository, veniceServerConfig);
        if (veniceServerConfig.isDedicatedDrainerQueueEnabled()) {
            this.storeBufferService = new SeparatedStoreBufferService(veniceServerConfig);
        } else {
            this.storeBufferService = new StoreBufferService(veniceServerConfig.getStoreWriterNumber(), veniceServerConfig.getStoreWriterBufferMemoryCapacity(), veniceServerConfig.getStoreWriterBufferNotifyDelta(), veniceServerConfig.isStoreWriterBufferAfterLeaderLogicEnabled());
        }
        this.kafkaMessageEnvelopeSchemaReader = optional3;
        new StoreBufferServiceStats(metricsRepository, this.storeBufferService);
        this.aggLagStats = new AggLagStats(this, metricsRepository);
        if (optional4.isPresent()) {
            this.participantStoreConsumptionTask = new ParticipantStoreConsumptionTask(this, clusterInfoProvider, new ParticipantStoreConsumptionStats(metricsRepository, veniceConfigLoader.getVeniceClusterConfig().getClusterName()), ClientConfig.cloneConfig(optional4.get()).setMetricsRepository(metricsRepository), veniceServerConfig.getParticipantMessageConsumptionDelayMs(), iCProvider);
        } else {
            LOGGER.info("Unable to start participant store consumption task because client config is not provided, jobs may not be killed if admin helix messaging channel is disabled");
        }
        OptimizedKafkaValueSerializer optimizedKafkaValueSerializer = new OptimizedKafkaValueSerializer();
        Objects.requireNonNull(optimizedKafkaValueSerializer);
        optional3.ifPresent(optimizedKafkaValueSerializer::setSchemaReader);
        this.aggKafkaConsumerService = new AggKafkaConsumerService(new ApacheKafkaConsumerAdapterFactory(), this::getPubSubSSLPropertiesFromServerConfig, veniceServerConfig, eventThrottler, eventThrottler2, kafkaClusterBasedRecordThrottler, metricsRepository, new MetadataRepoBasedTopicExistingCheckerImpl(getMetadataRepo()), new KafkaPubSubMessageDeserializer(optimizedKafkaValueSerializer, new LandFillObjectPool(KafkaMessageEnvelope::new), new LandFillObjectPool(KafkaMessageEnvelope::new)));
        Properties commonKafkaConsumerProperties = getCommonKafkaConsumerProperties(veniceServerConfig);
        if (!veniceServerConfig.getKafkaConsumerConfigsForLocalConsumption().isEmpty()) {
            commonKafkaConsumerProperties.putAll(veniceServerConfig.getKafkaConsumerConfigsForLocalConsumption().toProperties());
        }
        this.aggKafkaConsumerService.createKafkaConsumerService(commonKafkaConsumerProperties);
        this.ingestionTaskFactory = StoreIngestionTaskFactory.builder().setVeniceWriterFactory(veniceWriterFactory).setStorageEngineRepository(storageEngineRepository).setStorageMetadataService(storageMetadataService).setLeaderFollowerNotifiersQueue(this.leaderFollowerNotifiers).setSchemaRepository(readOnlySchemaRepository).setMetadataRepository(readOnlyStoreRepository).setTopicManagerRepository(this.topicManagerRepository).setHostLevelIngestionStats(this.hostLevelIngestionStats).setVersionedDIVStats(aggVersionedDIVStats).setVersionedIngestionStats(this.versionedIngestionStats).setStoreBufferService(this.storeBufferService).setServerConfig(veniceServerConfig).setDiskUsage(new DiskUsage(veniceConfigLoader.getVeniceServerConfig().getDataBasePath(), veniceConfigLoader.getVeniceServerConfig().getDiskFullThreshold())).setAggKafkaConsumerService(this.aggKafkaConsumerService).setStartReportingReadyToServeTimestamp(System.currentTimeMillis() + veniceServerConfig.getDelayReadyToServeMS()).setPartitionStateSerializer(internalAvroSpecificSerializer).setIsDaVinciClient(z2).setRemoteIngestionRepairService(remoteIngestionRepairService).setMetaStoreWriter(this.metaStoreWriter).setCompressorFactory(storageEngineBackedCompressorFactory).setVeniceViewWriterFactory(new VeniceViewWriterFactory(veniceConfigLoader)).setPubSubTopicRepository(this.pubSubTopicRepository).build();
    }

    public synchronized void addMetaSystemStoreReplicaStatusNotifier() {
        if (this.metaSystemStoreReplicaStatusNotifierQueued) {
            throw new VeniceException("MetaSystemStoreReplicaStatusNotifier should NOT be added twice");
        }
        if (this.metaSystemStoreReplicaStatusNotifier == null) {
            throw new VeniceException("MetaSystemStoreReplicaStatusNotifier wasn't initialized properly");
        }
        addIngestionNotifier(this.metaSystemStoreReplicaStatusNotifier);
        this.metaSystemStoreReplicaStatusNotifierQueued = true;
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionService
    public synchronized Optional<MetaSystemStoreReplicaStatusNotifier> getMetaSystemStoreReplicaStatusNotifier() {
        return this.metaSystemStoreReplicaStatusNotifierQueued ? Optional.of(this.metaSystemStoreReplicaStatusNotifier) : Optional.empty();
    }

    public boolean startInner() {
        this.ingestionExecutorService = Executors.newCachedThreadPool();
        Collection<StoreIngestionTask> values = this.topicNameToIngestionTaskMap.values();
        ExecutorService executorService = this.ingestionExecutorService;
        Objects.requireNonNull(executorService);
        values.forEach((v1) -> {
            r1.submit(v1);
        });
        this.storeBufferService.start();
        if (this.aggKafkaConsumerService != null) {
            this.aggKafkaConsumerService.start();
        }
        if (this.participantStoreConsumptionTask == null) {
            return true;
        }
        this.participantStoreConsumerExecutorService = Executors.newSingleThreadExecutor(new DaemonThreadFactory("ParticipantStoreConsumptionTask"));
        this.participantStoreConsumerExecutorService.submit(this.participantStoreConsumptionTask);
        return true;
    }

    private StoreIngestionTask createConsumerTask(VeniceStoreVersionConfig veniceStoreVersionConfig, int i) {
        String parseStoreFromKafkaTopicName = Version.parseStoreFromKafkaTopicName(veniceStoreVersionConfig.getStoreVersionName());
        int parseVersionFromKafkaTopicName = Version.parseVersionFromKafkaTopicName(veniceStoreVersionConfig.getStoreVersionName());
        Pair waitStoreVersionOrThrow = Utils.waitStoreVersionOrThrow(veniceStoreVersionConfig.getStoreVersionName(), this.metadataRepo);
        return this.ingestionTaskFactory.getNewIngestionTask((Store) waitStoreVersionOrThrow.getFirst(), (Version) waitStoreVersionOrThrow.getSecond(), getKafkaConsumerProperties(veniceStoreVersionConfig), () -> {
            try {
                return parseVersionFromKafkaTopicName == this.metadataRepo.getStoreOrThrow(parseStoreFromKafkaTopicName).getCurrentVersion();
            } catch (VeniceNoStoreException e) {
                LOGGER.warn("Unable to find store meta-data for {}", veniceStoreVersionConfig.getStoreVersionName(), e);
                return false;
            }
        }, veniceStoreVersionConfig, i, this.isIsolatedIngestion, this.cacheBackend);
    }

    private static void shutdownExecutorService(ExecutorService executorService, String str, boolean z) {
        if (executorService == null) {
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        try {
            try {
                executorService.shutdown();
                if (z || !executorService.awaitTermination(60L, TimeUnit.SECONDS)) {
                    if (!z) {
                        LOGGER.warn("Failed to gracefully shutdown executor {}", str);
                    }
                    executorService.shutdownNow();
                    if (!executorService.awaitTermination(30L, TimeUnit.SECONDS)) {
                        LOGGER.error("Failed to shutdown executor {}", str);
                    }
                }
                LOGGER.info("{} shutdown took {} ms.", str, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            } catch (InterruptedException e) {
                LOGGER.warn("Executor shutdown is interrupted");
                executorService.shutdownNow();
                Thread.currentThread().interrupt();
                LOGGER.info("{} shutdown took {} ms.", str, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            }
        } catch (Throwable th) {
            LOGGER.info("{} shutdown took {} ms.", str, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            throw th;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void stopInner() {
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.participantStoreConsumptionTask});
        shutdownExecutorService(this.participantStoreConsumerExecutorService, "participantStoreConsumerExecutorService", true);
        this.topicNameToIngestionTaskMap.values().forEach((v0) -> {
            v0.close();
        });
        shutdownExecutorService(this.ingestionExecutorService, "ingestionExecutorService", false);
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.aggKafkaConsumerService});
        this.leaderFollowerNotifiers.forEach((v0) -> {
            v0.close();
        });
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.metaStoreWriter});
        this.kafkaMessageEnvelopeSchemaReader.ifPresent(closeable -> {
            Utils.closeQuietlyWithErrorLogged(new Closeable[]{closeable});
        });
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.producerAdapterFactory});
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.storeBufferService});
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.topicManagerRepository});
        this.topicLockManager.removeAllLocks();
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionService
    public void startConsumption(VeniceStoreVersionConfig veniceStoreVersionConfig, int i, Optional<LeaderFollowerStateType> optional) {
        String storeVersionName = veniceStoreVersionConfig.getStoreVersionName();
        AutoCloseableLock lockForResource = this.topicLockManager.getLockForResource(storeVersionName);
        try {
            StoreIngestionTask storeIngestionTask = (StoreIngestionTask) this.topicNameToIngestionTaskMap.get(storeVersionName);
            if (storeIngestionTask == null || !storeIngestionTask.isRunning()) {
                storeIngestionTask = createConsumerTask(veniceStoreVersionConfig, i);
                this.topicNameToIngestionTaskMap.put(storeVersionName, storeIngestionTask);
                this.versionedIngestionStats.setIngestionTask(storeVersionName, storeIngestionTask);
                if (!isRunning()) {
                    LOGGER.info("Ignoring Start consumption message as service is stopping. Topic {} Partition {}", storeVersionName, Integer.valueOf(i));
                    if (lockForResource != null) {
                        lockForResource.close();
                        return;
                    }
                    return;
                }
                this.ingestionExecutorService.submit(storeIngestionTask);
            }
            String parseStoreFromKafkaTopicName = Version.parseStoreFromKafkaTopicName(storeVersionName);
            int parseVersionFromKafkaTopicName = Version.parseVersionFromKafkaTopicName(storeVersionName);
            int storeMaximumVersionNumber = getStoreMaximumVersionNumber(parseStoreFromKafkaTopicName);
            if (parseVersionFromKafkaTopicName > storeMaximumVersionNumber) {
                LOGGER.warn("Got stale info from metadataRepo. maxVersionNumberFromTopicName: {}, maxVersionNumberFromMetadataRepo: {}. Will rely on the topic name's version.", Integer.valueOf(parseVersionFromKafkaTopicName), Integer.valueOf(storeMaximumVersionNumber));
            }
            updateStatsEmission(this.topicNameToIngestionTaskMap, parseStoreFromKafkaTopicName, Math.max(storeMaximumVersionNumber, parseVersionFromKafkaTopicName));
            storeIngestionTask.subscribePartition(new PubSubTopicPartitionImpl(this.pubSubTopicRepository.getTopic(storeVersionName), i), optional);
            if (lockForResource != null) {
                lockForResource.close();
            }
            LOGGER.info("Started Consuming - Kafka Partition: {}-{}.", storeVersionName, Integer.valueOf(i));
        } catch (Throwable th) {
            if (lockForResource != null) {
                try {
                    lockForResource.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public void shutdownStoreIngestionTask(String str) {
        AutoCloseableLock lockForResource = this.topicLockManager.getLockForResource(str);
        try {
            if (this.topicNameToIngestionTaskMap.containsKey(str)) {
                ((StoreIngestionTask) this.topicNameToIngestionTaskMap.remove(str)).shutdown(10000);
                LOGGER.info("Successfully shut down ingestion task for {}", str);
            } else {
                LOGGER.info("Ignoring close request for not-existing consumption task {}", str);
            }
            if (lockForResource != null) {
                lockForResource.close();
            }
            this.topicLockManager.removeLockForResource(str);
        } catch (Throwable th) {
            if (lockForResource != null) {
                try {
                    lockForResource.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionService
    public void promoteToLeader(VeniceStoreVersionConfig veniceStoreVersionConfig, int i, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker leaderSessionIdChecker) {
        String storeVersionName = veniceStoreVersionConfig.getStoreVersionName();
        AutoCloseableLock lockForResource = this.topicLockManager.getLockForResource(storeVersionName);
        try {
            StoreIngestionTask storeIngestionTask = (StoreIngestionTask) this.topicNameToIngestionTaskMap.get(storeVersionName);
            if (storeIngestionTask == null || !storeIngestionTask.isRunning()) {
                LOGGER.warn("Ignoring standby to leader transition message for Topic {} Partition {}", storeVersionName, Integer.valueOf(i));
            } else {
                storeIngestionTask.promoteToLeader(new PubSubTopicPartitionImpl(this.pubSubTopicRepository.getTopic(storeVersionName), i), leaderSessionIdChecker);
            }
            if (lockForResource != null) {
                lockForResource.close();
            }
        } catch (Throwable th) {
            if (lockForResource != null) {
                try {
                    lockForResource.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionService
    public void demoteToStandby(VeniceStoreVersionConfig veniceStoreVersionConfig, int i, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker leaderSessionIdChecker) {
        String storeVersionName = veniceStoreVersionConfig.getStoreVersionName();
        AutoCloseableLock lockForResource = this.topicLockManager.getLockForResource(storeVersionName);
        try {
            StoreIngestionTask storeIngestionTask = (StoreIngestionTask) this.topicNameToIngestionTaskMap.get(storeVersionName);
            if (storeIngestionTask == null || !storeIngestionTask.isRunning()) {
                LOGGER.warn("Ignoring leader to standby transition message for Topic {} Partition {}", storeVersionName, Integer.valueOf(i));
            } else {
                storeIngestionTask.demoteToStandby(new PubSubTopicPartitionImpl(this.pubSubTopicRepository.getTopic(storeVersionName), i), leaderSessionIdChecker);
            }
            if (lockForResource != null) {
                lockForResource.close();
            }
        } catch (Throwable th) {
            if (lockForResource != null) {
                try {
                    lockForResource.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionService
    public VeniceConfigLoader getVeniceConfigLoader() {
        return this.veniceConfigLoader;
    }

    protected void updateStatsEmission(NavigableMap<String, StoreIngestionTask> navigableMap, String str, int i) {
        if (navigableMap.containsKey(Version.composeKafkaTopic(str, i))) {
            navigableMap.forEach((str2, storeIngestionTask) -> {
                if (Version.parseStoreFromKafkaTopicName(str2).equals(str)) {
                    if (Version.parseVersionFromKafkaTopicName(str2) < i) {
                        storeIngestionTask.disableMetricsEmission();
                    } else {
                        storeIngestionTask.enableMetricsEmission();
                    }
                }
            });
        } else {
            updateStatsEmission(navigableMap, str);
        }
    }

    protected void updateStatsEmission(NavigableMap<String, StoreIngestionTask> navigableMap, String str) {
        int parseVersionFromKafkaTopicName;
        int i = -1;
        StoreIngestionTask storeIngestionTask = null;
        for (Map.Entry<String, StoreIngestionTask> entry : navigableMap.entrySet()) {
            String key = entry.getKey();
            if (Version.parseStoreFromKafkaTopicName(key).equals(str) && (parseVersionFromKafkaTopicName = Version.parseVersionFromKafkaTopicName(key)) > i) {
                i = parseVersionFromKafkaTopicName;
                storeIngestionTask = entry.getValue();
            }
        }
        if (storeIngestionTask == null || storeIngestionTask.isMetricsEmissionEnabled()) {
            return;
        }
        storeIngestionTask.enableMetricsEmission();
        Map.Entry<String, StoreIngestionTask> lowerEntry = navigableMap.lowerEntry(Version.composeKafkaTopic(str, i));
        while (true) {
            Map.Entry<String, StoreIngestionTask> entry2 = lowerEntry;
            if (entry2 == null || !Version.parseStoreFromKafkaTopicName(entry2.getKey()).equals(str)) {
                return;
            }
            entry2.getValue().disableMetricsEmission();
            lowerEntry = navigableMap.lowerEntry(entry2.getKey());
        }
    }

    private int getStoreMaximumVersionNumber(String str) {
        int largestUsedVersionNumber = this.metadataRepo.getStoreOrThrow(str).getLargestUsedVersionNumber();
        if (largestUsedVersionNumber == 0) {
            throw new VeniceException("No version has been created yet for store " + str);
        }
        return largestUsedVersionNumber;
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionService
    public void stopConsumption(VeniceStoreVersionConfig veniceStoreVersionConfig, int i) {
        String storeVersionName = veniceStoreVersionConfig.getStoreVersionName();
        AutoCloseableLock lockForResource = this.topicLockManager.getLockForResource(storeVersionName);
        try {
            StoreIngestionTask storeIngestionTask = (StoreIngestionTask) this.topicNameToIngestionTaskMap.get(storeVersionName);
            if (storeIngestionTask == null || !storeIngestionTask.isRunning()) {
                LOGGER.warn("Ignoring stop consumption message for Topic {} Partition {}", storeVersionName, Integer.valueOf(i));
            } else {
                storeIngestionTask.unSubscribePartition(new PubSubTopicPartitionImpl(this.pubSubTopicRepository.getTopic(storeVersionName), i));
            }
            if (lockForResource != null) {
                lockForResource.close();
            }
        } catch (Throwable th) {
            if (lockForResource != null) {
                try {
                    lockForResource.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionService
    public void stopConsumptionAndWait(VeniceStoreVersionConfig veniceStoreVersionConfig, int i, int i2, int i3) {
        String storeVersionName = veniceStoreVersionConfig.getStoreVersionName();
        if (isPartitionConsuming(storeVersionName, i)) {
            stopConsumption(veniceStoreVersionConfig, i);
            try {
                long currentTimeMillis = System.currentTimeMillis();
                int i4 = 0;
                while (true) {
                    if (i4 >= i3) {
                        break;
                    }
                    if (!isPartitionConsuming(storeVersionName, i)) {
                        LOGGER.info("Partition: {} of topic: {} has stopped consumption in {} ms.", Integer.valueOf(i), storeVersionName, Long.valueOf(LatencyUtils.getElapsedTimeInMs(currentTimeMillis)));
                        break;
                    } else {
                        Thread.sleep(i2 * 1000);
                        i4++;
                    }
                }
                LOGGER.error("Partition: {} of store: {} is still consuming after waiting for it to stop for {} seconds.", Integer.valueOf(i), storeVersionName, Integer.valueOf(i3 * i2));
            } catch (InterruptedException e) {
                LOGGER.warn("Waiting for partition to stop consumption was interrupted", e);
                Thread.currentThread().interrupt();
            }
        } else {
            LOGGER.warn("Partition: {} of topic: {} is not consuming, skipped the stop consumption.", Integer.valueOf(i), storeVersionName);
        }
        resetConsumptionOffset(veniceStoreVersionConfig, i);
        if (ingestionTaskHasAnySubscription(storeVersionName)) {
            return;
        }
        if (this.isIsolatedIngestion) {
            LOGGER.info("Ingestion task for topic {} will be kept open for the access from main process.", storeVersionName);
        } else {
            LOGGER.info("Shutting down ingestion task of topic {}", storeVersionName);
            shutdownStoreIngestionTask(storeVersionName);
        }
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionService
    public boolean killConsumptionTask(String str) {
        if (!isRunning()) {
            throw new VeniceException("KafkaStoreIngestionService is not running.");
        }
        boolean z = false;
        AutoCloseableLock lockForResource = this.topicLockManager.getLockForResource(str);
        try {
            StoreIngestionTask storeIngestionTask = (StoreIngestionTask) this.topicNameToIngestionTaskMap.get(str);
            if (storeIngestionTask == null) {
                LOGGER.info("Ignoring kill request for not-existing consumption task {}", str);
                if (lockForResource != null) {
                    lockForResource.close();
                }
                return false;
            }
            if (storeIngestionTask.isRunning()) {
                storeIngestionTask.kill();
                this.compressorFactory.removeVersionSpecificCompressor(str);
                z = true;
                LOGGER.info("Killed consumption task for topic {}", str);
            } else {
                LOGGER.warn("Ignoring kill request for stopped consumption task {}", str);
            }
            this.topicNameToIngestionTaskMap.remove(str);
            if (this.aggKafkaConsumerService != null) {
                this.aggKafkaConsumerService.unsubscribeAll(storeIngestionTask.getVersionTopic());
            }
            if (storeIngestionTask.isMetricsEmissionEnabled()) {
                storeIngestionTask.disableMetricsEmission();
                updateStatsEmission(this.topicNameToIngestionTaskMap, Version.parseStoreFromKafkaTopicName(str));
            }
            if (lockForResource != null) {
                lockForResource.close();
            }
            this.topicLockManager.removeLockForResource(str);
            return z;
        } catch (Throwable th) {
            if (lockForResource != null) {
                try {
                    lockForResource.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionService
    public void addIngestionNotifier(VeniceNotifier veniceNotifier) {
        this.leaderFollowerNotifiers.add(veniceNotifier);
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionService
    public void replaceAndAddTestNotifier(VeniceNotifier veniceNotifier) {
        this.leaderFollowerNotifiers.removeIf(veniceNotifier2 -> {
            return veniceNotifier2 instanceof PartitionPushStatusNotifier;
        });
        this.leaderFollowerNotifiers.add(veniceNotifier);
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionService
    public boolean containsRunningConsumption(VeniceStoreVersionConfig veniceStoreVersionConfig) {
        return containsRunningConsumption(veniceStoreVersionConfig.getStoreVersionName());
    }

    /* JADX WARN: Removed duplicated region for block: B:10:0x002d  */
    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionService
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public boolean containsRunningConsumption(java.lang.String r4) {
        /*
            r3 = this;
            r0 = r3
            com.linkedin.venice.utils.locks.ResourceAutoClosableLockManager<java.lang.String> r0 = r0.topicLockManager
            r1 = r4
            com.linkedin.venice.utils.locks.AutoCloseableLock r0 = r0.getLockForResource(r1)
            r5 = r0
            r0 = r3
            java.util.NavigableMap<java.lang.String, com.linkedin.davinci.kafka.consumer.StoreIngestionTask> r0 = r0.topicNameToIngestionTaskMap     // Catch: java.lang.Throwable -> L34
            r1 = r4
            java.lang.Object r0 = r0.get(r1)     // Catch: java.lang.Throwable -> L34
            com.linkedin.davinci.kafka.consumer.StoreIngestionTask r0 = (com.linkedin.davinci.kafka.consumer.StoreIngestionTask) r0     // Catch: java.lang.Throwable -> L34
            r6 = r0
            r0 = r6
            if (r0 == 0) goto L26
            r0 = r6
            boolean r0 = r0.isRunning()     // Catch: java.lang.Throwable -> L34
            if (r0 == 0) goto L26
            r0 = 1
            goto L27
        L26:
            r0 = 0
        L27:
            r7 = r0
            r0 = r5
            if (r0 == 0) goto L31
            r0 = r5
            r0.close()
        L31:
            r0 = r7
            return r0
        L34:
            r6 = move-exception
            r0 = r5
            if (r0 == 0) goto L48
            r0 = r5
            r0.close()     // Catch: java.lang.Throwable -> L40
            goto L48
        L40:
            r7 = move-exception
            r0 = r6
            r1 = r7
            r0.addSuppressed(r1)
        L48:
            r0 = r6
            throw r0
        */
        throw new UnsupportedOperationException("Method not decompiled: com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService.containsRunningConsumption(java.lang.String):boolean");
    }

    /* JADX WARN: Removed duplicated region for block: B:12:0x0039  */
    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionService
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public boolean isPartitionConsuming(java.lang.String r4, int r5) {
        /*
            r3 = this;
            r0 = r3
            com.linkedin.venice.utils.locks.ResourceAutoClosableLockManager<java.lang.String> r0 = r0.topicLockManager
            r1 = r4
            com.linkedin.venice.utils.locks.AutoCloseableLock r0 = r0.getLockForResource(r1)
            r6 = r0
            r0 = r3
            java.util.NavigableMap<java.lang.String, com.linkedin.davinci.kafka.consumer.StoreIngestionTask> r0 = r0.topicNameToIngestionTaskMap     // Catch: java.lang.Throwable -> L40
            r1 = r4
            java.lang.Object r0 = r0.get(r1)     // Catch: java.lang.Throwable -> L40
            com.linkedin.davinci.kafka.consumer.StoreIngestionTask r0 = (com.linkedin.davinci.kafka.consumer.StoreIngestionTask) r0     // Catch: java.lang.Throwable -> L40
            r7 = r0
            r0 = r7
            if (r0 == 0) goto L32
            r0 = r7
            boolean r0 = r0.isRunning()     // Catch: java.lang.Throwable -> L40
            if (r0 == 0) goto L32
            r0 = r7
            r1 = r5
            boolean r0 = r0.isPartitionConsuming(r1)     // Catch: java.lang.Throwable -> L40
            if (r0 == 0) goto L32
            r0 = 1
            goto L33
        L32:
            r0 = 0
        L33:
            r8 = r0
            r0 = r6
            if (r0 == 0) goto L3d
            r0 = r6
            r0.close()
        L3d:
            r0 = r8
            return r0
        L40:
            r7 = move-exception
            r0 = r6
            if (r0 == 0) goto L56
            r0 = r6
            r0.close()     // Catch: java.lang.Throwable -> L4d
            goto L56
        L4d:
            r8 = move-exception
            r0 = r7
            r1 = r8
            r0.addSuppressed(r1)
        L56:
            r0 = r7
            throw r0
        */
        throw new UnsupportedOperationException("Method not decompiled: com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService.isPartitionConsuming(java.lang.String, int):boolean");
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionService
    public Set<String> getIngestingTopicsWithVersionStatusNotOnline() {
        HashSet hashSet = new HashSet();
        for (String str : this.topicNameToIngestionTaskMap.keySet()) {
            try {
                Store storeOrThrow = this.metadataRepo.getStoreOrThrow(Version.parseStoreFromKafkaTopicName(str));
                int parseVersionFromKafkaTopicName = Version.parseVersionFromKafkaTopicName(str);
                if (storeOrThrow == null || !storeOrThrow.getVersion(parseVersionFromKafkaTopicName).isPresent() || ((Version) storeOrThrow.getVersion(parseVersionFromKafkaTopicName).get()).getStatus() != VersionStatus.ONLINE) {
                    hashSet.add(str);
                }
            } catch (Exception e) {
                LOGGER.error("Unexpected exception while fetching ongoing ingestion topics, topic: {}", str, e);
            } catch (VeniceNoStoreException e2) {
                hashSet.add(str);
            }
        }
        return hashSet;
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionService
    public void recordIngestionFailure(String str) {
        ((HostLevelIngestionStats) this.hostLevelIngestionStats.getStoreStats(str)).recordIngestionFailure();
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionService
    public AggVersionedIngestionStats getAggVersionedIngestionStats() {
        return this.versionedIngestionStats;
    }

    private static String getGroupId(String str) {
        return String.format(GROUP_ID_FORMAT, str, Utils.getHostName());
    }

    private Properties getCommonKafkaConsumerProperties(VeniceServerConfig veniceServerConfig) {
        Properties properties = new Properties();
        ApacheKafkaProducerConfig.copyKafkaSASLProperties(veniceServerConfig.getClusterProperties(), properties, false);
        properties.setProperty("kafka.bootstrap.servers", veniceServerConfig.getKafkaBootstrapServers());
        properties.setProperty("kafka.auto.offset.reset", "earliest");
        properties.setProperty("kafka.enable.auto.commit", "false");
        properties.setProperty("kafka.fetch.min.bytes", String.valueOf(veniceServerConfig.getKafkaFetchMinSizePerSecond()));
        properties.setProperty("kafka.fetch.max.bytes", String.valueOf(veniceServerConfig.getKafkaFetchMaxSizePerSecond()));
        properties.setProperty("kafka.max.poll.records", Integer.toString(veniceServerConfig.getKafkaMaxPollRecords()));
        properties.setProperty("kafka.fetch.max.wait.ms", String.valueOf(veniceServerConfig.getKafkaFetchMaxTimeMS()));
        properties.setProperty("kafka.max.partition.fetch.bytes", String.valueOf(veniceServerConfig.getKafkaFetchPartitionMaxSizePerSecond()));
        properties.setProperty("kafka.consumer.poll.retry.times", String.valueOf(veniceServerConfig.getKafkaPollRetryTimes()));
        properties.setProperty("kafka.consumer.poll.retry.backoff.ms", String.valueOf(veniceServerConfig.getKafkaPollRetryBackoffMs()));
        return properties;
    }

    private VeniceProperties getPubSubSSLPropertiesFromServerConfig(String str) {
        VeniceServerConfig veniceServerConfig = this.veniceConfigLoader.getVeniceServerConfig();
        if (!str.equals(veniceServerConfig.getKafkaBootstrapServers())) {
            Properties properties = veniceServerConfig.getClusterProperties().toProperties();
            properties.setProperty("kafka.bootstrap.servers", str);
            veniceServerConfig = new VeniceServerConfig(new VeniceProperties(properties), veniceServerConfig.getKafkaClusterMap());
        }
        VeniceProperties clusterProperties = veniceServerConfig.getClusterProperties();
        Properties properties2 = new Properties();
        ApacheKafkaProducerConfig.copyKafkaSASLProperties(clusterProperties, properties2, false);
        String kafkaBootstrapServers = veniceServerConfig.getKafkaBootstrapServers();
        String apply = veniceServerConfig.getKafkaClusterUrlResolver().apply(kafkaBootstrapServers);
        if (apply != null) {
            kafkaBootstrapServers = apply;
        }
        properties2.setProperty("kafka.bootstrap.servers", kafkaBootstrapServers);
        SecurityProtocol kafkaSecurityProtocol = veniceServerConfig.getKafkaSecurityProtocol(kafkaBootstrapServers);
        if (KafkaSSLUtils.isKafkaSSLProtocol(kafkaSecurityProtocol)) {
            Optional<SSLConfig> sslConfig = veniceServerConfig.getSslConfig();
            if (!sslConfig.isPresent()) {
                throw new VeniceException("SSLConfig should be present when Kafka SSL is enabled");
            }
            properties2.putAll(sslConfig.get().getKafkaSSLConfig());
            if (veniceServerConfig.isKafkaOpenSSLEnabled()) {
                properties2.setProperty("ssl.context.provider.class", ConfigConstants.DEFAULT_KAFKA_SSL_CONTEXT_PROVIDER_CLASS_NAME);
            }
        }
        properties2.setProperty("security.protocol", kafkaSecurityProtocol.name);
        return new VeniceProperties(properties2);
    }

    private Properties getKafkaConsumerProperties(VeniceStoreVersionConfig veniceStoreVersionConfig) {
        Properties commonKafkaConsumerProperties = getCommonKafkaConsumerProperties(veniceStoreVersionConfig);
        String groupId = getGroupId(veniceStoreVersionConfig.getStoreVersionName());
        commonKafkaConsumerProperties.setProperty("group.id", groupId);
        commonKafkaConsumerProperties.setProperty("client.id", groupId);
        return commonKafkaConsumerProperties;
    }

    @Override // com.linkedin.davinci.storage.MetadataRetriever
    public ByteBuffer getStoreVersionCompressionDictionary(String str) {
        return this.storageMetadataService.getStoreVersionCompressionDictionary(str);
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionService
    public StoreIngestionTask getStoreIngestionTask(String str) {
        return (StoreIngestionTask) this.topicNameToIngestionTaskMap.get(str);
    }

    @Override // com.linkedin.davinci.storage.MetadataRetriever
    public AdminResponse getConsumptionSnapshots(String str, ComplementSet<Integer> complementSet) {
        AdminResponse adminResponse = new AdminResponse();
        StoreIngestionTask storeIngestionTask = getStoreIngestionTask(str);
        if (storeIngestionTask != null) {
            storeIngestionTask.dumpPartitionConsumptionStates(adminResponse, complementSet);
            storeIngestionTask.dumpStoreVersionState(adminResponse);
        } else {
            String str2 = "Ingestion task for " + str + " doesn't exist for " + ServerAdminAction.DUMP_INGESTION_STATE + " admin command";
            LOGGER.warn(str2);
            adminResponse.setMessage(str2);
        }
        return adminResponse;
    }

    @Override // com.linkedin.davinci.storage.MetadataRetriever
    public MetadataResponse getMetadata(String str) {
        MetadataResponse metadataResponse = new MetadataResponse();
        try {
            Store storeOrThrow = this.metadataRepo.getStoreOrThrow(str);
            Version version = (Version) storeOrThrow.getVersion(storeOrThrow.getCurrentVersion()).get();
            VersionProperties versionProperties = new VersionProperties(Integer.valueOf(storeOrThrow.getCurrentVersion()), Integer.valueOf(version.getCompressionStrategy().getValue()), Integer.valueOf(version.getPartitionCount()), version.getPartitionerConfig().getPartitionerClass(), new HashMap(version.getPartitionerConfig().getPartitionerParams()), Integer.valueOf(version.getPartitionerConfig().getAmplificationFactor()));
            ArrayList arrayList = new ArrayList();
            Iterator it = storeOrThrow.getVersions().iterator();
            while (it.hasNext()) {
                arrayList.add(Integer.valueOf(((Version) it.next()).getNumber()));
            }
            Map<CharSequence, CharSequence> singletonMap = Collections.singletonMap(String.valueOf(this.schemaRepo.getKeySchema(str).getId()), this.schemaRepo.getKeySchema(str).getSchema().toString());
            HashMap hashMap = new HashMap();
            Iterator it2 = this.schemaRepo.getValueSchemas(str).iterator();
            while (it2.hasNext()) {
                String schema = ((SchemaEntry) it2.next()).getSchema().toString();
                hashMap.put(String.valueOf(this.schemaRepo.getValueSchemaId(str, schema)), schema);
            }
            int latestSuperSetValueSchemaId = storeOrThrow.getLatestSuperSetValueSchemaId();
            HashMap hashMap2 = new HashMap();
            for (String str2 : this.customizedViewRepository.getResourceAssignment().getAssignedResources()) {
                if (str2.endsWith("v" + storeOrThrow.getCurrentVersion())) {
                    for (Partition partition : this.customizedViewRepository.getPartitionAssignments(str2).getAllPartitions()) {
                        ArrayList arrayList2 = new ArrayList();
                        Iterator it3 = this.customizedViewRepository.getReadyToServeInstances(str2, partition.getId()).iterator();
                        while (it3.hasNext()) {
                            arrayList2.add(((Instance) it3.next()).getUrl(true));
                        }
                        hashMap2.put(String.valueOf(partition.getId()), arrayList2);
                    }
                }
            }
            HashMap hashMap3 = new HashMap();
            for (Map.Entry entry : this.helixInstanceConfigRepository.getInstanceGroupIdMapping().entrySet()) {
                hashMap3.put(HelixUtils.instanceIdToUrl((String) entry.getKey()), (Integer) entry.getValue());
            }
            metadataResponse.setVersionMetadata(versionProperties);
            metadataResponse.setVersions(arrayList);
            metadataResponse.setKeySchema(singletonMap);
            metadataResponse.setValueSchemas(hashMap);
            metadataResponse.setLatestSuperSetValueSchemaId(latestSuperSetValueSchemaId);
            metadataResponse.setRoutingInfo(hashMap2);
            metadataResponse.setHelixGroupInfo(hashMap3);
        } catch (VeniceNoStoreException e) {
            LOGGER.warn("Store {} not found in metadataRepo.", str);
            metadataResponse.setMessage("Store \"" + str + "\" not found");
            metadataResponse.setError(true);
        }
        return metadataResponse;
    }

    @Override // com.linkedin.davinci.kafka.consumer.StoreIngestionService
    public void traverseAllIngestionTasksAndApply(Consumer<StoreIngestionTask> consumer) {
        this.topicNameToIngestionTaskMap.values().forEach(consumer);
    }

    public AggLagStats getAggLagStats() {
        return this.aggLagStats;
    }

    public LeaderFollowerStateType getLeaderStateFromPartitionConsumptionState(String str, int i) {
        return getStoreIngestionTask(str).getLeaderState(i);
    }

    public void updatePartitionOffsetRecords(String str, int i, List<ByteBuffer> list) {
        int amplificationFactor = PartitionUtils.getAmplificationFactor(this.metadataRepo, str);
        int i2 = amplificationFactor * i;
        IntListIterator it = PartitionUtils.getSubPartitions(i, amplificationFactor).iterator();
        while (it.hasNext()) {
            int intValue = ((Integer) it.next()).intValue();
            byte[] array = list.get(intValue - i2).array();
            try {
                OffsetRecord offsetRecord = new OffsetRecord(array, this.partitionStateSerializer);
                this.storageMetadataService.put(str, intValue, offsetRecord);
                LOGGER.info("Updated OffsetRecord: {} for topic: {}, partition: {}", offsetRecord.toString(), str, Integer.valueOf(i));
            } catch (Exception e) {
                LOGGER.error("Caught exception when deserializing offset record byte array: {} for topic: {}, subPartition: {}.", Arrays.toString(array), str, Integer.valueOf(intValue));
                throw e;
            }
        }
    }

    public List<ByteBuffer> getPartitionOffsetRecords(String str, int i) {
        ArrayList arrayList = new ArrayList();
        StoreIngestionTask storeIngestionTask = getStoreIngestionTask(str);
        int amplificationFactor = storeIngestionTask.getAmplificationFactor();
        for (int i2 = 0; i2 < amplificationFactor; i2++) {
            arrayList.add(ByteBuffer.wrap(storeIngestionTask.getPartitionConsumptionState((amplificationFactor * i) + i2).getOffsetRecord().toBytes()));
        }
        return arrayList;
    }

    public void syncTopicPartitionOffset(String str, int i) {
        StoreIngestionTask storeIngestionTask = getStoreIngestionTask(str);
        int amplificationFactor = storeIngestionTask.getAmplificationFactor();
        for (int i2 = 0; i2 < amplificationFactor; i2++) {
            storeIngestionTask.updateOffsetMetadataAndSync(str, (amplificationFactor * i) + i2);
        }
    }

    public final ReadOnlyStoreRepository getMetadataRepo() {
        return this.metadataRepo;
    }

    /* JADX WARN: Removed duplicated region for block: B:10:0x002d  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private boolean ingestionTaskHasAnySubscription(java.lang.String r4) {
        /*
            r3 = this;
            r0 = r3
            com.linkedin.venice.utils.locks.ResourceAutoClosableLockManager<java.lang.String> r0 = r0.topicLockManager
            r1 = r4
            com.linkedin.venice.utils.locks.AutoCloseableLock r0 = r0.getLockForResource(r1)
            r5 = r0
            r0 = r3
            java.util.NavigableMap<java.lang.String, com.linkedin.davinci.kafka.consumer.StoreIngestionTask> r0 = r0.topicNameToIngestionTaskMap     // Catch: java.lang.Throwable -> L34
            r1 = r4
            java.lang.Object r0 = r0.get(r1)     // Catch: java.lang.Throwable -> L34
            com.linkedin.davinci.kafka.consumer.StoreIngestionTask r0 = (com.linkedin.davinci.kafka.consumer.StoreIngestionTask) r0     // Catch: java.lang.Throwable -> L34
            r6 = r0
            r0 = r6
            if (r0 == 0) goto L26
            r0 = r6
            boolean r0 = r0.hasAnySubscription()     // Catch: java.lang.Throwable -> L34
            if (r0 == 0) goto L26
            r0 = 1
            goto L27
        L26:
            r0 = 0
        L27:
            r7 = r0
            r0 = r5
            if (r0 == 0) goto L31
            r0 = r5
            r0.close()
        L31:
            r0 = r7
            return r0
        L34:
            r6 = move-exception
            r0 = r5
            if (r0 == 0) goto L48
            r0 = r5
            r0.close()     // Catch: java.lang.Throwable -> L40
            goto L48
        L40:
            r7 = move-exception
            r0 = r6
            r1 = r7
            r0.addSuppressed(r1)
        L48:
            r0 = r6
            throw r0
        */
        throw new UnsupportedOperationException("Method not decompiled: com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService.ingestionTaskHasAnySubscription(java.lang.String):boolean");
    }

    private void resetConsumptionOffset(VeniceStoreVersionConfig veniceStoreVersionConfig, int i) {
        String storeVersionName = veniceStoreVersionConfig.getStoreVersionName();
        StoreIngestionTask storeIngestionTask = (StoreIngestionTask) this.topicNameToIngestionTaskMap.get(storeVersionName);
        if (storeIngestionTask != null && storeIngestionTask.isRunning()) {
            storeIngestionTask.resetPartitionConsumptionOffset(new PubSubTopicPartitionImpl(this.pubSubTopicRepository.getTopic(storeVersionName), i));
        }
        LOGGER.info("Offset reset to beginning - Kafka Partition: {}-{}.", storeVersionName, Integer.valueOf(i));
    }
}
