package com.linkedin.davinci.consumer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.linkedin.d2.balancer.D2Client;
import com.linkedin.data.ByteString;
import com.linkedin.davinci.consumer.VeniceChangelogConsumerClientFactory;
import com.linkedin.r2.message.rest.RestRequest;
import com.linkedin.r2.message.rest.RestResponse;
import com.linkedin.venice.client.store.schemas.TestKeyRecord;
import com.linkedin.venice.controllerapi.D2ControllerClient;
import com.linkedin.venice.controllerapi.D2ServiceDiscoveryResponse;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.ViewConfigImpl;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.schema.SchemaReader;
import com.linkedin.venice.serialization.KafkaKeySerializer;
import com.linkedin.venice.serialization.avro.KafkaValueSerializer;
import com.linkedin.venice.utils.ObjectMapperFactory;
import com.linkedin.venice.views.ChangeCaptureView;
import io.tehuti.metrics.MetricsRepository;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/davinci/consumer/VeniceChangelogConsumerClientFactoryTest.class */
public class VeniceChangelogConsumerClientFactoryTest {
    private static final String STORE_NAME = "dybbuk_store";
    private static final String VIEW_NAME = "mazzikim_view";
    private static final String TEST_CLUSTER = "golem_cluster";

    @Test
    public void testGetChangelogConsumer() throws ExecutionException, InterruptedException, JsonProcessingException {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "http://www.fooAddress.linkedin.com:16337");
        properties.put("key.deserializer", KafkaKeySerializer.class);
        properties.put("value.deserializer", KafkaValueSerializer.class);
        properties.put("receive.buffer.bytes", 1048576);
        SchemaReader schemaReader = (SchemaReader) Mockito.mock(SchemaReader.class);
        Mockito.when(schemaReader.getKeySchema()).thenReturn(TestKeyRecord.SCHEMA$);
        PubSubConsumerAdapter pubSubConsumerAdapter = (PubSubConsumerAdapter) Mockito.mock(PubSubConsumerAdapter.class);
        ChangelogClientConfig schemaReader2 = new ChangelogClientConfig().setConsumerProperties(properties).setSchemaReader(schemaReader);
        VeniceChangelogConsumerClientFactory veniceChangelogConsumerClientFactory = new VeniceChangelogConsumerClientFactory(schemaReader2, new MetricsRepository());
        D2ControllerClient d2ControllerClient = (D2ControllerClient) Mockito.mock(D2ControllerClient.class);
        veniceChangelogConsumerClientFactory.setD2ControllerClient(d2ControllerClient);
        veniceChangelogConsumerClientFactory.setConsumer(pubSubConsumerAdapter);
        StoreResponse storeResponse = (StoreResponse) Mockito.mock(StoreResponse.class);
        Mockito.when(Boolean.valueOf(storeResponse.isError())).thenReturn(false);
        StoreInfo storeInfo = new StoreInfo();
        storeInfo.setPartitionCount(1);
        storeInfo.setCurrentVersion(1);
        ViewConfigImpl viewConfigImpl = new ViewConfigImpl(ChangeCaptureView.class.getCanonicalName(), new HashMap());
        HashMap hashMap = new HashMap();
        hashMap.put(VIEW_NAME, viewConfigImpl);
        storeInfo.setViewConfigs(hashMap);
        Mockito.when(storeResponse.getStore()).thenReturn(storeInfo);
        Mockito.when(d2ControllerClient.getStore(STORE_NAME)).thenReturn(storeResponse);
        Assert.assertTrue(veniceChangelogConsumerClientFactory.getChangelogConsumer(STORE_NAME) instanceof VeniceAfterImageConsumerImpl);
        schemaReader2.setViewName(VIEW_NAME);
        Assert.assertTrue(veniceChangelogConsumerClientFactory.getChangelogConsumer(STORE_NAME) instanceof VeniceChangelogConsumerImpl);
        D2ServiceDiscoveryResponse d2ServiceDiscoveryResponse = new D2ServiceDiscoveryResponse();
        d2ServiceDiscoveryResponse.setCluster(TEST_CLUSTER);
        d2ServiceDiscoveryResponse.setD2Service("TEST_ROUTER_D2_SERVICE");
        d2ServiceDiscoveryResponse.setServerD2Service("TEST_SERVER_D2_SERVICE");
        d2ServiceDiscoveryResponse.setName(STORE_NAME);
        String writeValueAsString = ObjectMapperFactory.getInstance().writeValueAsString(d2ServiceDiscoveryResponse);
        RestResponse restResponse = (RestResponse) Mockito.mock(RestResponse.class);
        ((RestResponse) Mockito.doReturn(ByteString.unsafeWrap(writeValueAsString.getBytes(StandardCharsets.UTF_8))).when(restResponse)).getEntity();
        D2Client d2Client = (D2Client) Mockito.mock(D2Client.class);
        Future future = (Future) Mockito.mock(Future.class);
        Mockito.when(future.get()).thenReturn(restResponse);
        Mockito.when(d2Client.restRequest((RestRequest) Mockito.any())).thenReturn(future);
        schemaReader2.setD2Client(d2Client).setD2ControllerClient((D2ControllerClient) null).setControllerRequestRetryCount(1);
        VeniceChangelogConsumerClientFactory.ViewClassGetter viewClassGetter = (str, str2, d2ControllerClient2, i) -> {
            return ChangeCaptureView.class.getCanonicalName();
        };
        VeniceChangelogConsumerClientFactory veniceChangelogConsumerClientFactory2 = new VeniceChangelogConsumerClientFactory(schemaReader2, new MetricsRepository());
        veniceChangelogConsumerClientFactory2.setViewClassGetter(viewClassGetter);
        veniceChangelogConsumerClientFactory2.setD2ControllerClient(d2ControllerClient);
        veniceChangelogConsumerClientFactory2.setConsumer(pubSubConsumerAdapter);
        Assert.assertTrue(veniceChangelogConsumerClientFactory2.getChangelogConsumer(STORE_NAME) instanceof VeniceChangelogConsumerImpl);
    }

    @Test
    public void testGetChangelogConsumerThrowsException() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "http://www.fooAddress.linkedin.com:16337");
        properties.put("key.deserializer", KafkaKeySerializer.class);
        properties.put("value.deserializer", KafkaValueSerializer.class);
        properties.put("receive.buffer.bytes", 1048576);
        SchemaReader schemaReader = (SchemaReader) Mockito.mock(SchemaReader.class);
        Mockito.when(schemaReader.getKeySchema()).thenReturn(TestKeyRecord.SCHEMA$);
        PubSubConsumerAdapter pubSubConsumerAdapter = (PubSubConsumerAdapter) Mockito.mock(PubSubConsumerAdapter.class);
        ChangelogClientConfig schemaReader2 = new ChangelogClientConfig().setConsumerProperties(properties).setSchemaReader(schemaReader);
        VeniceChangelogConsumerClientFactory veniceChangelogConsumerClientFactory = new VeniceChangelogConsumerClientFactory(schemaReader2, new MetricsRepository());
        D2ControllerClient d2ControllerClient = (D2ControllerClient) Mockito.mock(D2ControllerClient.class);
        veniceChangelogConsumerClientFactory.setConsumer(pubSubConsumerAdapter);
        StoreResponse storeResponse = (StoreResponse) Mockito.mock(StoreResponse.class);
        Mockito.when(Boolean.valueOf(storeResponse.isError())).thenReturn(false);
        StoreInfo storeInfo = new StoreInfo();
        storeInfo.setPartitionCount(1);
        storeInfo.setCurrentVersion(1);
        storeInfo.setViewConfigs(new HashMap());
        Mockito.when(storeResponse.getStore()).thenReturn(storeInfo);
        Mockito.when(d2ControllerClient.getStore(STORE_NAME)).thenReturn(storeResponse);
        schemaReader2.setViewName(VIEW_NAME);
        Assert.assertThrows(() -> {
            veniceChangelogConsumerClientFactory.getChangelogConsumer(STORE_NAME);
        });
    }
}
