package com.linkedin.venice.kafka;

import com.google.common.collect.ImmutableMap;
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.integration.utils.PubSubBrokerConfigs;
import com.linkedin.venice.integration.utils.PubSubBrokerWrapper;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterCreateOptions;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.ZkServerWrapper;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.KeyAndValueSchemas;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
import io.tehuti.metrics.MetricsRepository;
import java.io.Closeable;
import java.io.File;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.IndexedRecord;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/kafka/EndToEndKafkaWithSASLTest.class */
public class EndToEndKafkaWithSASLTest {
    ZkServerWrapper zkServer;
    PubSubBrokerWrapper pubSubBrokerWrapper;
    private VeniceClusterWrapper veniceCluster;

    @BeforeClass(alwaysRun = true)
    public void setUp() throws VeniceClientException {
        Utils.thisIsLocalhost();
        this.zkServer = ServiceFactory.getZkServer();
        this.pubSubBrokerWrapper = ServiceFactory.getPubSubBroker(new PubSubBrokerConfigs.Builder().setZkWrapper(this.zkServer).setAdditionalBrokerConfiguration(ImmutableMap.of("listeners", "SASL_PLAINTEXT://$HOSTNAME:$PORT", "inter.broker.listener.name", "SASL_PLAINTEXT", "sasl.mechanism.inter.broker.protocol", "PLAIN", "sasl.enabled.mechanisms", "PLAIN", "listener.name.sasl_plaintext.plain.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required\n    user_admin=\"admin-secret\" username=\"admin\" password=\"admin-secret\"\n;")).build());
        Properties properties = new Properties();
        properties.put("kafka.security.protocol", "SASL_PLAINTEXT");
        properties.put("kafka.sasl.mechanism", "PLAIN");
        properties.put("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";");
        this.veniceCluster = ServiceFactory.getVeniceCluster(new VeniceClusterCreateOptions.Builder().numberOfControllers(1).numberOfServers(1).numberOfRouters(1).replicationFactor(1).partitionSize(1).minActiveReplica(0).sslToStorageNodes(false).sslToKafka(false).zkServerWrapper(this.zkServer).kafkaBrokerWrapper(this.pubSubBrokerWrapper).extraProperties(properties).build());
    }

    @AfterClass(alwaysRun = true)
    public void cleanUp() {
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.veniceCluster});
    }

    @Test
    public void testPushJob() throws Exception {
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        KeyAndValueSchemas keyAndValueSchemas = new KeyAndValueSchemas(TestWriteUtils.writeSchemaWithUnknownFieldIntoAvroFile(tempDataDirectory));
        String uniqueString = Utils.getUniqueString("store");
        Properties defaultVPJProps = IntegrationTestPushUtils.defaultVPJProps(this.veniceCluster, "file://" + tempDataDirectory.getAbsolutePath(), uniqueString);
        defaultVPJProps.put("kafka.security.protocol", "SASL_PLAINTEXT");
        defaultVPJProps.put("kafka.sasl.mechanism", "PLAIN");
        defaultVPJProps.put("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required     username=\"admin\"     password=\"admin-secret\";");
        IntegrationTestPushUtils.createStoreForJob(this.veniceCluster.getClusterName(), keyAndValueSchemas.getKey().toString(), keyAndValueSchemas.getValue().toString(), defaultVPJProps, new UpdateStoreQueryParams()).close();
        TestWriteUtils.runPushJob("Test Batch push job", defaultVPJProps);
        this.veniceCluster.refreshAllRouterMetaData();
        AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(this.veniceCluster.getRandomRouterURL()).setMetricsRepository(new MetricsRepository()));
        try {
            GenericData.Record record = new GenericData.Record(AvroCompatibilityHelper.parse(new String[]{TestWriteUtils.loadFileAsString("SchemaWithoutSymbolDoc.avsc")}).getField("key").schema());
            Schema schema = record.getSchema().getField("source").schema();
            record.put("memberId", 1L);
            record.put("source", AvroCompatibilityHelper.newEnumSymbol(schema, TestWriteUtils.TestRecordType.OFFLINE.toString()));
            IndexedRecord indexedRecord = (IndexedRecord) andStartGenericAvroClient.get(record).get();
            Assert.assertEquals(indexedRecord.get(0).toString(), "LOGO");
            Assert.assertEquals(indexedRecord.get(1), 1);
            if (andStartGenericAvroClient != null) {
                andStartGenericAvroClient.close();
            }
        } catch (Throwable th) {
            if (andStartGenericAvroClient != null) {
                try {
                    andStartGenericAvroClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
