package com.linkedin.davinci;

import com.linkedin.davinci.compression.StorageEngineBackedCompressorFactory;
import com.linkedin.davinci.config.VeniceConfigLoader;
import com.linkedin.davinci.ingestion.DaVinciIngestionBackend;
import com.linkedin.davinci.kafka.consumer.StoreIngestionService;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.OfflinePushStrategy;
import com.linkedin.venice.meta.PersistenceType;
import com.linkedin.venice.meta.ReadStrategy;
import com.linkedin.venice.meta.RoutingStrategy;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.SubscriptionBasedReadOnlyStoreRepository;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionImpl;
import com.linkedin.venice.meta.ZKStore;
import com.linkedin.venice.utils.ComplementSet;
import com.linkedin.venice.utils.PropertyBuilder;
import com.linkedin.venice.utils.ReferenceCounted;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import io.tehuti.Metric;
import io.tehuti.metrics.MetricsRepository;
import java.io.File;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.mockito.AdditionalAnswers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/davinci/StoreBackendTest.class */
public class StoreBackendTest {
    Store store;
    Version version1;
    Version version2;
    File baseDataPath;
    DaVinciBackend backend;
    StoreBackend storeBackend;
    Map<String, VersionBackend> versionMap;
    MetricsRepository metricsRepository;
    StorageService storageService;
    DaVinciIngestionBackend ingestionBackend;
    StorageEngineBackedCompressorFactory compressorFactory;

    @BeforeMethod
    void setUp() {
        this.baseDataPath = Utils.getTempDataDirectory();
        VeniceProperties build = new PropertyBuilder().put("cluster.name", "test-cluster").put("zookeeper.address", "test-zookeeper").put("kafka.bootstrap.servers", "test-kafka").put("data.base.path", this.baseDataPath.getAbsolutePath()).build();
        ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService) Mockito.mock(ScheduledExecutorService.class);
        ((ScheduledExecutorService) Mockito.doAnswer(AdditionalAnswers.answerVoid((v0) -> {
            v0.run();
        })).when(scheduledExecutorService)).execute((Runnable) Mockito.any());
        this.versionMap = new HashMap();
        this.metricsRepository = new MetricsRepository();
        this.storageService = (StorageService) Mockito.mock(StorageService.class);
        this.ingestionBackend = (DaVinciIngestionBackend) Mockito.mock(DaVinciIngestionBackend.class);
        this.compressorFactory = (StorageEngineBackedCompressorFactory) Mockito.mock(StorageEngineBackedCompressorFactory.class);
        Mockito.when(this.ingestionBackend.getStorageService()).thenReturn(this.storageService);
        this.backend = (DaVinciBackend) Mockito.mock(DaVinciBackend.class);
        Mockito.when(this.backend.getExecutor()).thenReturn(scheduledExecutorService);
        Mockito.when(this.backend.getConfigLoader()).thenReturn(new VeniceConfigLoader(build));
        Mockito.when(this.backend.getMetricsRepository()).thenReturn(this.metricsRepository);
        Mockito.when(this.backend.getStoreRepository()).thenReturn((SubscriptionBasedReadOnlyStoreRepository) Mockito.mock(SubscriptionBasedReadOnlyStoreRepository.class));
        Mockito.when(this.backend.getStorageService()).thenReturn(this.storageService);
        Mockito.when(this.backend.getIngestionService()).thenReturn((StoreIngestionService) Mockito.mock(StoreIngestionService.class));
        Mockito.when(this.backend.getVersionByTopicMap()).thenReturn(this.versionMap);
        Mockito.when(this.backend.getVeniceLatestNonFaultyVersion(Mockito.anyString(), Mockito.anySet())).thenCallRealMethod();
        Mockito.when(this.backend.getVeniceCurrentVersion(Mockito.anyString())).thenCallRealMethod();
        Mockito.when(this.backend.getIngestionBackend()).thenReturn(this.ingestionBackend);
        Mockito.when(this.backend.getCompressorFactory()).thenReturn(this.compressorFactory);
        ((DaVinciBackend) Mockito.doCallRealMethod().when(this.backend)).handleStoreChanged((StoreBackend) Mockito.any());
        this.store = new ZKStore("test-store", (String) null, 0L, PersistenceType.ROCKS_DB, RoutingStrategy.CONSISTENT_HASH, ReadStrategy.ANY_OF_ONLINE, OfflinePushStrategy.WAIT_ALL_REPLICAS, 1);
        this.version1 = new VersionImpl(this.store.getName(), this.store.peekNextVersion().getNumber(), (String) null, 5);
        this.store.addVersion(this.version1);
        this.version2 = new VersionImpl(this.store.getName(), this.store.peekNextVersion().getNumber(), (String) null, 3);
        this.store.addVersion(this.version2);
        this.store.setCurrentVersion(this.version1.getNumber());
        Mockito.when(this.backend.getStoreRepository().getStoreOrThrow(this.store.getName())).thenReturn(this.store);
        this.storeBackend = new StoreBackend(this.backend, this.store.getName());
        Mockito.when(this.backend.getStoreOrThrow(this.store.getName())).thenReturn(this.storeBackend);
    }

    private double getMetric(String str) {
        Metric metric = this.metricsRepository.getMetric("." + this.store.getName() + "--" + str);
        Assert.assertNotNull(metric, "Expected metric " + str + " not found.");
        return metric.value();
    }

    @Test
    void testSubscribeEmptyStore() {
        this.store.setVersions(Collections.emptyList());
        this.store.setCurrentVersion(0);
        Assert.assertThrows(VeniceException.class, () -> {
            this.storeBackend.subscribe(ComplementSet.universalSet());
        });
    }

    @Test
    void testSubscribeCurrentVersion() throws Exception {
        this.version1.setAge(Duration.ofHours(1L));
        this.version2.setAge(Duration.ofMinutes(5L));
        Assert.assertEquals(Double.valueOf(getMetric("data_age_ms.Gauge")), Double.valueOf(Double.NaN));
        CompletableFuture subscribe = this.storeBackend.subscribe(ComplementSet.of(new Integer[]{0}));
        TimeUnit.MILLISECONDS.sleep(100L);
        this.versionMap.get(this.version1.kafkaTopicName()).completePartition(0);
        subscribe.get(0L, TimeUnit.SECONDS);
        ReferenceCounted daVinciCurrentVersion = this.storeBackend.getDaVinciCurrentVersion();
        try {
            Assert.assertEquals(((VersionBackend) daVinciCurrentVersion.get()).getVersion().getNumber(), this.version1.getNumber());
            if (daVinciCurrentVersion != null) {
                daVinciCurrentVersion.close();
            }
            Assert.assertEquals(Double.valueOf(getMetric("current_version.Gauge")), Double.valueOf(this.version1.getNumber()));
            Assert.assertEquals(Double.valueOf(getMetric("future_version.Gauge")), Double.valueOf(this.version2.getNumber()));
            Assert.assertTrue(Math.abs(getMetric("data_age_ms.Gauge") - ((double) this.version1.getAge().toMillis())) < 1000.0d);
            Assert.assertTrue(Math.abs(getMetric("subscribe_duration_ms.Avg") - ((double) 100)) < 50.0d);
            TimeUnit.MILLISECONDS.sleep(200 - 100);
            this.versionMap.get(this.version2.kafkaTopicName()).completePartition(0);
            ReferenceCounted daVinciCurrentVersion2 = this.storeBackend.getDaVinciCurrentVersion();
            try {
                Assert.assertEquals(((VersionBackend) daVinciCurrentVersion2.get()).getVersion().getNumber(), this.version1.getNumber());
                if (daVinciCurrentVersion2 != null) {
                    daVinciCurrentVersion2.close();
                }
                this.store.setCurrentVersion(this.version2.getNumber());
                this.backend.handleStoreChanged(this.storeBackend);
                daVinciCurrentVersion = this.storeBackend.getDaVinciCurrentVersion();
                try {
                    Assert.assertEquals(((VersionBackend) daVinciCurrentVersion.get()).getVersion().getNumber(), this.version2.getNumber());
                    if (daVinciCurrentVersion != null) {
                        daVinciCurrentVersion.close();
                    }
                    Assert.assertEquals(Double.valueOf(getMetric("current_version.Gauge")), Double.valueOf(this.version2.getNumber()));
                    Assert.assertTrue(Math.abs(getMetric("data_age_ms.Gauge") - ((double) this.version2.getAge().toMillis())) < 1000.0d);
                    Assert.assertTrue(Math.abs(getMetric("subscribe_duration_ms.Avg") - (((double) (100 + 200)) / 2.0d)) < 50.0d);
                    Assert.assertTrue(Math.abs(getMetric("subscribe_duration_ms.Max") - ((double) 200)) < 50.0d);
                } finally {
                    if (daVinciCurrentVersion != null) {
                        try {
                            daVinciCurrentVersion.close();
                        } catch (Throwable th) {
                            th.addSuppressed(th);
                        }
                    }
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    void testSubscribeSlowCurrentVersion() throws Exception {
        Assert.assertFalse(this.storeBackend.subscribe(ComplementSet.of(new Integer[]{0})).isDone());
        ReferenceCounted daVinciCurrentVersion = this.storeBackend.getDaVinciCurrentVersion();
        try {
            Assert.assertEquals(((VersionBackend) daVinciCurrentVersion.get()).getVersion().getNumber(), this.version1.getNumber());
            if (daVinciCurrentVersion != null) {
                daVinciCurrentVersion.close();
            }
            this.versionMap.get(this.version2.kafkaTopicName()).completePartition(0);
            daVinciCurrentVersion = this.storeBackend.getDaVinciCurrentVersion();
            try {
                Assert.assertEquals(((VersionBackend) daVinciCurrentVersion.get()).getVersion().getNumber(), this.version1.getNumber());
                if (daVinciCurrentVersion != null) {
                    daVinciCurrentVersion.close();
                }
                this.store.setCurrentVersion(this.version2.getNumber());
                this.backend.handleStoreChanged(this.storeBackend);
                daVinciCurrentVersion = this.storeBackend.getDaVinciCurrentVersion();
                try {
                    Assert.assertEquals(((VersionBackend) daVinciCurrentVersion.get()).getVersion().getNumber(), this.version2.getNumber());
                    if (daVinciCurrentVersion != null) {
                        daVinciCurrentVersion.close();
                    }
                } finally {
                }
            } finally {
            }
        } finally {
            if (daVinciCurrentVersion != null) {
                try {
                    daVinciCurrentVersion.close();
                } catch (Throwable th) {
                    th.addSuppressed(th);
                }
            }
        }
    }

    @Test
    void testSubscribeWithoutCurrentVersion() throws Exception {
        this.store.setCurrentVersion(0);
        this.backend.handleStoreChanged(this.storeBackend);
        CompletableFuture subscribe = this.storeBackend.subscribe(ComplementSet.of(new Integer[]{1}));
        this.versionMap.get(this.version2.kafkaTopicName()).completePartition(1);
        subscribe.get(0L, TimeUnit.SECONDS);
        ReferenceCounted daVinciCurrentVersion = this.storeBackend.getDaVinciCurrentVersion();
        try {
            Assert.assertEquals(((VersionBackend) daVinciCurrentVersion.get()).getVersion().getNumber(), this.version2.getNumber());
            if (daVinciCurrentVersion != null) {
                daVinciCurrentVersion.close();
            }
        } catch (Throwable th) {
            if (daVinciCurrentVersion != null) {
                try {
                    daVinciCurrentVersion.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testSubscribeBootstrapVersion() throws Exception {
        this.store.addVersion(new VersionImpl(this.store.getName(), this.store.peekNextVersion().getNumber(), (String) null, 15));
        this.store.setCurrentVersion(this.version2.getNumber());
        this.backend.handleStoreChanged(this.storeBackend);
        CompletableFuture subscribe = this.storeBackend.subscribe(ComplementSet.of(new Integer[]{2}), Optional.of(this.version1));
        this.versionMap.get(this.version1.kafkaTopicName()).completePartition(2);
        subscribe.get(0L, TimeUnit.SECONDS);
        ReferenceCounted daVinciCurrentVersion = this.storeBackend.getDaVinciCurrentVersion();
        try {
            Assert.assertEquals(((VersionBackend) daVinciCurrentVersion.get()).getVersion().getNumber(), this.version1.getNumber());
            if (daVinciCurrentVersion != null) {
                daVinciCurrentVersion.close();
            }
            this.versionMap.get(this.version2.kafkaTopicName()).completePartition(2);
            daVinciCurrentVersion = this.storeBackend.getDaVinciCurrentVersion();
            try {
                Assert.assertEquals(((VersionBackend) daVinciCurrentVersion.get()).getVersion().getNumber(), this.version2.getNumber());
                if (daVinciCurrentVersion != null) {
                    daVinciCurrentVersion.close();
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    void testFutureVersionFailure() throws Exception {
        CompletableFuture subscribe = this.storeBackend.subscribe(ComplementSet.of(new Integer[]{1}));
        this.versionMap.get(this.version1.kafkaTopicName()).completePartition(1);
        subscribe.get(0L, TimeUnit.SECONDS);
        Assert.assertTrue(this.versionMap.containsKey(this.version2.kafkaTopicName()));
        this.store.deleteVersion(this.version2.getNumber());
        this.backend.handleStoreChanged(this.storeBackend);
        Assert.assertFalse(this.versionMap.containsKey(this.version2.kafkaTopicName()));
        ((DaVinciIngestionBackend) Mockito.verify(this.ingestionBackend, Mockito.times(1))).removeStorageEngine((String) Mockito.eq(this.version2.kafkaTopicName()));
        VersionImpl versionImpl = new VersionImpl(this.store.getName(), this.store.peekNextVersion().getNumber(), (String) null, 15);
        this.store.addVersion(versionImpl);
        this.backend.handleStoreChanged(this.storeBackend);
        VersionImpl versionImpl2 = new VersionImpl(this.store.getName(), this.store.peekNextVersion().getNumber(), (String) null, 20);
        this.store.addVersion(versionImpl2);
        this.backend.handleStoreChanged(this.storeBackend);
        this.versionMap.get(versionImpl.kafkaTopicName()).completePartitionExceptionally(1, new Exception());
        ReferenceCounted daVinciCurrentVersion = this.storeBackend.getDaVinciCurrentVersion();
        try {
            Assert.assertEquals(((VersionBackend) daVinciCurrentVersion.get()).getVersion().getNumber(), this.version1.getNumber());
            if (daVinciCurrentVersion != null) {
                daVinciCurrentVersion.close();
            }
            this.versionMap.get(versionImpl2.kafkaTopicName()).completePartition(1);
            daVinciCurrentVersion = this.storeBackend.getDaVinciCurrentVersion();
            try {
                Assert.assertEquals(((VersionBackend) daVinciCurrentVersion.get()).getVersion().getNumber(), this.version1.getNumber());
                if (daVinciCurrentVersion != null) {
                    daVinciCurrentVersion.close();
                }
                this.store.setCurrentVersion(versionImpl2.getNumber());
                this.backend.handleStoreChanged(this.storeBackend);
                daVinciCurrentVersion = this.storeBackend.getDaVinciCurrentVersion();
                try {
                    Assert.assertEquals(((VersionBackend) daVinciCurrentVersion.get()).getVersion().getNumber(), versionImpl2.getNumber());
                    if (daVinciCurrentVersion != null) {
                        daVinciCurrentVersion.close();
                    }
                    VersionImpl versionImpl3 = new VersionImpl(this.store.getName(), this.store.peekNextVersion().getNumber(), (String) null, 30);
                    this.store.addVersion(versionImpl3);
                    this.backend.handleStoreChanged(this.storeBackend);
                    this.versionMap.get(versionImpl3.kafkaTopicName()).completePartitionExceptionally(1, new Exception());
                    Assert.assertFalse(this.versionMap.containsKey(versionImpl3.kafkaTopicName()));
                    ((DaVinciIngestionBackend) Mockito.verify(this.ingestionBackend, Mockito.times(1))).removeStorageEngine((String) Mockito.eq(versionImpl3.kafkaTopicName()));
                    this.storeBackend.trySubscribeDaVinciFutureVersion();
                    Assert.assertFalse(this.versionMap.containsKey(versionImpl3.kafkaTopicName()));
                } finally {
                    if (daVinciCurrentVersion != null) {
                        try {
                            daVinciCurrentVersion.close();
                        } catch (Throwable th) {
                            th.addSuppressed(th);
                        }
                    }
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    void testSubscribeUnsubscribe() throws Exception {
        CompletableFuture subscribe = this.storeBackend.subscribe(ComplementSet.of(new Integer[]{0, 1}));
        this.versionMap.get(this.version1.kafkaTopicName()).completePartition(0);
        Assert.assertFalse(subscribe.isDone());
        this.storeBackend.unsubscribe(ComplementSet.of(new Integer[]{1}));
        subscribe.get(0L, TimeUnit.SECONDS);
        ReferenceCounted daVinciCurrentVersion = this.storeBackend.getDaVinciCurrentVersion();
        try {
            Assert.assertTrue(((VersionBackend) daVinciCurrentVersion.get()).isPartitionReadyToServe(0));
            Assert.assertFalse(((VersionBackend) daVinciCurrentVersion.get()).isPartitionReadyToServe(1));
            if (daVinciCurrentVersion != null) {
                daVinciCurrentVersion.close();
            }
            CompletableFuture subscribe2 = this.storeBackend.subscribe(ComplementSet.universalSet());
            this.storeBackend.unsubscribe(ComplementSet.universalSet());
            subscribe2.get(0L, TimeUnit.SECONDS);
            Assert.assertTrue(this.versionMap.isEmpty());
            Assert.assertEquals(FileUtils.sizeOfDirectory(this.baseDataPath), 0L);
            ((DaVinciIngestionBackend) Mockito.verify(this.ingestionBackend, Mockito.times(this.store.getVersions().size()))).removeStorageEngine((String) Mockito.any());
            ((StorageEngineBackedCompressorFactory) Mockito.verify(this.compressorFactory, Mockito.times(this.store.getVersions().size()))).removeVersionSpecificCompressor((String) Mockito.any());
        } catch (Throwable th) {
            if (daVinciCurrentVersion != null) {
                try {
                    daVinciCurrentVersion.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testSubscribeClose() {
        CompletableFuture subscribe = this.storeBackend.subscribe(ComplementSet.universalSet());
        this.storeBackend.close();
        Assert.assertThrows(CompletionException.class, () -> {
            subscribe.getNow(null);
        });
        ((StorageService) Mockito.verify(this.backend.getStorageService(), Mockito.never())).removeStorageEngine((String) Mockito.any());
    }

    @Test
    void testSubscribeDelete() {
        CompletableFuture subscribe = this.storeBackend.subscribe(ComplementSet.universalSet());
        this.storeBackend.delete();
        Assert.assertThrows(CompletionException.class, () -> {
            subscribe.getNow(null);
        });
        Assert.assertTrue(this.versionMap.isEmpty());
        Assert.assertEquals(FileUtils.sizeOfDirectory(this.baseDataPath), 0L);
        ((DaVinciIngestionBackend) Mockito.verify(this.ingestionBackend, Mockito.times(this.store.getVersions().size()))).removeStorageEngine((String) Mockito.any());
    }

    @Test
    void testRollbackAndRollForward() {
        this.storeBackend.subscribe(ComplementSet.of(new Integer[]{1}));
        this.versionMap.get(this.version1.kafkaTopicName()).completePartition(1);
        this.store.setCurrentVersion(this.version2.getNumber());
        this.backend.handleStoreChanged(this.storeBackend);
        this.versionMap.get(this.version2.kafkaTopicName()).completePartition(1);
        VersionImpl versionImpl = new VersionImpl(this.store.getName(), this.store.peekNextVersion().getNumber(), (String) null, 3);
        this.store.addVersion(versionImpl);
        this.backend.handleStoreChanged(this.storeBackend);
        this.versionMap.get(versionImpl.kafkaTopicName()).completePartition(1);
        this.store.setCurrentVersion(versionImpl.getNumber());
        this.backend.handleStoreChanged(this.storeBackend);
        ReferenceCounted daVinciCurrentVersion = this.storeBackend.getDaVinciCurrentVersion();
        try {
            Assert.assertEquals(((VersionBackend) daVinciCurrentVersion.get()).getVersion().getNumber(), versionImpl.getNumber());
            if (daVinciCurrentVersion != null) {
                daVinciCurrentVersion.close();
            }
            this.store.setCurrentVersion(1);
            this.backend.handleStoreChanged(this.storeBackend);
            this.versionMap.get(this.version1.kafkaTopicName()).completePartition(1);
            ReferenceCounted daVinciCurrentVersion2 = this.storeBackend.getDaVinciCurrentVersion();
            try {
                Assert.assertEquals(((VersionBackend) daVinciCurrentVersion2.get()).getVersion().getNumber(), this.version1.getNumber());
                if (daVinciCurrentVersion2 != null) {
                    daVinciCurrentVersion2.close();
                }
                this.versionMap.get(this.version2.kafkaTopicName()).completePartition(1);
                this.store.setCurrentVersion(3);
                this.backend.handleStoreChanged(this.storeBackend);
                this.versionMap.get(versionImpl.kafkaTopicName()).completePartition(1);
                daVinciCurrentVersion = this.storeBackend.getDaVinciCurrentVersion();
                try {
                    Assert.assertEquals(((VersionBackend) daVinciCurrentVersion.get()).getVersion().getNumber(), versionImpl.getNumber());
                    if (daVinciCurrentVersion != null) {
                        daVinciCurrentVersion.close();
                    }
                } finally {
                }
            } finally {
            }
        } finally {
            if (daVinciCurrentVersion != null) {
                try {
                    daVinciCurrentVersion.close();
                } catch (Throwable th) {
                    th.addSuppressed(th);
                }
            }
        }
    }
}
