package com.linkedin.venice.system.store;

import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.helix.HelixReadOnlyZKSharedSchemaRepository;
import com.linkedin.venice.kafka.TopicManager;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreConfig;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.ZKStore;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.schema.GeneratedSchemaID;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.systemstore.schemas.StoreKeySchemas;
import com.linkedin.venice.systemstore.schemas.StoreMetaKey;
import com.linkedin.venice.systemstore.schemas.StoreMetaValue;
import com.linkedin.venice.systemstore.schemas.StoreMetaValueWriteOpRecord;
import com.linkedin.venice.systemstore.schemas.StoreReplicaStatus;
import com.linkedin.venice.systemstore.schemas.StoreValueSchema;
import com.linkedin.venice.systemstore.schemas.StoreValueSchemas;
import com.linkedin.venice.systemstore.schemas.storeReplicaStatusesMapOps;
import com.linkedin.venice.systemstore.schemas.storeValueSchemaIdsWrittenPerStoreVersionListOps;
import com.linkedin.venice.utils.Timer;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/system/store/MetaStoreWriter.class */
public class MetaStoreWriter implements Closeable {
    public static final String KEY_STRING_STORE_NAME = "KEY_STORE_NAME";
    public static final String KEY_STRING_CLUSTER_NAME = "KEY_CLUSTER_NAME";
    public static final String KEY_STRING_VERSION_NUMBER = "KEY_VERSION_NUMBER";
    public static final String KEY_STRING_PARTITION_ID = "KEY_PARTITION_ID";
    public static final String KEY_STRING_SCHEMA_ID = "KEY_SCHEMA_ID";
    private static final Logger LOGGER = LogManager.getLogger(MetaStoreWriter.class);
    private final TopicManager topicManager;
    private final VeniceWriterFactory writerFactory;
    private final HelixReadOnlyZKSharedSchemaRepository zkSharedSchemaRepository;
    private final PubSubTopicRepository pubSubTopicRepository;
    private final Map<String, VeniceWriter> metaStoreWriterMap = new VeniceConcurrentHashMap();
    private final Map<String, ReentrantLock> metaStoreWriterLockMap = new VeniceConcurrentHashMap();
    private int derivedComputeSchemaId = -1;
    private final Schema derivedComputeSchema = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE.getCurrentProtocolVersionSchema());

    public MetaStoreWriter(TopicManager topicManager, VeniceWriterFactory veniceWriterFactory, HelixReadOnlyZKSharedSchemaRepository helixReadOnlyZKSharedSchemaRepository, PubSubTopicRepository pubSubTopicRepository) {
        this.topicManager = topicManager;
        this.writerFactory = veniceWriterFactory;
        this.zkSharedSchemaRepository = helixReadOnlyZKSharedSchemaRepository;
        this.pubSubTopicRepository = pubSubTopicRepository;
    }

    public void writeStoreProperties(String str, Store store) {
        String name = store.getName();
        if (!(store instanceof ZKStore)) {
            throw new IllegalArgumentException("Param 'store' must be an instance of 'ZKStore' for store name: " + name + ", but received: " + store.getClass());
        }
        write(name, MetaStoreDataType.STORE_PROPERTIES, () -> {
            return new HashMap<String, String>() { // from class: com.linkedin.venice.system.store.MetaStoreWriter.1
                {
                    put(MetaStoreWriter.KEY_STRING_STORE_NAME, name);
                    put(MetaStoreWriter.KEY_STRING_CLUSTER_NAME, str);
                }
            };
        }, () -> {
            StoreMetaValue storeMetaValue = new StoreMetaValue();
            storeMetaValue.storeProperties = ((ZKStore) store).dataModel();
            return storeMetaValue;
        });
    }

    public void writeStoreKeySchemas(String str, Collection<SchemaEntry> collection) {
        write(str, MetaStoreDataType.STORE_KEY_SCHEMAS, () -> {
            return new HashMap<String, String>() { // from class: com.linkedin.venice.system.store.MetaStoreWriter.2
                {
                    put(MetaStoreWriter.KEY_STRING_STORE_NAME, str);
                }
            };
        }, () -> {
            StoreMetaValue storeMetaValue = new StoreMetaValue();
            StoreKeySchemas storeKeySchemas = new StoreKeySchemas();
            storeKeySchemas.keySchemaMap = buildSchemaMap(collection);
            storeMetaValue.storeKeySchemas = storeKeySchemas;
            return storeMetaValue;
        });
    }

    public void writeStoreValueSchemas(String str, Collection<SchemaEntry> collection) {
        writeStoreValueSchemasIndividually(str, collection);
        write(str, MetaStoreDataType.STORE_VALUE_SCHEMAS, () -> {
            HashMap hashMap = new HashMap(1);
            hashMap.put(KEY_STRING_STORE_NAME, str);
            return hashMap;
        }, () -> {
            StoreMetaValue storeMetaValue = new StoreMetaValue();
            StoreValueSchemas storeValueSchemas = new StoreValueSchemas();
            storeValueSchemas.valueSchemaMap = buildSchemaIdOnlyMap(collection);
            storeMetaValue.storeValueSchemas = storeValueSchemas;
            return storeMetaValue;
        });
    }

    private void writeStoreValueSchemasIndividually(String str, Collection<SchemaEntry> collection) {
        for (SchemaEntry schemaEntry : collection) {
            write(str, MetaStoreDataType.STORE_VALUE_SCHEMA, () -> {
                HashMap hashMap = new HashMap(2);
                hashMap.put(KEY_STRING_STORE_NAME, str);
                hashMap.put(KEY_STRING_SCHEMA_ID, Integer.toString(schemaEntry.getId()));
                return hashMap;
            }, () -> {
                StoreMetaValue storeMetaValue = new StoreMetaValue();
                StoreValueSchema storeValueSchema = new StoreValueSchema();
                storeValueSchema.valueSchema = schemaEntry.getSchema().toString();
                storeMetaValue.storeValueSchema = storeValueSchema;
                return storeMetaValue;
            });
        }
    }

    public void writeInUseValueSchema(String str, int i, int i2) {
        update(str, MetaStoreDataType.VALUE_SCHEMAS_WRITTEN_PER_STORE_VERSION, () -> {
            HashMap hashMap = new HashMap(2);
            hashMap.put(KEY_STRING_STORE_NAME, str);
            hashMap.put(KEY_STRING_VERSION_NUMBER, Integer.toString(i));
            return hashMap;
        }, () -> {
            StoreMetaValueWriteOpRecord storeMetaValueWriteOpRecord = new StoreMetaValueWriteOpRecord();
            storeMetaValueWriteOpRecord.timestamp = Long.valueOf(System.currentTimeMillis());
            ArrayList arrayList = new ArrayList(1);
            arrayList.add(Integer.valueOf(i2));
            storeValueSchemaIdsWrittenPerStoreVersionListOps storevalueschemaidswrittenperstoreversionlistops = new storeValueSchemaIdsWrittenPerStoreVersionListOps();
            storevalueschemaidswrittenperstoreversionlistops.setUnion = arrayList;
            storevalueschemaidswrittenperstoreversionlistops.setDiff = Collections.emptyList();
            storeMetaValueWriteOpRecord.storeValueSchemaIdsWrittenPerStoreVersion = storevalueschemaidswrittenperstoreversionlistops;
            return storeMetaValueWriteOpRecord;
        });
    }

    public void writeReadyToServerStoreReplicas(String str, String str2, int i, int i2, Collection<Instance> collection) {
        write(str2, MetaStoreDataType.STORE_REPLICA_STATUSES, () -> {
            return new HashMap<String, String>() { // from class: com.linkedin.venice.system.store.MetaStoreWriter.3
                {
                    put(MetaStoreWriter.KEY_STRING_STORE_NAME, str2);
                    put(MetaStoreWriter.KEY_STRING_CLUSTER_NAME, str);
                    put(MetaStoreWriter.KEY_STRING_VERSION_NUMBER, Integer.toString(i));
                    put(MetaStoreWriter.KEY_STRING_PARTITION_ID, Integer.toString(i2));
                }
            };
        }, () -> {
            StoreMetaValue storeMetaValue = new StoreMetaValue();
            HashMap hashMap = new HashMap();
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                Instance instance = (Instance) it.next();
                StoreReplicaStatus storeReplicaStatus = new StoreReplicaStatus();
                storeReplicaStatus.status = ExecutionStatus.COMPLETED.getValue();
                hashMap.put(instance.getUrl(true), storeReplicaStatus);
            }
            storeMetaValue.storeReplicaStatuses = hashMap;
            return storeMetaValue;
        });
    }

    public void writeStoreReplicaStatus(String str, String str2, int i, int i2, Instance instance, ExecutionStatus executionStatus) {
        update(str2, MetaStoreDataType.STORE_REPLICA_STATUSES, () -> {
            return new HashMap<String, String>() { // from class: com.linkedin.venice.system.store.MetaStoreWriter.4
                {
                    put(MetaStoreWriter.KEY_STRING_STORE_NAME, str2);
                    put(MetaStoreWriter.KEY_STRING_CLUSTER_NAME, str);
                    put(MetaStoreWriter.KEY_STRING_VERSION_NUMBER, Integer.toString(i));
                    put(MetaStoreWriter.KEY_STRING_PARTITION_ID, Integer.toString(i2));
                }
            };
        }, () -> {
            StoreMetaValueWriteOpRecord storeMetaValueWriteOpRecord = new StoreMetaValueWriteOpRecord();
            storeMetaValueWriteOpRecord.timestamp = Long.valueOf(System.currentTimeMillis());
            HashMap hashMap = new HashMap();
            StoreReplicaStatus storeReplicaStatus = new StoreReplicaStatus();
            storeReplicaStatus.status = executionStatus.getValue();
            hashMap.put(instance.getUrl(true), storeReplicaStatus);
            storeReplicaStatusesMapOps storereplicastatusesmapops = new storeReplicaStatusesMapOps();
            storereplicastatusesmapops.mapUnion = hashMap;
            storereplicastatusesmapops.mapDiff = Collections.emptyList();
            storeMetaValueWriteOpRecord.storeReplicaStatuses = storereplicastatusesmapops;
            return storeMetaValueWriteOpRecord;
        });
    }

    public void writeStoreClusterConfig(StoreConfig storeConfig) {
        write(storeConfig.getStoreName(), MetaStoreDataType.STORE_CLUSTER_CONFIG, () -> {
            return new HashMap<String, String>() { // from class: com.linkedin.venice.system.store.MetaStoreWriter.5
                {
                    put(MetaStoreWriter.KEY_STRING_STORE_NAME, storeConfig.getStoreName());
                }
            };
        }, () -> {
            StoreMetaValue storeMetaValue = new StoreMetaValue();
            storeMetaValue.storeClusterConfig = storeConfig.dataModel();
            return storeMetaValue;
        });
    }

    public void deleteStoreReplicaStatus(String str, String str2, int i, int i2, Instance instance) {
        update(str2, MetaStoreDataType.STORE_REPLICA_STATUSES, () -> {
            return new HashMap<String, String>() { // from class: com.linkedin.venice.system.store.MetaStoreWriter.6
                {
                    put(MetaStoreWriter.KEY_STRING_STORE_NAME, str2);
                    put(MetaStoreWriter.KEY_STRING_CLUSTER_NAME, str);
                    put(MetaStoreWriter.KEY_STRING_VERSION_NUMBER, Integer.toString(i));
                    put(MetaStoreWriter.KEY_STRING_PARTITION_ID, Integer.toString(i2));
                }
            };
        }, () -> {
            StoreMetaValueWriteOpRecord storeMetaValueWriteOpRecord = new StoreMetaValueWriteOpRecord();
            storeMetaValueWriteOpRecord.timestamp = Long.valueOf(System.currentTimeMillis());
            storeReplicaStatusesMapOps storereplicastatusesmapops = new storeReplicaStatusesMapOps();
            ArrayList arrayList = new ArrayList();
            arrayList.add(instance.getUrl(true));
            storereplicastatusesmapops.mapDiff = arrayList;
            storereplicastatusesmapops.mapUnion = Collections.emptyMap();
            storeMetaValueWriteOpRecord.storeReplicaStatuses = storereplicastatusesmapops;
            return storeMetaValueWriteOpRecord;
        });
    }

    public void deleteStoreReplicaStatus(final String str, final String str2, final int i, final int i2) {
        String systemStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(str2);
        StoreMetaKey storeMetaKey = MetaStoreDataType.STORE_REPLICA_STATUSES.getStoreMetaKey(new HashMap<String, String>() { // from class: com.linkedin.venice.system.store.MetaStoreWriter.7
            {
                put(MetaStoreWriter.KEY_STRING_STORE_NAME, str2);
                put(MetaStoreWriter.KEY_STRING_CLUSTER_NAME, str);
                put(MetaStoreWriter.KEY_STRING_VERSION_NUMBER, Integer.toString(i));
                put(MetaStoreWriter.KEY_STRING_PARTITION_ID, Integer.toString(i2));
            }
        });
        writeMessageWithRetry(systemStoreName, veniceWriter -> {
            veniceWriter.delete(storeMetaKey, null);
        });
    }

    public void removeMetaStoreWriter(String str) {
        VeniceWriter remove = getMetaStoreWriterMap().remove(str);
        if (remove != null) {
            closeVeniceWriter(str, remove, true);
            LOGGER.info("Removed the venice writer for meta store: {}", str);
        }
    }

    public VeniceWriter getMetaStoreWriter(String str) {
        return this.metaStoreWriterMap.get(str);
    }

    private void write(String str, MetaStoreDataType metaStoreDataType, Supplier<Map<String, String>> supplier, Supplier<StoreMetaValue> supplier2) {
        String systemStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(str);
        StoreMetaKey storeMetaKey = metaStoreDataType.getStoreMetaKey(supplier.get());
        StoreMetaValue storeMetaValue = supplier2.get();
        storeMetaValue.timestamp = System.currentTimeMillis();
        writeMessageWithRetry(systemStoreName, veniceWriter -> {
            veniceWriter.put(storeMetaKey, storeMetaValue, AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE.currentProtocolVersion.get().intValue());
        });
    }

    private void update(String str, MetaStoreDataType metaStoreDataType, Supplier<Map<String, String>> supplier, Supplier<SpecificRecord> supplier2) {
        String systemStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(str);
        if (this.derivedComputeSchemaId == -1) {
            GeneratedSchemaID derivedSchemaId = this.zkSharedSchemaRepository.getDerivedSchemaId(VeniceSystemStoreType.META_STORE.getZkSharedStoreName(), this.derivedComputeSchema.toString());
            if (!derivedSchemaId.isValid()) {
                throw new VeniceException("The derived compute schema for meta system store hasn't been registered to Venice yet");
            }
            this.derivedComputeSchemaId = derivedSchemaId.getGeneratedSchemaVersion();
        }
        StoreMetaKey storeMetaKey = metaStoreDataType.getStoreMetaKey(supplier.get());
        SpecificRecord specificRecord = supplier2.get();
        writeMessageWithRetry(systemStoreName, veniceWriter -> {
            veniceWriter.update(storeMetaKey, specificRecord, AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE.currentProtocolVersion.get().intValue(), this.derivedComputeSchemaId, null);
        });
    }

    void writeMessageWithRetry(String str, Consumer<VeniceWriter> consumer) {
        ReentrantLock orCreateMetaStoreWriterLock = getOrCreateMetaStoreWriterLock(str);
        int i = 0;
        boolean z = false;
        while (!z) {
            orCreateMetaStoreWriterLock.lock();
            try {
                try {
                    VeniceWriter orCreateMetaStoreWriter = getOrCreateMetaStoreWriter(str);
                    consumer.accept(orCreateMetaStoreWriter);
                    orCreateMetaStoreWriter.flush();
                    z = true;
                    orCreateMetaStoreWriterLock.unlock();
                } catch (Exception e) {
                    i++;
                    if (i >= 3) {
                        LOGGER.error("Fail to write message after {} attempts.", 3, e);
                        orCreateMetaStoreWriterLock.unlock();
                        return;
                    } else {
                        LOGGER.warn("Caught exception while trying to write message, will restart Venice Writer and retry {}/{}", Integer.valueOf(i), 3);
                        try {
                            removeMetaStoreWriter(str);
                        } catch (Exception e2) {
                            LOGGER.warn("Caught exception while trying to close Venice writer", e);
                        }
                        orCreateMetaStoreWriterLock.unlock();
                    }
                    orCreateMetaStoreWriterLock.unlock();
                    throw th;
                }
            } catch (Throwable th) {
                orCreateMetaStoreWriterLock.unlock();
                throw th;
            }
        }
    }

    ReentrantLock getOrCreateMetaStoreWriterLock(String str) {
        return this.metaStoreWriterLockMap.computeIfAbsent(str, str2 -> {
            return new ReentrantLock();
        });
    }

    Map<String, VeniceWriter> getMetaStoreWriterMap() {
        return this.metaStoreWriterMap;
    }

    VeniceWriter getOrCreateMetaStoreWriter(String str) {
        return this.metaStoreWriterMap.computeIfAbsent(str, str2 -> {
            PubSubTopic topic = this.pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(str));
            if (!this.topicManager.containsTopicAndAllPartitionsAreOnline(topic)) {
                throw new VeniceException("Realtime topic: " + topic + " doesn't exist or some partitions are not online");
            }
            return this.writerFactory.createVeniceWriter(new VeniceWriterOptions.Builder(topic.getName()).setKeySerializer(new VeniceAvroKafkaSerializer(AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE_KEY.getCurrentProtocolVersionSchema())).setValueSerializer(new VeniceAvroKafkaSerializer(AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE.getCurrentProtocolVersionSchema())).setWriteComputeSerializer(new VeniceAvroKafkaSerializer(this.derivedComputeSchema)).setChunkingEnabled(false).setPartitionCount(1).build());
        });
    }

    private Map<CharSequence, CharSequence> buildSchemaMap(Collection<SchemaEntry> collection) {
        return (Map) collection.stream().collect(Collectors.toMap(schemaEntry -> {
            return Integer.toString(schemaEntry.getId());
        }, schemaEntry2 -> {
            return schemaEntry2.getSchema().toString();
        }));
    }

    private Map<CharSequence, CharSequence> buildSchemaIdOnlyMap(Collection<SchemaEntry> collection) {
        return (Map) collection.stream().collect(Collectors.toMap(schemaEntry -> {
            return Integer.toString(schemaEntry.getId());
        }, schemaEntry2 -> {
            return OffsetRecord.NON_AA_REPLICATION_UPSTREAM_OFFSET_MAP_KEY;
        }));
    }

    private void closeVeniceWriter(String str, VeniceWriter veniceWriter, boolean z) {
        if (z) {
            veniceWriter.close(false);
            return;
        }
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(str));
        if (this.topicManager.containsTopicAndAllPartitionsAreOnline(topic)) {
            veniceWriter.close();
        } else {
            LOGGER.info("RT topic: {} for meta system store: {} doesn't exist, will only close the internal producer without sending END_OF_SEGMENT control messages", topic, str);
            veniceWriter.close(false);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        Timer run = Timer.run(d -> {
            LOGGER.info("MetaStoreWriter takes {}ms to close {} VeniceWriters in parallel", d, Integer.valueOf(this.metaStoreWriterMap.size()));
        });
        try {
            this.metaStoreWriterMap.entrySet().parallelStream().forEach(entry -> {
                closeVeniceWriter((String) entry.getKey(), (VeniceWriter) entry.getValue(), false);
            });
            if (run != null) {
                run.close();
            }
        } catch (Throwable th) {
            if (run != null) {
                try {
                    run.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
