package com.linkedin.davinci.ingestion.main;

import com.linkedin.davinci.config.VeniceConfigLoader;
import com.linkedin.davinci.ingestion.utils.IsolatedIngestionUtils;
import com.linkedin.davinci.stats.MetadataUpdateStats;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.ingestion.protocol.IngestionStorageMetadata;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.kafka.protocol.state.StoreVersionState;
import com.linkedin.venice.meta.IngestionMetadataUpdateType;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/davinci/ingestion/main/MainIngestionStorageMetadataService.class */
public class MainIngestionStorageMetadataService extends AbstractVeniceService implements StorageMetadataService {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) MainIngestionStorageMetadataService.class);
    private final MainIngestionRequestClient client;
    private final InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer;
    private final MetadataUpdateStats metadataUpdateStats;
    private final BiConsumer<String, StoreVersionState> storeVersionStateSyncer;
    private final Map<String, Map<Integer, OffsetRecord>> topicPartitionOffsetRecordMap = new VeniceConcurrentHashMap();
    private final Map<String, StoreVersionState> topicStoreVersionStateMap = new VeniceConcurrentHashMap();
    private final ExecutorService metadataUpdateService = Executors.newSingleThreadExecutor();
    private final Queue<IngestionStorageMetadata> metadataUpdateQueue = new ConcurrentLinkedDeque();
    private final MetadataUpdateWorker metadataUpdateWorker = new MetadataUpdateWorker();

    /* loaded from: input_file:com/linkedin/davinci/ingestion/main/MainIngestionStorageMetadataService$MetadataUpdateWorker.class */
    class MetadataUpdateWorker implements Runnable, Closeable {
        private final AtomicBoolean isRunning = new AtomicBoolean(true);

        MetadataUpdateWorker() {
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.isRunning.set(false);
        }

        @Override // java.lang.Runnable
        public void run() {
            while (this.isRunning.get()) {
                while (!MainIngestionStorageMetadataService.this.metadataUpdateQueue.isEmpty()) {
                    try {
                        if (MainIngestionStorageMetadataService.this.client.updateMetadata((IngestionStorageMetadata) MainIngestionStorageMetadataService.this.metadataUpdateQueue.peek())) {
                            MainIngestionStorageMetadataService.this.metadataUpdateQueue.remove();
                            MainIngestionStorageMetadataService.this.metadataUpdateStats.recordMetadataUpdateQueueLength(MainIngestionStorageMetadataService.this.metadataUpdateQueue.size());
                        } else if (!this.isRunning.get()) {
                            break;
                        } else {
                            Thread.sleep(5000L);
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return;
                    } catch (Throwable th) {
                        MainIngestionStorageMetadataService.LOGGER.error("Unexpected throwable while running {}", getClass().getSimpleName(), th);
                        MainIngestionStorageMetadataService.this.metadataUpdateStats.recordMetadataQueueUpdateError(1.0d);
                    }
                }
                Thread.sleep(1000L);
                if (MainIngestionStorageMetadataService.this.metadataUpdateQueue.size() > 0) {
                    MainIngestionStorageMetadataService.LOGGER.info("Number of remaining metadata update requests in queue: {}", Integer.valueOf(MainIngestionStorageMetadataService.this.metadataUpdateQueue.size()));
                }
            }
        }
    }

    public MainIngestionStorageMetadataService(int i, InternalAvroSpecificSerializer<PartitionState> internalAvroSpecificSerializer, MetadataUpdateStats metadataUpdateStats, VeniceConfigLoader veniceConfigLoader, BiConsumer<String, StoreVersionState> biConsumer) {
        this.client = new MainIngestionRequestClient(veniceConfigLoader);
        this.partitionStateSerializer = internalAvroSpecificSerializer;
        this.metadataUpdateStats = metadataUpdateStats;
        this.storeVersionStateSyncer = biConsumer;
    }

    @Override // com.linkedin.venice.service.AbstractVeniceService
    public boolean startInner() throws Exception {
        this.metadataUpdateService.execute(this.metadataUpdateWorker);
        return true;
    }

    @Override // com.linkedin.venice.service.AbstractVeniceService
    public void stopInner() throws Exception {
        this.metadataUpdateWorker.close();
        this.metadataUpdateService.shutdownNow();
        this.client.close();
    }

    @Override // com.linkedin.davinci.storage.StorageMetadataService
    public void computeStoreVersionState(String str, Function<StoreVersionState, StoreVersionState> function) throws VeniceException {
        this.topicStoreVersionStateMap.compute(str, (str2, storeVersionState) -> {
            StoreVersionState storeVersionState = (StoreVersionState) function.apply(storeVersionState);
            this.storeVersionStateSyncer.accept(str, storeVersionState);
            IngestionStorageMetadata ingestionStorageMetadata = new IngestionStorageMetadata();
            ingestionStorageMetadata.metadataUpdateType = IngestionMetadataUpdateType.PUT_STORE_VERSION_STATE.getValue();
            ingestionStorageMetadata.topicName = str;
            ingestionStorageMetadata.payload = ByteBuffer.wrap(IsolatedIngestionUtils.serializeStoreVersionState(str, storeVersionState));
            updateRemoteStorageMetadataService(ingestionStorageMetadata);
            return storeVersionState;
        });
    }

    @Override // com.linkedin.davinci.storage.StorageMetadataService
    public void clearStoreVersionState(String str) {
        LOGGER.info("Clearing StoreVersionState for {}", str);
        this.topicStoreVersionStateMap.remove(str);
        IngestionStorageMetadata ingestionStorageMetadata = new IngestionStorageMetadata();
        ingestionStorageMetadata.metadataUpdateType = IngestionMetadataUpdateType.CLEAR_STORE_VERSION_STATE.getValue();
        ingestionStorageMetadata.topicName = str;
        ingestionStorageMetadata.payload = ByteBuffer.wrap(new byte[0]);
        updateRemoteStorageMetadataService(ingestionStorageMetadata);
    }

    @Override // com.linkedin.davinci.storage.StorageMetadataService
    public StoreVersionState getStoreVersionState(String str) throws VeniceException {
        return this.topicStoreVersionStateMap.get(str);
    }

    @Override // com.linkedin.venice.offsets.OffsetManager
    public void put(String str, int i, OffsetRecord offsetRecord) throws VeniceException {
        putOffsetRecord(str, i, offsetRecord);
        IngestionStorageMetadata ingestionStorageMetadata = new IngestionStorageMetadata();
        ingestionStorageMetadata.metadataUpdateType = IngestionMetadataUpdateType.PUT_OFFSET_RECORD.getValue();
        ingestionStorageMetadata.topicName = str;
        ingestionStorageMetadata.partitionId = i;
        ingestionStorageMetadata.payload = ByteBuffer.wrap(offsetRecord.toBytes());
        updateRemoteStorageMetadataService(ingestionStorageMetadata);
    }

    @Override // com.linkedin.venice.offsets.OffsetManager
    public void clearOffset(String str, int i) {
        LOGGER.info("Clearing OffsetRecord for {} {}", str, Integer.valueOf(i));
        if (this.topicPartitionOffsetRecordMap.containsKey(str)) {
            Map<Integer, OffsetRecord> map = this.topicPartitionOffsetRecordMap.get(str);
            map.remove(Integer.valueOf(i));
            this.topicPartitionOffsetRecordMap.put(str, map);
        }
        IngestionStorageMetadata ingestionStorageMetadata = new IngestionStorageMetadata();
        ingestionStorageMetadata.metadataUpdateType = IngestionMetadataUpdateType.CLEAR_OFFSET_RECORD.getValue();
        ingestionStorageMetadata.topicName = str;
        ingestionStorageMetadata.partitionId = i;
        ingestionStorageMetadata.payload = ByteBuffer.wrap(new byte[0]);
        updateRemoteStorageMetadataService(ingestionStorageMetadata);
    }

    @Override // com.linkedin.venice.offsets.OffsetManager
    public OffsetRecord getLastOffset(String str, int i) throws VeniceException {
        Map<Integer, OffsetRecord> map = this.topicPartitionOffsetRecordMap.get(str);
        OffsetRecord offsetRecord = null;
        if (map != null) {
            offsetRecord = map.get(Integer.valueOf(i));
        }
        return offsetRecord == null ? new OffsetRecord(this.partitionStateSerializer) : offsetRecord;
    }

    public void putOffsetRecord(String str, int i, OffsetRecord offsetRecord) {
        LOGGER.info("Updating OffsetRecord: {} for topic: {}, partition: {}", offsetRecord.toString(), str, Integer.valueOf(i));
        this.topicPartitionOffsetRecordMap.computeIfAbsent(str, str2 -> {
            return new VeniceConcurrentHashMap();
        }).put(Integer.valueOf(i), offsetRecord);
    }

    public void putStoreVersionState(String str, StoreVersionState storeVersionState) {
        LOGGER.info("Updating StoreVersionState for {}", str);
        this.topicStoreVersionStateMap.put(str, storeVersionState);
        this.storeVersionStateSyncer.accept(str, storeVersionState);
    }

    private synchronized void updateRemoteStorageMetadataService(IngestionStorageMetadata ingestionStorageMetadata) {
        this.metadataUpdateQueue.add(ingestionStorageMetadata);
        this.metadataUpdateStats.recordMetadataUpdateQueueLength(this.metadataUpdateQueue.size());
    }
}
