package com.linkedin.venice.router.api;

import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.CompressorFactory;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.meta.OnlineInstanceFinder;
import com.linkedin.venice.meta.QueryAction;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreDataChangedListener;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionStatus;
import com.linkedin.venice.router.VeniceRouterConfig;
import com.linkedin.venice.router.httpclient.PortableHttpResponse;
import com.linkedin.venice.router.httpclient.StorageNodeClient;
import com.linkedin.venice.router.httpclient.VeniceMetaDataRequest;
import com.linkedin.venice.security.SSLFactory;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/router/api/DictionaryRetrievalService.class */
public class DictionaryRetrievalService extends AbstractVeniceService {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) DictionaryRetrievalService.class);
    private static final int DEFAULT_DICTIONARY_DOWNLOAD_INTERVAL_IN_MS = 100;
    private final OnlineInstanceFinder onlineInstanceFinder;
    private final Optional<SSLFactory> sslFactory;
    private final ReadOnlyStoreRepository metadataRepository;
    private final Thread dictionaryRetrieverThread;
    private final ScheduledExecutorService executor;
    private final StorageNodeClient storageNodeClient;
    private final CompressorFactory compressorFactory;
    private final int dictionaryRetrievalTimeMs;
    private final BlockingQueue<String> dictionaryDownloadCandidates = new LinkedBlockingQueue();
    private final VeniceConcurrentHashMap<String, CompletableFuture<Void>> downloadingDictionaryFutures = new VeniceConcurrentHashMap<>();
    private final StoreDataChangedListener storeChangeListener = new StoreDataChangedListener() { // from class: com.linkedin.venice.router.api.DictionaryRetrievalService.1
        @Override // com.linkedin.venice.meta.StoreDataChangedListener
        public void handleStoreCreated(Store store) {
            DictionaryRetrievalService.this.dictionaryDownloadCandidates.addAll((Collection) store.getVersions().stream().filter(version -> {
                return version.getCompressionStrategy() == CompressionStrategy.ZSTD_WITH_DICT && version.getStatus() == VersionStatus.ONLINE;
            }).map((v0) -> {
                return v0.kafkaTopicName();
            }).collect(Collectors.toList()));
        }

        @Override // com.linkedin.venice.meta.StoreDataChangedListener
        public void handleStoreDeleted(Store store) {
            store.getVersions().forEach(version -> {
                DictionaryRetrievalService.this.handleVersionRetirement(version.kafkaTopicName(), "Store deleted.");
            });
        }

        @Override // com.linkedin.venice.meta.StoreDataChangedListener
        public void handleStoreChanged(Store store) {
            List<Version> versions = store.getVersions();
            DictionaryRetrievalService.this.dictionaryDownloadCandidates.addAll((Collection) versions.stream().filter(version -> {
                return version.getCompressionStrategy() == CompressionStrategy.ZSTD_WITH_DICT && version.getStatus() == VersionStatus.ONLINE;
            }).filter(version2 -> {
                return !DictionaryRetrievalService.this.downloadingDictionaryFutures.containsKey(version2.kafkaTopicName());
            }).map((v0) -> {
                return v0.kafkaTopicName();
            }).collect(Collectors.toList()));
            versions.stream().filter(version3 -> {
                return version3.getCompressionStrategy() == CompressionStrategy.ZSTD_WITH_DICT && version3.getStatus() != VersionStatus.ONLINE;
            }).forEach(version4 -> {
                DictionaryRetrievalService.this.handleVersionRetirement(version4.kafkaTopicName(), "Version status " + version4.getStatus());
            });
            DictionaryRetrievalService.this.downloadingDictionaryFutures.keySet().stream().filter(str -> {
                return Version.parseStoreFromKafkaTopicName(str).equals(store.getName());
            }).filter(str2 -> {
                return !store.getVersion(Version.parseVersionFromKafkaTopicName(str2)).isPresent();
            }).forEach(str3 -> {
                DictionaryRetrievalService.this.handleVersionRetirement(str3, "Version retired");
            });
        }
    };

    public DictionaryRetrievalService(OnlineInstanceFinder onlineInstanceFinder, VeniceRouterConfig veniceRouterConfig, Optional<SSLFactory> optional, ReadOnlyStoreRepository readOnlyStoreRepository, StorageNodeClient storageNodeClient, CompressorFactory compressorFactory) {
        this.onlineInstanceFinder = onlineInstanceFinder;
        this.sslFactory = optional;
        this.metadataRepository = readOnlyStoreRepository;
        this.storageNodeClient = storageNodeClient;
        this.compressorFactory = compressorFactory;
        this.dictionaryRetrievalTimeMs = veniceRouterConfig.getDictionaryRetrievalTimeMs();
        this.executor = Executors.newScheduledThreadPool(veniceRouterConfig.getRouterDictionaryProcessingThreads());
        this.dictionaryRetrieverThread = new Thread(() -> {
            while (true) {
                try {
                    String take = this.dictionaryDownloadCandidates.take();
                    if (!compressorFactory.versionSpecificCompressorExists(take) && !this.downloadingDictionaryFutures.containsKey(take)) {
                        downloadDictionaries(Arrays.asList(take));
                    }
                } catch (InterruptedException e) {
                    LOGGER.warn("Thread was interrupted while waiting for a candidate to download dictionary.", (Throwable) e);
                    return;
                }
            }
        });
    }

    private CompletableFuture<byte[]> getDictionary(String str, int i) {
        String composeKafkaTopic = Version.composeKafkaTopic(str, i);
        Instance onlineInstance = getOnlineInstance(composeKafkaTopic);
        if (onlineInstance == null) {
            return CompletableFuture.supplyAsync(() -> {
                throw new VeniceException("No online storage instance for resource: " + composeKafkaTopic);
            }, this.executor);
        }
        String url = onlineInstance.getUrl(this.sslFactory.isPresent());
        LOGGER.info("Downloading dictionary for resource: {} from: {}", composeKafkaTopic, url);
        VeniceMetaDataRequest veniceMetaDataRequest = new VeniceMetaDataRequest(onlineInstance, QueryAction.DICTIONARY.toString().toLowerCase() + "/" + str + "/" + i, "GET", this.sslFactory.isPresent());
        CompletableFuture<PortableHttpResponse> completableFuture = new CompletableFuture<>();
        this.storageNodeClient.sendRequest(veniceMetaDataRequest, completableFuture);
        return CompletableFuture.supplyAsync(() -> {
            VeniceException veniceException;
            byte[] dictionaryFromResponse;
            try {
                dictionaryFromResponse = getDictionaryFromResponse((PortableHttpResponse) completableFuture.get(this.dictionaryRetrievalTimeMs, TimeUnit.MILLISECONDS), url);
            } catch (InterruptedException e) {
                veniceException = new VeniceException("Dictionary download for resource: " + composeKafkaTopic + " from: " + url + " was interrupted: " + e.getMessage());
            } catch (ExecutionException e2) {
                veniceException = new VeniceException("ExecutionException encountered when downloading dictionary for resource: " + composeKafkaTopic + " from: " + url + " : " + e2.getMessage());
            } catch (TimeoutException e3) {
                veniceException = new VeniceException("Dictionary download for resource: " + composeKafkaTopic + " from: " + url + " timed out : " + e3.getMessage());
            }
            if (dictionaryFromResponse != null) {
                return dictionaryFromResponse;
            }
            veniceException = new VeniceException("Dictionary download for resource: " + composeKafkaTopic + " from: " + url + " returned unexpected response.");
            LOGGER.warn(veniceException.getMessage());
            throw veniceException;
        }, this.executor);
    }

    private byte[] getDictionaryFromResponse(PortableHttpResponse portableHttpResponse, String str) {
        try {
            int statusCode = portableHttpResponse.getStatusCode();
            if (statusCode != 200) {
                LOGGER.warn("Dictionary fetch returns {} for {}", Integer.valueOf(statusCode), str);
                return null;
            }
            ByteBuf contentInByteBuf = portableHttpResponse.getContentInByteBuf();
            byte[] bArr = new byte[contentInByteBuf.readableBytes()];
            contentInByteBuf.readBytes(bArr);
            return bArr;
        } catch (IOException e) {
            LOGGER.warn("Dictionary fetch HTTP response error: {} for {}", e.getMessage(), str);
            return null;
        }
    }

    private Instance getOnlineInstance(String str) {
        try {
            int numberOfPartitions = this.onlineInstanceFinder.getNumberOfPartitions(str);
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < numberOfPartitions; i++) {
                arrayList.addAll(this.onlineInstanceFinder.getReadyToServeInstances(str, i));
            }
            if (arrayList.isEmpty()) {
                return null;
            }
            return (Instance) arrayList.get((int) (Math.random() * arrayList.size()));
        } catch (Exception e) {
            LOGGER.warn("Exception caught in getting online instances for resource: {}. {}", str, e.getMessage());
            return null;
        }
    }

    private boolean getAllDictionaries() {
        this.metadataRepository.refresh();
        return downloadDictionaries((List) this.metadataRepository.getAllStores().stream().flatMap(store -> {
            return store.getVersions().stream();
        }).filter(version -> {
            return version.getCompressionStrategy() == CompressionStrategy.ZSTD_WITH_DICT && version.getStatus() == VersionStatus.ONLINE;
        }).filter(version2 -> {
            return !this.downloadingDictionaryFutures.containsKey(version2.kafkaTopicName());
        }).map((v0) -> {
            return v0.kafkaTopicName();
        }).collect(Collectors.toList()));
    }

    private boolean downloadDictionaries(Collection<String> collection) {
        String join = String.join(",", collection);
        if (join.isEmpty()) {
            return true;
        }
        List list = (List) collection.stream().map(str -> {
            return this.metadataRepository.getStore(Version.parseStoreFromKafkaTopicName(str)).getVersion(Version.parseVersionFromKafkaTopicName(str));
        }).filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toList());
        LOGGER.info("Beginning dictionary fetch for {}", join);
        try {
            CompletableFuture.allOf((CompletableFuture[]) list.stream().map(this::fetchCompressionDictionary).filter((v0) -> {
                return Objects.nonNull(v0);
            }).toArray(i -> {
                return new CompletableFuture[i];
            })).get(this.dictionaryRetrievalTimeMs, TimeUnit.MILLISECONDS);
            return true;
        } catch (Exception e) {
            LOGGER.warn("Dictionary fetch failed. Store topics were: {}. {}", join, e.getMessage());
            return false;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private CompletableFuture<Void> fetchCompressionDictionary(Version version) {
        CompletableFuture handleAsync;
        String kafkaTopicName = version.kafkaTopicName();
        if (this.downloadingDictionaryFutures.containsKey(kafkaTopicName)) {
            handleAsync = this.downloadingDictionaryFutures.get(kafkaTopicName);
        } else {
            handleAsync = getDictionary(version.getStoreName(), version.getNumber()).handleAsync((BiFunction<? super byte[], Throwable, ? extends U>) (bArr, th) -> {
                if (th == null) {
                    initCompressorFromDictionary(version, bArr);
                    LOGGER.info("Dictionary downloaded and compressor is ready for resource: {}", kafkaTopicName);
                    return null;
                }
                if (th instanceof InterruptedException) {
                    LOGGER.warn("{}. Will not retry dictionary download.", th.getMessage());
                    return null;
                }
                LOGGER.warn("Exception encountered when asynchronously downloading dictionary for resource: {}. {}", kafkaTopicName, th.getMessage());
                while (this.downloadingDictionaryFutures.remove(kafkaTopicName) == null) {
                    if (!Utils.sleep(100L)) {
                        LOGGER.warn("Got InterruptedException. Will not retry dictionary download.");
                        return null;
                    }
                }
                this.executor.schedule(() -> {
                    return Boolean.valueOf(this.dictionaryDownloadCandidates.add(kafkaTopicName));
                }, 100L, TimeUnit.MILLISECONDS);
                return null;
            }, (Executor) this.executor);
            this.downloadingDictionaryFutures.put(kafkaTopicName, handleAsync);
        }
        return handleAsync;
    }

    private void initCompressorFromDictionary(Version version, byte[] bArr) {
        String kafkaTopicName = version.kafkaTopicName();
        if (version.getStatus() == VersionStatus.ONLINE && this.downloadingDictionaryFutures.containsKey(kafkaTopicName)) {
            this.compressorFactory.createVersionSpecificCompressorIfNotExist(version.getCompressionStrategy(), kafkaTopicName, bArr);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleVersionRetirement(String str, String str2) {
        InterruptedException interruptedException = new InterruptedException("Dictionary download for resource " + str + " interrupted: " + str2);
        CompletableFuture<Void> remove = this.downloadingDictionaryFutures.remove(str);
        if (remove != null && !remove.isDone()) {
            remove.completeExceptionally(interruptedException);
        }
        this.dictionaryDownloadCandidates.remove(str);
        this.compressorFactory.removeVersionSpecificCompressor(str);
    }

    @Override // com.linkedin.venice.service.AbstractVeniceService
    public boolean startInner() {
        this.metadataRepository.registerStoreDataChangedListener(this.storeChangeListener);
        if (!getAllDictionaries()) {
            throw new VeniceException("Dictionary warmup failed! Preventing router start up.");
        }
        this.dictionaryRetrieverThread.start();
        return true;
    }

    @Override // com.linkedin.venice.service.AbstractVeniceService
    public void stopInner() throws IOException {
        this.dictionaryRetrieverThread.interrupt();
        this.executor.shutdownNow();
        this.downloadingDictionaryFutures.forEach((str, completableFuture) -> {
            completableFuture.completeExceptionally(new InterruptedException("Dictionary download thread stopped"));
        });
    }
}
