package com.linkedin.venice.hadoop.input.kafka;

import com.github.luben.zstd.ZstdDictTrainer;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.NoopCompressor;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.hadoop.input.kafka.KafkaInputDictTrainer;
import com.linkedin.venice.hadoop.input.kafka.avro.KafkaInputMapperKey;
import com.linkedin.venice.hadoop.input.kafka.avro.KafkaInputMapperValue;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/hadoop/input/kafka/TestKafkaInputDictTrainer.class */
public class TestKafkaInputDictTrainer {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/venice/hadoop/input/kafka/TestKafkaInputDictTrainer$ResettableRecordReader.class */
    public interface ResettableRecordReader<K, V> extends RecordReader<K, V> {
        void reset();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/venice/hadoop/input/kafka/TestKafkaInputDictTrainer$ValueSchemaPair.class */
    public static class ValueSchemaPair {
        int schemaId;
        byte[] value;

        public ValueSchemaPair(int i, byte[] bArr) {
            this.schemaId = i;
            this.value = bArr;
        }
    }

    private KafkaInputDictTrainer.CompressorBuilder getCompressorBuilder(VeniceCompressor veniceCompressor) {
        return (compressorFactory, compressionStrategy, str, str2, veniceProperties) -> {
            return veniceCompressor;
        };
    }

    private KafkaInputDictTrainer.Param getParam(int i) {
        return getParam(i, CompressionStrategy.NO_OP);
    }

    private KafkaInputDictTrainer.Param getParam(int i, CompressionStrategy compressionStrategy) {
        return new KafkaInputDictTrainer.ParamBuilder().setKafkaInputBroker("test_url").setTopicName("test_topic").setKeySchema("\"string\"").setCompressionDictSize(921600).setDictSampleSize(i).setSslProperties(new Properties()).setSourceVersionCompressionStrategy(compressionStrategy).build();
    }

    @Test(expectedExceptions = {VeniceException.class}, expectedExceptionsMessageRegExp = "No record.*")
    public void testEmptyTopic() throws IOException {
        KafkaInputFormat kafkaInputFormat = (KafkaInputFormat) Mockito.mock(KafkaInputFormat.class);
        ((KafkaInputFormat) Mockito.doReturn(new KafkaInputSplit[]{new KafkaInputSplit("test_topic", 0, 0L, 0L)}).when(kafkaInputFormat)).getSplitsByRecordsPerSplit((JobConf) ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        RecordReader recordReader = (RecordReader) Mockito.mock(RecordReader.class);
        ((RecordReader) Mockito.doReturn(false).when(recordReader)).next((KafkaInputMapperKey) ArgumentMatchers.any(), (KafkaInputMapperValue) ArgumentMatchers.any());
        ((KafkaInputFormat) Mockito.doReturn(recordReader).when(kafkaInputFormat)).getRecordReader((InputSplit) ArgumentMatchers.any(), (JobConf) ArgumentMatchers.any(), (Reporter) ArgumentMatchers.any());
        new KafkaInputDictTrainer(kafkaInputFormat, Optional.empty(), getParam(100), getCompressorBuilder(new NoopCompressor())).trainDict();
    }

    private ResettableRecordReader<KafkaInputMapperKey, KafkaInputMapperValue> mockReaderForValueSchemaPairs(final List<ValueSchemaPair> list) {
        return new ResettableRecordReader<KafkaInputMapperKey, KafkaInputMapperValue>() { // from class: com.linkedin.venice.hadoop.input.kafka.TestKafkaInputDictTrainer.1
            int cur = 0;

            @Override // com.linkedin.venice.hadoop.input.kafka.TestKafkaInputDictTrainer.ResettableRecordReader
            public void reset() {
                this.cur = 0;
            }

            public boolean next(KafkaInputMapperKey kafkaInputMapperKey, KafkaInputMapperValue kafkaInputMapperValue) throws IOException {
                if (list.size() <= this.cur) {
                    return false;
                }
                kafkaInputMapperKey.offset = this.cur;
                kafkaInputMapperKey.key = ByteBuffer.wrap(("test_key" + this.cur).getBytes());
                kafkaInputMapperValue.offset = this.cur;
                kafkaInputMapperValue.value = ByteBuffer.wrap(((ValueSchemaPair) list.get(this.cur)).value);
                kafkaInputMapperValue.schemaId = ((ValueSchemaPair) list.get(this.cur)).schemaId;
                this.cur++;
                return true;
            }

            /* renamed from: createKey, reason: merged with bridge method [inline-methods] */
            public KafkaInputMapperKey m4createKey() {
                return new KafkaInputMapperKey();
            }

            /* renamed from: createValue, reason: merged with bridge method [inline-methods] */
            public KafkaInputMapperValue m3createValue() {
                return new KafkaInputMapperValue();
            }

            public long getPos() throws IOException {
                return 0L;
            }

            public void close() throws IOException {
            }

            public float getProgress() throws IOException {
                return 0.0f;
            }
        };
    }

    private ResettableRecordReader<KafkaInputMapperKey, KafkaInputMapperValue> mockReader(List<byte[]> list) {
        ArrayList arrayList = new ArrayList();
        list.forEach(bArr -> {
            arrayList.add(new ValueSchemaPair(1, bArr));
        });
        return mockReaderForValueSchemaPairs(arrayList);
    }

    /* JADX WARN: Type inference failed for: r1v6, types: [byte[], java.lang.Object[]] */
    /* JADX WARN: Type inference failed for: r1v9, types: [byte[], java.lang.Object[]] */
    @Test
    public void testSamplingFromMultiplePartitions() throws IOException {
        KafkaInputFormat kafkaInputFormat = (KafkaInputFormat) Mockito.mock(KafkaInputFormat.class);
        KafkaInputSplit[] kafkaInputSplitArr = {new KafkaInputSplit("test_topic", 0, 0L, 2L), new KafkaInputSplit("test_topic", 0, 0L, 2L)};
        ((KafkaInputFormat) Mockito.doReturn(kafkaInputSplitArr).when(kafkaInputFormat)).getSplitsByRecordsPerSplit((JobConf) ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        ResettableRecordReader<KafkaInputMapperKey, KafkaInputMapperValue> mockReader = mockReader(Arrays.asList(new byte[]{"p0_value0".getBytes(), "p0_value1".getBytes(), "p0_value2".getBytes()}));
        ResettableRecordReader<KafkaInputMapperKey, KafkaInputMapperValue> mockReader2 = mockReader(Arrays.asList(new byte[]{"p1_value0".getBytes(), "p1_value1".getBytes(), "p1_value2".getBytes()}));
        ((KafkaInputFormat) Mockito.doReturn(mockReader).when(kafkaInputFormat)).getRecordReader((InputSplit) ArgumentMatchers.eq(kafkaInputSplitArr[0]), (JobConf) ArgumentMatchers.any(), (Reporter) ArgumentMatchers.any());
        ((KafkaInputFormat) Mockito.doReturn(mockReader2).when(kafkaInputFormat)).getRecordReader((InputSplit) ArgumentMatchers.eq(kafkaInputSplitArr[1]), (JobConf) ArgumentMatchers.any(), (Reporter) ArgumentMatchers.any());
        ZstdDictTrainer zstdDictTrainer = (ZstdDictTrainer) Mockito.mock(ZstdDictTrainer.class);
        new KafkaInputDictTrainer(kafkaInputFormat, Optional.of(zstdDictTrainer), getParam(1000), getCompressorBuilder(new NoopCompressor())).trainDict();
        ((ZstdDictTrainer) Mockito.verify(zstdDictTrainer)).addSample((byte[]) ArgumentMatchers.eq("p0_value0".getBytes()));
        ((ZstdDictTrainer) Mockito.verify(zstdDictTrainer)).addSample((byte[]) ArgumentMatchers.eq("p0_value1".getBytes()));
        ((ZstdDictTrainer) Mockito.verify(zstdDictTrainer)).addSample((byte[]) ArgumentMatchers.eq("p0_value2".getBytes()));
        ((ZstdDictTrainer) Mockito.verify(zstdDictTrainer)).addSample((byte[]) ArgumentMatchers.eq("p1_value0".getBytes()));
        ((ZstdDictTrainer) Mockito.verify(zstdDictTrainer)).addSample((byte[]) ArgumentMatchers.eq("p1_value1".getBytes()));
        ((ZstdDictTrainer) Mockito.verify(zstdDictTrainer)).addSample((byte[]) ArgumentMatchers.eq("p1_value2".getBytes()));
        mockReader.reset();
        mockReader2.reset();
        ZstdDictTrainer zstdDictTrainer2 = (ZstdDictTrainer) Mockito.mock(ZstdDictTrainer.class);
        new KafkaInputDictTrainer(kafkaInputFormat, Optional.of(zstdDictTrainer2), getParam(20), getCompressorBuilder(new NoopCompressor())).trainDict();
        ((ZstdDictTrainer) Mockito.verify(zstdDictTrainer2)).addSample((byte[]) ArgumentMatchers.eq("p0_value0".getBytes()));
        ((ZstdDictTrainer) Mockito.verify(zstdDictTrainer2, Mockito.never())).addSample((byte[]) ArgumentMatchers.eq("p0_value1".getBytes()));
        ((ZstdDictTrainer) Mockito.verify(zstdDictTrainer2, Mockito.never())).addSample((byte[]) ArgumentMatchers.eq("p0_value2".getBytes()));
        ((ZstdDictTrainer) Mockito.verify(zstdDictTrainer2)).addSample((byte[]) ArgumentMatchers.eq("p1_value0".getBytes()));
        ((ZstdDictTrainer) Mockito.verify(zstdDictTrainer2, Mockito.never())).addSample((byte[]) ArgumentMatchers.eq("p1_value1".getBytes()));
        ((ZstdDictTrainer) Mockito.verify(zstdDictTrainer2, Mockito.never())).addSample((byte[]) ArgumentMatchers.eq("p1_value2".getBytes()));
    }

    /* JADX WARN: Type inference failed for: r1v6, types: [byte[], java.lang.Object[]] */
    /* JADX WARN: Type inference failed for: r1v9, types: [byte[], java.lang.Object[]] */
    /* JADX WARN: Type inference failed for: r2v17, types: [byte[], java.lang.Object[]] */
    @Test
    public void testSamplingFromMultiplePartitionsWithSourceVersionCompressionEnabled() throws IOException {
        KafkaInputFormat kafkaInputFormat = (KafkaInputFormat) Mockito.mock(KafkaInputFormat.class);
        KafkaInputSplit[] kafkaInputSplitArr = {new KafkaInputSplit("test_topic", 0, 0L, 2L), new KafkaInputSplit("test_topic", 0, 0L, 2L)};
        ((KafkaInputFormat) Mockito.doReturn(kafkaInputSplitArr).when(kafkaInputFormat)).getSplitsByRecordsPerSplit((JobConf) ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        ResettableRecordReader<KafkaInputMapperKey, KafkaInputMapperValue> mockReader = mockReader(Arrays.asList(new byte[]{"p0_value0".getBytes(), "p0_value1".getBytes(), "p0_value2".getBytes()}));
        ResettableRecordReader<KafkaInputMapperKey, KafkaInputMapperValue> mockReader2 = mockReader(Arrays.asList(new byte[]{"p1_value0".getBytes(), "p1_value1".getBytes(), "p1_value2".getBytes()}));
        ((KafkaInputFormat) Mockito.doReturn(mockReader).when(kafkaInputFormat)).getRecordReader((InputSplit) ArgumentMatchers.eq(kafkaInputSplitArr[0]), (JobConf) ArgumentMatchers.any(), (Reporter) ArgumentMatchers.any());
        ((KafkaInputFormat) Mockito.doReturn(mockReader2).when(kafkaInputFormat)).getRecordReader((InputSplit) ArgumentMatchers.eq(kafkaInputSplitArr[1]), (JobConf) ArgumentMatchers.any(), (Reporter) ArgumentMatchers.any());
        VeniceCompressor veniceCompressor = (VeniceCompressor) Mockito.mock(VeniceCompressor.class);
        ((VeniceCompressor) Mockito.doReturn(CompressionStrategy.GZIP).when(veniceCompressor)).getCompressionStrategy();
        ((VeniceCompressor) Mockito.doAnswer(invocationOnMock -> {
            return invocationOnMock.getArgument(0);
        }).when(veniceCompressor)).decompress((ByteBuffer) ArgumentMatchers.any(ByteBuffer.class));
        ZstdDictTrainer zstdDictTrainer = (ZstdDictTrainer) Mockito.mock(ZstdDictTrainer.class);
        new KafkaInputDictTrainer(kafkaInputFormat, Optional.of(zstdDictTrainer), getParam(1000, CompressionStrategy.GZIP), getCompressorBuilder(veniceCompressor)).trainDict();
        new ArrayList(Arrays.asList(new byte[]{"p0_value0".getBytes(), "p0_value1".getBytes(), "p0_value2".getBytes(), "p1_value0".getBytes(), "p1_value1".getBytes(), "p1_value2".getBytes()})).forEach(bArr -> {
            try {
                ((VeniceCompressor) Mockito.verify(veniceCompressor)).decompress((ByteBuffer) ArgumentMatchers.eq(ByteBuffer.wrap(bArr)));
                ((ZstdDictTrainer) Mockito.verify(zstdDictTrainer)).addSample((byte[]) ArgumentMatchers.eq(bArr));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

    /* JADX WARN: Type inference failed for: r1v9, types: [byte[], java.lang.Object[]] */
    /* JADX WARN: Type inference failed for: r2v17, types: [byte[], java.lang.Object[]] */
    @Test
    public void testSamplingFromMultiplePartitionsWithSourceVersionCompressionEnabledWithChunking() throws IOException {
        KafkaInputFormat kafkaInputFormat = (KafkaInputFormat) Mockito.mock(KafkaInputFormat.class);
        KafkaInputSplit[] kafkaInputSplitArr = {new KafkaInputSplit("test_topic", 0, 0L, 2L), new KafkaInputSplit("test_topic", 0, 0L, 2L)};
        ((KafkaInputFormat) Mockito.doReturn(kafkaInputSplitArr).when(kafkaInputFormat)).getSplitsByRecordsPerSplit((JobConf) ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        ResettableRecordReader<KafkaInputMapperKey, KafkaInputMapperValue> mockReaderForValueSchemaPairs = mockReaderForValueSchemaPairs(Arrays.asList(new ValueSchemaPair(-1, "p0_value0".getBytes()), new ValueSchemaPair(1, "p0_value1".getBytes()), new ValueSchemaPair(1, "p0_value2".getBytes())));
        ResettableRecordReader<KafkaInputMapperKey, KafkaInputMapperValue> mockReader = mockReader(Arrays.asList(new byte[]{"p1_value0".getBytes(), "p1_value1".getBytes(), "p1_value2".getBytes()}));
        ((KafkaInputFormat) Mockito.doReturn(mockReaderForValueSchemaPairs).when(kafkaInputFormat)).getRecordReader((InputSplit) ArgumentMatchers.eq(kafkaInputSplitArr[0]), (JobConf) ArgumentMatchers.any(), (Reporter) ArgumentMatchers.any());
        ((KafkaInputFormat) Mockito.doReturn(mockReader).when(kafkaInputFormat)).getRecordReader((InputSplit) ArgumentMatchers.eq(kafkaInputSplitArr[1]), (JobConf) ArgumentMatchers.any(), (Reporter) ArgumentMatchers.any());
        VeniceCompressor veniceCompressor = (VeniceCompressor) Mockito.mock(VeniceCompressor.class);
        ((VeniceCompressor) Mockito.doReturn(CompressionStrategy.GZIP).when(veniceCompressor)).getCompressionStrategy();
        ((VeniceCompressor) Mockito.doAnswer(invocationOnMock -> {
            return invocationOnMock.getArgument(0);
        }).when(veniceCompressor)).decompress((ByteBuffer) ArgumentMatchers.any(ByteBuffer.class));
        ZstdDictTrainer zstdDictTrainer = (ZstdDictTrainer) Mockito.mock(ZstdDictTrainer.class);
        new KafkaInputDictTrainer(kafkaInputFormat, Optional.of(zstdDictTrainer), getParam(1000, CompressionStrategy.GZIP), getCompressorBuilder(veniceCompressor)).trainDict();
        new ArrayList(Arrays.asList(new byte[]{"p0_value1".getBytes(), "p0_value2".getBytes(), "p1_value0".getBytes(), "p1_value1".getBytes(), "p1_value2".getBytes()})).forEach(bArr -> {
            try {
                ((VeniceCompressor) Mockito.verify(veniceCompressor)).decompress((ByteBuffer) ArgumentMatchers.eq(ByteBuffer.wrap(bArr)));
                ((ZstdDictTrainer) Mockito.verify(zstdDictTrainer)).addSample((byte[]) ArgumentMatchers.eq(bArr));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
        ((VeniceCompressor) Mockito.verify(veniceCompressor, Mockito.never())).decompress((ByteBuffer) ArgumentMatchers.eq(ByteBuffer.wrap("p0_value0".getBytes())));
        ((ZstdDictTrainer) Mockito.verify(zstdDictTrainer, Mockito.never())).addSample((byte[]) ArgumentMatchers.eq("p0_value0".getBytes()));
    }
}
