package org.apache.kafka.streams.kstream.internals;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.class */
public class KStreamWindowAggregateTest {
    private final Serde<String> strSerde = Serdes.String();
    private KStreamTestDriver driver = null;
    private File stateDir = null;

    @After
    public void tearDown() {
        if (this.driver != null) {
            this.driver.close();
        }
        this.driver = null;
    }

    @Before
    public void setUp() throws IOException {
        this.stateDir = TestUtils.tempDirectory("kafka-test");
    }

    @Test
    public void testAggBasic() throws Exception {
        File file = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            KStreamBuilder kStreamBuilder = new KStreamBuilder();
            KTable aggregate = kStreamBuilder.stream(this.strSerde, this.strSerde, new String[]{"topic1"}).groupByKey(this.strSerde, this.strSerde).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10L).advanceBy(5L), this.strSerde, "topic1-Canonized");
            MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
            aggregate.toStream().process(mockProcessorSupplier, new String[0]);
            this.driver = new KStreamTestDriver(kStreamBuilder, file);
            setRecordContext(0L, "topic1");
            this.driver.process("topic1", "A", "1");
            this.driver.flushState();
            setRecordContext(1L, "topic1");
            this.driver.process("topic1", "B", "2");
            this.driver.flushState();
            setRecordContext(2L, "topic1");
            this.driver.process("topic1", "C", "3");
            this.driver.flushState();
            setRecordContext(3L, "topic1");
            this.driver.process("topic1", "D", "4");
            this.driver.flushState();
            setRecordContext(4L, "topic1");
            this.driver.process("topic1", "A", "1");
            this.driver.flushState();
            setRecordContext(5L, "topic1");
            this.driver.process("topic1", "A", "1");
            this.driver.flushState();
            setRecordContext(6L, "topic1");
            this.driver.process("topic1", "B", "2");
            this.driver.flushState();
            setRecordContext(7L, "topic1");
            this.driver.process("topic1", "D", "4");
            this.driver.flushState();
            setRecordContext(8L, "topic1");
            this.driver.process("topic1", "B", "2");
            this.driver.flushState();
            setRecordContext(9L, "topic1");
            this.driver.process("topic1", "C", "3");
            this.driver.flushState();
            setRecordContext(10L, "topic1");
            this.driver.process("topic1", "A", "1");
            this.driver.flushState();
            setRecordContext(11L, "topic1");
            this.driver.process("topic1", "B", "2");
            this.driver.flushState();
            setRecordContext(12L, "topic1");
            this.driver.flushState();
            this.driver.process("topic1", "D", "4");
            this.driver.flushState();
            setRecordContext(13L, "topic1");
            this.driver.process("topic1", "B", "2");
            this.driver.flushState();
            setRecordContext(14L, "topic1");
            this.driver.process("topic1", "C", "3");
            this.driver.flushState();
            Assert.assertEquals(Utils.mkList(new String[]{"[A@0]:0+1", "[B@0]:0+2", "[C@0]:0+3", "[D@0]:0+4", "[A@0]:0+1+1", "[A@0]:0+1+1+1", "[A@5]:0+1", "[B@0]:0+2+2", "[B@5]:0+2", "[D@0]:0+4+4", "[D@5]:0+4", "[B@0]:0+2+2+2", "[B@5]:0+2+2", "[C@0]:0+3+3", "[C@5]:0+3", "[A@5]:0+1+1", "[A@10]:0+1", "[B@5]:0+2+2+2", "[B@10]:0+2", "[D@5]:0+4+4", "[D@10]:0+4", "[B@5]:0+2+2+2+2", "[B@10]:0+2+2", "[C@5]:0+3+3", "[C@10]:0+3"}), mockProcessorSupplier.processed);
            Utils.delete(file);
        } catch (Throwable th) {
            Utils.delete(file);
            throw th;
        }
    }

    private void setRecordContext(long j, String str) {
        this.driver.context().setRecordContext(new ProcessorRecordContext(j, 0L, 0, str));
    }

    @Test
    public void testJoin() throws Exception {
        File file = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            KStreamBuilder kStreamBuilder = new KStreamBuilder();
            KTable aggregate = kStreamBuilder.stream(this.strSerde, this.strSerde, new String[]{"topic1"}).groupByKey(this.strSerde, this.strSerde).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10L).advanceBy(5L), this.strSerde, "topic1-Canonized");
            MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
            aggregate.toStream().process(mockProcessorSupplier, new String[0]);
            KTable aggregate2 = kStreamBuilder.stream(this.strSerde, this.strSerde, new String[]{"topic2"}).groupByKey(this.strSerde, this.strSerde).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10L).advanceBy(5L), this.strSerde, "topic2-Canonized");
            MockProcessorSupplier mockProcessorSupplier2 = new MockProcessorSupplier();
            aggregate2.toStream().process(mockProcessorSupplier2, new String[0]);
            MockProcessorSupplier mockProcessorSupplier3 = new MockProcessorSupplier();
            aggregate.join(aggregate2, new ValueJoiner<String, String, String>() { // from class: org.apache.kafka.streams.kstream.internals.KStreamWindowAggregateTest.1
                public String apply(String str, String str2) {
                    return str + "%" + str2;
                }
            }).toStream().process(mockProcessorSupplier3, new String[0]);
            this.driver = new KStreamTestDriver(kStreamBuilder, file);
            setRecordContext(0L, "topic1");
            this.driver.process("topic1", "A", "1");
            this.driver.flushState();
            setRecordContext(1L, "topic1");
            this.driver.process("topic1", "B", "2");
            this.driver.flushState();
            setRecordContext(2L, "topic1");
            this.driver.process("topic1", "C", "3");
            this.driver.flushState();
            setRecordContext(3L, "topic1");
            this.driver.process("topic1", "D", "4");
            this.driver.flushState();
            setRecordContext(4L, "topic1");
            this.driver.process("topic1", "A", "1");
            this.driver.flushState();
            mockProcessorSupplier.checkAndClearProcessResult("[A@0]:0+1", "[B@0]:0+2", "[C@0]:0+3", "[D@0]:0+4", "[A@0]:0+1+1");
            mockProcessorSupplier2.checkAndClearProcessResult(new String[0]);
            mockProcessorSupplier3.checkAndClearProcessResult(new String[0]);
            setRecordContext(5L, "topic1");
            this.driver.process("topic1", "A", "1");
            this.driver.flushState();
            setRecordContext(6L, "topic1");
            this.driver.process("topic1", "B", "2");
            this.driver.flushState();
            setRecordContext(7L, "topic1");
            this.driver.process("topic1", "D", "4");
            this.driver.flushState();
            setRecordContext(8L, "topic1");
            this.driver.process("topic1", "B", "2");
            this.driver.flushState();
            setRecordContext(9L, "topic1");
            this.driver.process("topic1", "C", "3");
            this.driver.flushState();
            mockProcessorSupplier.checkAndClearProcessResult("[A@0]:0+1+1+1", "[A@5]:0+1", "[B@0]:0+2+2", "[B@5]:0+2", "[D@0]:0+4+4", "[D@5]:0+4", "[B@0]:0+2+2+2", "[B@5]:0+2+2", "[C@0]:0+3+3", "[C@5]:0+3");
            mockProcessorSupplier2.checkAndClearProcessResult(new String[0]);
            mockProcessorSupplier3.checkAndClearProcessResult(new String[0]);
            setRecordContext(0L, "topic1");
            this.driver.process("topic2", "A", "a");
            this.driver.flushState();
            setRecordContext(1L, "topic1");
            this.driver.process("topic2", "B", "b");
            this.driver.flushState();
            setRecordContext(2L, "topic1");
            this.driver.process("topic2", "C", "c");
            this.driver.flushState();
            setRecordContext(3L, "topic1");
            this.driver.process("topic2", "D", "d");
            this.driver.flushState();
            setRecordContext(4L, "topic1");
            this.driver.process("topic2", "A", "a");
            this.driver.flushState();
            mockProcessorSupplier.checkAndClearProcessResult(new String[0]);
            mockProcessorSupplier2.checkAndClearProcessResult("[A@0]:0+a", "[B@0]:0+b", "[C@0]:0+c", "[D@0]:0+d", "[A@0]:0+a+a");
            mockProcessorSupplier3.checkAndClearProcessResult("[A@0]:0+1+1+1%0+a", "[B@0]:0+2+2+2%0+b", "[C@0]:0+3+3%0+c", "[D@0]:0+4+4%0+d", "[A@0]:0+1+1+1%0+a+a");
            setRecordContext(5L, "topic1");
            this.driver.process("topic2", "A", "a");
            this.driver.flushState();
            setRecordContext(6L, "topic1");
            this.driver.process("topic2", "B", "b");
            this.driver.flushState();
            setRecordContext(7L, "topic1");
            this.driver.process("topic2", "D", "d");
            this.driver.flushState();
            setRecordContext(8L, "topic1");
            this.driver.process("topic2", "B", "b");
            this.driver.flushState();
            setRecordContext(9L, "topic1");
            this.driver.process("topic2", "C", "c");
            this.driver.flushState();
            mockProcessorSupplier.checkAndClearProcessResult(new String[0]);
            mockProcessorSupplier2.checkAndClearProcessResult("[A@0]:0+a+a+a", "[A@5]:0+a", "[B@0]:0+b+b", "[B@5]:0+b", "[D@0]:0+d+d", "[D@5]:0+d", "[B@0]:0+b+b+b", "[B@5]:0+b+b", "[C@0]:0+c+c", "[C@5]:0+c");
            mockProcessorSupplier3.checkAndClearProcessResult("[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a", "[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b", "[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d", "[B@0]:0+2+2+2%0+b+b+b", "[B@5]:0+2+2%0+b+b", "[C@0]:0+3+3%0+c+c", "[C@5]:0+3%0+c");
            Utils.delete(file);
        } catch (Throwable th) {
            Utils.delete(file);
            throw th;
        }
    }
}
