package org.apache.kafka.streams.integration;

import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import kafka.admin.AdminUtils;
import kafka.log.LogConfig;
import kafka.utils.MockTime;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import scala.Tuple2;
import scala.collection.Iterator;

/* loaded from: input_file:org/apache/kafka/streams/integration/InternalTopicIntegrationTest.class */
public class InternalTopicIntegrationTest {
    private static final int NUM_BROKERS = 1;

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
    private static final String DEFAULT_INPUT_TOPIC = "inputTopic";
    private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
    private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10000;
    private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8000;
    private Properties streamsConfiguration;
    private final MockTime mockTime = CLUSTER.time;
    private String applicationId = "compact-topics-integration-test";

    @BeforeClass
    public static void startKafkaCluster() throws Exception {
        CLUSTER.createTopic(DEFAULT_INPUT_TOPIC);
        CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC);
    }

    @Before
    public void before() {
        this.streamsConfiguration = new Properties();
        this.streamsConfiguration.put("application.id", this.applicationId);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("key.serde", Serdes.String().getClass().getName());
        this.streamsConfiguration.put("value.serde", Serdes.String().getClass().getName());
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("commit.interval.ms", 100);
        this.streamsConfiguration.put("cache.max.bytes.buffering", 0);
        this.streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
    }

    private Properties getTopicConfigProperties(String str) {
        ZkClient zkClient = new ZkClient(CLUSTER.zKConnectString(), DEFAULT_ZK_SESSION_TIMEOUT_MS, DEFAULT_ZK_CONNECTION_TIMEOUT_MS, ZKStringSerializer$.MODULE$);
        try {
            Iterator it = AdminUtils.fetchAllTopicConfigs(new ZkUtils(zkClient, new ZkConnection(CLUSTER.zKConnectString()), false)).iterator();
            while (it.hasNext()) {
                Tuple2 tuple2 = (Tuple2) it.next();
                String str2 = (String) tuple2._1;
                Properties properties = (Properties) tuple2._2;
                if (str2.equals(str)) {
                    return properties;
                }
            }
            Properties properties2 = new Properties();
            zkClient.close();
            return properties2;
        } finally {
            zkClient.close();
        }
    }

    @Test
    public void shouldCompactTopicsForStateChangelogs() throws Exception {
        Serde String = Serdes.String();
        Serde Long = Serdes.Long();
        Properties properties = new Properties();
        properties.put("application.id", "compact-topics-integration-test");
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("key.serde", Serdes.String().getClass().getName());
        properties.put("value.serde", Serdes.String().getClass().getName());
        properties.put("state.dir", TestUtils.tempDirectory().getPath());
        properties.put("auto.offset.reset", "earliest");
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        kStreamBuilder.stream(new String[]{DEFAULT_INPUT_TOPIC}).flatMapValues(new ValueMapper<String, Iterable<String>>() { // from class: org.apache.kafka.streams.integration.InternalTopicIntegrationTest.1
            public Iterable<String> apply(String str) {
                return Arrays.asList(str.toLowerCase(Locale.getDefault()).split("\\W+"));
            }
        }).groupBy(MockKeyValueMapper.SelectValueMapper()).count("Counts").toStream().to(String, Long, DEFAULT_OUTPUT_TOPIC);
        IntegrationTestUtils.purgeLocalStreamsState(properties);
        KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, properties);
        kafkaStreams.start();
        produceData(Arrays.asList("hello", "world", "world", "hello world"));
        kafkaStreams.close();
        Assert.assertEquals(LogConfig.Compact(), getTopicConfigProperties(ProcessorStateManager.storeChangelogTopic(this.applicationId, "Counts")).getProperty(LogConfig.CleanupPolicyProp()));
    }

    private void produceData(List<String> list) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("key.serializer", StringSerializer.class);
        properties.put("value.serializer", StringSerializer.class);
        IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, list, properties, this.mockTime);
    }

    @Test
    public void shouldUseCompactAndDeleteForWindowStoreChangelogs() throws Exception {
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        kStreamBuilder.stream(new String[]{DEFAULT_INPUT_TOPIC}).flatMapValues(new ValueMapper<String, Iterable<String>>() { // from class: org.apache.kafka.streams.integration.InternalTopicIntegrationTest.2
            public Iterable<String> apply(String str) {
                return Arrays.asList(str.toLowerCase(Locale.getDefault()).split("\\W+"));
            }
        }).groupBy(MockKeyValueMapper.SelectValueMapper()).count(TimeWindows.of(1000L).until(2000L), "CountWindows").toStream();
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
        KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, this.streamsConfiguration);
        kafkaStreams.start();
        produceData(Arrays.asList("hello", "world", "world", "hello world"));
        kafkaStreams.close();
        Properties topicConfigProperties = getTopicConfigProperties(ProcessorStateManager.storeChangelogTopic(this.applicationId, "CountWindows"));
        List asList = Arrays.asList(topicConfigProperties.getProperty(LogConfig.CleanupPolicyProp()).split(","));
        Assert.assertEquals(2L, asList.size());
        Assert.assertTrue(asList.contains(LogConfig.Compact()));
        Assert.assertTrue(asList.contains(LogConfig.Delete()));
        Assert.assertEquals(TimeUnit.MILLISECONDS.convert(1L, TimeUnit.DAYS) + 2000, Long.parseLong(topicConfigProperties.getProperty(LogConfig.RetentionMsProp())));
    }
}
