package org.apache.kafka.streams.processor.internals;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/RecordCollectorTest.class */
public class RecordCollectorTest {
    private List<PartitionInfo> infos = Arrays.asList(new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]));
    private Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), this.infos, Collections.emptySet(), Collections.emptySet());
    private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
    private final StringSerializer stringSerializer = new StringSerializer();
    private final StreamPartitioner<String, Object> streamPartitioner = new StreamPartitioner<String, Object>() { // from class: org.apache.kafka.streams.processor.internals.RecordCollectorTest.1
        public Integer partition(String str, Object obj, int i) {
            return Integer.valueOf(Integer.parseInt(str) % i);
        }
    };

    @Test
    public void testSpecificPartition() {
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl(new MockProducer(this.cluster, true, new DefaultPartitioner(), this.byteArraySerializer, this.byteArraySerializer), "RecordCollectorTest-TestSpecificPartition");
        recordCollectorImpl.send("topic1", "999", "0", 0, (Long) null, this.stringSerializer, this.stringSerializer);
        recordCollectorImpl.send("topic1", "999", "0", 0, (Long) null, this.stringSerializer, this.stringSerializer);
        recordCollectorImpl.send("topic1", "999", "0", 0, (Long) null, this.stringSerializer, this.stringSerializer);
        recordCollectorImpl.send("topic1", "999", "0", 1, (Long) null, this.stringSerializer, this.stringSerializer);
        recordCollectorImpl.send("topic1", "999", "0", 1, (Long) null, this.stringSerializer, this.stringSerializer);
        recordCollectorImpl.send("topic1", "999", "0", 2, (Long) null, this.stringSerializer, this.stringSerializer);
        Map offsets = recordCollectorImpl.offsets();
        Assert.assertEquals(2L, offsets.get(new TopicPartition("topic1", 0)));
        Assert.assertEquals(1L, offsets.get(new TopicPartition("topic1", 1)));
        Assert.assertEquals(0L, offsets.get(new TopicPartition("topic1", 2)));
        recordCollectorImpl.send("topic1", "999", "0", 0, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        recordCollectorImpl.send("topic1", "999", "0", 1, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        recordCollectorImpl.send("topic1", "999", "0", 2, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        Assert.assertEquals(3L, offsets.get(new TopicPartition("topic1", 0)));
        Assert.assertEquals(2L, offsets.get(new TopicPartition("topic1", 1)));
        Assert.assertEquals(1L, offsets.get(new TopicPartition("topic1", 2)));
    }

    @Test
    public void testStreamPartitioner() {
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl(new MockProducer(this.cluster, true, new DefaultPartitioner(), this.byteArraySerializer, this.byteArraySerializer), "RecordCollectorTest-TestStreamPartitioner");
        recordCollectorImpl.send("topic1", "3", "0", (Integer) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        recordCollectorImpl.send("topic1", "9", "0", (Integer) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        recordCollectorImpl.send("topic1", "27", "0", (Integer) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        recordCollectorImpl.send("topic1", "81", "0", (Integer) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        recordCollectorImpl.send("topic1", "243", "0", (Integer) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        recordCollectorImpl.send("topic1", "28", "0", (Integer) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        recordCollectorImpl.send("topic1", "82", "0", (Integer) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        recordCollectorImpl.send("topic1", "244", "0", (Integer) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        recordCollectorImpl.send("topic1", "245", "0", (Integer) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        Map offsets = recordCollectorImpl.offsets();
        Assert.assertEquals(4L, offsets.get(new TopicPartition("topic1", 0)));
        Assert.assertEquals(2L, offsets.get(new TopicPartition("topic1", 1)));
        Assert.assertEquals(0L, offsets.get(new TopicPartition("topic1", 2)));
    }

    @Test
    public void shouldRetryWhenTimeoutExceptionOccursOnSend() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl(new MockProducer(this.cluster, true, new DefaultPartitioner(), this.byteArraySerializer, this.byteArraySerializer) { // from class: org.apache.kafka.streams.processor.internals.RecordCollectorTest.2
            public synchronized Future<RecordMetadata> send(ProducerRecord producerRecord, Callback callback) {
                if (atomicInteger.getAndIncrement() == 0) {
                    throw new TimeoutException();
                }
                return super.send(producerRecord, callback);
            }
        }, "test");
        recordCollectorImpl.send("topic1", "3", "0", (Integer) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        Assert.assertEquals(0L, (Long) recordCollectorImpl.offsets().get(new TopicPartition("topic1", 0)));
    }

    @Test(expected = StreamsException.class)
    public void shouldThrowStreamsExceptionAfterMaxAttempts() throws Exception {
        new RecordCollectorImpl(new MockProducer(this.cluster, true, new DefaultPartitioner(), this.byteArraySerializer, this.byteArraySerializer) { // from class: org.apache.kafka.streams.processor.internals.RecordCollectorTest.3
            public synchronized Future<RecordMetadata> send(ProducerRecord producerRecord, Callback callback) {
                throw new TimeoutException();
            }
        }, "test").send("topic1", "3", "0", (Integer) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
    }

    @Test(expected = StreamsException.class)
    public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFails() throws Exception {
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl(new MockProducer(this.cluster, true, new DefaultPartitioner(), this.byteArraySerializer, this.byteArraySerializer) { // from class: org.apache.kafka.streams.processor.internals.RecordCollectorTest.4
            public synchronized Future<RecordMetadata> send(ProducerRecord producerRecord, Callback callback) {
                callback.onCompletion((RecordMetadata) null, new Exception());
                return null;
            }
        }, "test");
        recordCollectorImpl.send("topic1", "3", "0", (Integer) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        recordCollectorImpl.send("topic1", "3", "0", (Integer) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
    }

    @Test(expected = StreamsException.class)
    public void shouldThrowStreamsExceptionOnFlushIfASendFailed() throws Exception {
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl(new MockProducer(this.cluster, true, new DefaultPartitioner(), this.byteArraySerializer, this.byteArraySerializer) { // from class: org.apache.kafka.streams.processor.internals.RecordCollectorTest.5
            public synchronized Future<RecordMetadata> send(ProducerRecord producerRecord, Callback callback) {
                callback.onCompletion((RecordMetadata) null, new Exception());
                return null;
            }
        }, "test");
        recordCollectorImpl.send("topic1", "3", "0", (Integer) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        recordCollectorImpl.flush();
    }

    @Test(expected = StreamsException.class)
    public void shouldThrowStreamsExceptionOnCloseIfASendFailed() throws Exception {
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl(new MockProducer(this.cluster, true, new DefaultPartitioner(), this.byteArraySerializer, this.byteArraySerializer) { // from class: org.apache.kafka.streams.processor.internals.RecordCollectorTest.6
            public synchronized Future<RecordMetadata> send(ProducerRecord producerRecord, Callback callback) {
                callback.onCompletion((RecordMetadata) null, new Exception());
                return null;
            }
        }, "test");
        recordCollectorImpl.send("topic1", "3", "0", (Integer) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        recordCollectorImpl.close();
    }
}
