package com.linkedin.venice.hadoop.input.kafka;

import com.linkedin.venice.hadoop.AbstractTestVeniceMapper;
import com.linkedin.venice.hadoop.AbstractVeniceFilter;
import com.linkedin.venice.hadoop.FilterChain;
import com.linkedin.venice.hadoop.input.kafka.avro.KafkaInputMapperKey;
import com.linkedin.venice.hadoop.input.kafka.avro.KafkaInputMapperValue;
import com.linkedin.venice.hadoop.input.kafka.avro.MapperValueType;
import com.linkedin.venice.utils.VeniceProperties;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Properties;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/hadoop/input/kafka/TestVeniceKafkaInputMapper.class */
public class TestVeniceKafkaInputMapper extends AbstractTestVeniceMapper<VeniceKafkaInputMapper> {
    private static final BytesWritable BYTES_WRITABLE = new BytesWritable(new byte[0]);
    private static final KafkaInputMapperKey EMPTY_KEY = new KafkaInputMapperKey();
    private static final String RMD = "rmd";

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.linkedin.venice.hadoop.AbstractTestVeniceMapper
    public VeniceKafkaInputMapper newMapper() {
        return new VeniceKafkaInputMapper();
    }

    @Test(dataProvider = "mapperParams")
    public void testConfigure(int i, int i2) throws IOException {
        try {
            getMapper(i, i2).configure(setupJobConf(i, i2));
        } catch (Exception e) {
            Assert.fail("VeniceKafkaInputMapper#configure should not throw any exception when all the required props are there");
        }
    }

    @Test(expectedExceptions = {UnsupportedOperationException.class})
    public void testUnsupportedGetRecordReader() {
        newMapper().getRecordReader(VeniceProperties.empty());
    }

    @Test
    public void testEmptyFilterWhenTTLNotSpecified() {
        VeniceKafkaInputMapper veniceKafkaInputMapper = new VeniceKafkaInputMapper();
        try {
            Assert.assertNull(veniceKafkaInputMapper.getFilterChain(VeniceProperties.empty()));
            veniceKafkaInputMapper.close();
        } catch (Throwable th) {
            try {
                veniceKafkaInputMapper.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testValidFilterWhenTTLSpecified() {
        Properties properties = new Properties();
        properties.put("repush.ttl.seconds", 10L);
        properties.put("repush.ttl.policy", 0);
        properties.put("rmd.schema.dir", "tmp");
        Assert.assertFalse(newMapper().getFilterChain(new VeniceProperties(properties)).isEmpty());
        properties.put("venice.writer.chunking.enabled", true);
        Assert.assertFalse(newMapper().getFilterChain(new VeniceProperties(properties)).isEmpty());
    }

    @Test(dataProvider = "mapperParams")
    public void testProcessWithoutFilter(int i, int i2) throws IOException {
        VeniceKafkaInputMapper mapper = getMapper(i, i2);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(BytesWritable.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(BytesWritable.class);
        OutputCollector outputCollector = (OutputCollector) Mockito.mock(OutputCollector.class);
        mapper.map(EMPTY_KEY, generateKIFRecord(), outputCollector, (Reporter) null);
        ((OutputCollector) Mockito.verify(outputCollector, Mockito.times(getNumberOfCollectorInvocationForFirstMapInvocation(i, i2)))).collect((BytesWritable) forClass.capture(), (BytesWritable) forClass2.capture());
    }

    @Test
    public void testProcessWithFilterFilteringPartialRecords() {
        AbstractVeniceFilter abstractVeniceFilter = (AbstractVeniceFilter) Mockito.mock(AbstractVeniceFilter.class);
        ((AbstractVeniceFilter) Mockito.doReturn(true, new Object[]{false, true, false, false}).when(abstractVeniceFilter)).apply((KafkaInputMapperValue) ArgumentMatchers.any());
        VeniceKafkaInputMapper veniceKafkaInputMapper = (VeniceKafkaInputMapper) Mockito.spy(newMapper());
        ((VeniceKafkaInputMapper) Mockito.doReturn(new FilterChain(new AbstractVeniceFilter[]{abstractVeniceFilter})).when(veniceKafkaInputMapper)).getFilterChain((VeniceProperties) ArgumentMatchers.any());
        veniceKafkaInputMapper.configureTask((VeniceProperties) ArgumentMatchers.any(), (JobConf) ArgumentMatchers.any());
        int i = 0;
        int i2 = 0;
        for (int i3 = 0; i3 < 5; i3++) {
            if (veniceKafkaInputMapper.process(EMPTY_KEY, generateKIFRecord(), BYTES_WRITABLE, BYTES_WRITABLE, (Reporter) null)) {
                i++;
            } else {
                i2++;
            }
        }
        Assert.assertEquals(i, 3);
        Assert.assertEquals(i2, 2);
    }

    @Test(dataProvider = "mapperParams")
    public void testProcessWithFilterFilteringAllRecords(int i, int i2) throws IOException {
        AbstractVeniceFilter abstractVeniceFilter = (AbstractVeniceFilter) Mockito.mock(AbstractVeniceFilter.class);
        ((AbstractVeniceFilter) Mockito.doReturn(true).when(abstractVeniceFilter)).apply((KafkaInputMapperValue) ArgumentMatchers.any());
        FilterChain filterChain = new FilterChain(new AbstractVeniceFilter[]{abstractVeniceFilter});
        VeniceKafkaInputMapper veniceKafkaInputMapper = (VeniceKafkaInputMapper) Mockito.spy(getMapper(i, i2));
        ((VeniceKafkaInputMapper) Mockito.doReturn(filterChain).when(veniceKafkaInputMapper)).getFilterChain((VeniceProperties) ArgumentMatchers.any());
        veniceKafkaInputMapper.configureTask((VeniceProperties) ArgumentMatchers.any(), (JobConf) ArgumentMatchers.any());
        Assert.assertFalse(veniceKafkaInputMapper.process((KafkaInputMapperKey) ArgumentMatchers.any(), (KafkaInputMapperValue) ArgumentMatchers.any(), (BytesWritable) ArgumentMatchers.any(), (BytesWritable) ArgumentMatchers.any(), (Reporter) ArgumentMatchers.any()));
    }

    private KafkaInputMapperValue generateKIFRecord() {
        KafkaInputMapperValue kafkaInputMapperValue = new KafkaInputMapperValue();
        kafkaInputMapperValue.offset = 0L;
        kafkaInputMapperValue.schemaId = -1;
        kafkaInputMapperValue.valueType = MapperValueType.PUT;
        kafkaInputMapperValue.replicationMetadataPayload = ByteBuffer.wrap(RMD.getBytes());
        kafkaInputMapperValue.value = ByteBuffer.wrap(new byte[0]);
        return kafkaInputMapperValue;
    }

    static {
        EMPTY_KEY.key = ByteBuffer.wrap("test_key".getBytes());
        EMPTY_KEY.offset = 1L;
    }
}
