package com.linkedin.venice.hadoop.input.kafka;

import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.GzipCompressor;
import com.linkedin.venice.compression.NoopCompressor;
import com.linkedin.venice.hadoop.AbstractVeniceFilter;
import com.linkedin.venice.hadoop.FilterChain;
import com.linkedin.venice.hadoop.VeniceReducer;
import com.linkedin.venice.hadoop.input.kafka.avro.KafkaInputMapperKey;
import com.linkedin.venice.hadoop.input.kafka.avro.KafkaInputMapperValue;
import com.linkedin.venice.hadoop.input.kafka.avro.MapperValueType;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.serializer.RecordSerializer;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.VeniceProperties;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/hadoop/input/kafka/TestVeniceKafkaInputReducer.class */
public class TestVeniceKafkaInputReducer {
    private static final RecordSerializer KAFKA_INPUT_MAPPER_VALUE_SERIALIZER = FastSerializerDeserializerFactory.getFastAvroGenericSerializer(KafkaInputMapperValue.SCHEMA$);
    private static final String VALUE_PREFIX = "value_";
    private static final String RMD_VALUE_PREFIX = "rmd_value_";

    @Test
    public void testTTLFilter() {
        VeniceKafkaInputReducer veniceKafkaInputReducer = new VeniceKafkaInputReducer();
        Assert.assertNull(veniceKafkaInputReducer.initFilterChain(getTestProps()));
        veniceKafkaInputReducer.setChunkingEnabled(true);
        Assert.assertNotNull(veniceKafkaInputReducer.initFilterChain(getTestProps()));
    }

    @Test(dataProvider = "Two-True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testExtract(boolean z, boolean z2) {
        byte[] bytes = "test_key".getBytes();
        KafkaInputMapperKey kafkaInputMapperKey = new KafkaInputMapperKey();
        kafkaInputMapperKey.key = ByteBuffer.wrap(bytes);
        kafkaInputMapperKey.offset = 1L;
        byte[] serialize = FastSerializerDeserializerFactory.getFastAvroGenericSerializer(KafkaInputMapperKey.SCHEMA$).serialize(kafkaInputMapperKey);
        BytesWritable bytesWritable = new BytesWritable();
        bytesWritable.set(serialize, 0, serialize.length);
        VeniceKafkaInputReducer veniceKafkaInputReducer = new VeniceKafkaInputReducer();
        veniceKafkaInputReducer.setChunkingEnabled(z);
        veniceKafkaInputReducer.setSourceVersionCompressor(new NoopCompressor());
        veniceKafkaInputReducer.setDestVersionCompressor(new NoopCompressor());
        VeniceReducer.VeniceWriterMessage extract = veniceKafkaInputReducer.extract(bytesWritable, getValues(Arrays.asList(MapperValueType.PUT, MapperValueType.PUT, MapperValueType.PUT), z2).iterator(), (Reporter) Mockito.mock(Reporter.class));
        Assert.assertNotNull(extract);
        Assert.assertEquals(extract.getKeyBytes(), bytes);
        Assert.assertEquals(extract.getValueBytes(), "value_2".getBytes());
        Assert.assertEquals(extract.getValueSchemaId(), 1);
        Assert.assertEquals(extract.getRmdVersionId(), z2 ? 1 : -1);
        VeniceReducer.VeniceWriterMessage extract2 = veniceKafkaInputReducer.extract(bytesWritable, getValues(Arrays.asList(MapperValueType.PUT, MapperValueType.PUT, MapperValueType.DELETE), z2).iterator(), (Reporter) Mockito.mock(Reporter.class));
        if (z2) {
            Assert.assertNotNull(extract2);
        } else {
            Assert.assertNull(extract2);
        }
        VeniceReducer.VeniceWriterMessage extract3 = veniceKafkaInputReducer.extract(bytesWritable, getValues(Arrays.asList(MapperValueType.PUT, MapperValueType.DELETE, MapperValueType.PUT), z2).iterator(), (Reporter) Mockito.mock(Reporter.class));
        Assert.assertNotNull(extract3);
        Assert.assertEquals(extract3.getKeyBytes(), bytes);
        Assert.assertEquals(extract3.getValueBytes(), "value_2".getBytes());
        Assert.assertEquals(extract3.getValueSchemaId(), 1);
        Assert.assertEquals(extract3.getRmdVersionId(), z2 ? 1 : -1);
    }

    @Test(dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
    public void testExtractWithTTL(boolean z) {
        AbstractVeniceFilter abstractVeniceFilter = (AbstractVeniceFilter) Mockito.mock(AbstractVeniceFilter.class);
        ((AbstractVeniceFilter) Mockito.doReturn(true).when(abstractVeniceFilter)).apply((KafkaInputMapperValue) ArgumentMatchers.any());
        FilterChain filterChain = new FilterChain(new AbstractVeniceFilter[]{abstractVeniceFilter});
        byte[] bytes = "test_key".getBytes();
        KafkaInputMapperKey kafkaInputMapperKey = new KafkaInputMapperKey();
        kafkaInputMapperKey.key = ByteBuffer.wrap(bytes);
        kafkaInputMapperKey.offset = 1L;
        byte[] serialize = FastSerializerDeserializerFactory.getFastAvroGenericSerializer(KafkaInputMapperKey.SCHEMA$).serialize(kafkaInputMapperKey);
        BytesWritable bytesWritable = new BytesWritable();
        bytesWritable.set(serialize, 0, serialize.length);
        VeniceKafkaInputReducer veniceKafkaInputReducer = (VeniceKafkaInputReducer) Mockito.spy(new VeniceKafkaInputReducer());
        ((VeniceKafkaInputReducer) Mockito.doReturn(filterChain).when(veniceKafkaInputReducer)).initFilterChain((VeniceProperties) ArgumentMatchers.any());
        veniceKafkaInputReducer.configureTask(getTestProps(), getTestJobConf());
        veniceKafkaInputReducer.setChunkingEnabled(z);
        VeniceReducer.VeniceWriterMessage extract = veniceKafkaInputReducer.extract(bytesWritable, getValues(Arrays.asList(MapperValueType.PUT, MapperValueType.PUT, MapperValueType.PUT), true).iterator(), (Reporter) Mockito.mock(Reporter.class));
        if (z) {
            Assert.assertNull(extract);
        } else {
            Assert.assertNotNull(extract);
        }
    }

    public List<BytesWritable> getValues(List<MapperValueType> list, boolean z) {
        ArrayList arrayList = new ArrayList();
        long j = 0;
        for (MapperValueType mapperValueType : list) {
            KafkaInputMapperValue kafkaInputMapperValue = new KafkaInputMapperValue();
            long j2 = j;
            j = j2 + 1;
            kafkaInputMapperValue.offset = j2;
            kafkaInputMapperValue.schemaId = 1;
            kafkaInputMapperValue.valueType = mapperValueType;
            kafkaInputMapperValue.replicationMetadataVersionId = z ? 1 : -1;
            kafkaInputMapperValue.replicationMetadataPayload = z ? ByteBuffer.wrap(RMD_VALUE_PREFIX.getBytes()) : ByteBuffer.allocate(0);
            if (mapperValueType.equals(MapperValueType.DELETE)) {
                kafkaInputMapperValue.value = ByteBuffer.wrap(new byte[0]);
            } else {
                kafkaInputMapperValue.value = ByteBuffer.wrap((VALUE_PREFIX + kafkaInputMapperValue.offset).getBytes());
            }
            BytesWritable bytesWritable = new BytesWritable();
            byte[] serialize = KAFKA_INPUT_MAPPER_VALUE_SERIALIZER.serialize(kafkaInputMapperValue);
            bytesWritable.set(serialize, 0, serialize.length);
            arrayList.add(bytesWritable);
        }
        Collections.reverse(arrayList);
        return arrayList;
    }

    private VeniceProperties getTestProps() {
        Properties properties = new Properties();
        properties.put("value.schema.id", 1);
        properties.put("repush.ttl.seconds", 10L);
        properties.put("repush.ttl.policy", 0);
        properties.put("rmd.schema.dir", "tmp");
        return new VeniceProperties(properties);
    }

    private JobConf getTestJobConf() {
        JobConf jobConf = new JobConf();
        jobConf.set("mapred.job.id", "job_200707121733_0003");
        jobConf.set("kafka.input.source.compression.strategy", CompressionStrategy.NO_OP.name());
        jobConf.set("compression.strategy", CompressionStrategy.NO_OP.name());
        return jobConf;
    }

    @Test
    public void testCompress() {
        byte[] bytes = "abc".getBytes();
        VeniceKafkaInputReducer veniceKafkaInputReducer = new VeniceKafkaInputReducer();
        veniceKafkaInputReducer.setSourceVersionCompressor(new NoopCompressor());
        veniceKafkaInputReducer.setDestVersionCompressor(new NoopCompressor());
        Assert.assertNull(veniceKafkaInputReducer.compress((byte[]) null));
        Assert.assertEquals(veniceKafkaInputReducer.compress(bytes), bytes);
        veniceKafkaInputReducer.setDestVersionCompressor(new GzipCompressor());
        Assert.assertNotEquals(veniceKafkaInputReducer.compress(bytes), bytes);
    }
}
