package com.linkedin.davinci.client;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.davinci.DaVinciBackend;
import com.linkedin.davinci.StoreBackend;
import com.linkedin.davinci.VersionBackend;
import com.linkedin.davinci.config.VeniceConfigLoader;
import com.linkedin.davinci.storage.chunking.AbstractAvroChunkingAdapter;
import com.linkedin.davinci.storage.chunking.GenericChunkingAdapter;
import com.linkedin.davinci.storage.chunking.GenericRecordChunkingAdapter;
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
import com.linkedin.davinci.store.cache.backend.ObjectCacheConfig;
import com.linkedin.davinci.store.rocksdb.RocksDBServerConfig;
import com.linkedin.venice.client.exceptions.ServiceDiscoveryException;
import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.client.stats.ClientStats;
import com.linkedin.venice.client.store.AvroComputeRequestBuilderV4;
import com.linkedin.venice.client.store.AvroGenericReadComputeStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.client.store.ComputeGenericRecord;
import com.linkedin.venice.client.store.ComputeRequestBuilder;
import com.linkedin.venice.client.store.D2ServiceDiscovery;
import com.linkedin.venice.client.store.streaming.StreamingCallback;
import com.linkedin.venice.client.store.transport.D2TransportClient;
import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.compute.ComputeRequestWrapper;
import com.linkedin.venice.controllerapi.D2ServiceDiscoveryResponseV2;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapter;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.serializer.RecordSerializer;
import com.linkedin.venice.service.ICProvider;
import com.linkedin.venice.utils.ComplementSet;
import com.linkedin.venice.utils.ComputeUtils;
import com.linkedin.venice.utils.PropertyBuilder;
import com.linkedin.venice.utils.ReferenceCounted;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/davinci/client/AvroGenericDaVinciClient.class */
public class AvroGenericDaVinciClient<K, V> implements DaVinciClient<K, V>, AvroGenericReadComputeStoreClient<K, V> {
    protected final Logger logger;
    private final DaVinciConfig daVinciConfig;
    private final ClientConfig clientConfig;
    private final VeniceProperties backendConfig;
    private final Optional<Set<String>> managedClients;
    private final ICProvider icProvider;
    private final AtomicBoolean ready;
    private final ComplementSet<Integer> subscription;
    private RecordSerializer<K> keySerializer;
    private RecordDeserializer<K> keyDeserializer;
    private AvroGenericReadComputeStoreClient<K, V> veniceClient;
    private StoreBackend storeBackend;
    private static ReferenceCounted<DaVinciBackend> daVinciBackend;
    private ObjectCacheBackend cacheBackend;
    private static final ThreadLocal<ReusableObjects> threadLocalReusableObjects = ThreadLocal.withInitial(() -> {
        return new ReusableObjects();
    });
    private static final Map<CharSequence, Schema> computeResultSchemaCache = new VeniceConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/davinci/client/AvroGenericDaVinciClient$ReusableObjects.class */
    public static class ReusableObjects {
        final ByteBuffer rawValue;
        final BinaryDecoder binaryDecoder;
        final BinaryEncoder binaryEncoder;
        final ByteArrayOutputStream byteArrayOutputStream;
        private static final int REUSABLE_MAP_CAPACITY = 100;
        private static final float REUSABLE_MAP_LOAD_FACTOR = 0.75f;
        final LinkedHashMap<Schema, GenericRecord> reuseValueRecordMap;

        private ReusableObjects() {
            this.rawValue = ByteBuffer.allocate(1048576);
            this.binaryDecoder = DecoderFactory.defaultFactory().createBinaryDecoder(new byte[16], (BinaryDecoder) null);
            this.binaryEncoder = AvroCompatibilityHelper.newBinaryEncoder(new ByteArrayOutputStream(), true, (BinaryEncoder) null);
            this.byteArrayOutputStream = new ByteArrayOutputStream();
            this.reuseValueRecordMap = new LinkedHashMap<Schema, GenericRecord>(REUSABLE_MAP_CAPACITY, REUSABLE_MAP_LOAD_FACTOR, true) { // from class: com.linkedin.davinci.client.AvroGenericDaVinciClient.ReusableObjects.1
                @Override // java.util.LinkedHashMap
                protected boolean removeEldestEntry(Map.Entry<Schema, GenericRecord> entry) {
                    return size() > ReusableObjects.REUSABLE_MAP_CAPACITY;
                }
            };
        }
    }

    public AvroGenericDaVinciClient(DaVinciConfig daVinciConfig, ClientConfig clientConfig, VeniceProperties veniceProperties, Optional<Set<String>> optional) {
        this(daVinciConfig, clientConfig, veniceProperties, optional, null);
    }

    public AvroGenericDaVinciClient(DaVinciConfig daVinciConfig, ClientConfig clientConfig, VeniceProperties veniceProperties, Optional<Set<String>> optional, ICProvider iCProvider) {
        this.logger = LogManager.getLogger(getClass());
        this.ready = new AtomicBoolean(false);
        this.subscription = ComplementSet.emptySet();
        this.logger.info("Creating client, storeName={}, daVinciConfig={}", clientConfig.getStoreName(), daVinciConfig);
        this.daVinciConfig = daVinciConfig;
        this.clientConfig = clientConfig;
        this.backendConfig = veniceProperties;
        this.managedClients = optional;
        this.icProvider = iCProvider;
    }

    public String getStoreName() {
        return this.clientConfig.getStoreName();
    }

    public Schema getKeySchema() {
        throwIfNotReady();
        return getBackend().getSchemaRepository().getKeySchema(getStoreName()).getSchema();
    }

    public Schema getLatestValueSchema() {
        throwIfNotReady();
        return getBackend().getSchemaRepository().getSupersetOrLatestValueSchema(getStoreName()).getSchema();
    }

    @Override // com.linkedin.davinci.client.DaVinciClient
    public int getPartitionCount() {
        throwIfNotReady();
        Store storeOrThrow = getBackend().getStoreRepository().getStoreOrThrow(getStoreName());
        Optional map = storeOrThrow.getVersion(storeOrThrow.getCurrentVersion()).map((v0) -> {
            return v0.getPartitionCount();
        });
        Objects.requireNonNull(storeOrThrow);
        return ((Integer) map.orElseGet(storeOrThrow::getPartitionCount)).intValue();
    }

    @Override // com.linkedin.davinci.client.DaVinciClient
    public CompletableFuture<Void> subscribeAll() {
        return subscribe(ComplementSet.universalSet());
    }

    @Override // com.linkedin.davinci.client.DaVinciClient
    public CompletableFuture<Void> subscribe(Set<Integer> set) {
        return subscribe(ComplementSet.wrap(set));
    }

    protected CompletableFuture<Void> subscribe(ComplementSet<Integer> complementSet) {
        throwIfNotReady();
        this.subscription.addAll(complementSet);
        return this.storeBackend.subscribe(complementSet);
    }

    @Override // com.linkedin.davinci.client.DaVinciClient
    public void unsubscribeAll() {
        unsubscribe(ComplementSet.universalSet());
        if (this.daVinciConfig.isCacheEnabled()) {
            dropAllCachePartitions();
        }
    }

    @Override // com.linkedin.davinci.client.DaVinciClient
    public void unsubscribe(Set<Integer> set) {
        unsubscribe(ComplementSet.wrap(set));
    }

    protected void unsubscribe(ComplementSet<Integer> complementSet) {
        throwIfNotReady();
        if (this.daVinciConfig.isIsolated()) {
            ComplementSet newSet = ComplementSet.newSet(complementSet);
            newSet.removeAll(this.subscription);
            if (!newSet.isEmpty()) {
                this.logger.warn("Partitions {} of {} are not subscribed, ignoring unsubscribe request.", newSet, getStoreName());
                complementSet = ComplementSet.newSet(complementSet);
                complementSet.removeAll(newSet);
            }
        }
        this.subscription.removeAll(complementSet);
        this.storeBackend.unsubscribe(complementSet);
    }

    public CompletableFuture<V> get(K k) {
        return get(k, null);
    }

    private CompletableFuture<V> readFromLocalStorage(K k, V v) {
        ReferenceCounted<VersionBackend> daVinciCurrentVersion = this.storeBackend.getDaVinciCurrentVersion();
        try {
            VersionBackend versionBackend = (VersionBackend) daVinciCurrentVersion.get();
            ReusableObjects reusableObjects = threadLocalReusableObjects.get();
            byte[] serialize = this.keySerializer.serialize(k, reusableObjects.binaryEncoder, reusableObjects.byteArrayOutputStream);
            int partition = versionBackend.getPartition(serialize);
            if (isPartitionReadyToServe(versionBackend, partition)) {
                CompletableFuture<V> completedFuture = CompletableFuture.completedFuture(versionBackend.read(partition, serialize, getAvroChunkingAdapter(), reusableObjects.binaryDecoder, reusableObjects.rawValue, v));
                if (daVinciCurrentVersion != null) {
                    daVinciCurrentVersion.close();
                }
                return completedFuture;
            }
            if (isVeniceQueryAllowed()) {
                CompletableFuture<V> completableFuture = this.veniceClient.get(k);
                if (daVinciCurrentVersion != null) {
                    daVinciCurrentVersion.close();
                }
                return completableFuture;
            }
            if (!isPartitionSubscribed(versionBackend, partition)) {
                this.storeBackend.getStats().recordBadRequest();
                throw new NonLocalAccessException(versionBackend.toString(), partition);
            }
            CompletableFuture<V> completedFuture2 = CompletableFuture.completedFuture(null);
            if (daVinciCurrentVersion != null) {
                daVinciCurrentVersion.close();
            }
            return completedFuture2;
        } catch (Throwable th) {
            if (daVinciCurrentVersion != null) {
                try {
                    daVinciCurrentVersion.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public CompletableFuture<V> get(K k, V v) {
        throwIfNotReady();
        ReferenceCounted<VersionBackend> daVinciCurrentVersion = this.storeBackend.getDaVinciCurrentVersion();
        try {
            VersionBackend versionBackend = (VersionBackend) daVinciCurrentVersion.get();
            if (versionBackend == null) {
                if (!isVeniceQueryAllowed()) {
                    this.storeBackend.getStats().recordBadRequest();
                    throw new VeniceClientException("Da Vinci client is not subscribed, storeName=" + getStoreName());
                }
                CompletableFuture<V> completableFuture = this.veniceClient.get(k);
                if (daVinciCurrentVersion != null) {
                    daVinciCurrentVersion.close();
                }
                return completableFuture;
            }
            if (this.daVinciConfig.isCacheEnabled()) {
                CompletableFuture<V> completableFuture2 = this.cacheBackend.get(k, versionBackend.getVersion(), (obj, executor) -> {
                    return readFromLocalStorage(obj, null);
                });
                if (daVinciCurrentVersion != null) {
                    daVinciCurrentVersion.close();
                }
                return completableFuture2;
            }
            CompletableFuture<V> readFromLocalStorage = readFromLocalStorage(k, v);
            if (daVinciCurrentVersion != null) {
                daVinciCurrentVersion.close();
            }
            return readFromLocalStorage;
        } catch (Throwable th) {
            if (daVinciCurrentVersion != null) {
                try {
                    daVinciCurrentVersion.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    CompletableFuture<Map<K, V>> batchGetFromLocalStorage(Iterable<K> iterable) {
        HashMap hashMap = new HashMap();
        ReferenceCounted<VersionBackend> daVinciCurrentVersion = this.storeBackend.getDaVinciCurrentVersion();
        try {
            VersionBackend versionBackend = (VersionBackend) daVinciCurrentVersion.get();
            if (versionBackend == null) {
                if (!isVeniceQueryAllowed()) {
                    this.storeBackend.getStats().recordBadRequest();
                    throw new VeniceClientException("Da Vinci client is not subscribed, storeName=" + getStoreName());
                }
                CompletableFuture<Map<K, V>> batchGet = this.veniceClient.batchGet(new HashSet((Collection) iterable));
                if (daVinciCurrentVersion != null) {
                    daVinciCurrentVersion.close();
                }
                return batchGet;
            }
            HashSet hashSet = new HashSet();
            ReusableObjects reusableObjects = threadLocalReusableObjects.get();
            for (K k : iterable) {
                byte[] serialize = this.keySerializer.serialize(k, reusableObjects.binaryEncoder, reusableObjects.byteArrayOutputStream);
                int partition = versionBackend.getPartition(serialize);
                if (isPartitionReadyToServe(versionBackend, partition)) {
                    Object read = versionBackend.read(partition, serialize, getAvroChunkingAdapter(), reusableObjects.binaryDecoder, reusableObjects.rawValue, null);
                    if (read != null) {
                        hashMap.put(k, read);
                    }
                } else if (isVeniceQueryAllowed()) {
                    hashSet.add(k);
                } else if (!isPartitionSubscribed(versionBackend, partition)) {
                    this.storeBackend.getStats().recordBadRequest();
                    throw new NonLocalAccessException(versionBackend.toString(), partition);
                }
            }
            if (hashSet.isEmpty()) {
                CompletableFuture<Map<K, V>> completedFuture = CompletableFuture.completedFuture(hashMap);
                if (daVinciCurrentVersion != null) {
                    daVinciCurrentVersion.close();
                }
                return completedFuture;
            }
            CompletableFuture<Map<K, V>> thenApply = this.veniceClient.batchGet(hashSet).thenApply(map -> {
                hashMap.putAll(map);
                return hashMap;
            });
            if (daVinciCurrentVersion != null) {
                daVinciCurrentVersion.close();
            }
            return thenApply;
        } catch (Throwable th) {
            if (daVinciCurrentVersion != null) {
                try {
                    daVinciCurrentVersion.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public CompletableFuture<Map<K, V>> batchGet(Set<K> set) {
        throwIfNotReady();
        ReferenceCounted<VersionBackend> daVinciCurrentVersion = this.storeBackend.getDaVinciCurrentVersion();
        try {
            VersionBackend versionBackend = (VersionBackend) daVinciCurrentVersion.get();
            if (this.daVinciConfig.isCacheEnabled()) {
                CompletableFuture<Map<K, V>> all = this.cacheBackend.getAll(set, versionBackend.getVersion(), iterable -> {
                    try {
                        return batchGetFromLocalStorage(iterable).get();
                    } catch (InterruptedException | ExecutionException e) {
                        throw new VeniceClientException("Error performing batch get while loading cache!!", e);
                    }
                }, (obj, executor) -> {
                    return readFromLocalStorage(obj, null);
                });
                if (daVinciCurrentVersion != null) {
                    daVinciCurrentVersion.close();
                }
                return all;
            }
            CompletableFuture<Map<K, V>> batchGetFromLocalStorage = batchGetFromLocalStorage(set);
            if (daVinciCurrentVersion != null) {
                daVinciCurrentVersion.close();
            }
            return batchGetFromLocalStorage;
        } catch (Throwable th) {
            if (daVinciCurrentVersion != null) {
                try {
                    daVinciCurrentVersion.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public ComputeRequestBuilder<K> compute() throws VeniceClientException {
        return compute(Optional.empty(), Optional.empty(), 0L);
    }

    public ComputeRequestBuilder<K> compute(Optional<ClientStats> optional, Optional<ClientStats> optional2, long j) {
        return new AvroComputeRequestBuilderV4(this, getLatestValueSchema()).setStats(optional2);
    }

    private Schema getComputeResultSchema(ComputeRequestWrapper computeRequestWrapper) {
        CharSequence resultSchemaStr = computeRequestWrapper.getResultSchemaStr();
        Schema schema = computeResultSchemaCache.get(resultSchemaStr);
        if (schema == null) {
            schema = Schema.parse(resultSchemaStr.toString());
            ComputeUtils.checkResultSchema(schema, computeRequestWrapper.getValueSchema(), computeRequestWrapper.getComputeRequestVersion(), computeRequestWrapper.getOperations());
            computeResultSchemaCache.putIfAbsent(resultSchemaStr, schema);
        }
        return schema;
    }

    public void compute(ComputeRequestWrapper computeRequestWrapper, Set<K> set, Schema schema, StreamingCallback<K, ComputeGenericRecord> streamingCallback, long j) throws VeniceClientException {
        compute(computeRequestWrapper, set, schema, streamingCallback, j, null, null);
    }

    public void compute(ComputeRequestWrapper computeRequestWrapper, Set<K> set, Schema schema, StreamingCallback<K, ComputeGenericRecord> streamingCallback, long j, BinaryEncoder binaryEncoder, ByteArrayOutputStream byteArrayOutputStream) throws VeniceClientException {
        if (handleCallbackForEmptyKeySet(set, streamingCallback)) {
            return;
        }
        throwIfNotReady();
        ReferenceCounted<VersionBackend> daVinciCurrentVersion = this.storeBackend.getDaVinciCurrentVersion();
        try {
            VersionBackend versionBackend = (VersionBackend) daVinciCurrentVersion.get();
            if (versionBackend == null) {
                if (isVeniceQueryAllowed()) {
                    this.veniceClient.compute(computeRequestWrapper, set, schema, streamingCallback, j, binaryEncoder, byteArrayOutputStream);
                    if (daVinciCurrentVersion != null) {
                        daVinciCurrentVersion.close();
                        return;
                    }
                    return;
                }
                this.storeBackend.getStats().recordBadRequest();
                streamingCallback.onCompletion(Optional.of(new VeniceClientException("Da Vinci client is not subscribed, storeName=" + getStoreName())));
                if (daVinciCurrentVersion != null) {
                    daVinciCurrentVersion.close();
                    return;
                }
                return;
            }
            HashSet hashSet = new HashSet();
            ReusableObjects reusableObjects = threadLocalReusableObjects.get();
            Schema valueSchema = computeRequestWrapper.getValueSchema();
            GenericRecord computeIfAbsent = reusableObjects.reuseValueRecordMap.computeIfAbsent(valueSchema, schema2 -> {
                return new GenericData.Record(valueSchema);
            });
            HashMap hashMap = new HashMap();
            Schema computeResultSchema = getComputeResultSchema(computeRequestWrapper);
            for (K k : set) {
                byte[] serialize = this.keySerializer.serialize(k, reusableObjects.binaryEncoder, reusableObjects.byteArrayOutputStream);
                int partition = versionBackend.getPartition(serialize);
                if (isPartitionReadyToServe(versionBackend, partition)) {
                    GenericRecord compute = versionBackend.compute(partition, serialize, getGenericRecordChunkingAdapter(), reusableObjects.binaryDecoder, reusableObjects.rawValue, computeIfAbsent, hashMap, computeRequestWrapper, computeResultSchema);
                    if (compute != null) {
                        streamingCallback.onRecordReceived(k, new ComputeGenericRecord(compute, computeRequestWrapper.getValueSchema()));
                    } else {
                        streamingCallback.onRecordReceived(k, (Object) null);
                    }
                } else if (isVeniceQueryAllowed()) {
                    hashSet.add(k);
                } else if (!isPartitionSubscribed(versionBackend, partition)) {
                    this.storeBackend.getStats().recordBadRequest();
                    streamingCallback.onCompletion(Optional.of(new NonLocalAccessException(versionBackend.toString(), partition)));
                    if (daVinciCurrentVersion != null) {
                        daVinciCurrentVersion.close();
                        return;
                    }
                    return;
                }
            }
            if (hashSet.isEmpty()) {
                streamingCallback.onCompletion(Optional.empty());
                if (daVinciCurrentVersion != null) {
                    daVinciCurrentVersion.close();
                    return;
                }
                return;
            }
            this.veniceClient.compute(computeRequestWrapper, hashSet, schema, streamingCallback, j, binaryEncoder, byteArrayOutputStream);
            if (daVinciCurrentVersion != null) {
                daVinciCurrentVersion.close();
            }
        } catch (Throwable th) {
            if (daVinciCurrentVersion != null) {
                try {
                    daVinciCurrentVersion.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public void computeWithKeyPrefixFilter(byte[] bArr, ComputeRequestWrapper computeRequestWrapper, StreamingCallback<GenericRecord, GenericRecord> streamingCallback) {
        throwIfNotReady();
        ReferenceCounted<VersionBackend> daVinciCurrentVersion = this.storeBackend.getDaVinciCurrentVersion();
        try {
            VersionBackend versionBackend = (VersionBackend) daVinciCurrentVersion.get();
            if (versionBackend == null) {
                this.storeBackend.getStats().recordBadRequest();
                streamingCallback.onCompletion(Optional.of(new VeniceClientException("Da Vinci client is not subscribed, storeName=" + getStoreName())));
                if (daVinciCurrentVersion != null) {
                    daVinciCurrentVersion.close();
                    return;
                }
                return;
            }
            if (Schema.Type.RECORD != getKeySchema().getType()) {
                streamingCallback.onCompletion(Optional.of(new VeniceClientException("Key schema must be of type Record to execute with a filter on key fields")));
                if (daVinciCurrentVersion != null) {
                    daVinciCurrentVersion.close();
                    return;
                }
                return;
            }
            ReusableObjects reusableObjects = threadLocalReusableObjects.get();
            Schema valueSchema = computeRequestWrapper.getValueSchema();
            GenericRecord computeIfAbsent = reusableObjects.reuseValueRecordMap.computeIfAbsent(valueSchema, schema -> {
                return new GenericData.Record(valueSchema);
            });
            HashMap hashMap = new HashMap();
            Schema computeResultSchema = getComputeResultSchema(computeRequestWrapper);
            int partitionCount = versionBackend.getPartitionCount();
            for (int i = 0; i < partitionCount; i++) {
                if (isPartitionReadyToServe(versionBackend, i)) {
                    try {
                        versionBackend.computeWithKeyPrefixFilter(bArr, i, streamingCallback, computeRequestWrapper, getGenericRecordChunkingAdapter(), this.keyDeserializer, computeIfAbsent, reusableObjects.binaryDecoder, hashMap, computeResultSchema);
                    } catch (VeniceException e) {
                        streamingCallback.onCompletion(Optional.of(e));
                        if (daVinciCurrentVersion != null) {
                            daVinciCurrentVersion.close();
                            return;
                        }
                        return;
                    }
                }
            }
            streamingCallback.onCompletion(Optional.empty());
            if (daVinciCurrentVersion != null) {
                daVinciCurrentVersion.close();
            }
        } catch (Throwable th) {
            if (daVinciCurrentVersion != null) {
                try {
                    daVinciCurrentVersion.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private boolean handleCallbackForEmptyKeySet(Set<K> set, StreamingCallback streamingCallback) {
        if (!set.isEmpty()) {
            return false;
        }
        streamingCallback.onCompletion(Optional.empty());
        return true;
    }

    public boolean isReady() {
        return this.ready.get();
    }

    protected boolean isVeniceQueryAllowed() {
        return this.daVinciConfig.getNonLocalAccessPolicy().equals(NonLocalAccessPolicy.QUERY_VENICE);
    }

    protected boolean isPartitionReadyToServe(VersionBackend versionBackend, int i) {
        if (!this.daVinciConfig.isIsolated() || this.subscription.contains(Integer.valueOf(i))) {
            return versionBackend.isPartitionReadyToServe(i);
        }
        return false;
    }

    protected boolean isPartitionSubscribed(VersionBackend versionBackend, int i) {
        return this.daVinciConfig.isIsolated() ? this.subscription.contains(Integer.valueOf(i)) : versionBackend.isPartitionSubscribed(i);
    }

    private void dropAllCachePartitions() {
        ReferenceCounted<VersionBackend> daVinciCurrentVersion = this.storeBackend.getDaVinciCurrentVersion();
        try {
            VersionBackend versionBackend = (VersionBackend) daVinciCurrentVersion.get();
            if (versionBackend != null) {
                this.cacheBackend.clearCachedPartitions(versionBackend.getVersion());
            }
            if (daVinciCurrentVersion != null) {
                daVinciCurrentVersion.close();
            }
        } catch (Throwable th) {
            if (daVinciCurrentVersion != null) {
                try {
                    daVinciCurrentVersion.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    protected void throwIfNotReady() {
        if (!isReady()) {
            throw new VeniceClientException("Da Vinci client is not ready, storeName=" + getStoreName());
        }
    }

    protected AbstractAvroChunkingAdapter<V> getAvroChunkingAdapter() {
        return GenericChunkingAdapter.INSTANCE;
    }

    protected GenericRecordChunkingAdapter getGenericRecordChunkingAdapter() {
        return GenericRecordChunkingAdapter.INSTANCE;
    }

    private D2ServiceDiscoveryResponseV2 discoverService() {
        try {
            D2TransportClient transportClient = ClientFactory.getTransportClient(this.clientConfig);
            try {
                if (!(transportClient instanceof D2TransportClient)) {
                    throw new VeniceClientException("Venice service discovery requires D2 client, storeName=" + getStoreName() + ", clientClass=" + transportClient.getClass());
                }
                D2ServiceDiscoveryResponseV2 find = new D2ServiceDiscovery().find(transportClient, getStoreName());
                this.logger.info("Venice service discovered, clusterName={}, zkAddress={}, kafkaBootstrapServers={}", find.getCluster(), find.getZkAddress(), find.getKafkaBootstrapServers());
                if (transportClient != null) {
                    transportClient.close();
                }
                return find;
            } finally {
            }
        } catch (Throwable th) {
            throw new ServiceDiscoveryException("Failed to discover Venice service, storeName=" + getStoreName(), th);
        }
    }

    private VeniceConfigLoader buildVeniceConfig() {
        D2ServiceDiscoveryResponseV2 discoverService = discoverService();
        String cluster = discoverService.getCluster();
        String zkAddress = discoverService.getZkAddress();
        String kafkaBootstrapServers = discoverService.getKafkaBootstrapServers();
        if (zkAddress == null) {
            zkAddress = this.backendConfig.getString("zookeeper.address");
        }
        if (kafkaBootstrapServers == null) {
            kafkaBootstrapServers = this.backendConfig.getString("kafka.bootstrap.servers");
        }
        VeniceProperties build = new PropertyBuilder().put("kafka.admin.class", ApacheKafkaAdminAdapter.class.getName()).put("server.enable.kafka.openssl", false).put(RocksDBServerConfig.ROCKSDB_LEVEL0_FILE_NUM_COMPACTION_TRIGGER, 4).put(RocksDBServerConfig.ROCKSDB_LEVEL0_SLOWDOWN_WRITES_TRIGGER, 20).put(RocksDBServerConfig.ROCKSDB_LEVEL0_STOPS_WRITES_TRIGGER, 36).put(RocksDBServerConfig.ROCKSDB_LEVEL0_FILE_NUM_COMPACTION_TRIGGER_WRITE_ONLY_VERSION, 40).put(RocksDBServerConfig.ROCKSDB_LEVEL0_SLOWDOWN_WRITES_TRIGGER_WRITE_ONLY_VERSION, 60).put(RocksDBServerConfig.ROCKSDB_LEVEL0_STOPS_WRITES_TRIGGER_WRITE_ONLY_VERSION, 80).put(this.backendConfig.toProperties()).put("cluster.name", cluster).put("zookeeper.address", zkAddress).put("kafka.bootstrap.servers", kafkaBootstrapServers).put(RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, Boolean.valueOf(this.daVinciConfig.getStorageClass() == StorageClass.MEMORY_BACKED_BY_DISK)).put("ingestion.use.da.vinci.client", true).build();
        this.logger.info("backendConfig=" + build.toString(true));
        return new VeniceConfigLoader(build, build);
    }

    private void initBackend(ClientConfig clientConfig, VeniceConfigLoader veniceConfigLoader, Optional<Set<String>> optional, ICProvider iCProvider, Optional<ObjectCacheConfig> optional2) {
        synchronized (AvroGenericDaVinciClient.class) {
            if (daVinciBackend == null) {
                this.logger.info("Da Vinci Backend does not exist, creating a new backend for client: " + clientConfig.getStoreName());
                daVinciBackend = new ReferenceCounted<>(new DaVinciBackend(clientConfig, veniceConfigLoader, optional, iCProvider, optional2), daVinciBackend2 -> {
                    synchronized (AvroGenericDaVinciClient.class) {
                        daVinciBackend = null;
                        daVinciBackend2.close();
                    }
                });
            } else if (VeniceSystemStoreType.getSystemStoreType(clientConfig.getStoreName()) != VeniceSystemStoreType.META_STORE) {
                this.logger.info("Da Vinci Backend exists, reusing existing backend for client: " + clientConfig.getStoreName());
                daVinciBackend.retain();
            }
        }
    }

    public static DaVinciBackend getBackend() {
        DaVinciBackend daVinciBackend2;
        synchronized (AvroGenericDaVinciClient.class) {
            daVinciBackend2 = (DaVinciBackend) daVinciBackend.get();
        }
        return daVinciBackend2;
    }

    public synchronized void start() {
        if (isReady()) {
            throw new VeniceClientException("Da Vinci client is already started, storeName=" + getStoreName());
        }
        this.logger.info("Starting client, storeName=" + getStoreName());
        VeniceConfigLoader buildVeniceConfig = buildVeniceConfig();
        Optional<ObjectCacheConfig> ofNullable = Optional.ofNullable(this.daVinciConfig.getCacheConfig());
        initBackend(this.clientConfig, buildVeniceConfig, this.managedClients, this.icProvider, ofNullable);
        try {
            if (!getBackend().compareCacheConfig(ofNullable)) {
                throw new VeniceClientException("Cache config conflicts with existing backend, storeName=" + getStoreName());
            }
            if (this.daVinciConfig.isCacheEnabled()) {
                this.cacheBackend = getBackend().getObjectCache();
            }
            this.storeBackend = getBackend().getStoreOrThrow(getStoreName());
            if (this.managedClients.isPresent()) {
                this.storeBackend.setManaged(this.daVinciConfig.isManaged());
            }
            Schema schema = getBackend().getSchemaRepository().getKeySchema(getStoreName()).getSchema();
            this.keySerializer = FastSerializerDeserializerFactory.getFastAvroGenericSerializer(schema, false);
            this.keyDeserializer = FastSerializerDeserializerFactory.getFastAvroGenericDeserializer(schema, schema);
            if (isVeniceQueryAllowed()) {
                this.veniceClient = ClientFactory.getAndStartAvroClient(this.clientConfig);
            }
            this.ready.set(true);
            this.logger.info("Client is started successfully, storeName=" + getStoreName());
        } catch (Throwable th) {
            String str = "Unable to start Da Vinci client, storeName=" + getStoreName();
            this.logger.error(str, th);
            daVinciBackend.release();
            throw new VeniceClientException(str, th);
        }
    }

    public synchronized void close() {
        throwIfNotReady();
        try {
            this.logger.info("Closing client, storeName=" + getStoreName());
            this.ready.set(false);
            if (this.veniceClient != null) {
                this.veniceClient.close();
            }
            if (this.cacheBackend != null) {
                this.cacheBackend.close();
            }
            daVinciBackend.release();
            this.logger.info("Client is closed successfully, storeName=" + getStoreName());
        } catch (Throwable th) {
            String str = "Unable to close Da Vinci client, storeName=" + getStoreName();
            this.logger.error(str, th);
            throw new VeniceClientException(str, th);
        }
    }
}
