package com.linkedin.venice.kafka.ssl;

import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterCreateOptions;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.KafkaSSLUtils;
import com.linkedin.venice.utils.SslUtils;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.writer.VeniceWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/kafka/ssl/TestProduceWithSSL.class */
public class TestProduceWithSSL {
    private VeniceClusterWrapper cluster;

    @BeforeClass
    public void setUp() {
        Utils.thisIsLocalhost();
        this.cluster = ServiceFactory.getVeniceCluster(new VeniceClusterCreateOptions.Builder().sslToKafka(true).isKafkaOpenSSLEnabled(false).build());
    }

    @AfterClass
    public void cleanUp() {
        this.cluster.close();
    }

    @Test(timeOut = 60000)
    public void testVeniceWriterSupportSSL() throws ExecutionException, InterruptedException {
        String uniqueString = Utils.getUniqueString("testVeniceWriterSupportSSL");
        this.cluster.getNewStore(uniqueString);
        VersionCreationResponse newVersion = this.cluster.getNewVersion(uniqueString);
        Assert.assertFalse(newVersion.isError());
        int version = newVersion.getVersion();
        VeniceWriter<String, String, byte[]> sslVeniceWriter = this.cluster.getSslVeniceWriter(newVersion.getKafkaTopic());
        sslVeniceWriter.broadcastStartOfPush(new HashMap());
        sslVeniceWriter.put("key", "value", 1);
        sslVeniceWriter.broadcastEndOfPush(new HashMap());
        ControllerClient controllerClient = new ControllerClient(this.cluster.getClusterName(), this.cluster.getAllControllersURLs());
        TestUtils.waitForNonDeterministicCompletion(30L, TimeUnit.SECONDS, () -> {
            return controllerClient.getStore(uniqueString).getStore().getCurrentVersion() == version;
        });
        Assert.assertEquals(((CharSequence) ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(uniqueString).setVeniceURL(this.cluster.getRandomRouterURL()).setSslFactory(SslUtils.getVeniceLocalSslFactory())).get("key").get()).toString(), "value");
        sslVeniceWriter.close();
        controllerClient.close();
    }

    private byte[] readFile(String str) throws IOException {
        File file = new File(str);
        FileInputStream fileInputStream = new FileInputStream(file);
        try {
            byte[] bArr = new byte[(int) file.length()];
            Assert.assertEquals(fileInputStream.read(bArr), file.length());
            fileInputStream.close();
            return bArr;
        } catch (Throwable th) {
            try {
                fileInputStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class, timeOut = 90000)
    public void testVenicePushJobSupportSSL(boolean z) throws Exception {
        VeniceClusterWrapper veniceClusterWrapper = this.cluster;
        if (z) {
            try {
                veniceClusterWrapper = ServiceFactory.getVeniceCluster(new VeniceClusterCreateOptions.Builder().sslToKafka(true).isKafkaOpenSSLEnabled(true).build());
            } catch (Throwable th) {
                if (z) {
                    veniceClusterWrapper.close();
                }
                throw th;
            }
        }
        File tempDataDirectory = TestWriteUtils.getTempDataDirectory();
        String uniqueString = Utils.getUniqueString("store");
        Schema writeSimpleAvroFileWithUserSchema = TestWriteUtils.writeSimpleAvroFileWithUserSchema(tempDataDirectory);
        Properties sslVPJProps = IntegrationTestPushUtils.sslVPJProps(veniceClusterWrapper, "file://" + tempDataDirectory.getAbsolutePath(), uniqueString);
        sslVPJProps.setProperty("venice.ssl.enable", Boolean.TRUE.toString());
        sslVPJProps.setProperty("ssl.key.store.property.name", "ssl.identity");
        sslVPJProps.setProperty("ssl.trust.store.property.name", "ssl.truststore");
        sslVPJProps.setProperty("ssl.key.store.password.property.name", "ssl.identity.keystore.password");
        sslVPJProps.setProperty("ssl.key.password.property.name", "ssl.identity.key.password");
        sslVPJProps.setProperty("kif.record.reader.kafka.send.buffer.bytes", Integer.toString(4194304));
        Properties localCommonKafkaSSLConfig = KafkaSSLUtils.getLocalCommonKafkaSSLConfig();
        byte[] readFile = readFile(localCommonKafkaSSLConfig.getProperty("ssl.keystore.location"));
        byte[] readFile2 = readFile(localCommonKafkaSSLConfig.getProperty("ssl.truststore.location"));
        Credentials credentials = new Credentials();
        credentials.addSecretKey(new Text("ssl.identity"), readFile);
        credentials.addSecretKey(new Text("ssl.truststore"), readFile2);
        credentials.addSecretKey(new Text("ssl.identity.keystore.password"), localCommonKafkaSSLConfig.getProperty("ssl.keystore.password").getBytes(StandardCharsets.UTF_8));
        credentials.addSecretKey(new Text("ssl.identity.key.password"), localCommonKafkaSSLConfig.getProperty("ssl.key.password").getBytes(StandardCharsets.UTF_8));
        UserGroupInformation.getCurrentUser().addCredentials(credentials);
        String str = TestWriteUtils.getTempDataDirectory().getAbsolutePath() + "/testHadoopToken";
        credentials.writeTokenStorageFile(new Path(str), new Configuration());
        System.setProperty("HADOOP_TOKEN_FILE_LOCATION", str);
        Assert.assertEquals(System.getProperty("HADOOP_TOKEN_FILE_LOCATION"), str);
        IntegrationTestPushUtils.createStoreForJob(veniceClusterWrapper.getClusterName(), writeSimpleAvroFileWithUserSchema, sslVPJProps).close();
        ControllerClient controllerClient = new ControllerClient(veniceClusterWrapper.getClusterName(), veniceClusterWrapper.getAllControllersURLs());
        Assert.assertEquals(controllerClient.getStore(uniqueString).getStore().getCurrentVersion(), 0, "Push has not been start, current should be 0");
        TestWriteUtils.runPushJob("Test push job", sslVPJProps);
        TestUtils.waitForNonDeterministicCompletion(30L, TimeUnit.SECONDS, () -> {
            return controllerClient.getStore(uniqueString).getStore().getCurrentVersion() == 1;
        });
        sslVPJProps.setProperty("source.kafka", "true");
        sslVPJProps.setProperty("kafka.input.broker.url", veniceClusterWrapper.getKafka().getSSLAddress());
        TestWriteUtils.runPushJob("Test Kafka re-push job", sslVPJProps);
        TestUtils.waitForNonDeterministicCompletion(30L, TimeUnit.SECONDS, () -> {
            return controllerClient.getStore(uniqueString).getStore().getCurrentVersion() == 2;
        });
        ControllerResponse updateStore = controllerClient.updateStore(uniqueString, new UpdateStoreQueryParams().setCompressionStrategy(CompressionStrategy.ZSTD_WITH_DICT));
        if (updateStore.isError()) {
            throw new VeniceException(updateStore.getError());
        }
        sslVPJProps.setProperty("source.kafka", "false");
        TestWriteUtils.runPushJob("Test push job with dictionary compression", sslVPJProps);
        TestUtils.waitForNonDeterministicCompletion(30L, TimeUnit.SECONDS, () -> {
            return controllerClient.getStore(uniqueString).getStore().getCurrentVersion() == 3;
        });
        sslVPJProps.setProperty("source.kafka", "true");
        TestWriteUtils.runPushJob("Test Kafka re-push job with dictionary compression", sslVPJProps);
        TestUtils.waitForNonDeterministicCompletion(30L, TimeUnit.SECONDS, () -> {
            return controllerClient.getStore(uniqueString).getStore().getCurrentVersion() == 4;
        });
        if (z) {
            veniceClusterWrapper.close();
        }
    }
}
