package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockProcessorNode;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.NoOpRecordCollector;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamTaskTest.class */
public class StreamTaskTest {
    private File baseDir;
    private StateDirectory stateDirectory;
    private RecordCollectorImpl recordCollector;
    private final Serializer<Integer> intSerializer = new IntegerSerializer();
    private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
    private final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
    private final String[] topic1 = {"topic1"};
    private final String[] topic2 = {"topic2"};
    private final TopicPartition partition1 = new TopicPartition(this.topic1[0], 1);
    private final TopicPartition partition2 = new TopicPartition(this.topic2[0], 1);
    private final Set<TopicPartition> partitions = Utils.mkSet(new TopicPartition[]{this.partition1, this.partition2});
    private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(this.topic1, this.intDeserializer, this.intDeserializer);
    private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(this.topic2, this.intDeserializer, this.intDeserializer);
    private final MockProcessorNode<Integer, Integer> processor = new MockProcessorNode<>(10);
    private final ProcessorTopology topology = new ProcessorTopology(Arrays.asList(this.source1, this.source2, this.processor), new HashMap<String, SourceNode>() { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.1
        {
            put(StreamTaskTest.this.topic1[0], StreamTaskTest.this.source1);
            put(StreamTaskTest.this.topic2[0], StreamTaskTest.this.source2);
        }
    }, Collections.emptyMap(), Collections.emptyList(), Collections.emptyMap(), Collections.emptyList());
    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    private final MockProducer<byte[], byte[]> producer = new MockProducer<>(false, this.bytesSerializer, this.bytesSerializer);
    private final MockConsumer<byte[], byte[]> restoreStateConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    private final byte[] recordValue = this.intSerializer.serialize((String) null, 10);
    private final byte[] recordKey = this.intSerializer.serialize((String) null, 1);

    private StreamsConfig createConfig(final File file) throws Exception {
        return new StreamsConfig(new Properties() { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.2
            {
                setProperty("application.id", "stream-task-test");
                setProperty("bootstrap.servers", "localhost:2171");
                setProperty("buffered.records.per.partition", "3");
                setProperty("state.dir", file.getCanonicalPath());
                setProperty("timestamp.extractor", MockTimestampExtractor.class.getName());
            }
        });
    }

    @Before
    public void setup() {
        this.consumer.assign(Arrays.asList(this.partition1, this.partition2));
        this.source1.addChild(this.processor);
        this.source2.addChild(this.processor);
        this.baseDir = TestUtils.tempDirectory();
        this.stateDirectory = new StateDirectory("applicationId", this.baseDir.getPath());
    }

    @After
    public void cleanup() {
        Utils.delete(this.baseDir);
    }

    @Test
    public void testProcessOrder() throws Exception {
        StreamsConfig createConfig = createConfig(this.baseDir);
        this.recordCollector = new RecordCollectorImpl(this.producer, "taskId");
        StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", this.partitions, this.topology, this.consumer, this.restoreStateConsumer, createConfig, new MockStreamsMetrics(new Metrics()), this.stateDirectory, (ThreadCache) null, new MockTime(), this.recordCollector);
        streamTask.addRecords(this.partition1, records(new ConsumerRecord<>(this.partition1.topic(), this.partition1.partition(), 10L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord<>(this.partition1.topic(), this.partition1.partition(), 20L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord<>(this.partition1.topic(), this.partition1.partition(), 30L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue)));
        streamTask.addRecords(this.partition2, records(new ConsumerRecord<>(this.partition2.topic(), this.partition2.partition(), 25L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord<>(this.partition2.topic(), this.partition2.partition(), 35L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord<>(this.partition2.topic(), this.partition2.partition(), 45L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue)));
        Assert.assertEquals(5L, streamTask.process());
        Assert.assertEquals(1L, this.source1.numReceived);
        Assert.assertEquals(0L, this.source2.numReceived);
        Assert.assertEquals(4L, streamTask.process());
        Assert.assertEquals(2L, this.source1.numReceived);
        Assert.assertEquals(0L, this.source2.numReceived);
        Assert.assertEquals(3L, streamTask.process());
        Assert.assertEquals(2L, this.source1.numReceived);
        Assert.assertEquals(1L, this.source2.numReceived);
        Assert.assertEquals(2L, streamTask.process());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(1L, this.source2.numReceived);
        Assert.assertEquals(1L, streamTask.process());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(2L, this.source2.numReceived);
        Assert.assertEquals(0L, streamTask.process());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(3L, this.source2.numReceived);
        streamTask.close();
    }

    @Test
    public void testMetrics() throws Exception {
        StreamsConfig createConfig = createConfig(this.baseDir);
        this.recordCollector = new RecordCollectorImpl(this.producer, "taskId");
        Metrics metrics = new Metrics();
        String taskId = new StreamTask(new TaskId(0, 0), "applicationId", this.partitions, this.topology, this.consumer, this.restoreStateConsumer, createConfig, new MockStreamsMetrics(metrics), this.stateDirectory, (ThreadCache) null, new MockTime(), this.recordCollector).id().toString();
        String[] strArr = {"all", taskId};
        Map singletonMap = Collections.singletonMap("streams-task-id", taskId);
        Assert.assertNotNull(metrics.getSensor("commit"));
        Assert.assertNotNull(metrics.getSensor(taskId + "-commit"));
        for (String str : strArr) {
            Assert.assertNotNull(metrics.metrics().get(metrics.metricName(str + "-commit-latency-avg", "stream-task-metrics", "The average latency in milliseconds of " + str + " commit operation.", singletonMap)));
            Assert.assertNotNull(metrics.metrics().get(metrics.metricName(str + "-commit-latency-max", "stream-task-metrics", "The max latency in milliseconds of " + str + " commit operation.", singletonMap)));
            Assert.assertNotNull(metrics.metrics().get(metrics.metricName(str + "-commit-rate", "stream-task-metrics", "The average number of occurrence of " + str + " commit operation per second.", singletonMap)));
        }
    }

    @Test
    public void testPauseResume() throws Exception {
        StreamTask streamTask = new StreamTask(new TaskId(1, 1), "applicationId", this.partitions, this.topology, this.consumer, this.restoreStateConsumer, createConfig(this.baseDir), new MockStreamsMetrics(new Metrics()), this.stateDirectory, (ThreadCache) null, new MockTime(), this.recordCollector);
        streamTask.addRecords(this.partition1, records(new ConsumerRecord<>(this.partition1.topic(), this.partition1.partition(), 10L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord<>(this.partition1.topic(), this.partition1.partition(), 20L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue)));
        streamTask.addRecords(this.partition2, records(new ConsumerRecord<>(this.partition2.topic(), this.partition2.partition(), 35L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord<>(this.partition2.topic(), this.partition2.partition(), 45L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord<>(this.partition2.topic(), this.partition2.partition(), 55L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord<>(this.partition2.topic(), this.partition2.partition(), 65L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue)));
        Assert.assertEquals(5L, streamTask.process());
        Assert.assertEquals(1L, this.source1.numReceived);
        Assert.assertEquals(0L, this.source2.numReceived);
        Assert.assertEquals(1L, this.consumer.paused().size());
        Assert.assertTrue(this.consumer.paused().contains(this.partition2));
        streamTask.addRecords(this.partition1, records(new ConsumerRecord<>(this.partition1.topic(), this.partition1.partition(), 30L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord<>(this.partition1.topic(), this.partition1.partition(), 40L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord<>(this.partition1.topic(), this.partition1.partition(), 50L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue)));
        Assert.assertEquals(2L, this.consumer.paused().size());
        Assert.assertTrue(this.consumer.paused().contains(this.partition1));
        Assert.assertTrue(this.consumer.paused().contains(this.partition2));
        Assert.assertEquals(7L, streamTask.process());
        Assert.assertEquals(2L, this.source1.numReceived);
        Assert.assertEquals(0L, this.source2.numReceived);
        Assert.assertEquals(1L, this.consumer.paused().size());
        Assert.assertTrue(this.consumer.paused().contains(this.partition2));
        Assert.assertEquals(6L, streamTask.process());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(0L, this.source2.numReceived);
        Assert.assertEquals(1L, this.consumer.paused().size());
        Assert.assertTrue(this.consumer.paused().contains(this.partition2));
        Assert.assertEquals(5L, streamTask.process());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(1L, this.source2.numReceived);
        Assert.assertEquals(0L, this.consumer.paused().size());
        streamTask.close();
    }

    @Test
    public void testMaybePunctuate() throws Exception {
        StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", this.partitions, this.topology, this.consumer, this.restoreStateConsumer, createConfig(this.baseDir), new MockStreamsMetrics(new Metrics()), this.stateDirectory, (ThreadCache) null, new MockTime(), this.recordCollector);
        streamTask.addRecords(this.partition1, records(new ConsumerRecord<>(this.partition1.topic(), this.partition1.partition(), 20L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord<>(this.partition1.topic(), this.partition1.partition(), 30L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord<>(this.partition1.topic(), this.partition1.partition(), 40L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue)));
        streamTask.addRecords(this.partition2, records(new ConsumerRecord<>(this.partition2.topic(), this.partition2.partition(), 25L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord<>(this.partition2.topic(), this.partition2.partition(), 35L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord<>(this.partition2.topic(), this.partition2.partition(), 45L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue)));
        Assert.assertTrue(streamTask.maybePunctuate());
        Assert.assertEquals(5L, streamTask.process());
        Assert.assertEquals(1L, this.source1.numReceived);
        Assert.assertEquals(0L, this.source2.numReceived);
        Assert.assertFalse(streamTask.maybePunctuate());
        Assert.assertEquals(4L, streamTask.process());
        Assert.assertEquals(1L, this.source1.numReceived);
        Assert.assertEquals(1L, this.source2.numReceived);
        Assert.assertTrue(streamTask.maybePunctuate());
        Assert.assertEquals(3L, streamTask.process());
        Assert.assertEquals(2L, this.source1.numReceived);
        Assert.assertEquals(1L, this.source2.numReceived);
        Assert.assertFalse(streamTask.maybePunctuate());
        Assert.assertEquals(2L, streamTask.process());
        Assert.assertEquals(2L, this.source1.numReceived);
        Assert.assertEquals(2L, this.source2.numReceived);
        Assert.assertTrue(streamTask.maybePunctuate());
        Assert.assertEquals(1L, streamTask.process());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(2L, this.source2.numReceived);
        Assert.assertFalse(streamTask.maybePunctuate());
        Assert.assertEquals(0L, streamTask.process());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(3L, this.source2.numReceived);
        Assert.assertFalse(streamTask.maybePunctuate());
        this.processor.supplier.checkAndClearPunctuateResult(20, 30, 40);
        streamTask.close();
    }

    @Test
    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContext() throws Exception {
        StreamsConfig createConfig = createConfig(this.baseDir);
        MockSourceNode mockSourceNode = new MockSourceNode(this.topic1, this.intDeserializer, this.intDeserializer) { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.3
            @Override // org.apache.kafka.test.MockSourceNode
            public void process(Object obj, Object obj2) {
                throw new KafkaException("KABOOM!");
            }
        };
        List singletonList = Collections.singletonList(mockSourceNode);
        Map singletonMap = Collections.singletonMap(this.topic1[0], mockSourceNode);
        MockStreamsMetrics mockStreamsMetrics = new MockStreamsMetrics(new Metrics());
        StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", this.partitions, new ProcessorTopology(singletonList, singletonMap, Collections.emptyMap(), Collections.emptyList(), Collections.emptyMap(), Collections.emptyList()), this.consumer, this.restoreStateConsumer, createConfig, mockStreamsMetrics, this.stateDirectory, new ThreadCache("testCache", 0L, mockStreamsMetrics), new MockTime(), this.recordCollector);
        streamTask.addRecords(this.partition1, Collections.singletonList(new ConsumerRecord(this.partition1.topic(), this.partition1.partition(), 20L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue)));
        try {
            streamTask.process();
            Assert.fail("Should've thrown StreamsException");
        } catch (StreamsException e) {
            String message = e.getMessage();
            Assert.assertTrue("message=" + message + " should contain topic", message.contains("topic=" + this.topic1[0]));
            Assert.assertTrue("message=" + message + " should contain partition", message.contains("partition=" + this.partition1.partition()));
            Assert.assertTrue("message=" + message + " should contain offset", message.contains("offset=20"));
            Assert.assertTrue("message=" + message + " should contain processor", message.contains("processor=" + mockSourceNode.name()));
        }
    }

    @Test
    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuating() throws Exception {
        StreamsConfig createConfig = createConfig(this.baseDir);
        ProcessorNode processorNode = new ProcessorNode("test", new AbstractProcessor() { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.4
            public void init(ProcessorContext processorContext) {
                processorContext.schedule(1L);
            }

            public void process(Object obj, Object obj2) {
            }

            public void punctuate(long j) {
                throw new KafkaException("KABOOM!");
            }
        }, Collections.emptySet());
        ProcessorTopology processorTopology = new ProcessorTopology(Collections.singletonList(processorNode), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyList(), Collections.emptyMap(), Collections.emptyList());
        MockStreamsMetrics mockStreamsMetrics = new MockStreamsMetrics(new Metrics());
        try {
            new StreamTask(new TaskId(0, 0), "applicationId", this.partitions, processorTopology, this.consumer, this.restoreStateConsumer, createConfig, mockStreamsMetrics, this.stateDirectory, new ThreadCache("testCache", 0L, mockStreamsMetrics), new MockTime(), this.recordCollector).punctuate(processorNode, 1L);
            Assert.fail("Should've thrown StreamsException");
        } catch (StreamsException e) {
            String message = e.getMessage();
            Assert.assertTrue("message=" + message + " should contain processor", message.contains("processor=test"));
        }
    }

    @Test
    public void shouldFlushRecordCollectorOnFlushState() throws Exception {
        ProcessorTopology processorTopology = new ProcessorTopology(Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyList(), Collections.emptyMap(), Collections.emptyList());
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        NoOpRecordCollector noOpRecordCollector = new NoOpRecordCollector() { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.5
            @Override // org.apache.kafka.test.NoOpRecordCollector
            public void flush() {
                atomicBoolean.set(true);
            }
        };
        MockStreamsMetrics mockStreamsMetrics = new MockStreamsMetrics(new Metrics());
        new StreamTask(new TaskId(0, 0), "appId", this.partitions, processorTopology, this.consumer, this.restoreStateConsumer, createConfig(this.baseDir), mockStreamsMetrics, this.stateDirectory, new ThreadCache("testCache", 0L, mockStreamsMetrics), new MockTime(), noOpRecordCollector).flushState();
        Assert.assertTrue(atomicBoolean.get());
    }

    @Test
    public void shouldCheckpointOffsetsOnCommit() throws Exception {
        String storeChangelogTopic = ProcessorStateManager.storeChangelogTopic("appId", "test");
        ProcessorTopology processorTopology = new ProcessorTopology(Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap(), Collections.singletonList(new InMemoryKeyValueStore("test", null, null) { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.6
            public void init(ProcessorContext processorContext, StateStore stateStore) {
                processorContext.register(stateStore, true, (StateRestoreCallback) null);
            }

            public boolean persistent() {
                return true;
            }
        }), Collections.singletonMap("test", storeChangelogTopic), Collections.emptyList());
        final TopicPartition topicPartition = new TopicPartition(storeChangelogTopic, 0);
        NoOpRecordCollector noOpRecordCollector = new NoOpRecordCollector() { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.7
            @Override // org.apache.kafka.test.NoOpRecordCollector
            public Map<TopicPartition, Long> offsets() {
                return Collections.singletonMap(topicPartition, 543L);
            }
        };
        this.restoreStateConsumer.updatePartitions(storeChangelogTopic, Collections.singletonList(new PartitionInfo(storeChangelogTopic, 0, (Node) null, new Node[0], new Node[0])));
        this.restoreStateConsumer.updateEndOffsets(Collections.singletonMap(topicPartition, 0L));
        this.restoreStateConsumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
        MockStreamsMetrics mockStreamsMetrics = new MockStreamsMetrics(new Metrics());
        TaskId taskId = new TaskId(0, 0);
        MockTime mockTime = new MockTime();
        StreamsConfig createConfig = createConfig(this.baseDir);
        StreamTask streamTask = new StreamTask(taskId, "appId", this.partitions, processorTopology, this.consumer, this.restoreStateConsumer, createConfig, mockStreamsMetrics, this.stateDirectory, new ThreadCache("testCache", 0L, mockStreamsMetrics), mockTime, noOpRecordCollector);
        mockTime.sleep(createConfig.getLong("commit.interval.ms").longValue());
        streamTask.commit();
        MatcherAssert.assertThat(new OffsetCheckpoint(new File(this.stateDirectory.directoryForTask(taskId), ".checkpoint")).read(), IsEqual.equalTo(Collections.singletonMap(topicPartition, 544L)));
    }

    private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... consumerRecordArr) {
        return Arrays.asList(consumerRecordArr);
    }
}
