package com.linkedin.venice.controller.server;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.venice.acl.DynamicAccessController;
import com.linkedin.venice.controller.Admin;
import com.linkedin.venice.controllerapi.ControllerRoute;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.BufferReplayPolicy;
import com.linkedin.venice.meta.DataReplicationPolicy;
import com.linkedin.venice.meta.HybridStoreConfigImpl;
import com.linkedin.venice.meta.OfflinePushStrategy;
import com.linkedin.venice.meta.PersistenceType;
import com.linkedin.venice.meta.ReadStrategy;
import com.linkedin.venice.meta.RoutingStrategy;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionImpl;
import com.linkedin.venice.meta.ZKStore;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.ObjectMapperFactory;
import java.security.cert.X509Certificate;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import javax.security.auth.x500.X500Principal;
import javax.servlet.http.HttpServletRequest;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import spark.QueryParamsMap;
import spark.Request;
import spark.Response;
import spark.Route;

/* loaded from: input_file:com/linkedin/venice/controller/server/CreateVersionTest.class */
public class CreateVersionTest {
    private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.getInstance();
    private static final String CLUSTER_NAME = "test_cluster";
    private static final String STORE_NAME = "test_store";
    private static final String USER = "test_user";
    private static final String JOB_ID = "push_1";
    private Admin admin;
    private X509Certificate certificate;
    private Request request;
    private Response response;
    private DynamicAccessController accessClient;

    @BeforeMethod
    public void setUp() {
        this.admin = (Admin) Mockito.mock(Admin.class);
        this.request = (Request) Mockito.mock(Request.class);
        this.response = (Response) Mockito.mock(Response.class);
        this.accessClient = (DynamicAccessController) Mockito.mock(DynamicAccessController.class);
        this.certificate = (X509Certificate) Mockito.mock(X509Certificate.class);
        HttpServletRequest httpServletRequest = (HttpServletRequest) Mockito.mock(HttpServletRequest.class);
        X509Certificate[] x509CertificateArr = {this.certificate};
        X500Principal x500Principal = new X500Principal("CN=test_user");
        HashMap hashMap = new HashMap();
        hashMap.put("cluster_name", new String[]{CLUSTER_NAME});
        hashMap.put("store_name", new String[]{STORE_NAME});
        hashMap.put("store_size", new String[]{"0"});
        hashMap.put("push_type", new String[]{Version.PushType.INCREMENTAL.name()});
        hashMap.put("push_job_id", new String[]{JOB_ID});
        hashMap.put("hostname", new String[]{"localhost"});
        QueryParamsMap queryParamsMap = new QueryParamsMap(httpServletRequest);
        ((X509Certificate) Mockito.doReturn(x500Principal).when(this.certificate)).getSubjectX500Principal();
        ((Request) Mockito.doReturn(httpServletRequest).when(this.request)).raw();
        ((Request) Mockito.doReturn(queryParamsMap).when(this.request)).queryMap();
        ((Request) Mockito.doReturn(ControllerRoute.REQUEST_TOPIC.getPath()).when(this.request)).pathInfo();
        for (Map.Entry entry : hashMap.entrySet()) {
            ((Request) Mockito.doReturn(((String[]) entry.getValue())[0]).when(this.request)).queryParams((String) entry.getKey());
        }
        ((HttpServletRequest) Mockito.doReturn(hashMap).when(httpServletRequest)).getParameterMap();
        ((HttpServletRequest) Mockito.doReturn(x509CertificateArr).when(httpServletRequest)).getAttribute("javax.servlet.request.X509Certificate");
        ((Admin) Mockito.doReturn(true).when(this.admin)).isLeaderControllerFor(CLUSTER_NAME);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testCreateVersionWithACL(boolean z) throws Exception {
        Route requestTopicForPushing = new CreateVersion(true, Optional.of(this.accessClient), z, false, Optional.empty()).requestTopicForPushing(this.admin);
        ((DynamicAccessController) Mockito.doReturn(false).when(this.accessClient)).isAllowlistUsers(this.certificate, STORE_NAME, "GET");
        ((DynamicAccessController) Mockito.doReturn(false).when(this.accessClient)).hasAccessToTopic(this.certificate, STORE_NAME, "Write");
        requestTopicForPushing.handle(this.request, this.response);
        ((Response) Mockito.verify(this.response)).status(403);
        if (z) {
            this.response = (Response) Mockito.mock(Response.class);
            ((DynamicAccessController) Mockito.doReturn(true).when(this.accessClient)).hasAccessToTopic(this.certificate, STORE_NAME, "Write");
            ((DynamicAccessController) Mockito.doReturn(false).when(this.accessClient)).hasAccessToTopic(this.certificate, STORE_NAME, "Read");
            requestTopicForPushing.handle(this.request, this.response);
            ((Response) Mockito.verify(this.response)).status(403);
        }
    }

    @Test(description = "requestTopicForPushing should return an RT topic when store is hybrid and inc-push is enabled")
    public void testRequestTopicForIncPushReturnsRTTopicWhenStoreIsHybridAndIncPushIsEnabled() throws Exception {
        ((Admin) Mockito.doReturn(true).when(this.admin)).whetherEnableBatchPushFromAdmin(STORE_NAME);
        ((Request) Mockito.doCallRealMethod().when(this.request)).queryParamOrDefault((String) Mockito.any(), (String) Mockito.any());
        ((DynamicAccessController) Mockito.doReturn(true).when(this.accessClient)).isAllowlistUsers(this.certificate, STORE_NAME, "GET");
        Store hybridTestStore = getHybridTestStore();
        hybridTestStore.setIncrementalPushEnabled(true);
        ((Admin) Mockito.doReturn(hybridTestStore).when(this.admin)).getStore(CLUSTER_NAME, STORE_NAME);
        ((Admin) Mockito.doReturn(new VersionImpl(STORE_NAME, 1, JOB_ID)).when(this.admin)).incrementVersionIdempotent(CLUSTER_NAME, STORE_NAME, JOB_ID, 0, 0, Version.PushType.INCREMENTAL, false, false, (String) null, Optional.empty(), Optional.of(this.certificate), -1L, Optional.empty(), false, (String) null);
        Assert.assertTrue(hybridTestStore.isHybrid());
        Assert.assertTrue(hybridTestStore.isIncrementalPushEnabled());
        Object handle = new CreateVersion(true, Optional.of(this.accessClient), false, false, Optional.empty()).requestTopicForPushing(this.admin).handle(this.request, this.response);
        Assert.assertNotNull(handle);
        Assert.assertEquals(((VersionCreationResponse) OBJECT_MAPPER.readValue(handle.toString(), VersionCreationResponse.class)).getKafkaTopic(), "test_store_rt");
    }

    @Test(description = "requestTopicForPushing should an ERROR when store is not in hybrid but inc-push is enabled")
    public void testRequestTopicForIncPushReturnsErrorWhenStoreIsNotHybridAndIncPushIsEnabled() throws Exception {
        ((Admin) Mockito.doReturn(true).when(this.admin)).whetherEnableBatchPushFromAdmin(STORE_NAME);
        ((Request) Mockito.doCallRealMethod().when(this.request)).queryParamOrDefault((String) Mockito.any(), (String) Mockito.any());
        ((DynamicAccessController) Mockito.doReturn(true).when(this.accessClient)).isAllowlistUsers(this.certificate, STORE_NAME, "GET");
        ZKStore zKStore = new ZKStore(STORE_NAME, "abc@linkedin.com", 10L, PersistenceType.ROCKS_DB, RoutingStrategy.CONSISTENT_HASH, ReadStrategy.ANY_OF_ONLINE, OfflinePushStrategy.WAIT_ALL_REPLICAS, 1);
        zKStore.setIncrementalPushEnabled(true);
        ((Admin) Mockito.doReturn(zKStore).when(this.admin)).getStore(CLUSTER_NAME, STORE_NAME);
        ((Admin) Mockito.doReturn(new VersionImpl(STORE_NAME, 1, JOB_ID)).when(this.admin)).incrementVersionIdempotent(CLUSTER_NAME, STORE_NAME, JOB_ID, 0, 0, Version.PushType.INCREMENTAL, false, false, (String) null, Optional.empty(), Optional.of(this.certificate), -1L, Optional.empty(), false, (String) null);
        Assert.assertFalse(zKStore.isHybrid());
        Assert.assertTrue(zKStore.isIncrementalPushEnabled());
        Object handle = new CreateVersion(true, Optional.of(this.accessClient), false, false, Optional.empty()).requestTopicForPushing(this.admin).handle(this.request, this.response);
        Assert.assertNotNull(handle);
        ((Response) Mockito.verify(this.response)).status(400);
        VersionCreationResponse versionCreationResponse = (VersionCreationResponse) OBJECT_MAPPER.readValue(handle.toString(), VersionCreationResponse.class);
        Assert.assertTrue(versionCreationResponse.isError());
        Assert.assertTrue(versionCreationResponse.getError().contains("which does not have hybrid mode enabled"));
        Assert.assertNull(versionCreationResponse.getKafkaTopic());
    }

    @Test
    public void testRequestTopicForIncPushCanUseEmergencyRegionWhenItIsSet() throws Exception {
        Store hybridTestStore = getHybridTestStore();
        hybridTestStore.setIncrementalPushEnabled(true);
        hybridTestStore.setActiveActiveReplicationEnabled(true);
        VersionImpl versionImpl = new VersionImpl(STORE_NAME, 1, JOB_ID);
        Optional of = Optional.of("dc-1");
        ((Admin) Mockito.doReturn(true).when(this.admin)).whetherEnableBatchPushFromAdmin(STORE_NAME);
        ((Admin) Mockito.doReturn(true).when(this.admin)).isParent();
        ((Admin) Mockito.doReturn(true).when(this.admin)).isActiveActiveReplicationEnabledInAllRegion((String) Mockito.any(), (String) Mockito.any(), ArgumentMatchers.anyBoolean());
        ((Admin) Mockito.doReturn(hybridTestStore).when(this.admin)).getStore(CLUSTER_NAME, STORE_NAME);
        ((Admin) Mockito.doReturn("default-src.region.io").when(this.admin)).getKafkaBootstrapServers(ArgumentMatchers.anyBoolean());
        ((Admin) Mockito.doReturn(of).when(this.admin)).getEmergencySourceRegion();
        ((Request) Mockito.doCallRealMethod().when(this.request)).queryParamOrDefault((String) Mockito.any(), (String) Mockito.any());
        ((DynamicAccessController) Mockito.doReturn(true).when(this.accessClient)).isAllowlistUsers(this.certificate, STORE_NAME, "GET");
        ((Admin) Mockito.doReturn("dc-1.region.io").when(this.admin)).getNativeReplicationKafkaBootstrapServerAddress((String) of.get());
        ((Admin) Mockito.doReturn(versionImpl).when(this.admin)).incrementVersionIdempotent(CLUSTER_NAME, STORE_NAME, JOB_ID, 0, 0, Version.PushType.INCREMENTAL, false, false, (String) null, Optional.empty(), Optional.of(this.certificate), -1L, of, false, (String) null);
        Assert.assertTrue(hybridTestStore.isHybrid());
        Assert.assertTrue(hybridTestStore.isIncrementalPushEnabled());
        Assert.assertTrue(this.admin.isParent());
        Object handle = new CreateVersion(true, Optional.of(this.accessClient), false, false, Optional.empty()).requestTopicForPushing(this.admin).handle(this.request, this.response);
        Assert.assertNotNull(handle);
        Assert.assertEquals(((VersionCreationResponse) OBJECT_MAPPER.readValue(handle.toString(), VersionCreationResponse.class)).getKafkaBootstrapServers(), "dc-1.region.io");
    }

    private Store getHybridTestStore() {
        ZKStore zKStore = new ZKStore(STORE_NAME, "abc@linkedin.com", 10L, PersistenceType.ROCKS_DB, RoutingStrategy.CONSISTENT_HASH, ReadStrategy.ANY_OF_ONLINE, OfflinePushStrategy.WAIT_ALL_REPLICAS, 1);
        zKStore.setHybridStoreConfig(new HybridStoreConfigImpl(0L, 1L, -1L, DataReplicationPolicy.NON_AGGREGATE, BufferReplayPolicy.REWIND_FROM_EOP));
        return zKStore;
    }

    @Test
    public void testOverrideSourceRegionAddressForIncrementalPushJob() {
        VersionCreationResponse versionCreationResponse = new VersionCreationResponse();
        versionCreationResponse.setKafkaBootstrapServers("default.src.region.com");
        ((Admin) Mockito.doReturn(Optional.empty()).when(this.admin)).getAggregateRealTimeTopicSource(CLUSTER_NAME);
        CreateVersion.overrideSourceRegionAddressForIncrementalPushJob(this.admin, versionCreationResponse, CLUSTER_NAME, (String) null, (String) null, false, true);
        Assert.assertEquals(versionCreationResponse.getKafkaBootstrapServers(), "default.src.region.com");
        VersionCreationResponse versionCreationResponse2 = new VersionCreationResponse();
        versionCreationResponse2.setKafkaBootstrapServers("default.src.region.com");
        ((Admin) Mockito.doReturn(Optional.of("agg.rt.region.com")).when(this.admin)).getAggregateRealTimeTopicSource(CLUSTER_NAME);
        CreateVersion.overrideSourceRegionAddressForIncrementalPushJob(this.admin, versionCreationResponse2, CLUSTER_NAME, (String) null, (String) null, false, true);
        Assert.assertEquals(versionCreationResponse2.getKafkaBootstrapServers(), "agg.rt.region.com");
        VersionCreationResponse versionCreationResponse3 = new VersionCreationResponse();
        versionCreationResponse3.setKafkaBootstrapServers("default.src.region.com");
        CreateVersion.overrideSourceRegionAddressForIncrementalPushJob(this.admin, versionCreationResponse3, CLUSTER_NAME, (String) null, (String) null, false, false);
        Assert.assertEquals(versionCreationResponse3.getKafkaBootstrapServers(), "default.src.region.com");
        VersionCreationResponse versionCreationResponse4 = new VersionCreationResponse();
        versionCreationResponse4.setKafkaBootstrapServers("default.src.region.com");
        CreateVersion.overrideSourceRegionAddressForIncrementalPushJob(this.admin, versionCreationResponse4, CLUSTER_NAME, (String) null, (String) null, true, false);
        Assert.assertEquals(versionCreationResponse4.getKafkaBootstrapServers(), "default.src.region.com");
        VersionCreationResponse versionCreationResponse5 = new VersionCreationResponse();
        versionCreationResponse5.setKafkaBootstrapServers("default.src.region.com");
        CreateVersion.overrideSourceRegionAddressForIncrementalPushJob(this.admin, versionCreationResponse5, CLUSTER_NAME, (String) null, (String) null, true, true);
        Assert.assertEquals(versionCreationResponse5.getKafkaBootstrapServers(), "default.src.region.com");
        VersionCreationResponse versionCreationResponse6 = new VersionCreationResponse();
        versionCreationResponse6.setKafkaBootstrapServers("default.src.region.com");
        ((Admin) Mockito.doReturn("vpj.src.region.com").when(this.admin)).getNativeReplicationKafkaBootstrapServerAddress("dc-vpj");
        CreateVersion.overrideSourceRegionAddressForIncrementalPushJob(this.admin, versionCreationResponse6, CLUSTER_NAME, (String) null, "dc-vpj", true, true);
        Assert.assertEquals(versionCreationResponse6.getKafkaBootstrapServers(), "vpj.src.region.com");
        VersionCreationResponse versionCreationResponse7 = new VersionCreationResponse();
        versionCreationResponse7.setKafkaBootstrapServers("emergency.src.region.com");
        ((Admin) Mockito.doReturn("emergency.src.region.com").when(this.admin)).getNativeReplicationKafkaBootstrapServerAddress("dc-e");
        CreateVersion.overrideSourceRegionAddressForIncrementalPushJob(this.admin, versionCreationResponse7, CLUSTER_NAME, "dc-e", "dc-vpj", true, true);
        Assert.assertEquals(versionCreationResponse7.getKafkaBootstrapServers(), "emergency.src.region.com");
        VersionCreationResponse versionCreationResponse8 = new VersionCreationResponse();
        versionCreationResponse8.setKafkaBootstrapServers("emergency.src.region.com");
        ((Admin) Mockito.doReturn("emergency.src.region.com").when(this.admin)).getNativeReplicationKafkaBootstrapServerAddress("dc-e");
        CreateVersion.overrideSourceRegionAddressForIncrementalPushJob(this.admin, versionCreationResponse8, CLUSTER_NAME, "dc-e", (String) null, true, true);
        Assert.assertEquals(versionCreationResponse8.getKafkaBootstrapServers(), "emergency.src.region.com");
    }

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = "Failed to get the broker server URL for the source region: dc1")
    public void testOverrideSourceRegionAddressForIncrementalPushJobWhenOverrideRegionAddressIsNotFound() {
        VersionCreationResponse versionCreationResponse = new VersionCreationResponse();
        versionCreationResponse.setKafkaBootstrapServers("default.src.region.com");
        ((Admin) Mockito.doReturn((Object) null).when(this.admin)).getNativeReplicationKafkaBootstrapServerAddress("dc1");
        CreateVersion.overrideSourceRegionAddressForIncrementalPushJob(this.admin, versionCreationResponse, CLUSTER_NAME, "dc1", (String) null, true, true);
    }
}
