package com.linkedin.davinci;

import com.linkedin.davinci.config.VeniceStoreVersionConfig;
import com.linkedin.davinci.storage.chunking.AbstractAvroChunkingAdapter;
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.venice.ConfigKeys;
import com.linkedin.venice.VeniceConstants;
import com.linkedin.venice.client.store.streaming.StreamingCallback;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.compute.ComputeOperationUtils;
import com.linkedin.venice.compute.ComputeRequestWrapper;
import com.linkedin.venice.compute.ReadComputeOperator;
import com.linkedin.venice.compute.protocol.request.ComputeOperation;
import com.linkedin.venice.compute.protocol.request.enums.ComputeOperationType;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.IngestionMode;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.partitioner.VenicePartitioner;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.utils.ComplementSet;
import com.linkedin.venice.utils.ExceptionUtils;
import com.linkedin.venice.utils.PartitionUtils;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.utils.lazy.Lazy;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/davinci/VersionBackend.class */
public class VersionBackend {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) VersionBackend.class);
    private static final int DEFAULT_PUSH_STATUS_HEARTBEAT_INTERVAL_IN_SECONDS = 10;
    private final DaVinciBackend backend;
    private final Version version;
    private final VeniceStoreVersionConfig config;
    private final VenicePartitioner partitioner;
    private final boolean reportPushStatus;
    private final boolean suppressLiveUpdates;
    private final AtomicReference<AbstractStorageEngine> storageEngine = new AtomicReference<>();
    private final Map<Integer, CompletableFuture<Void>> partitionFutures = new VeniceConcurrentHashMap();
    private final int stopConsumptionWaitRetriesNum;
    private final StoreBackendStats storeBackendStats;
    private final Lazy<VeniceCompressor> compressor;
    private Future heartbeat;
    private final int heartbeatInterval;

    /* JADX INFO: Access modifiers changed from: package-private */
    public VersionBackend(DaVinciBackend daVinciBackend, Version version, StoreBackendStats storeBackendStats) {
        this.backend = daVinciBackend;
        this.version = version;
        this.config = daVinciBackend.getConfigLoader().getStoreConfig(version.kafkaTopicName());
        if (this.config.getIngestionMode().equals(IngestionMode.ISOLATED)) {
            this.config.setRestoreDataPartitions(false);
            this.config.setRestoreMetadataPartition(false);
        }
        this.partitioner = PartitionUtils.getUserPartitionLevelVenicePartitioner(version.getPartitionerConfig());
        this.suppressLiveUpdates = this.config.freezeIngestionIfReadyToServeOrLocalDataExists();
        this.storageEngine.set(daVinciBackend.getStorageService().getStorageEngine(version.kafkaTopicName()));
        this.backend.getIngestionBackend().setStorageEngineReference(version.kafkaTopicName(), this.storageEngine);
        Store storeOrThrow = daVinciBackend.getStoreRepository().getStoreOrThrow(version.getStoreName());
        this.storeBackendStats = storeBackendStats;
        this.reportPushStatus = storeOrThrow.isDaVinciPushStatusStoreEnabled() && this.config.getClusterProperties().getBoolean(ConfigKeys.PUSH_STATUS_STORE_ENABLED, false);
        this.heartbeatInterval = this.config.getClusterProperties().getInt(ConfigKeys.PUSH_STATUS_STORE_HEARTBEAT_INTERVAL_IN_SECONDS, 10);
        this.stopConsumptionWaitRetriesNum = daVinciBackend.getConfigLoader().getCombinedProperties().getInt(ConfigKeys.SERVER_STOP_CONSUMPTION_WAIT_RETRIES_NUM, 60);
        this.compressor = Lazy.of(() -> {
            return daVinciBackend.getCompressorFactory().getCompressor(version.getCompressionStrategy(), version.kafkaTopicName());
        });
        daVinciBackend.getVersionByTopicMap().put(version.kafkaTopicName(), this);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void close() {
        LOGGER.info("Closing local version {}", this);
        this.backend.getVersionByTopicMap().remove(this.version.kafkaTopicName(), this);
        this.backend.getIngestionBackend().setStorageEngineReference(this.version.kafkaTopicName(), null);
        if (this.heartbeat != null) {
            this.heartbeat.cancel(true);
        }
        Iterator<Map.Entry<Integer, CompletableFuture<Void>>> it2 = this.partitionFutures.entrySet().iterator();
        while (it2.hasNext()) {
            it2.next().getValue().cancel(true);
        }
        try {
            this.backend.getIngestionBackend().shutdownIngestionTask(this.version.kafkaTopicName());
        } catch (VeniceException e) {
            LOGGER.error("Encounter exception when killing consumption task: {}", this.version.kafkaTopicName(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void delete() {
        LOGGER.info("Deleting local version {}", this);
        close();
        String kafkaTopicName = this.version.kafkaTopicName();
        try {
            this.backend.getIngestionBackend().removeStorageEngine(kafkaTopicName);
            this.backend.getCompressorFactory().removeVersionSpecificCompressor(kafkaTopicName);
        } catch (VeniceException e) {
            LOGGER.error("Encounter exception when removing version storage of topic {}", kafkaTopicName, e);
        }
    }

    public String toString() {
        return this.version.kafkaTopicName();
    }

    public Version getVersion() {
        return this.version;
    }

    private AbstractStorageEngine getStorageEngineOrThrow() {
        AbstractStorageEngine abstractStorageEngine = this.storageEngine.get();
        if (abstractStorageEngine == null) {
            throw new VeniceException("Storage engine is not ready, version=" + this);
        }
        return abstractStorageEngine;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isReportingPushStatus() {
        return this.reportPushStatus;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void tryStartHeartbeat() {
        if (isReportingPushStatus() && this.heartbeat == null) {
            this.heartbeat = this.backend.getExecutor().scheduleAtFixedRate(() -> {
                try {
                    this.backend.getPushStatusStoreWriter().writeHeartbeat(this.version.getStoreName());
                } catch (Throwable th) {
                    LOGGER.error("Unable to send heartbeat for {}", this);
                }
            }, 0L, this.heartbeatInterval, TimeUnit.SECONDS);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void tryStopHeartbeat() {
        if (this.heartbeat == null || !this.partitionFutures.values().stream().allMatch((v0) -> {
            return v0.isDone();
        })) {
            return;
        }
        this.heartbeat.cancel(true);
        this.heartbeat = null;
    }

    public <V> V read(int i, byte[] bArr, AbstractAvroChunkingAdapter<V> abstractAvroChunkingAdapter, BinaryDecoder binaryDecoder, ByteBuffer byteBuffer, V v) {
        return abstractAvroChunkingAdapter.get(this.version.getStoreName(), getStorageEngineOrThrow(), i, this.partitioner, this.version.getPartitionerConfig(), bArr, byteBuffer, v, binaryDecoder, this.version.isChunkingEnabled(), this.version.getCompressionStrategy(), true, this.backend.getSchemaRepository(), null, this.compressor.get());
    }

    public GenericRecord compute(int i, byte[] bArr, AbstractAvroChunkingAdapter<GenericRecord> abstractAvroChunkingAdapter, BinaryDecoder binaryDecoder, ByteBuffer byteBuffer, GenericRecord genericRecord, Map<String, Object> map, ComputeRequestWrapper computeRequestWrapper, Schema schema) {
        return getResultOfComputeOperations(computeRequestWrapper.getOperations(), computeRequestWrapper.getValueSchema(), abstractAvroChunkingAdapter.get(this.version.getStoreName(), getStorageEngineOrThrow(), i, this.partitioner, this.version.getPartitionerConfig(), bArr, byteBuffer, genericRecord, binaryDecoder, this.version.isChunkingEnabled(), this.version.getCompressionStrategy(), true, this.backend.getSchemaRepository(), null, this.compressor.get()), map, computeRequestWrapper.getComputeRequestVersion(), schema);
    }

    public void computeWithKeyPrefixFilter(byte[] bArr, int i, final StreamingCallback<GenericRecord, GenericRecord> streamingCallback, final ComputeRequestWrapper computeRequestWrapper, AbstractAvroChunkingAdapter<GenericRecord> abstractAvroChunkingAdapter, RecordDeserializer<GenericRecord> recordDeserializer, GenericRecord genericRecord, BinaryDecoder binaryDecoder, final Map<String, Object> map, final Schema schema) {
        abstractAvroChunkingAdapter.getByPartialKey(this.version.getStoreName(), getStorageEngineOrThrow(), i, this.version.getPartitionerConfig(), bArr, genericRecord, binaryDecoder, recordDeserializer, this.version.isChunkingEnabled(), this.version.getCompressionStrategy(), true, this.backend.getSchemaRepository(), null, this.compressor.get(), new StreamingCallback<GenericRecord, GenericRecord>() { // from class: com.linkedin.davinci.VersionBackend.1
            @Override // com.linkedin.venice.client.store.streaming.StreamingCallback
            public void onRecordReceived(GenericRecord genericRecord2, GenericRecord genericRecord3) {
                streamingCallback.onRecordReceived(genericRecord2, VersionBackend.this.getResultOfComputeOperations(computeRequestWrapper.getOperations(), computeRequestWrapper.getValueSchema(), genericRecord3, map, computeRequestWrapper.getComputeRequestVersion(), schema));
            }

            @Override // com.linkedin.venice.client.store.streaming.StreamingCallback
            public void onCompletion(Optional<Exception> optional) {
                if (optional.isPresent()) {
                    throw new VeniceException(ExceptionUtils.compactExceptionDescription(optional.get()));
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public GenericRecord getResultOfComputeOperations(List<ComputeOperation> list, Schema schema, GenericRecord genericRecord, Map<String, Object> map, int i, Schema schema2) {
        if (genericRecord == null) {
            return null;
        }
        HashMap hashMap = new HashMap();
        GenericData.Record record = new GenericData.Record(schema2);
        for (ComputeOperation computeOperation : list) {
            ReadComputeOperator operator = ComputeOperationType.valueOf(computeOperation).getOperator();
            String orElse = ComputeOperationUtils.validateNullableFieldAndGetErrorMsg(operator, genericRecord, operator.getOperatorFieldName(computeOperation)).orElse(null);
            if (orElse != null) {
                operator.putDefaultResult(record, operator.getResultFieldName(computeOperation));
                hashMap.put(operator.getResultFieldName(computeOperation), orElse);
            } else {
                operator.compute(i, computeOperation, genericRecord, record, hashMap, map);
            }
        }
        Schema.Field field = schema2.getField(VeniceConstants.VENICE_COMPUTATION_ERROR_MAP_FIELD_NAME);
        if (field != null && record.get(field.pos()) == null) {
            record.put(field.pos(), hashMap);
        }
        for (Schema.Field field2 : schema2.getFields()) {
            if (record.get(field2.pos()) == null) {
                record.put(field2.pos(), genericRecord.get(field2.name()));
            }
        }
        return record;
    }

    public int getPartitionCount() {
        return this.version.getPartitionCount();
    }

    public int getPartition(byte[] bArr) {
        return this.partitioner.getPartitionId(bArr, this.version.getPartitionCount());
    }

    public boolean isPartitionSubscribed(int i) {
        return this.partitionFutures.containsKey(Integer.valueOf(i));
    }

    public boolean isPartitionReadyToServe(int i) {
        CompletableFuture<Void> completableFuture = this.partitionFutures.get(Integer.valueOf(i));
        return (completableFuture == null || !completableFuture.isDone() || completableFuture.isCompletedExceptionally()) ? false : true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized boolean isReadyToServe(ComplementSet<Integer> complementSet) {
        return getPartitions(complementSet).stream().allMatch((v1) -> {
            return isPartitionReadyToServe(v1);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized CompletableFuture<Void> subscribe(ComplementSet<Integer> complementSet) {
        Instant now = Instant.now();
        List<Integer> partitions = getPartitions(complementSet);
        LOGGER.info("Subscribing to partitions {} of {}", partitions, this);
        ArrayList arrayList = new ArrayList(partitions.size());
        Iterator<Integer> it2 = partitions.iterator();
        while (it2.hasNext()) {
            int intValue = it2.next().intValue();
            AbstractStorageEngine abstractStorageEngine = this.storageEngine.get();
            if (this.partitionFutures.containsKey(Integer.valueOf(intValue))) {
                LOGGER.info("Partition {} of {}  is subscribed, ignoring subscribe request.", Integer.valueOf(intValue), this);
            } else if (this.suppressLiveUpdates && abstractStorageEngine != null && abstractStorageEngine.containsPartition(intValue, this.version.getPartitionerConfig())) {
                this.partitionFutures.computeIfAbsent(Integer.valueOf(intValue), num -> {
                    return CompletableFuture.completedFuture(null);
                });
            } else {
                this.partitionFutures.computeIfAbsent(Integer.valueOf(intValue), num2 -> {
                    return new CompletableFuture();
                });
                this.backend.getIngestionBackend().startConsumption(this.config, intValue);
                tryStartHeartbeat();
            }
            arrayList.add(this.partitionFutures.get(Integer.valueOf(intValue)));
        }
        return CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).whenComplete((r6, th) -> {
            this.storeBackendStats.recordSubscribeDuration(Duration.between(now, Instant.now()));
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void unsubscribe(ComplementSet<Integer> complementSet) {
        List<Integer> partitions = getPartitions(complementSet);
        LOGGER.info("Unsubscribing from partitions {} of {}", complementSet, this);
        Iterator<Integer> it2 = partitions.iterator();
        while (it2.hasNext()) {
            int intValue = it2.next().intValue();
            if (!this.partitionFutures.containsKey(Integer.valueOf(intValue))) {
                LOGGER.warn("Partition {} of {} is not subscribed, ignoring unsubscribe request.", Integer.valueOf(intValue), this);
                return;
            } else {
                completePartition(intValue);
                this.backend.getIngestionBackend().dropStoragePartitionGracefully(this.config, intValue, this.stopConsumptionWaitRetriesNum);
                this.partitionFutures.remove(Integer.valueOf(intValue));
            }
        }
        tryStopHeartbeat();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void completePartition(int i) {
        LOGGER.info("Partition {} of {} is ready to serve.", Integer.valueOf(i), this);
        this.partitionFutures.computeIfAbsent(Integer.valueOf(i), num -> {
            return new CompletableFuture();
        }).complete(null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void completePartitionExceptionally(int i, Throwable th) {
        LOGGER.warn("Failed to subscribe to partition {} of {}", Integer.valueOf(i), this, th);
        this.partitionFutures.computeIfAbsent(Integer.valueOf(i), num -> {
            return new CompletableFuture();
        }).completeExceptionally(th);
    }

    private List<Integer> getPartitions(ComplementSet<Integer> complementSet) {
        IntStream range = IntStream.range(0, this.version.getPartitionCount());
        Objects.requireNonNull(complementSet);
        return (List) range.filter((v1) -> {
            return r1.contains(v1);
        }).boxed().collect(Collectors.toList());
    }
}
