package com.linkedin.venice.controller.kafka.consumer;

import com.linkedin.venice.admin.InMemoryAdminTopicMetadataAccessor;
import com.linkedin.venice.admin.InMemoryExecutionIdAccessor;
import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.controller.AdminTopicMetadataAccessor;
import com.linkedin.venice.controller.ExecutionIdAccessor;
import com.linkedin.venice.controller.VeniceHelixAdmin;
import com.linkedin.venice.controller.kafka.AdminTopicUtils;
import com.linkedin.venice.controller.kafka.protocol.admin.AddVersion;
import com.linkedin.venice.controller.kafka.protocol.admin.AdminOperation;
import com.linkedin.venice.controller.kafka.protocol.admin.DerivedSchemaCreation;
import com.linkedin.venice.controller.kafka.protocol.admin.ETLStoreConfigRecord;
import com.linkedin.venice.controller.kafka.protocol.admin.HybridStoreConfigRecord;
import com.linkedin.venice.controller.kafka.protocol.admin.KillOfflinePushJob;
import com.linkedin.venice.controller.kafka.protocol.admin.PartitionerConfigRecord;
import com.linkedin.venice.controller.kafka.protocol.admin.SchemaMeta;
import com.linkedin.venice.controller.kafka.protocol.admin.StoreCreation;
import com.linkedin.venice.controller.kafka.protocol.admin.UpdateStore;
import com.linkedin.venice.controller.kafka.protocol.enums.AdminMessageType;
import com.linkedin.venice.controller.kafka.protocol.enums.SchemaType;
import com.linkedin.venice.controller.kafka.protocol.serializer.AdminOperationSerializer;
import com.linkedin.venice.controller.stats.AdminConsumptionStats;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.guid.GuidUtils;
import com.linkedin.venice.kafka.TopicManager;
import com.linkedin.venice.kafka.VeniceOperationAgainstKafkaTimedOut;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.kafka.protocol.state.ProducerPartitionState;
import com.linkedin.venice.kafka.validation.SegmentStatus;
import com.linkedin.venice.kafka.validation.checksum.CheckSumType;
import com.linkedin.venice.meta.DataReplicationPolicy;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.offsets.InMemoryOffsetManager;
import com.linkedin.venice.offsets.OffsetManager;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.pubsub.kafka.KafkaPubSubMessageDeserializer;
import com.linkedin.venice.serialization.DefaultSerializer;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.OptimizedKafkaValueSerializer;
import com.linkedin.venice.unit.kafka.InMemoryKafkaBroker;
import com.linkedin.venice.unit.kafka.SimplePartitioner;
import com.linkedin.venice.unit.kafka.consumer.MockInMemoryConsumer;
import com.linkedin.venice.unit.kafka.consumer.poll.ArbitraryOrderingPollStrategy;
import com.linkedin.venice.unit.kafka.consumer.poll.CompositePollStrategy;
import com.linkedin.venice.unit.kafka.consumer.poll.FilteringPollStrategy;
import com.linkedin.venice.unit.kafka.consumer.poll.PollStrategy;
import com.linkedin.venice.unit.kafka.consumer.poll.PubSubTopicPartitionOffset;
import com.linkedin.venice.unit.kafka.consumer.poll.RandomPollStrategy;
import com.linkedin.venice.unit.kafka.producer.MockInMemoryProducerAdapter;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.RegionUtils;
import com.linkedin.venice.utils.SystemTime;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.utils.pools.LandFillObjectPool;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.mockito.AdditionalAnswers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(priority = -5)
/* loaded from: input_file:com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTaskTest.class */
public class AdminConsumptionTaskTest {
    private static final int TIMEOUT = 10000;
    private String clusterName;
    private String topicName;
    private final AdminOperationSerializer adminOperationSerializer = new AdminOperationSerializer();
    private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
    private static final String owner = "test_owner";
    private static final String keySchema = "\"string\"";
    private static final String valueSchema = "\"string\"";
    private PubSubConsumerAdapter mockKafkaConsumer;
    private VeniceHelixAdmin admin;
    private OffsetManager offsetManager;
    private AdminTopicMetadataAccessor adminTopicMetadataAccessor;
    private ExecutorService executor;
    private InMemoryKafkaBroker inMemoryKafkaBroker;
    private VeniceWriter veniceWriter;
    private ExecutionIdAccessor executionIdAccessor;
    private static final byte[] emptyKeyBytes = {97};
    private static final String storeName = Utils.getUniqueString("test_store");
    private static final String storeTopicName = storeName + "_v1";

    @BeforeMethod
    public void methodSetup() {
        this.clusterName = Utils.getUniqueString("test-cluster");
        this.topicName = AdminTopicUtils.getTopicNameFromClusterName(this.clusterName);
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(this.topicName);
        this.executor = Executors.newCachedThreadPool();
        this.inMemoryKafkaBroker = new InMemoryKafkaBroker("local");
        this.inMemoryKafkaBroker.createTopic(this.topicName, 1);
        this.veniceWriter = getVeniceWriter(this.inMemoryKafkaBroker);
        this.executionIdAccessor = new InMemoryExecutionIdAccessor();
        this.mockKafkaConsumer = (PubSubConsumerAdapter) Mockito.mock(PubSubConsumerAdapter.class);
        this.admin = (VeniceHelixAdmin) Mockito.mock(VeniceHelixAdmin.class);
        ((VeniceHelixAdmin) Mockito.doReturn(true).when(this.admin)).isLeaderControllerFor(this.clusterName);
        ((VeniceHelixAdmin) Mockito.doReturn(true).when(this.admin)).isAdminTopicConsumptionEnabled(this.clusterName);
        this.offsetManager = new InMemoryOffsetManager();
        this.adminTopicMetadataAccessor = new InMemoryAdminTopicMetadataAccessor();
        TopicManager topicManager = (TopicManager) Mockito.mock(TopicManager.class);
        ((TopicManager) Mockito.doReturn(new HashSet(Arrays.asList(topic))).when(topicManager)).listTopics();
        ((VeniceHelixAdmin) Mockito.doReturn(topicManager).when(this.admin)).getTopicManager();
        ((TopicManager) Mockito.doReturn(true).when(topicManager)).containsTopicAndAllPartitionsAreOnline(topic);
    }

    @AfterMethod
    public void cleanUp() throws InterruptedException {
        TestUtils.shutdownExecutor(this.executor);
        this.veniceWriter.close();
    }

    private VeniceWriter getVeniceWriter(InMemoryKafkaBroker inMemoryKafkaBroker) {
        Properties properties = new Properties();
        properties.put("venice.writer.checksum.type", CheckSumType.NONE.name());
        return new VeniceWriter(new VeniceWriterOptions.Builder(this.topicName).setKeySerializer(new DefaultSerializer()).setValueSerializer(new DefaultSerializer()).setWriteComputeSerializer(new DefaultSerializer()).setPartitioner(new SimplePartitioner()).setTime(SystemTime.INSTANCE).build(), new VeniceProperties(properties), new MockInMemoryProducerAdapter(inMemoryKafkaBroker));
    }

    private AdminConsumptionTask getAdminConsumptionTask(PollStrategy pollStrategy, boolean z) {
        return getAdminConsumptionTask(pollStrategy, z, (AdminConsumptionStats) Mockito.mock(AdminConsumptionStats.class), 10000L, false, null, 3);
    }

    private AdminConsumptionTask getAdminConsumptionTask(PollStrategy pollStrategy, boolean z, AdminConsumptionStats adminConsumptionStats, long j) {
        return getAdminConsumptionTask(pollStrategy, z, adminConsumptionStats, j, false, null, 3);
    }

    private AdminConsumptionTask getAdminConsumptionTask(PollStrategy pollStrategy, boolean z, AdminConsumptionStats adminConsumptionStats, long j, boolean z2, String str, int i) {
        return new AdminConsumptionTask(this.clusterName, new MockInMemoryConsumer(this.inMemoryKafkaBroker, pollStrategy, this.mockKafkaConsumer), z2, Optional.ofNullable(str), this.admin, this.adminTopicMetadataAccessor, this.executionIdAccessor, z, adminConsumptionStats, 1, Optional.empty(), j, i, new PubSubTopicRepository(), new KafkaPubSubMessageDeserializer(new OptimizedKafkaValueSerializer(), new LandFillObjectPool(KafkaMessageEnvelope::new), new LandFillObjectPool(KafkaMessageEnvelope::new)), "dc-0");
    }

    private PubSubTopicPartitionOffset getTopicPartitionOffsetPair(PubSubProduceResult pubSubProduceResult) {
        return new PubSubTopicPartitionOffset(new PubSubTopicPartitionImpl(this.pubSubTopicRepository.getTopic(pubSubProduceResult.getTopic()), pubSubProduceResult.getPartition()), Long.valueOf(pubSubProduceResult.getOffset()));
    }

    private long getLastOffset(String str) {
        return ((Long) AdminTopicMetadataAccessor.getOffsets(this.adminTopicMetadataAccessor.getMetadata(str)).getFirst()).longValue();
    }

    private long getLastExecutionId(String str) {
        return AdminTopicMetadataAccessor.getExecutionId(this.adminTopicMetadataAccessor.getMetadata(str));
    }

    @Test(timeOut = 10000)
    public void testRunWhenNotLeaderController() throws IOException, InterruptedException {
        ((VeniceHelixAdmin) Mockito.doReturn(false).when(this.admin)).isLeaderControllerFor(this.clusterName);
        Runnable adminConsumptionTask = getAdminConsumptionTask(new RandomPollStrategy(), false);
        this.executor.submit(adminConsumptionTask);
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.timeout(10000L).atLeastOnce())).isLeaderControllerFor(this.clusterName);
        ((PubSubConsumerAdapter) Mockito.verify(this.mockKafkaConsumer, Mockito.never())).subscribe((PubSubTopicPartition) Mockito.any(), Mockito.anyLong());
        adminConsumptionTask.close();
        this.executor.shutdown();
        this.executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
    }

    @Test(timeOut = 10000)
    public void testRunWhenTopicDoesNotExistInParent() throws InterruptedException, IOException {
        testRunWhenTopicDoesNotExist(true);
    }

    @Test(timeOut = 10000)
    public void testRunWhenTopicDoesNotExistInNonParent() throws InterruptedException, IOException {
        testRunWhenTopicDoesNotExist(false);
    }

    private void testRunWhenTopicDoesNotExist(boolean z) throws InterruptedException, IOException {
        TopicManager topicManager = (TopicManager) Mockito.mock(TopicManager.class);
        ((TopicManager) Mockito.doReturn(new HashSet()).when(topicManager)).listTopics();
        ((VeniceHelixAdmin) Mockito.doReturn(topicManager).when(this.admin)).getTopicManager();
        Runnable adminConsumptionTask = getAdminConsumptionTask(new RandomPollStrategy(), z);
        this.executor.submit(adminConsumptionTask);
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.timeout(10000L).atLeastOnce())).isLeaderControllerFor(this.clusterName);
        PubSubTopic topic = this.pubSubTopicRepository.getTopic(AdminTopicUtils.getTopicNameFromClusterName(this.clusterName));
        if (z) {
            ((TopicManager) Mockito.verify(topicManager, Mockito.timeout(10000L))).createTopic(topic, 1, 1, true, false, Optional.empty());
            ((PubSubConsumerAdapter) Mockito.verify(this.mockKafkaConsumer, Mockito.timeout(10000L))).subscribe((PubSubTopicPartition) Mockito.any(), Mockito.anyLong());
        } else {
            ((TopicManager) Mockito.verify(topicManager, Mockito.never())).createTopic((PubSubTopic) Mockito.any(), Mockito.anyInt(), Mockito.anyInt(), Mockito.anyBoolean(), Mockito.anyBoolean(), (Optional) Mockito.any());
            ((PubSubConsumerAdapter) Mockito.verify(this.mockKafkaConsumer, Mockito.never())).subscribe((PubSubTopicPartition) Mockito.any(), Mockito.anyLong());
        }
        adminConsumptionTask.close();
        this.executor.shutdown();
        this.executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
    }

    @Test(timeOut = 10000)
    public void testRun() throws InterruptedException, IOException {
        ((VeniceHelixAdmin) Mockito.doReturn(false).when(this.admin)).hasStore(this.clusterName, storeName);
        this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, storeName, owner, "\"string\"", "\"string\"", 1L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        this.veniceWriter.put(emptyKeyBytes, getKillOfflinePushJobMessage(this.clusterName, storeTopicName, 2L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        Runnable adminConsumptionTask = getAdminConsumptionTask(new RandomPollStrategy(), false);
        this.executor.submit(adminConsumptionTask);
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(getLastOffset(this.clusterName), 2L);
        });
        adminConsumptionTask.close();
        this.executor.shutdown();
        this.executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.timeout(10000L).atLeastOnce())).isLeaderControllerFor(this.clusterName);
        ((PubSubConsumerAdapter) Mockito.verify(this.mockKafkaConsumer, Mockito.timeout(10000L))).subscribe((PubSubTopicPartition) Mockito.any(), Mockito.anyLong());
        ((PubSubConsumerAdapter) Mockito.verify(this.mockKafkaConsumer, Mockito.timeout(10000L))).unSubscribe((PubSubTopicPartition) Mockito.any());
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.timeout(10000L))).createStore(this.clusterName, storeName, owner, "\"string\"", "\"string\"", false);
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.timeout(10000L))).killOfflinePush(this.clusterName, storeTopicName, false);
    }

    @Test(timeOut = 10000)
    public void testRunWhenStoreCreationGotExceptionForTheFirstTime() throws InterruptedException, IOException {
        this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, storeName, owner, "\"string\"", "\"string\"", 1L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        ((VeniceHelixAdmin) Mockito.doReturn(false).when(this.admin)).hasStore(this.clusterName, storeName);
        ((VeniceHelixAdmin) Mockito.doThrow(new Throwable[]{new VeniceException("Mock store creation exception")}).doNothing().when(this.admin)).createStore(this.clusterName, storeName, owner, "\"string\"", "\"string\"", false);
        Runnable adminConsumptionTask = getAdminConsumptionTask(new RandomPollStrategy(), false);
        this.executor.submit(adminConsumptionTask);
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(getLastOffset(this.clusterName), 1L);
        });
        adminConsumptionTask.close();
        this.executor.shutdown();
        this.executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.timeout(10000L).atLeastOnce())).isLeaderControllerFor(this.clusterName);
        ((PubSubConsumerAdapter) Mockito.verify(this.mockKafkaConsumer, Mockito.timeout(10000L))).subscribe((PubSubTopicPartition) Mockito.any(), Mockito.anyLong());
        ((PubSubConsumerAdapter) Mockito.verify(this.mockKafkaConsumer, Mockito.timeout(10000L))).unSubscribe((PubSubTopicPartition) Mockito.any());
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.timeout(10000L).times(2))).createStore(this.clusterName, storeName, owner, "\"string\"", "\"string\"", false);
    }

    @Test(timeOut = 10000)
    public void testDelegateExceptionSetsFailingOffset() throws ExecutionException, InterruptedException, IOException {
        long offset = ((PubSubProduceResult) this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, storeName, owner, "\"string\"", "\"string\"", 1L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION).get()).getOffset();
        AdminConsumptionStats adminConsumptionStats = (AdminConsumptionStats) Mockito.mock(AdminConsumptionStats.class);
        ((AdminConsumptionStats) Mockito.doThrow(StringIndexOutOfBoundsException.class).when(adminConsumptionStats)).recordAdminMessageDelegateLatency(Mockito.anyDouble());
        Runnable adminConsumptionTask = getAdminConsumptionTask(new RandomPollStrategy(), false, adminConsumptionStats, 10000L);
        this.executor.submit(adminConsumptionTask);
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(adminConsumptionTask.getFailingOffset(), offset);
        });
        adminConsumptionTask.close();
        this.executor.shutdown();
        this.executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
    }

    @Test(timeOut = 10000)
    public void testConsumeFailedStats() throws IOException, InterruptedException {
        ((VeniceHelixAdmin) Mockito.doReturn(false).when(this.admin)).hasStore(this.clusterName, storeName);
        this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, storeName, owner, "\"string\"", "\"string\"", 1L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        this.veniceWriter.put(emptyKeyBytes, getKillOfflinePushJobMessage(this.clusterName, storeTopicName, 2L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        ((VeniceHelixAdmin) Mockito.doThrow(new Throwable[]{new VeniceException("Mock store creation exception")}).doNothing().when(this.admin)).createStore(this.clusterName, storeName, owner, "\"string\"", "\"string\"", false);
        AdminConsumptionStats adminConsumptionStats = (AdminConsumptionStats) Mockito.mock(AdminConsumptionStats.class);
        Runnable adminConsumptionTask = getAdminConsumptionTask(new RandomPollStrategy(), false, adminConsumptionStats, 10000L);
        this.executor.submit(adminConsumptionTask);
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(adminConsumptionTask.getFailingOffset(), 1L);
        });
        ((AdminConsumptionStats) Mockito.verify(adminConsumptionStats, Mockito.timeout(100L).atLeastOnce())).setAdminConsumptionFailedOffset(1L);
        ((AdminConsumptionStats) Mockito.verify(adminConsumptionStats, Mockito.timeout(100L).atLeastOnce())).recordPendingAdminMessagesCount(2.0d);
        ((AdminConsumptionStats) Mockito.verify(adminConsumptionStats, Mockito.timeout(100L).atLeastOnce())).recordStoresWithPendingAdminMessagesCount(1.0d);
        ((AdminConsumptionStats) Mockito.verify(adminConsumptionStats, Mockito.timeout(100L).atLeastOnce())).recordAdminConsumptionCycleDurationMs(Mockito.anyDouble());
        adminConsumptionTask.close();
        this.executor.shutdown();
        this.executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
        Assert.assertEquals(getLastOffset(this.clusterName), -1L);
    }

    @Test(timeOut = 10000)
    public void testSkipMessageCommand() throws IOException, InterruptedException {
        this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, storeName, owner, "\"string\"", "\"string\"", 1L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        ((VeniceHelixAdmin) Mockito.doReturn(false).when(this.admin)).hasStore(this.clusterName, storeName);
        ((VeniceHelixAdmin) Mockito.doThrow(new Throwable[]{new VeniceException("Mock store creation exception")}).when(this.admin)).createStore(this.clusterName, storeName, owner, "\"string\"", "\"string\"", false);
        Runnable adminConsumptionTask = getAdminConsumptionTask(new RandomPollStrategy(), false);
        this.executor.submit(adminConsumptionTask);
        TestUtils.waitForNonDeterministicCompletion(5L, TimeUnit.SECONDS, () -> {
            try {
                adminConsumptionTask.skipMessageWithOffset(1L);
                return true;
            } catch (VeniceException e) {
                return false;
            }
        });
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(getLastOffset(this.clusterName), 1L);
        });
        adminConsumptionTask.close();
        this.executor.shutdown();
        this.executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
    }

    @Test(timeOut = 10000)
    public void testRunWithDuplicateMessagesWithSameOffset() throws Exception {
        PubSubProduceResult pubSubProduceResult = (PubSubProduceResult) this.veniceWriter.put(emptyKeyBytes, getKillOfflinePushJobMessage(this.clusterName, storeTopicName, 1L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION).get();
        this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, storeName, owner, "\"string\"", "\"string\"", 2L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION).get();
        LinkedList linkedList = new LinkedList();
        linkedList.add(new RandomPollStrategy());
        LinkedList linkedList2 = new LinkedList();
        linkedList2.add(getTopicPartitionOffsetPair(pubSubProduceResult));
        linkedList.add(new ArbitraryOrderingPollStrategy(linkedList2));
        CompositePollStrategy compositePollStrategy = new CompositePollStrategy(linkedList);
        ((VeniceHelixAdmin) Mockito.doReturn(false).when(this.admin)).hasStore(this.clusterName, storeName);
        Runnable adminConsumptionTask = getAdminConsumptionTask(compositePollStrategy, false);
        this.executor.submit(adminConsumptionTask);
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(getLastOffset(this.clusterName), 2L);
        });
        Utils.sleep(1000L);
        adminConsumptionTask.close();
        this.executor.shutdown();
        this.executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.timeout(10000L).atLeastOnce())).isLeaderControllerFor(this.clusterName);
        ((PubSubConsumerAdapter) Mockito.verify(this.mockKafkaConsumer, Mockito.timeout(10000L))).subscribe((PubSubTopicPartition) Mockito.any(), Mockito.anyLong());
        ((PubSubConsumerAdapter) Mockito.verify(this.mockKafkaConsumer, Mockito.timeout(10000L))).unSubscribe((PubSubTopicPartition) Mockito.any());
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.timeout(10000L))).createStore(this.clusterName, storeName, owner, "\"string\"", "\"string\"", false);
    }

    private OffsetRecord getOffsetRecordByOffsetAndSeqNum(long j, int i) {
        PartitionState partitionState = new PartitionState();
        partitionState.endOfPush = false;
        partitionState.offset = j;
        partitionState.lastUpdate = System.currentTimeMillis();
        partitionState.producerStates = new HashMap();
        partitionState.databaseInfo = new HashMap();
        partitionState.upstreamOffsetMap = new VeniceConcurrentHashMap();
        ProducerPartitionState producerPartitionState = new ProducerPartitionState();
        producerPartitionState.segmentNumber = 0;
        producerPartitionState.segmentStatus = SegmentStatus.IN_PROGRESS.getValue();
        producerPartitionState.messageSequenceNumber = i;
        producerPartitionState.messageTimestamp = System.currentTimeMillis();
        producerPartitionState.checksumType = CheckSumType.NONE.getValue();
        producerPartitionState.checksumState = ByteBuffer.allocate(0);
        producerPartitionState.aggregates = new HashMap();
        producerPartitionState.debugInfo = new HashMap();
        partitionState.producerStates.put(GuidUtils.getCharSequenceFromGuid(this.veniceWriter.getProducerGUID()), producerPartitionState);
        return new OffsetRecord(partitionState, AvroProtocolDefinition.PARTITION_STATE.getSerializer());
    }

    @Test(timeOut = 10000)
    public void testRunWhenRestart() throws Exception {
        this.offsetManager.put(this.topicName, 0, getOffsetRecordByOffsetAndSeqNum(1, 1));
        PubSubProduceResult pubSubProduceResult = (PubSubProduceResult) this.veniceWriter.put(emptyKeyBytes, getKillOfflinePushJobMessage(this.clusterName, storeTopicName, 1L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION).get();
        this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, storeName, owner, "\"string\"", "\"string\"", 2L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION).get();
        LinkedList linkedList = new LinkedList();
        linkedList.add(getTopicPartitionOffsetPair(pubSubProduceResult));
        ArbitraryOrderingPollStrategy arbitraryOrderingPollStrategy = new ArbitraryOrderingPollStrategy(linkedList);
        ((VeniceHelixAdmin) Mockito.doReturn(false).when(this.admin)).hasStore(this.clusterName, storeName);
        Runnable adminConsumptionTask = getAdminConsumptionTask(arbitraryOrderingPollStrategy, false);
        this.executor.submit(adminConsumptionTask);
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(getLastOffset(this.clusterName), 2L);
        });
        adminConsumptionTask.close();
        this.executor.shutdown();
        this.executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.timeout(10000L).atLeastOnce())).isLeaderControllerFor(this.clusterName);
        ((PubSubConsumerAdapter) Mockito.verify(this.mockKafkaConsumer, Mockito.timeout(10000L))).subscribe((PubSubTopicPartition) Mockito.any(), Mockito.anyLong());
        ((PubSubConsumerAdapter) Mockito.verify(this.mockKafkaConsumer, Mockito.timeout(10000L))).unSubscribe((PubSubTopicPartition) Mockito.any());
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.timeout(10000L))).createStore(this.clusterName, storeName, owner, "\"string\"", "\"string\"", false);
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.never())).killOfflinePush(this.clusterName, storeTopicName, false);
    }

    @Test(timeOut = 10000)
    public void testRunWithMissingMessages() throws Exception {
        this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, "test_store1", owner, "\"string\"", "\"string\"", 1L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        long offset = ((PubSubProduceResult) this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, "test_store2", owner, "\"string\"", "\"string\"", 2L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION).get()).getOffset();
        this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, "test_store3", owner, "\"string\"", "\"string\"", 3L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        HashSet hashSet = new HashSet();
        hashSet.add(new PubSubTopicPartitionOffset(new PubSubTopicPartitionImpl(this.pubSubTopicRepository.getTopic(this.topicName), 0), Long.valueOf(offset - 1)));
        FilteringPollStrategy filteringPollStrategy = new FilteringPollStrategy(new RandomPollStrategy(false), hashSet);
        ((VeniceHelixAdmin) Mockito.doReturn(false).when(this.admin)).hasStore(this.clusterName, "test_store1");
        ((VeniceHelixAdmin) Mockito.doReturn(false).when(this.admin)).hasStore(this.clusterName, "test_store2");
        ((VeniceHelixAdmin) Mockito.doReturn(false).when(this.admin)).hasStore(this.clusterName, "test_store3");
        AdminConsumptionStats adminConsumptionStats = (AdminConsumptionStats) Mockito.mock(AdminConsumptionStats.class);
        Runnable adminConsumptionTask = getAdminConsumptionTask(filteringPollStrategy, false, adminConsumptionStats, 10000L);
        this.executor.submit(adminConsumptionTask);
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(getLastOffset(this.clusterName), 1L);
        });
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(adminConsumptionTask.getFailingOffset(), 3L);
        });
        adminConsumptionTask.skipMessageWithOffset(3L);
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(getLastOffset(this.clusterName), 3L);
        });
        adminConsumptionTask.close();
        this.executor.shutdownNow();
        this.executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
        ((AdminConsumptionStats) Mockito.verify(adminConsumptionStats, Mockito.atLeastOnce())).recordAdminTopicDIVErrorReportCount();
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.atLeastOnce())).isLeaderControllerFor(this.clusterName);
        ((PubSubConsumerAdapter) Mockito.verify(this.mockKafkaConsumer, Mockito.times(1))).subscribe((PubSubTopicPartition) Mockito.any(), Mockito.anyLong());
        ((PubSubConsumerAdapter) Mockito.verify(this.mockKafkaConsumer, Mockito.times(1))).unSubscribe((PubSubTopicPartition) Mockito.any());
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.times(1))).createStore(this.clusterName, "test_store1", owner, "\"string\"", "\"string\"", false);
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.never())).createStore(this.clusterName, "test_store2", owner, "\"string\"", "\"string\"", false);
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.never())).createStore(this.clusterName, "test_store3", owner, "\"string\"", "\"string\"", false);
        Assert.assertEquals(getLastExecutionId(this.clusterName), 1L);
    }

    @Test(timeOut = 10000)
    public void testRunWithFalsePositiveMissingMessages() throws Exception {
        this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, "test_store1", owner, "\"string\"", "\"string\"", 1L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, "test_store2", owner, "\"string\"", "\"string\"", 3L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, "test_store3", owner, "\"string\"", "\"string\"", 4L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        ((VeniceHelixAdmin) Mockito.doReturn(false).when(this.admin)).hasStore(this.clusterName, "test_store1");
        ((VeniceHelixAdmin) Mockito.doReturn(false).when(this.admin)).hasStore(this.clusterName, "test_store2");
        ((VeniceHelixAdmin) Mockito.doReturn(false).when(this.admin)).hasStore(this.clusterName, "test_store3");
        AdminConsumptionStats adminConsumptionStats = (AdminConsumptionStats) Mockito.mock(AdminConsumptionStats.class);
        Runnable adminConsumptionTask = getAdminConsumptionTask(new RandomPollStrategy(), false, adminConsumptionStats, 10000L);
        this.executor.submit(adminConsumptionTask);
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(getLastOffset(this.clusterName), 3L);
        });
        Assert.assertEquals(adminConsumptionTask.getFailingOffset(), -1L);
        adminConsumptionTask.close();
        ((AdminConsumptionStats) Mockito.verify(adminConsumptionStats, Mockito.never())).recordAdminTopicDIVErrorReportCount();
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.atLeastOnce())).createStore(this.clusterName, "test_store1", owner, "\"string\"", "\"string\"", false);
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.atLeastOnce())).createStore(this.clusterName, "test_store2", owner, "\"string\"", "\"string\"", false);
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.atLeastOnce())).createStore(this.clusterName, "test_store3", owner, "\"string\"", "\"string\"", false);
        Assert.assertEquals(getLastExecutionId(this.clusterName), 4L);
    }

    @Test(timeOut = 10000)
    public void testRunWithFalsePositiveMissingMessagesWhenFirstBecomeLeaderController() throws Exception {
        this.adminTopicMetadataAccessor.updateMetadata(this.clusterName, AdminTopicMetadataAccessor.generateMetadataMap(((PubSubProduceResult) getVeniceWriter(this.inMemoryKafkaBroker).put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, "test_store0", owner, "\"string\"", "\"string\"", 1L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION).get()).getOffset(), -1L, 1L));
        this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, "test_store1", owner, "\"string\"", "\"string\"", 3L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, "test_store2", owner, "\"string\"", "\"string\"", 4L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, "test_store3", owner, "\"string\"", "\"string\"", 5L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        ((VeniceHelixAdmin) Mockito.doReturn(false).when(this.admin)).hasStore(this.clusterName, "test_store0");
        ((VeniceHelixAdmin) Mockito.doReturn(false).when(this.admin)).hasStore(this.clusterName, "test_store1");
        ((VeniceHelixAdmin) Mockito.doReturn(false).when(this.admin)).hasStore(this.clusterName, "test_store2");
        ((VeniceHelixAdmin) Mockito.doReturn(false).when(this.admin)).hasStore(this.clusterName, "test_store3");
        AdminConsumptionStats adminConsumptionStats = (AdminConsumptionStats) Mockito.mock(AdminConsumptionStats.class);
        Runnable adminConsumptionTask = getAdminConsumptionTask(new RandomPollStrategy(), false, adminConsumptionStats, 10000L);
        this.executor.submit(adminConsumptionTask);
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(getLastOffset(this.clusterName), 5L);
        });
        Assert.assertEquals(adminConsumptionTask.getFailingOffset(), -1L);
        adminConsumptionTask.close();
        ((AdminConsumptionStats) Mockito.verify(adminConsumptionStats, Mockito.never())).recordAdminTopicDIVErrorReportCount();
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.atLeastOnce())).createStore(this.clusterName, "test_store1", owner, "\"string\"", "\"string\"", false);
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.never())).createStore(this.clusterName, "test_store0", owner, "\"string\"", "\"string\"", false);
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.atLeastOnce())).createStore(this.clusterName, "test_store2", owner, "\"string\"", "\"string\"", false);
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.atLeastOnce())).createStore(this.clusterName, "test_store3", owner, "\"string\"", "\"string\"", false);
        Assert.assertEquals(getLastExecutionId(this.clusterName), 5L);
    }

    @Test(timeOut = 10000)
    public void testRunWithDuplicateMessagesWithDifferentOffset() throws InterruptedException, IOException {
        this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, storeName, owner, "\"string\"", "\"string\"", 1L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, storeName, owner, "\"string\"", "\"string\"", 1L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        Mockito.when(Boolean.valueOf(this.admin.hasStore(this.clusterName, storeName))).thenReturn(false).thenReturn(true);
        Runnable adminConsumptionTask = getAdminConsumptionTask(new RandomPollStrategy(), false);
        this.executor.submit(adminConsumptionTask);
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(getLastOffset(this.clusterName), 2L);
        });
        adminConsumptionTask.close();
        this.executor.shutdown();
        this.executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.timeout(10000L).atLeastOnce())).isLeaderControllerFor(this.clusterName);
        ((PubSubConsumerAdapter) Mockito.verify(this.mockKafkaConsumer, Mockito.timeout(10000L))).subscribe((PubSubTopicPartition) Mockito.any(), Mockito.anyLong());
        ((PubSubConsumerAdapter) Mockito.verify(this.mockKafkaConsumer, Mockito.timeout(10000L))).unSubscribe((PubSubTopicPartition) Mockito.any());
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.timeout(10000L))).createStore(this.clusterName, storeName, owner, "\"string\"", "\"string\"", false);
    }

    @Test(timeOut = 20000)
    public void testRunWithBiggerStartingOffset() throws InterruptedException, IOException {
        this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, "test_store1", owner, "\"string\"", "\"string\"", 1L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        this.veniceWriter = getVeniceWriter(this.inMemoryKafkaBroker);
        this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, "test_store2", owner, "\"string\"", "\"string\"", 2L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        ((VeniceHelixAdmin) Mockito.doReturn(false).when(this.admin)).hasStore(this.clusterName, "test_store1");
        ((VeniceHelixAdmin) Mockito.doReturn(false).when(this.admin)).hasStore(this.clusterName, "test_store2");
        this.adminTopicMetadataAccessor.updateMetadata(this.clusterName, AdminTopicMetadataAccessor.generateMetadataMap(1L, -1L, 1L));
        Runnable adminConsumptionTask = getAdminConsumptionTask(new RandomPollStrategy(), false);
        this.executor.submit(adminConsumptionTask);
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(getLastOffset(this.clusterName), 3L);
        });
        adminConsumptionTask.close();
        this.executor.shutdown();
        this.executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.timeout(10000L).atLeastOnce())).isLeaderControllerFor(this.clusterName);
        ((PubSubConsumerAdapter) Mockito.verify(this.mockKafkaConsumer, Mockito.timeout(10000L))).subscribe((PubSubTopicPartition) Mockito.any(), Mockito.anyLong());
        ((PubSubConsumerAdapter) Mockito.verify(this.mockKafkaConsumer, Mockito.timeout(10000L))).unSubscribe((PubSubTopicPartition) Mockito.any());
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.never())).createStore(this.clusterName, "test_store1", owner, "\"string\"", "\"string\"", false);
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.atLeastOnce())).createStore(this.clusterName, "test_store2", owner, "\"string\"", "\"string\"", false);
    }

    @Test(timeOut = 10000)
    public void testParentControllerSkipKillOfflinePushJobMessage() throws InterruptedException, IOException {
        this.veniceWriter.put(emptyKeyBytes, getKillOfflinePushJobMessage(this.clusterName, storeTopicName, 1L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        Runnable adminConsumptionTask = getAdminConsumptionTask(new RandomPollStrategy(), true);
        this.executor.submit(adminConsumptionTask);
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(getLastOffset(this.clusterName), 1L);
        });
        adminConsumptionTask.close();
        this.executor.shutdown();
        this.executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.timeout(10000L).atLeastOnce())).isLeaderControllerFor(this.clusterName);
        ((PubSubConsumerAdapter) Mockito.verify(this.mockKafkaConsumer, Mockito.timeout(10000L))).subscribe((PubSubTopicPartition) Mockito.any(), Mockito.anyLong());
        ((PubSubConsumerAdapter) Mockito.verify(this.mockKafkaConsumer, Mockito.timeout(10000L))).unSubscribe((PubSubTopicPartition) Mockito.any());
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.never())).killOfflinePush(this.clusterName, storeTopicName, false);
    }

    @Test
    public void testGetLastSucceedExecutionId() throws Exception {
        Runnable adminConsumptionTask = getAdminConsumptionTask(new RandomPollStrategy(), true);
        this.executor.submit(adminConsumptionTask);
        long j = 1;
        while (true) {
            long j2 = j;
            if (j2 > 3) {
                adminConsumptionTask.close();
                this.executor.shutdown();
                this.executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
                return;
            } else {
                this.veniceWriter.put(emptyKeyBytes, getKillOfflinePushJobMessage(this.clusterName, storeTopicName, j2), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
                TestUtils.waitForNonDeterministicCompletion(10000L, TimeUnit.MILLISECONDS, () -> {
                    Map metadata = this.adminTopicMetadataAccessor.getMetadata(this.clusterName);
                    return ((Long) AdminTopicMetadataAccessor.getOffsets(metadata).getFirst()).longValue() == j2 && AdminTopicMetadataAccessor.getExecutionId(metadata) == j2;
                });
                Assert.assertEquals(adminConsumptionTask.getLastSucceededExecutionId().longValue(), j2, "After consumption succeed, the last succeed execution id should be updated.");
                j = j2 + 1;
            }
        }
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testSetStore(boolean z) throws Exception {
        Runnable adminConsumptionTask = getAdminConsumptionTask(new RandomPollStrategy(), true);
        this.executor.submit(adminConsumptionTask);
        this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, storeName, owner, "\"string\"", "\"string\"", 1L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        boolean z2 = true;
        boolean z3 = true;
        boolean z4 = true;
        boolean z5 = true;
        int i = 48;
        UpdateStore updateStore = (UpdateStore) AdminMessageType.UPDATE_STORE.getNewInstance();
        updateStore.clusterName = this.clusterName;
        updateStore.storeName = storeName;
        updateStore.owner = "test_ownerupdated";
        updateStore.partitionNum = 100;
        updateStore.currentVersion = 100;
        updateStore.enableReads = false;
        updateStore.enableWrites = true;
        updateStore.accessControlled = true;
        updateStore.incrementalPushEnabled = true;
        updateStore.isMigrating = true;
        updateStore.writeComputationEnabled = true;
        updateStore.readComputationEnabled = true;
        updateStore.bootstrapToOnlineTimeoutInHours = 48;
        HybridStoreConfigRecord hybridStoreConfigRecord = new HybridStoreConfigRecord();
        hybridStoreConfigRecord.rewindTimeInSeconds = 123L;
        hybridStoreConfigRecord.offsetLagThresholdToGoOnline = 1000L;
        hybridStoreConfigRecord.producerTimestampLagThresholdToGoOnlineInSeconds = 300L;
        hybridStoreConfigRecord.dataReplicationPolicy = DataReplicationPolicy.AGGREGATE.getValue();
        updateStore.hybridStoreConfig = hybridStoreConfigRecord;
        ETLStoreConfigRecord eTLStoreConfigRecord = new ETLStoreConfigRecord();
        eTLStoreConfigRecord.etledUserProxyAccount = "";
        updateStore.ETLStoreConfig = eTLStoreConfigRecord;
        PartitionerConfigRecord partitionerConfigRecord = new PartitionerConfigRecord();
        partitionerConfigRecord.amplificationFactor = 10;
        partitionerConfigRecord.partitionerParams = new HashMap();
        partitionerConfigRecord.partitionerClass = "dummyClassName";
        updateStore.partitionerConfig = partitionerConfigRecord;
        if (z) {
            updateStore.replicateAllConfigs = true;
            updateStore.updatedConfigsList = Collections.emptyList();
        } else {
            updateStore.replicateAllConfigs = false;
            updateStore.updatedConfigsList = new LinkedList();
            updateStore.updatedConfigsList.add("owner");
            updateStore.updatedConfigsList.add("partition_count");
            updateStore.updatedConfigsList.add("version");
            updateStore.updatedConfigsList.add("enable_reads");
            updateStore.updatedConfigsList.add("enable_writes");
            updateStore.updatedConfigsList.add("access_controlled");
            updateStore.updatedConfigsList.add("incremental_push_enabled");
            updateStore.updatedConfigsList.add("store_migration");
            updateStore.updatedConfigsList.add("write_computation_enabled");
            updateStore.updatedConfigsList.add("read_computation_enabled");
            updateStore.updatedConfigsList.add("bootstrap_to_online_timeout_in_hours");
            updateStore.updatedConfigsList.add("rewind_time_seconds");
            updateStore.updatedConfigsList.add("offset_lag_to_go_online");
            updateStore.updatedConfigsList.add("time_lag_to_go_online");
            updateStore.updatedConfigsList.add("etled_proxy_user_account");
            updateStore.updatedConfigsList.add("amplification_factor");
            updateStore.updatedConfigsList.add("partitioner_class");
            updateStore.updatedConfigsList.add("partitioner_params");
        }
        AdminOperation adminOperation = new AdminOperation();
        adminOperation.operationType = AdminMessageType.UPDATE_STORE.getValue();
        adminOperation.payloadUnion = updateStore;
        adminOperation.executionId = 2L;
        this.veniceWriter.put(emptyKeyBytes, this.adminOperationSerializer.serialize(adminOperation), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(getLastOffset(this.clusterName), 2L);
        });
        adminConsumptionTask.close();
        this.executor.shutdown();
        this.executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.timeout(10000L).atLeastOnce())).updateStore((String) Mockito.eq(this.clusterName), (String) Mockito.eq(storeName), (UpdateStoreQueryParams) Mockito.argThat(updateStoreQueryParams -> {
            return ((Long) updateStoreQueryParams.getHybridRewindSeconds().get()).longValue() == 123 && ((Long) updateStoreQueryParams.getHybridOffsetLagThreshold().get()).longValue() == 1000 && ((Boolean) updateStoreQueryParams.getAccessControlled().get()).booleanValue() == z2 && ((Boolean) updateStoreQueryParams.getStoreMigration().get()).booleanValue() == z3 && ((Boolean) updateStoreQueryParams.getWriteComputationEnabled().get()).booleanValue() == z4 && ((Boolean) updateStoreQueryParams.getReadComputationEnabled().get()).booleanValue() == z5 && ((Integer) updateStoreQueryParams.getBootstrapToOnlineTimeoutInHours().get()).intValue() == i && ((Boolean) updateStoreQueryParams.getIncrementalPushEnabled().get()).booleanValue();
        }));
    }

    @Test(timeOut = 10000)
    public void testStoreIsolation() throws Exception {
        String str = "test_store2";
        this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, "test_store1", owner, "\"string\"", "\"string\"", 1L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, "test_store2", owner, "\"string\"", "\"string\"", 2L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        this.veniceWriter.put(emptyKeyBytes, getKillOfflinePushJobMessage(this.clusterName, "test_store1_v1", 3L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        this.veniceWriter.put(emptyKeyBytes, getKillOfflinePushJobMessage(this.clusterName, "test_store2_v1", 4L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        Mockito.when(Boolean.valueOf(this.admin.hasStore(this.clusterName, "test_store1"))).thenReturn(false);
        Mockito.when(Boolean.valueOf(this.admin.hasStore(this.clusterName, "test_store2"))).thenReturn(false);
        ((VeniceHelixAdmin) Mockito.doThrow(new Throwable[]{new VeniceException("Mock store creation exception")}).when(this.admin)).createStore(this.clusterName, "test_store1", owner, "\"string\"", "\"string\"", false);
        Runnable adminConsumptionTask = getAdminConsumptionTask(new RandomPollStrategy(), false);
        this.executor.submit(adminConsumptionTask);
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(adminConsumptionTask.getFailingOffset(), 1L);
        });
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(((Long) this.executionIdAccessor.getLastSucceededExecutionIdMap(this.clusterName).getOrDefault(str, -1L)).longValue(), 4L);
        });
        Assert.assertEquals(((Long) this.executionIdAccessor.getLastSucceededExecutionIdMap(this.clusterName).getOrDefault("test_store2", -1L)).longValue(), 4L);
        Assert.assertEquals(((Long) this.executionIdAccessor.getLastSucceededExecutionIdMap(this.clusterName).getOrDefault("test_store1", -1L)).longValue(), -1L);
        Assert.assertEquals(getLastOffset(this.clusterName), -1L);
        Assert.assertEquals(getLastExecutionId(this.clusterName), -1L);
        adminConsumptionTask.skipMessageWithOffset(1L);
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(getLastOffset(this.clusterName), 4L);
        });
        Assert.assertEquals(getLastExecutionId(this.clusterName), 4L);
        Assert.assertEquals(((Long) this.executionIdAccessor.getLastSucceededExecutionIdMap(this.clusterName).getOrDefault("test_store1", -1L)).longValue(), 3L);
        Assert.assertEquals(adminConsumptionTask.getFailingOffset(), -1L);
        adminConsumptionTask.close();
        this.executor.shutdown();
        this.executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.timeout(10000L).atLeastOnce())).isLeaderControllerFor(this.clusterName);
        ((PubSubConsumerAdapter) Mockito.verify(this.mockKafkaConsumer, Mockito.timeout(10000L))).subscribe((PubSubTopicPartition) Mockito.any(), Mockito.anyLong());
        ((PubSubConsumerAdapter) Mockito.verify(this.mockKafkaConsumer, Mockito.timeout(10000L))).unSubscribe((PubSubTopicPartition) Mockito.any());
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.atLeastOnce())).createStore(this.clusterName, "test_store1", owner, "\"string\"", "\"string\"", false);
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.times(1))).createStore(this.clusterName, "test_store2", owner, "\"string\"", "\"string\"", false);
    }

    @Test
    public void testResubscribe() throws IOException, InterruptedException, TimeoutException, ExecutionException {
        Runnable adminConsumptionTask = getAdminConsumptionTask(new RandomPollStrategy(), false);
        this.executor.submit(adminConsumptionTask);
        this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, storeName, owner, "\"string\"", "\"string\"", 1L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        this.veniceWriter.put(emptyKeyBytes, getKillOfflinePushJobMessage(this.clusterName, storeTopicName, 2L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(getLastExecutionId(this.clusterName), 2L);
        });
        ((VeniceHelixAdmin) Mockito.doReturn(false).when(this.admin)).isLeaderControllerFor(this.clusterName);
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(adminConsumptionTask.getLastSucceededExecutionId().longValue(), -1L);
        });
        this.veniceWriter.put(emptyKeyBytes, getKillOfflinePushJobMessage(this.clusterName, storeTopicName, 3L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        this.adminTopicMetadataAccessor.updateMetadata(this.clusterName, AdminTopicMetadataAccessor.generateMetadataMap(((PubSubProduceResult) this.veniceWriter.put(emptyKeyBytes, getKillOfflinePushJobMessage(this.clusterName, storeTopicName, 4L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION).get(10000L, TimeUnit.MILLISECONDS)).getOffset(), -1L, 4L));
        this.executionIdAccessor.updateLastSucceededExecutionIdMap(this.clusterName, storeName, 4L);
        ((VeniceHelixAdmin) Mockito.doReturn(true).when(this.admin)).isLeaderControllerFor(this.clusterName);
        Utils.sleep(2000L);
        this.veniceWriter.put(emptyKeyBytes, getKillOfflinePushJobMessage(this.clusterName, storeTopicName, 5L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(getLastExecutionId(this.clusterName), 5L);
        });
        adminConsumptionTask.close();
        this.executor.shutdown();
        this.executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
    }

    @Test(timeOut = 20000)
    public void testResubscribeWithFailedAdminMessages() throws ExecutionException, InterruptedException, IOException {
        Runnable adminConsumptionTask = getAdminConsumptionTask(new RandomPollStrategy(), false);
        this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, storeName, owner, "\"string\"", "\"string\"", 1L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        long offset = ((PubSubProduceResult) this.veniceWriter.put(emptyKeyBytes, getAddVersionMessage(this.clusterName, storeName, "test", 1, 1, 2L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION).get()).getOffset();
        this.veniceWriter.put(emptyKeyBytes, getKillOfflinePushJobMessage(this.clusterName, storeTopicName, 3L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        ((VeniceHelixAdmin) Mockito.doThrow(new Throwable[]{new VeniceException("Mocked add version exception")}).when(this.admin)).addVersionAndStartIngestion(this.clusterName, storeName, "test", 1, 1, Version.PushType.BATCH, (String) null, -1L, 1, false);
        Mockito.when(Boolean.valueOf(this.admin.isLeaderControllerFor(this.clusterName))).thenReturn(true, new Boolean[]{true, true, true, true, false, false, false, true});
        this.executor.submit(adminConsumptionTask);
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(adminConsumptionTask.getFailingOffset(), offset);
        });
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(adminConsumptionTask.getFailingOffset(), -1L);
        });
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(adminConsumptionTask.getFailingOffset(), offset);
        });
        adminConsumptionTask.close();
        this.executor.shutdown();
        this.executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testMigrateFromOffsetManagerToMetadataAccessor() throws InterruptedException, ExecutionException, TimeoutException, IOException {
        Runnable adminConsumptionTask = getAdminConsumptionTask(new RandomPollStrategy(), false);
        this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, storeName, owner, "\"string\"", "\"string\"", 1L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        this.veniceWriter.put(emptyKeyBytes, getKillOfflinePushJobMessage(this.clusterName, storeTopicName, 2L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        long offset = ((PubSubProduceResult) this.veniceWriter.put(emptyKeyBytes, getKillOfflinePushJobMessage(this.clusterName, storeTopicName, 3L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION).get(10000L, TimeUnit.MILLISECONDS)).getOffset();
        OffsetRecord lastOffset = this.offsetManager.getLastOffset(this.topicName, 0);
        lastOffset.setCheckpointLocalVersionTopicOffset(offset);
        this.offsetManager.put(this.topicName, 0, lastOffset);
        this.executionIdAccessor.updateLastSucceededExecutionIdMap(this.clusterName, storeName, 3L);
        this.executionIdAccessor.updateLastSucceededExecutionId(this.clusterName, 3L);
        this.executor.submit(adminConsumptionTask);
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(getLastOffset(this.clusterName), offset);
        });
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(getLastExecutionId(this.clusterName), 3L);
        });
        this.veniceWriter.put(emptyKeyBytes, getKillOfflinePushJobMessage(this.clusterName, storeTopicName, 4L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        long offset2 = ((PubSubProduceResult) this.veniceWriter.put(emptyKeyBytes, getKillOfflinePushJobMessage(this.clusterName, storeTopicName, 5L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION).get(10000L, TimeUnit.MILLISECONDS)).getOffset();
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(getLastExecutionId(this.clusterName), 5L);
        });
        Assert.assertEquals(((Long) this.executionIdAccessor.getLastSucceededExecutionIdMap(this.clusterName).get(storeName)).longValue(), 5L);
        Assert.assertEquals(getLastOffset(this.clusterName), offset2);
        adminConsumptionTask.close();
        this.executor.shutdown();
        this.executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
    }

    @Test(timeOut = 10000)
    public void testSkipDIV() throws InterruptedException, ExecutionException, TimeoutException, IOException {
        this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, storeName, owner, "\"string\"", "\"string\"", 1L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        this.veniceWriter.put(emptyKeyBytes, getKillOfflinePushJobMessage(this.clusterName, storeTopicName, 2L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        this.veniceWriter.put(emptyKeyBytes, getKillOfflinePushJobMessage(this.clusterName, storeTopicName, 3L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        long offset = ((PubSubProduceResult) this.veniceWriter.put(emptyKeyBytes, getKillOfflinePushJobMessage(this.clusterName, storeTopicName, 4L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION).get(10000L, TimeUnit.MILLISECONDS)).getOffset();
        this.veniceWriter.put(emptyKeyBytes, getKillOfflinePushJobMessage(this.clusterName, storeTopicName, 5L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        HashSet hashSet = new HashSet();
        hashSet.add(new PubSubTopicPartitionOffset(new PubSubTopicPartitionImpl(this.pubSubTopicRepository.getTopic(this.topicName), 0), 2L));
        Runnable adminConsumptionTask = getAdminConsumptionTask(new FilteringPollStrategy(new RandomPollStrategy(false), hashSet), false);
        this.executor.submit(adminConsumptionTask);
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(getLastExecutionId(this.clusterName), 2L);
        });
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(adminConsumptionTask.getFailingOffset(), offset);
        });
        adminConsumptionTask.skipMessageDIVWithOffset(offset);
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(getLastExecutionId(this.clusterName), 5L);
        });
        adminConsumptionTask.close();
        this.executor.shutdown();
        this.executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testRetriableConsumptionException() throws InterruptedException, ExecutionException, TimeoutException, IOException {
        AdminConsumptionStats adminConsumptionStats = (AdminConsumptionStats) Mockito.mock(AdminConsumptionStats.class);
        Runnable adminConsumptionTask = getAdminConsumptionTask(new RandomPollStrategy(), false, adminConsumptionStats, 10000L);
        this.executor.submit(adminConsumptionTask);
        ((VeniceHelixAdmin) Mockito.doThrow(new Throwable[]{new VeniceOperationAgainstKafkaTimedOut("Mocking kafka topic creation timeout")}).when(this.admin)).addVersionAndStartIngestion(this.clusterName, storeName, "mock push job id", 1, 1, Version.PushType.BATCH, (String) null, -1L, 1, false);
        long offset = ((PubSubProduceResult) this.veniceWriter.put(emptyKeyBytes, getAddVersionMessage(this.clusterName, storeName, "mock push job id", 1, 1, 1L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION).get(10000L, TimeUnit.MILLISECONDS)).getOffset();
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(adminConsumptionTask.getFailingOffset(), offset);
        });
        ((AdminConsumptionStats) Mockito.verify(adminConsumptionStats, Mockito.atLeastOnce())).recordFailedRetriableAdminConsumption();
        ((AdminConsumptionStats) Mockito.verify(adminConsumptionStats, Mockito.never())).recordFailedAdminConsumption();
        adminConsumptionTask.close();
        this.executor.shutdown();
        this.executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    public void testWithMessageRewind() throws IOException, InterruptedException {
        AdminConsumptionStats adminConsumptionStats = (AdminConsumptionStats) Mockito.mock(AdminConsumptionStats.class);
        Runnable adminConsumptionTask = getAdminConsumptionTask(new RandomPollStrategy(), false, adminConsumptionStats, 10000L);
        this.executor.submit(adminConsumptionTask);
        String composeKafkaTopic = Version.composeKafkaTopic(storeName, 1);
        byte[] bArr = new byte[10];
        int i = 0;
        for (int i2 = 1; i2 < 11; i2++) {
            bArr[i] = getKillOfflinePushJobMessage(this.clusterName, composeKafkaTopic, i2);
            this.veniceWriter.put(emptyKeyBytes, bArr[i], AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
            i++;
        }
        for (int i3 = 4; i3 < 10; i3++) {
            this.veniceWriter.put(emptyKeyBytes, bArr[i3], AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        }
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(getLastExecutionId(this.clusterName), 10L);
        });
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.times(10))).killOfflinePush(this.clusterName, composeKafkaTopic, false);
        ((AdminConsumptionStats) Mockito.verify(adminConsumptionStats, Mockito.never())).recordFailedAdminConsumption();
        ((AdminConsumptionStats) Mockito.verify(adminConsumptionStats, Mockito.never())).recordAdminTopicDIVErrorReportCount();
        adminConsumptionTask.close();
        this.executor.shutdown();
        this.executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testAddingDerivedSchema() throws Exception {
        Runnable adminConsumptionTask = getAdminConsumptionTask(new RandomPollStrategy(), true);
        this.executor.submit(adminConsumptionTask);
        this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, storeName, owner, "\"string\"", "\"string\"", 1L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        DerivedSchemaCreation derivedSchemaCreation = (DerivedSchemaCreation) AdminMessageType.DERIVED_SCHEMA_CREATION.getNewInstance();
        derivedSchemaCreation.clusterName = this.clusterName;
        derivedSchemaCreation.storeName = storeName;
        SchemaMeta schemaMeta = new SchemaMeta();
        schemaMeta.definition = "\"string\"";
        schemaMeta.schemaType = SchemaType.AVRO_1_4.getValue();
        derivedSchemaCreation.schema = schemaMeta;
        derivedSchemaCreation.valueSchemaId = 1;
        derivedSchemaCreation.derivedSchemaId = 2;
        AdminOperation adminOperation = new AdminOperation();
        adminOperation.operationType = AdminMessageType.DERIVED_SCHEMA_CREATION.getValue();
        adminOperation.payloadUnion = derivedSchemaCreation;
        adminOperation.executionId = 2L;
        this.veniceWriter.put(emptyKeyBytes, this.adminOperationSerializer.serialize(adminOperation), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(getLastOffset(this.clusterName), 2L);
        });
        adminConsumptionTask.close();
        this.executor.shutdown();
        this.executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.timeout(10000L).atLeastOnce())).addDerivedSchema((String) Mockito.eq(this.clusterName), (String) Mockito.eq(storeName), Mockito.eq(derivedSchemaCreation.valueSchemaId), Mockito.eq(derivedSchemaCreation.derivedSchemaId), (String) Mockito.eq(schemaMeta.definition.toString()));
    }

    @Test
    public void testAddVersionMsgHandlingForTargetedRegionPush() throws Exception {
        Runnable adminConsumptionTask = getAdminConsumptionTask(new RandomPollStrategy(), false, (AdminConsumptionStats) Mockito.mock(AdminConsumptionStats.class), 10000L);
        this.executor.submit(adminConsumptionTask);
        String str = "mock push job id";
        int i = 1;
        int i2 = 1;
        this.veniceWriter.put(emptyKeyBytes, getAddVersionMessage(this.clusterName, storeName, "mock push job id", 1, 1, 1L, "dc-1"), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        this.veniceWriter.put(emptyKeyBytes, getAddVersionMessage(this.clusterName, storeName, "mock push job id", 1, 1, 2L, "dc-2"), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        this.veniceWriter.put(emptyKeyBytes, getAddVersionMessage(this.clusterName, storeName, "mock push job id", 1, 1, 3L, "dc-0"), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.times(1))).addVersionAndStartIngestion(this.clusterName, storeName, str, i, i2, Version.PushType.BATCH, (String) null, -1L, 1, false);
        });
        adminConsumptionTask.close();
        this.executor.shutdown();
        this.executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
    }

    private byte[] getStoreCreationMessage(String str, String str2, String str3, String str4, String str5, long j) {
        StoreCreation storeCreation = (StoreCreation) AdminMessageType.STORE_CREATION.getNewInstance();
        storeCreation.clusterName = str;
        storeCreation.storeName = str2;
        storeCreation.owner = str3;
        storeCreation.keySchema = new SchemaMeta();
        storeCreation.keySchema.definition = str4;
        storeCreation.keySchema.schemaType = SchemaType.AVRO_1_4.getValue();
        storeCreation.valueSchema = new SchemaMeta();
        storeCreation.valueSchema.definition = str5;
        storeCreation.valueSchema.schemaType = SchemaType.AVRO_1_4.getValue();
        AdminOperation adminOperation = new AdminOperation();
        adminOperation.operationType = AdminMessageType.STORE_CREATION.getValue();
        adminOperation.payloadUnion = storeCreation;
        adminOperation.executionId = j;
        return this.adminOperationSerializer.serialize(adminOperation);
    }

    private byte[] getKillOfflinePushJobMessage(String str, String str2, long j) {
        KillOfflinePushJob killOfflinePushJob = (KillOfflinePushJob) AdminMessageType.KILL_OFFLINE_PUSH_JOB.getNewInstance();
        killOfflinePushJob.clusterName = str;
        killOfflinePushJob.kafkaTopic = str2;
        AdminOperation adminOperation = new AdminOperation();
        adminOperation.operationType = AdminMessageType.KILL_OFFLINE_PUSH_JOB.getValue();
        adminOperation.payloadUnion = killOfflinePushJob;
        adminOperation.executionId = j;
        return this.adminOperationSerializer.serialize(adminOperation);
    }

    private byte[] getAddVersionMessage(String str, String str2, String str3, int i, int i2, long j) {
        return getAddVersionMessage(str, str2, str3, i, i2, j, null);
    }

    private byte[] getAddVersionMessage(String str, String str2, String str3, int i, int i2, long j, String str4) {
        AddVersion addVersion = (AddVersion) AdminMessageType.ADD_VERSION.getNewInstance();
        addVersion.clusterName = str;
        addVersion.storeName = str2;
        addVersion.pushJobId = str3;
        addVersion.versionNum = i;
        addVersion.numberOfPartitions = i2;
        addVersion.rewindTimeInSecondsOverride = -1L;
        addVersion.timestampMetadataVersionId = 1;
        if (str4 != null) {
            addVersion.targetedRegions = new ArrayList(RegionUtils.parseRegionsFilterList(str4));
        }
        AdminOperation adminOperation = new AdminOperation();
        adminOperation.operationType = AdminMessageType.ADD_VERSION.getValue();
        adminOperation.payloadUnion = addVersion;
        adminOperation.executionId = j;
        return this.adminOperationSerializer.serialize(adminOperation);
    }

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = "Admin topic remote consumption is enabled but no config found for the source Kafka bootstrap server url")
    public void testRemoteConsumptionEnabledButRemoteBootstrapUrlsAreMissing() {
        getAdminConsumptionTask(new RandomPollStrategy(), true, (AdminConsumptionStats) Mockito.mock(AdminConsumptionStats.class), 0L, true, null, 3);
    }

    @Test
    public void testRemoteConsumptionEnabledAndRemoteBootstrapUrlsAreGiven() {
        AdminConsumptionStats adminConsumptionStats = (AdminConsumptionStats) Mockito.mock(AdminConsumptionStats.class);
        TopicManager topicManager = (TopicManager) Mockito.mock(TopicManager.class);
        ((VeniceHelixAdmin) Mockito.doReturn(topicManager).when(this.admin)).getTopicManager("remote.pubsub");
        Assert.assertEquals(getAdminConsumptionTask(null, true, adminConsumptionStats, 0L, true, "remote.pubsub", 3).getSourceKafkaClusterTopicManager(), topicManager);
    }

    @Test(timeOut = 10000)
    public void testLongRunningBadTask() throws Exception {
        String str = "test_store2";
        this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, "test_store1", owner, "\"string\"", "\"string\"", 1L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, "test_store2", owner, "\"string\"", "\"string\"", 2L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        this.veniceWriter.put(emptyKeyBytes, getKillOfflinePushJobMessage(this.clusterName, "test_store1_v1", 3L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        this.veniceWriter.put(emptyKeyBytes, getKillOfflinePushJobMessage(this.clusterName, "test_store2_v1", 4L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        Mockito.when(Boolean.valueOf(this.admin.hasStore(this.clusterName, "test_store1"))).thenReturn(false);
        Mockito.when(Boolean.valueOf(this.admin.hasStore(this.clusterName, "test_store2"))).thenReturn(false);
        ((VeniceHelixAdmin) Mockito.doAnswer(AdditionalAnswers.answersWithDelay(2000L, invocationOnMock -> {
            return null;
        })).when(this.admin)).createStore(this.clusterName, "test_store1", owner, "\"string\"", "\"string\"", false);
        Runnable adminConsumptionTask = getAdminConsumptionTask(new RandomPollStrategy(), false, (AdminConsumptionStats) Mockito.mock(AdminConsumptionStats.class), 1000L, false, null, 3);
        this.executor.submit(adminConsumptionTask);
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(((Long) this.executionIdAccessor.getLastSucceededExecutionIdMap(this.clusterName).getOrDefault(str, -1L)).longValue(), 4L);
        });
        Assert.assertEquals(getLastOffset(this.clusterName), -1L);
        Assert.assertEquals(getLastExecutionId(this.clusterName), -1L);
        Assert.assertEquals(adminConsumptionTask.getFailingOffset(), 1L);
        Assert.assertEquals(adminConsumptionTask.getLastSucceededExecutionId().longValue(), -1L);
        Assert.assertNull(adminConsumptionTask.getLastSucceededExecutionId("test_store1"));
        Assert.assertEquals(adminConsumptionTask.getLastSucceededExecutionId("test_store2").longValue(), 4L);
        adminConsumptionTask.skipMessageWithOffset(1L);
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            Assert.assertEquals(getLastOffset(this.clusterName), 4L);
        });
        Assert.assertEquals(getLastExecutionId(this.clusterName), 4L);
        Assert.assertEquals(((Long) this.executionIdAccessor.getLastSucceededExecutionIdMap(this.clusterName).getOrDefault("test_store1", -1L)).longValue(), 3L);
        Assert.assertEquals(adminConsumptionTask.getFailingOffset(), -1L);
        adminConsumptionTask.close();
        this.executor.shutdown();
        this.executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.timeout(10000L).atLeastOnce())).isLeaderControllerFor(this.clusterName);
        ((PubSubConsumerAdapter) Mockito.verify(this.mockKafkaConsumer, Mockito.timeout(10000L))).subscribe((PubSubTopicPartition) Mockito.any(), Mockito.anyLong());
        ((PubSubConsumerAdapter) Mockito.verify(this.mockKafkaConsumer, Mockito.timeout(10000L))).unSubscribe((PubSubTopicPartition) Mockito.any());
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.atLeastOnce())).createStore(this.clusterName, "test_store1", owner, "\"string\"", "\"string\"", false);
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.times(1))).createStore(this.clusterName, "test_store2", owner, "\"string\"", "\"string\"", false);
    }

    @Test(timeOut = 10000)
    public void testSystemStoreMessageOrder() throws InterruptedException, IOException {
        ((VeniceHelixAdmin) Mockito.doThrow(new Throwable[]{new VeniceException("Prevent store creation")}).when(this.admin)).createStore(this.clusterName, storeName, owner, "\"string\"", "\"string\"", false);
        Runnable adminConsumptionTask = getAdminConsumptionTask(new RandomPollStrategy(), false);
        this.executor.submit(adminConsumptionTask);
        this.veniceWriter.put(emptyKeyBytes, getStoreCreationMessage(this.clusterName, storeName, owner, "\"string\"", "\"string\"", 1L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        String systemStoreName = VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getSystemStoreName(storeName);
        this.veniceWriter.put(emptyKeyBytes, getAddVersionMessage(this.clusterName, systemStoreName, "empty_push", 1, 1, 2L), AdminOperationSerializer.LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION);
        TestUtils.waitForNonDeterministicAssertion(10000L, TimeUnit.MILLISECONDS, () -> {
            ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.times(2))).createStore(this.clusterName, storeName, owner, "\"string\"", "\"string\"", false);
        });
        ((VeniceHelixAdmin) Mockito.verify(this.admin, Mockito.never())).addVersionAndStartIngestion(this.clusterName, systemStoreName, "empty_push", 1, 1, Version.PushType.BATCH, (String) null, -1L, 1, false);
        adminConsumptionTask.close();
        this.executor.shutdown();
        this.executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
    }
}
