package com.linkedin.venice.utils;

import com.github.luben.zstd.Zstd;
import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.kafka.consumer.AbstractStoreBufferService;
import com.linkedin.davinci.kafka.consumer.AggKafkaConsumerService;
import com.linkedin.davinci.kafka.consumer.StoreBufferService;
import com.linkedin.davinci.kafka.consumer.StoreIngestionTaskFactory;
import com.linkedin.davinci.stats.AggHostLevelIngestionStats;
import com.linkedin.davinci.stats.AggVersionedDIVStats;
import com.linkedin.davinci.stats.AggVersionedIngestionStats;
import com.linkedin.davinci.storage.StorageEngineRepository;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.GzipCompressor;
import com.linkedin.venice.compression.NoopCompressor;
import com.linkedin.venice.compression.ZstdWithDictCompressor;
import com.linkedin.venice.controller.VeniceControllerConfig;
import com.linkedin.venice.controller.VeniceControllerMultiClusterConfig;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.controllerapi.JobStatusQueryResponse;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.helix.HelixAdapterSerializer;
import com.linkedin.venice.helix.HelixInstanceConverter;
import com.linkedin.venice.helix.SafeHelixManager;
import com.linkedin.venice.helix.VeniceOfflinePushMonitorAccessor;
import com.linkedin.venice.kafka.TopicManagerRepository;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.meta.HybridStoreConfig;
import com.linkedin.venice.meta.IngestionMode;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.meta.OfflinePushStrategy;
import com.linkedin.venice.meta.PartitionerConfigImpl;
import com.linkedin.venice.meta.PersistenceType;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.meta.ReadStrategy;
import com.linkedin.venice.meta.RoutingStrategy;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionImpl;
import com.linkedin.venice.meta.ZKStore;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.partitioner.VenicePartitioner;
import com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerAdapterFactory;
import com.linkedin.venice.pubsub.adapter.kafka.producer.SharedKafkaProducerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubTopicType;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.serializer.AvroSerializer;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import com.linkedin.venice.writer.VeniceWriterOptions;
import io.tehuti.metrics.MetricsRepository;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Paths;
import java.security.Permission;
import java.util.AbstractMap;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.rocksdb.ComparatorOptions;
import org.rocksdb.util.BytewiseComparator;
import org.testng.Assert;

/* loaded from: input_file:com/linkedin/venice/utils/TestUtils.class */
public class TestUtils {
    private static final long ND_ASSERTION_MIN_WAIT_TIME_MS = 100;
    private static final long ND_ASSERTION_MAX_WAIT_TIME_MS = 3000;
    private static final Logger LOGGER = LogManager.getLogger(TestUtils.class);
    private static final InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer = AvroProtocolDefinition.PARTITION_STATE.getSerializer();

    /* loaded from: input_file:com/linkedin/venice/utils/TestUtils$NonDeterministicAssertion.class */
    public interface NonDeterministicAssertion {
        void execute() throws Exception;
    }

    public static String dequalifyClassName(String str) {
        return str.substring(str.lastIndexOf(46) + 1);
    }

    private static String getCallingMethod() {
        return (String) Arrays.stream(Thread.currentThread().getStackTrace()).filter(stackTraceElement -> {
            return stackTraceElement.getClassName().startsWith("com.linkedin.") && !stackTraceElement.getClassName().equals(TestUtils.class.getName());
        }).findFirst().map(stackTraceElement2 -> {
            return String.format("%s.%s.%d", dequalifyClassName(stackTraceElement2.getClassName()), stackTraceElement2.getMethodName(), Integer.valueOf(stackTraceElement2.getLineNumber()));
        }).orElse("UNKNOWN_METHOD");
    }

    public static void waitForNonDeterministicCompletion(long j, TimeUnit timeUnit, BooleanSupplier booleanSupplier) throws AssertionError {
        long currentTimeMillis = System.currentTimeMillis();
        long millis = currentTimeMillis + timeUnit.toMillis(j);
        while (!booleanSupplier.getAsBoolean()) {
            try {
                Assert.assertTrue(millis - System.currentTimeMillis() > ND_ASSERTION_MIN_WAIT_TIME_MS, "Non-deterministic condition not met.");
                Assert.assertTrue(Utils.sleep(ND_ASSERTION_MIN_WAIT_TIME_MS), "Waiting for non-deterministic condition was interrupted.");
            } catch (Throwable th) {
                LOGGER.info("{} waiting took {} ms.", getCallingMethod(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                throw th;
            }
        }
        LOGGER.info("{} waiting took {} ms.", getCallingMethod(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    public static ControllerResponse updateStoreToHybrid(String str, ControllerClient controllerClient, Optional<Boolean> optional, Optional<Boolean> optional2, Optional<Boolean> optional3) {
        UpdateStoreQueryParams hybridOffsetLagThreshold = new UpdateStoreQueryParams().setStorageQuotaInByte(-1L).setHybridRewindSeconds(25L).setHybridOffsetLagThreshold(1L);
        Objects.requireNonNull(hybridOffsetLagThreshold);
        optional.ifPresent((v1) -> {
            r1.setNativeReplicationEnabled(v1);
        });
        Objects.requireNonNull(hybridOffsetLagThreshold);
        optional2.ifPresent((v1) -> {
            r1.setActiveActiveReplicationEnabled(v1);
        });
        Objects.requireNonNull(hybridOffsetLagThreshold);
        optional3.ifPresent((v1) -> {
            r1.setChunkingEnabled(v1);
        });
        return assertCommand(controllerClient.updateStore(str, hybridOffsetLagThreshold));
    }

    public static void waitForNonDeterministicAssertion(long j, TimeUnit timeUnit, NonDeterministicAssertion nonDeterministicAssertion) throws AssertionError {
        waitForNonDeterministicAssertion(j, timeUnit, false, nonDeterministicAssertion);
    }

    public static void waitForNonDeterministicAssertion(long j, TimeUnit timeUnit, boolean z, NonDeterministicAssertion nonDeterministicAssertion) throws AssertionError {
        waitForNonDeterministicAssertion(j, timeUnit, z, false, nonDeterministicAssertion);
    }

    public static void waitForNonDeterministicAssertion(long j, TimeUnit timeUnit, boolean z, boolean z2, NonDeterministicAssertion nonDeterministicAssertion) throws AssertionError {
        long currentTimeMillis = System.currentTimeMillis();
        long j2 = 100;
        long millis = currentTimeMillis + timeUnit.toMillis(j);
        while (true) {
            try {
                try {
                    nonDeterministicAssertion.execute();
                    LOGGER.info("{} waiting took {} ms.", getCallingMethod(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                    return;
                } catch (Throwable th) {
                    long currentTimeMillis2 = millis - System.currentTimeMillis();
                    if (currentTimeMillis2 < j2 || !(z2 || (th instanceof AssertionError))) {
                        if (th instanceof AssertionError) {
                            throw ((AssertionError) th);
                        }
                        throw new AssertionError(th);
                    }
                    LOGGER.info("Non-deterministic assertion not met: {}. Will retry again in {} ms.", th, Long.valueOf(j2));
                    Assert.assertTrue(Utils.sleep(j2), "Waiting for non-deterministic assertion was interrupted.");
                    if (z) {
                        j2 = Math.min(Math.min(j2 * 2, currentTimeMillis2 - j2), ND_ASSERTION_MAX_WAIT_TIME_MS);
                    }
                }
            } catch (Throwable th2) {
                LOGGER.info("{} waiting took {} ms.", getCallingMethod(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                throw th2;
            }
        }
    }

    public static VersionCreationResponse createVersionWithBatchData(ControllerClient controllerClient, String str, String str2, String str3, Stream<Map.Entry> stream) {
        return createVersionWithBatchData(controllerClient, str, str2, str3, stream, 1);
    }

    public static VersionCreationResponse createVersionWithBatchData(ControllerClient controllerClient, String str, String str2, String str3, Stream<Map.Entry> stream, int i) {
        VersionCreationResponse assertCommand = assertCommand(controllerClient.requestTopicForWrites(str, 1024L, Version.PushType.BATCH, Version.guidBasedDummyPushId(), true, false, false, Optional.empty(), Optional.empty(), Optional.empty(), false, -1L));
        writeBatchData(assertCommand, str2, str3, stream, i);
        return assertCommand;
    }

    public static void writeBatchData(VersionCreationResponse versionCreationResponse, String str, String str2, Stream<Map.Entry> stream, int i) {
        writeBatchData(versionCreationResponse, str, str2, stream, i, CompressionStrategy.NO_OP, null);
    }

    public static void writeBatchData(VersionCreationResponse versionCreationResponse, String str, String str2, Stream<Map.Entry> stream, int i, CompressionStrategy compressionStrategy, Function<String, ByteBuffer> function) {
        Properties properties = new Properties();
        properties.put("kafka.bootstrap.servers", versionCreationResponse.getKafkaBootstrapServers());
        properties.setProperty("partitioner.class", versionCreationResponse.getPartitionerClass());
        properties.putAll(versionCreationResponse.getPartitionerParams());
        properties.setProperty("amplification.factor", String.valueOf(versionCreationResponse.getAmplificationFactor()));
        VeniceWriterFactory veniceWriterFactory = getVeniceWriterFactory(properties);
        Properties properties2 = new Properties();
        properties2.putAll(versionCreationResponse.getPartitionerParams());
        VenicePartitioner venicePartitioner = PartitionUtils.getVenicePartitioner(versionCreationResponse.getPartitionerClass(), versionCreationResponse.getAmplificationFactor(), new VeniceProperties(properties2));
        if (compressionStrategy != CompressionStrategy.NO_OP) {
            writeCompressed(veniceWriterFactory, str, str2, i, versionCreationResponse.getKafkaTopic(), versionCreationResponse.getPartitions() * versionCreationResponse.getAmplificationFactor(), venicePartitioner, stream, compressionStrategy, function.apply(versionCreationResponse.getKafkaTopic()));
        } else {
            writeUncompressed(veniceWriterFactory, str, str2, i, versionCreationResponse.getKafkaTopic(), versionCreationResponse.getPartitions() * versionCreationResponse.getAmplificationFactor(), venicePartitioner, stream);
        }
    }

    private static void writeCompressed(VeniceWriterFactory veniceWriterFactory, String str, String str2, int i, String str3, int i2, VenicePartitioner venicePartitioner, Stream<Map.Entry> stream, CompressionStrategy compressionStrategy, ByteBuffer byteBuffer) {
        ZstdWithDictCompressor zstdWithDictCompressor = compressionStrategy == CompressionStrategy.ZSTD_WITH_DICT ? new ZstdWithDictCompressor(byteBuffer.array(), Zstd.maxCompressionLevel()) : compressionStrategy == CompressionStrategy.GZIP ? new GzipCompressor() : new NoopCompressor();
        try {
            VeniceWriter createVeniceWriter = veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(str3).setPartitionCount(Integer.valueOf(i2)).setPartitioner(venicePartitioner).build());
            try {
                createVeniceWriter.broadcastStartOfPush(false, false, compressionStrategy, Optional.ofNullable(byteBuffer), Collections.emptyMap());
                VeniceAvroKafkaSerializer veniceAvroKafkaSerializer = new VeniceAvroKafkaSerializer(str);
                VeniceAvroKafkaSerializer veniceAvroKafkaSerializer2 = new VeniceAvroKafkaSerializer(str2);
                LinkedList linkedList = new LinkedList();
                Objects.requireNonNull(stream);
                Iterable<Map.Entry> iterable = stream::iterator;
                for (Map.Entry entry : iterable) {
                    linkedList.add(createVeniceWriter.put(veniceAvroKafkaSerializer.serialize(str3, entry.getKey()), zstdWithDictCompressor.compress(veniceAvroKafkaSerializer2.serialize(str3, entry.getValue())), i));
                }
                Iterator it = linkedList.iterator();
                while (it.hasNext()) {
                    ((Future) it.next()).get();
                }
                createVeniceWriter.broadcastEndOfPush(Collections.emptyMap());
                if (createVeniceWriter != null) {
                    createVeniceWriter.close();
                }
            } finally {
            }
        } catch (IOException | InterruptedException | ExecutionException e) {
            throw new VeniceException(e);
        }
    }

    private static void writeUncompressed(VeniceWriterFactory veniceWriterFactory, String str, String str2, int i, String str3, int i2, VenicePartitioner venicePartitioner, Stream<Map.Entry> stream) {
        try {
            VeniceWriter createVeniceWriter = veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(str3).setKeySerializer(new VeniceAvroKafkaSerializer(str)).setValueSerializer(new VeniceAvroKafkaSerializer(str2)).setPartitionCount(Integer.valueOf(i2)).setPartitioner(venicePartitioner).build());
            try {
                createVeniceWriter.broadcastStartOfPush(Collections.emptyMap());
                LinkedList linkedList = new LinkedList();
                Objects.requireNonNull(stream);
                Iterable<Map.Entry> iterable = stream::iterator;
                for (Map.Entry entry : iterable) {
                    linkedList.add(createVeniceWriter.put(entry.getKey(), entry.getValue(), i));
                }
                Iterator it = linkedList.iterator();
                while (it.hasNext()) {
                    ((Future) it.next()).get();
                }
                createVeniceWriter.broadcastEndOfPush(Collections.emptyMap());
                if (createVeniceWriter != null) {
                    createVeniceWriter.close();
                }
            } finally {
            }
        } catch (InterruptedException | ExecutionException e) {
            throw new VeniceException(e);
        }
    }

    public static void waitForNonDeterministicPushCompletion(String str, ControllerClient controllerClient, long j, TimeUnit timeUnit) {
        waitForNonDeterministicAssertion(j, timeUnit, () -> {
            JobStatusQueryResponse assertCommand = assertCommand(controllerClient.queryJobStatus(str, Optional.empty()));
            ExecutionStatus valueOf = ExecutionStatus.valueOf(assertCommand.getStatus());
            if (valueOf == ExecutionStatus.ERROR) {
                throw new VeniceException("Unexpected push failure for topic: " + str + ": " + assertCommand);
            }
            Assert.assertEquals(valueOf, ExecutionStatus.COMPLETED, "Push is yet to complete: " + assertCommand.toString());
        });
    }

    public static Store createTestStore(String str, String str2, long j) {
        ZKStore zKStore = new ZKStore(str, str2, j, PersistenceType.IN_MEMORY, RoutingStrategy.CONSISTENT_HASH, ReadStrategy.ANY_OF_ONLINE, OfflinePushStrategy.WAIT_ALL_REPLICAS, 3);
        zKStore.setLatestVersionPromoteToCurrentTimestamp(-1L);
        return zKStore;
    }

    @Deprecated
    public static SafeHelixManager getParticipant(String str, String str2, String str3, int i, String str4) {
        new ZkClient(str3).close();
        return getParticipant(str, str2, str3, i, new MockTestStateModelFactory(new VeniceOfflinePushMonitorAccessor(str, new ZkClient(str3), new HelixAdapterSerializer(), 3, 1000L)), str4);
    }

    public static SafeHelixManager getParticipant(String str, String str2, String str3, int i, StateModelFactory<StateModel> stateModelFactory, String str4) {
        SafeHelixManager safeHelixManager = new SafeHelixManager(HelixManagerFactory.getZKHelixManager(str, str2, InstanceType.PARTICIPANT, str3));
        safeHelixManager.getStateMachineEngine().registerStateModelFactory(str4, stateModelFactory);
        safeHelixManager.setLiveInstanceInfoProvider(() -> {
            return HelixInstanceConverter.convertInstanceToZNRecord(new Instance(str2, Utils.getHostName(), i));
        });
        return safeHelixManager;
    }

    public static OffsetRecord getOffsetRecord(long j) {
        return getOffsetRecord(j, (Optional<Long>) Optional.empty());
    }

    public static OffsetRecord getOffsetRecord(long j, boolean z) {
        return getOffsetRecord(j, (Optional<Long>) (z ? Optional.of(1000L) : Optional.of(0L)));
    }

    public static OffsetRecord getOffsetRecord(long j, Optional<Long> optional) {
        OffsetRecord offsetRecord = new OffsetRecord(partitionStateSerializer);
        offsetRecord.setCheckpointLocalVersionTopicOffset(j);
        if (optional.isPresent()) {
            offsetRecord.endOfPushReceived(optional.get().longValue());
        }
        return offsetRecord;
    }

    public static VeniceControllerMultiClusterConfig getMultiClusterConfigFromOneCluster(VeniceControllerConfig veniceControllerConfig) {
        HashMap hashMap = new HashMap();
        hashMap.put(veniceControllerConfig.getClusterName(), veniceControllerConfig);
        return new VeniceControllerMultiClusterConfig(hashMap);
    }

    public static Properties getPropertiesForControllerConfig() throws IOException {
        String path = Paths.get("", new String[0]).toAbsolutePath().toString();
        if (path.endsWith("venice-controller")) {
            path = path + "/..";
        } else if (path.endsWith("venice-test-common")) {
            path = path + "/../../services";
        }
        VeniceProperties parseProperties = Utils.parseProperties(path + "/venice-server/config/cluster.properties");
        VeniceProperties parseProperties2 = Utils.parseProperties(path + "/venice-controller/config/controller.properties");
        Properties properties = new Properties();
        properties.putAll(parseProperties.toProperties());
        properties.putAll(parseProperties2.toProperties());
        return properties;
    }

    public static String getClusterToD2String(Map<String, String> map) {
        return (String) map.entrySet().stream().map(entry -> {
            return ((String) entry.getKey()) + ":" + ((String) entry.getValue());
        }).collect(Collectors.joining(","));
    }

    public static VeniceWriterFactory getVeniceWriterFactory(String str) {
        Properties properties = new Properties();
        properties.put("kafka.bootstrap.servers", str);
        return getVeniceWriterFactory(properties);
    }

    public static VeniceWriterFactory getVeniceWriterFactory(Properties properties) {
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        return new VeniceWriterFactory(properties2);
    }

    public static SharedKafkaProducerAdapterFactory getSharedKafkaProducerService(Properties properties) {
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        return new SharedKafkaProducerAdapterFactory(properties2, 1, new ApacheKafkaProducerAdapterFactory(), new MetricsRepository(), new HashSet(Arrays.asList("outgoing-byte-rate", "record-send-rate", "batch-size-max", "batch-size-avg", "buffer-available-bytes", "buffer-exhausted-rate")));
    }

    public static VeniceWriterFactory getVeniceWriterFactoryWithSharedProducer(Properties properties, SharedKafkaProducerAdapterFactory sharedKafkaProducerAdapterFactory) {
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        return new VeniceWriterFactory(properties2, sharedKafkaProducerAdapterFactory, sharedKafkaProducerAdapterFactory.getMetricsRepository());
    }

    public static Store getRandomStore() {
        return new ZKStore(Utils.getUniqueString("RandomStore"), Utils.getUniqueString("RandomOwner"), System.currentTimeMillis(), PersistenceType.ROCKS_DB, RoutingStrategy.CONSISTENT_HASH, ReadStrategy.ANY_OF_ONLINE, OfflinePushStrategy.WAIT_ALL_REPLICAS, 3);
    }

    public static <T extends ControllerResponse> T assertCommand(T t) {
        return (T) assertCommand(t, "Controller error");
    }

    public static <T extends ControllerResponse> T assertCommand(T t, String str) {
        Assert.assertFalse(t.isError(), str + ": " + t.getError());
        return t;
    }

    public static void preventSystemExit() {
        System.setSecurityManager(new SecurityManager() { // from class: com.linkedin.venice.utils.TestUtils.1
            @Override // java.lang.SecurityManager
            public void checkPermission(Permission permission) {
            }

            @Override // java.lang.SecurityManager
            public void checkPermission(Permission permission, Object obj) {
            }

            @Override // java.lang.SecurityManager
            public void checkExit(int i) {
                if (i != 0) {
                    SecurityException securityException = new SecurityException("System exit requested with error " + i);
                    TestUtils.LOGGER.info("checkExit called", securityException);
                    throw securityException;
                }
            }
        });
    }

    public static void restoreSystemExit() {
        System.setSecurityManager(null);
    }

    public static void createAndVerifyStoreInAllRegions(String str, ControllerClient controllerClient, List<ControllerClient> list) {
        Assert.assertFalse(controllerClient.createNewStore(str, "owner", TestWriteUtils.STRING_SCHEMA, TestWriteUtils.STRING_SCHEMA).isError());
        waitForNonDeterministicAssertion(60L, TimeUnit.SECONDS, () -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                Assert.assertFalse(((ControllerClient) it.next()).getStore(str).isError());
            }
        });
    }

    public static void verifyDCConfigNativeAndActiveRepl(String str, boolean z, boolean z2, ControllerClient... controllerClientArr) {
        waitForNonDeterministicAssertion(60L, TimeUnit.SECONDS, true, () -> {
            for (ControllerClient controllerClient : controllerClientArr) {
                StoreResponse assertCommand = assertCommand(controllerClient.getStore(str));
                Assert.assertEquals(assertCommand.getStore().isNativeReplicationEnabled(), z, "The native replication config does not match.");
                Assert.assertEquals(assertCommand.getStore().isActiveActiveReplicationEnabled(), z2, "The active active replication config does not match.");
            }
        });
    }

    public static StoreIngestionTaskFactory.Builder getStoreIngestionTaskBuilder(String str) {
        VeniceServerConfig veniceServerConfig = (VeniceServerConfig) Mockito.mock(VeniceServerConfig.class);
        ((VeniceServerConfig) Mockito.doReturn(false).when(veniceServerConfig)).isHybridQuotaEnabled();
        VeniceProperties veniceProperties = (VeniceProperties) Mockito.mock(VeniceProperties.class);
        ((VeniceProperties) Mockito.doReturn(true).when(veniceProperties)).isEmpty();
        ((VeniceServerConfig) Mockito.doReturn(veniceProperties).when(veniceServerConfig)).getKafkaConsumerConfigsForLocalConsumption();
        StorageEngineRepository storageEngineRepository = (StorageEngineRepository) Mockito.mock(StorageEngineRepository.class);
        ((StorageEngineRepository) Mockito.doReturn(Mockito.mock(AbstractStorageEngine.class)).when(storageEngineRepository)).getLocalStorageEngine(ArgumentMatchers.anyString());
        ReadOnlyStoreRepository readOnlyStoreRepository = (ReadOnlyStoreRepository) Mockito.mock(ReadOnlyStoreRepository.class);
        Store store = (Store) Mockito.mock(Store.class);
        ((ReadOnlyStoreRepository) Mockito.doReturn(store).when(readOnlyStoreRepository)).getStoreOrThrow((String) Mockito.eq(str));
        ((Store) Mockito.doReturn(false).when(store)).isHybridStoreDiskQuotaEnabled();
        ((Store) Mockito.doReturn(0).when(store)).getBootstrapToOnlineTimeoutInHours();
        StorageMetadataService storageMetadataService = (StorageMetadataService) Mockito.mock(StorageMetadataService.class);
        OffsetRecord offsetRecord = (OffsetRecord) Mockito.mock(OffsetRecord.class);
        ((OffsetRecord) Mockito.doReturn(Collections.emptyMap()).when(offsetRecord)).getProducerPartitionStateMap();
        ((StorageMetadataService) Mockito.doReturn(offsetRecord).when(storageMetadataService)).getLastOffset((String) Mockito.eq(Version.composeKafkaTopic(str, 1)), Mockito.eq(0));
        DefaultVenicePartitioner defaultVenicePartitioner = new DefaultVenicePartitioner();
        PartitionerConfigImpl partitionerConfigImpl = new PartitionerConfigImpl();
        partitionerConfigImpl.setPartitionerClass(defaultVenicePartitioner.getClass().getName());
        partitionerConfigImpl.setAmplificationFactor(1);
        VersionImpl versionImpl = new VersionImpl(str, 1, "1", 1);
        versionImpl.setPartitionerConfig(partitionerConfigImpl);
        ((Store) Mockito.doReturn(partitionerConfigImpl).when(store)).getPartitionerConfig();
        versionImpl.setIncrementalPushEnabled(false);
        ((Store) Mockito.doReturn(false).when(store)).isIncrementalPushEnabled();
        versionImpl.setHybridStoreConfig((HybridStoreConfig) null);
        ((Store) Mockito.doReturn((Object) null).when(store)).getHybridStoreConfig();
        ((Store) Mockito.doReturn(false).when(store)).isHybrid();
        versionImpl.setBufferReplayEnabledForHybrid(true);
        versionImpl.setNativeReplicationEnabled(false);
        ((Store) Mockito.doReturn(false).when(store)).isNativeReplicationEnabled();
        versionImpl.setPushStreamSourceAddress("");
        ((Store) Mockito.doReturn("").when(store)).getPushStreamSourceAddress();
        ((Store) Mockito.doReturn(false).when(store)).isWriteComputationEnabled();
        ((Store) Mockito.doReturn(1).when(store)).getPartitionCount();
        ((Store) Mockito.doReturn(-1).when(store)).getCurrentVersion();
        ((Store) Mockito.doReturn(Optional.of(versionImpl)).when(store)).getVersion(Mockito.anyInt());
        return new StoreIngestionTaskFactory.Builder().setVeniceWriterFactory((VeniceWriterFactory) Mockito.mock(VeniceWriterFactory.class)).setStorageEngineRepository(storageEngineRepository).setStorageMetadataService(storageMetadataService).setLeaderFollowerNotifiersQueue(new ArrayDeque()).setSchemaRepository((ReadOnlySchemaRepository) Mockito.mock(ReadOnlySchemaRepository.class)).setMetadataRepository(readOnlyStoreRepository).setTopicManagerRepository((TopicManagerRepository) Mockito.mock(TopicManagerRepository.class)).setHostLevelIngestionStats((AggHostLevelIngestionStats) Mockito.mock(AggHostLevelIngestionStats.class)).setVersionedDIVStats((AggVersionedDIVStats) Mockito.mock(AggVersionedDIVStats.class)).setVersionedIngestionStats((AggVersionedIngestionStats) Mockito.mock(AggVersionedIngestionStats.class)).setStoreBufferService((AbstractStoreBufferService) Mockito.mock(StoreBufferService.class)).setDiskUsage((DiskUsage) Mockito.mock(DiskUsage.class)).setAggKafkaConsumerService((AggKafkaConsumerService) Mockito.mock(AggKafkaConsumerService.class)).setServerConfig((VeniceServerConfig) Mockito.mock(VeniceServerConfig.class)).setServerConfig(veniceServerConfig).setPartitionStateSerializer((InternalAvroSpecificSerializer) Mockito.mock(InternalAvroSpecificSerializer.class)).setIsDaVinciClient(false);
    }

    public static Map<byte[], byte[]> generateInput(int i, boolean z, int i2, AvroSerializer avroSerializer) {
        AbstractMap hashMap;
        if (z) {
            BytewiseComparator bytewiseComparator = new BytewiseComparator(new ComparatorOptions());
            hashMap = new TreeMap((bArr, bArr2) -> {
                return bytewiseComparator.compare(ByteBuffer.wrap(bArr), ByteBuffer.wrap(bArr2));
            });
        } else {
            hashMap = new HashMap();
        }
        for (int i3 = i2; i3 < i + i2; i3++) {
            hashMap.put(avroSerializer.serialize("key" + i3), avroSerializer.serialize("value" + i3));
        }
        return hashMap;
    }

    public static void shutdownThread(Thread thread) throws InterruptedException {
        shutdownThread(thread, 5L, TimeUnit.SECONDS);
    }

    public static void shutdownThread(Thread thread, long j, TimeUnit timeUnit) throws InterruptedException {
        if (thread == null) {
            return;
        }
        thread.interrupt();
        thread.join(timeUnit.toMillis(j));
        Assert.assertFalse(thread.isAlive());
    }

    public static void shutdownExecutor(ExecutorService executorService) throws InterruptedException {
        shutdownExecutor(executorService, 5L, TimeUnit.SECONDS);
    }

    public static void shutdownExecutor(ExecutorService executorService, long j, TimeUnit timeUnit) throws InterruptedException {
        if (executorService == null) {
            return;
        }
        executorService.shutdown();
        executorService.shutdownNow();
        Assert.assertTrue(executorService.awaitTermination(j, timeUnit));
    }

    public static void createMetaSystemStore(ControllerClient controllerClient, String str, Optional<Logger> optional) {
        String systemStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(str);
        waitForNonDeterministicPushCompletion(assertCommand(controllerClient.emptyPush(systemStoreName, "testEmptyPush", 1234321L)).getKafkaTopic(), controllerClient, 1L, TimeUnit.MINUTES);
        optional.ifPresent(logger -> {
            logger.info("System store " + systemStoreName + " is created.");
        });
    }

    public static void addIngestionIsolationToProperties(Properties properties) {
        properties.putAll(getIngestionIsolationPropertyMap());
    }

    public static Map<String, Object> getIngestionIsolationPropertyMap() {
        HashMap hashMap = new HashMap();
        hashMap.put("server.ingestion.mode", IngestionMode.ISOLATED);
        hashMap.put("server.forked.process.jvm.arg.list", "-Xms256M;-Xmx1G");
        return hashMap;
    }

    public static String getUniqueTopicString(String str) {
        int length = PubSubTopicType.values().length;
        PubSubTopicType pubSubTopicType = PubSubTopicType.values()[Math.abs(ThreadLocalRandom.current().nextInt() % length)];
        int abs = Math.abs(ThreadLocalRandom.current().nextInt() % length);
        if (pubSubTopicType.equals(PubSubTopicType.REALTIME_TOPIC)) {
            return Utils.getUniqueString(str) + "_rt";
        }
        if (pubSubTopicType.equals(PubSubTopicType.REPROCESSING_TOPIC)) {
            return Utils.getUniqueString(str) + "_v" + abs + "_sr";
        }
        if (pubSubTopicType.equals(PubSubTopicType.VERSION_TOPIC)) {
            return Utils.getUniqueString(str) + "_v" + abs;
        }
        if (pubSubTopicType.equals(PubSubTopicType.ADMIN_TOPIC)) {
            return "venice_admin_" + Utils.getUniqueString(str);
        }
        if (pubSubTopicType.equals(PubSubTopicType.VIEW_TOPIC)) {
            return Utils.getUniqueString(str) + "_v" + abs + "_cc";
        }
        if (pubSubTopicType.equals(PubSubTopicType.UNKNOWN_TYPE_TOPIC)) {
            return Utils.getUniqueString(str);
        }
        throw new VeniceException("Unsupported topic type for: " + pubSubTopicType);
    }
}
