package com.linkedin.venice.hadoop;

import com.linkedin.venice.exceptions.RecordTooLargeException;
import com.linkedin.venice.exceptions.TopicAuthorizationVeniceException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.hadoop.VeniceReducer;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.pubsub.adapter.SimplePubSubProduceResultImpl;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import com.linkedin.venice.pubsub.api.PubSubProducerCallback;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.writer.AbstractVeniceWriter;
import com.linkedin.venice.writer.DeleteMetadata;
import com.linkedin.venice.writer.PutMetadata;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/hadoop/TestVeniceReducer.class */
public class TestVeniceReducer extends AbstractTestVeniceMR {
    private static final int TASK_ID = 2;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.linkedin.venice.hadoop.AbstractTestVeniceMR
    public Configuration getDefaultJobConfiguration() {
        Configuration defaultJobConfiguration = super.getDefaultJobConfiguration();
        defaultJobConfiguration.set("mapred.task.id", new TaskAttemptID("200707121733", 3, TaskType.REDUCE, TASK_ID, 0).toString());
        defaultJobConfiguration.set("partitioner.class", DefaultVenicePartitioner.class.getName());
        return defaultJobConfiguration;
    }

    @Test
    public void testReducerPutWithTooLargeValueAndChunkingDisabled() {
        AbstractVeniceWriter abstractVeniceWriter = (AbstractVeniceWriter) Mockito.mock(AbstractVeniceWriter.class);
        Mockito.when(abstractVeniceWriter.put(Mockito.any(), Mockito.any(), Mockito.anyInt(), (PubSubProducerCallback) Mockito.any(), (PutMetadata) Mockito.any())).thenThrow(new Throwable[]{new RecordTooLargeException("expected exception")});
        testReduceWithTooLargeValueAndChunkingDisabled(abstractVeniceWriter, setupJobConf());
    }

    @Test
    public void testReducerUpdateWithTooLargeValueAndChunkingDisabled() {
        AbstractVeniceWriter abstractVeniceWriter = (AbstractVeniceWriter) Mockito.mock(AbstractVeniceWriter.class);
        Mockito.when(abstractVeniceWriter.update(Mockito.any(), Mockito.any(), Mockito.anyInt(), Mockito.anyInt(), (PubSubProducerCallback) Mockito.any())).thenThrow(new Throwable[]{new RecordTooLargeException("expected exception")});
        JobConf jobConf = setupJobConf();
        jobConf.setInt("derived.schema.id", TASK_ID);
        jobConf.setBoolean("venice.write.compute.enable", true);
        testReduceWithTooLargeValueAndChunkingDisabled(abstractVeniceWriter, jobConf);
    }

    private void testReduceWithTooLargeValueAndChunkingDisabled(AbstractVeniceWriter abstractVeniceWriter, JobConf jobConf) {
        VeniceReducer veniceReducer = new VeniceReducer();
        veniceReducer.setVeniceWriter(abstractVeniceWriter);
        veniceReducer.configure(jobConf);
        BytesWritable bytesWritable = new BytesWritable("test_key".getBytes());
        List singletonList = Collections.singletonList(new BytesWritable("test_value".getBytes()));
        OutputCollector outputCollector = (OutputCollector) Mockito.mock(OutputCollector.class);
        Reporter createZeroCountReporterMock = createZeroCountReporterMock();
        veniceReducer.reduce(bytesWritable, singletonList.iterator(), outputCollector, createZeroCountReporterMock);
        ((Reporter) Mockito.verify(createZeroCountReporterMock)).incrCounter(MRJobCounterHelper.RECORD_TOO_LARGE_FAILURE_GROUP_COUNTER_NAME.getGroupName(), MRJobCounterHelper.RECORD_TOO_LARGE_FAILURE_GROUP_COUNTER_NAME.getCounterName(), 1L);
    }

    @Test
    public void testReduce() {
        AbstractVeniceWriter abstractVeniceWriter = (AbstractVeniceWriter) Mockito.mock(AbstractVeniceWriter.class);
        VeniceReducer veniceReducer = new VeniceReducer();
        veniceReducer.setVeniceWriter(abstractVeniceWriter);
        veniceReducer.configure(setupJobConf());
        BytesWritable bytesWritable = new BytesWritable("test_key".getBytes());
        List singletonList = Collections.singletonList(new BytesWritable("test_value".getBytes()));
        OutputCollector outputCollector = (OutputCollector) Mockito.mock(OutputCollector.class);
        Reporter createZeroCountReporterMock = createZeroCountReporterMock();
        veniceReducer.reduce(bytesWritable, singletonList.iterator(), outputCollector, createZeroCountReporterMock);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(byte[].class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(byte[].class);
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(Integer.class);
        ArgumentCaptor forClass4 = ArgumentCaptor.forClass(PutMetadata.class);
        ArgumentCaptor forClass5 = ArgumentCaptor.forClass(VeniceReducer.ReducerProduceCallback.class);
        ((AbstractVeniceWriter) Mockito.verify(abstractVeniceWriter)).put(forClass.capture(), forClass2.capture(), ((Integer) forClass3.capture()).intValue(), (PubSubProducerCallback) forClass5.capture(), (PutMetadata) forClass4.capture());
        Assert.assertEquals((byte[]) forClass.getValue(), "test_key".getBytes());
        Assert.assertEquals((byte[]) forClass2.getValue(), "test_value".getBytes());
        Assert.assertEquals(((Integer) forClass3.getValue()).intValue(), 1);
        Assert.assertEquals(((VeniceReducer.ReducerProduceCallback) forClass5.getValue()).getProgressable(), createZeroCountReporterMock);
        ((Reporter) Mockito.verify(createZeroCountReporterMock)).incrCounter(MRJobCounterHelper.OUTPUT_RECORD_COUNT_GROUP_COUNTER_NAME.getGroupName(), MRJobCounterHelper.OUTPUT_RECORD_COUNT_GROUP_COUNTER_NAME.getCounterName(), 1L);
    }

    @Test(expectedExceptions = {VeniceException.class})
    public void testReduceWithNoValue() {
        AbstractVeniceWriter abstractVeniceWriter = (AbstractVeniceWriter) Mockito.mock(AbstractVeniceWriter.class);
        VeniceReducer veniceReducer = new VeniceReducer();
        veniceReducer.setVeniceWriter(abstractVeniceWriter);
        veniceReducer.configure(setupJobConf());
        BytesWritable bytesWritable = new BytesWritable("test_key".getBytes());
        ArrayList arrayList = new ArrayList();
        veniceReducer.reduce(bytesWritable, arrayList.iterator(), (OutputCollector) Mockito.mock(OutputCollector.class), createZeroCountReporterMock());
    }

    @Test
    public void testReduceWithMultipleSameValues() {
        Reporter createZeroCountReporterMock = createZeroCountReporterMock();
        testDuplicateKey(true, createZeroCountReporterMock);
        ((Reporter) Mockito.verify(createZeroCountReporterMock, Mockito.never())).incrCounter((String) Mockito.eq(MRJobCounterHelper.DUP_KEY_WITH_DISTINCT_VALUE_GROUP_COUNTER_NAME.getGroupName()), (String) Mockito.eq(MRJobCounterHelper.DUP_KEY_WITH_DISTINCT_VALUE_GROUP_COUNTER_NAME.getCounterName()), Mockito.anyLong());
        ((Reporter) Mockito.verify(createZeroCountReporterMock, Mockito.times(1))).incrCounter((String) Mockito.eq(MRJobCounterHelper.DUP_KEY_WITH_IDENTICAL_VALUE_GROUP_COUNTER_NAME.getGroupName()), (String) Mockito.eq(MRJobCounterHelper.DUP_KEY_WITH_IDENTICAL_VALUE_GROUP_COUNTER_NAME.getCounterName()), Mockito.eq(1L));
    }

    @Test
    public void testReduceWithMultipleDistinctValues() {
        Reporter createZeroCountReporterMock = createZeroCountReporterMock();
        testDuplicateKey(false, createZeroCountReporterMock);
        ((Reporter) Mockito.verify(createZeroCountReporterMock, Mockito.times(1))).incrCounter((String) Mockito.eq(MRJobCounterHelper.DUP_KEY_WITH_DISTINCT_VALUE_GROUP_COUNTER_NAME.getGroupName()), (String) Mockito.eq(MRJobCounterHelper.DUP_KEY_WITH_DISTINCT_VALUE_GROUP_COUNTER_NAME.getCounterName()), Mockito.eq(1L));
        ((Reporter) Mockito.verify(createZeroCountReporterMock, Mockito.never())).incrCounter((String) Mockito.eq(MRJobCounterHelper.DUP_KEY_WITH_IDENTICAL_VALUE_GROUP_COUNTER_NAME.getGroupName()), (String) Mockito.eq(MRJobCounterHelper.DUP_KEY_WITH_IDENTICAL_VALUE_GROUP_COUNTER_NAME.getCounterName()), Mockito.anyLong());
    }

    private void testDuplicateKey(boolean z, Reporter reporter) {
        AbstractVeniceWriter abstractVeniceWriter = (AbstractVeniceWriter) Mockito.mock(AbstractVeniceWriter.class);
        VeniceReducer veniceReducer = new VeniceReducer();
        veniceReducer.setVeniceWriter(abstractVeniceWriter);
        veniceReducer.configure(setupJobConf());
        BytesWritable bytesWritable = new BytesWritable(new VeniceAvroKafkaSerializer("\"string\"").serialize("test_topic", "test_key"));
        ArrayList arrayList = new ArrayList();
        arrayList.add(new BytesWritable("test_value".getBytes()));
        arrayList.add(z ? new BytesWritable("test_value".getBytes()) : new BytesWritable("test_value1".getBytes()));
        veniceReducer.reduce(bytesWritable, arrayList.iterator(), (OutputCollector) Mockito.mock(OutputCollector.class), reporter);
    }

    @Test
    public void testReducerConfigWithUnlimitedStorageQuota() {
        VeniceReducer veniceReducer = new VeniceReducer();
        Configuration defaultJobConfiguration = getDefaultJobConfiguration();
        defaultJobConfiguration.setLong("storage.quota", -1L);
        veniceReducer.configure(new JobConf(defaultJobConfiguration));
        Assert.assertFalse(veniceReducer.getExceedQuotaFlag());
    }

    @Test
    public void testReducerDetectExceededQuotaInConfig() throws IOException {
        reducerDetectExceededQuotaInConfig(1024L, 1024L, 2048L);
        reducerDetectExceededQuotaInConfig(1024L, 512L, 2048L);
        reducerDetectExceededQuotaInConfig(1024L, 1024L, 1024L);
    }

    private void reducerDetectExceededQuotaInConfig(long j, long j2, long j3) throws IOException {
        JobClient jobClient = (JobClient) Mockito.mock(JobClient.class);
        Counters counters = (Counters) Mockito.mock(Counters.class);
        Counters.Group group = (Counters.Group) Mockito.mock(Counters.Group.class);
        Mockito.when(counters.getGroup(MRJobCounterHelper.TOTAL_KEY_SIZE_GROUP_COUNTER_NAME.getGroupName())).thenReturn(group);
        Mockito.when(Long.valueOf(group.getCounter(MRJobCounterHelper.TOTAL_KEY_SIZE_GROUP_COUNTER_NAME.getCounterName()))).thenReturn(Long.valueOf(j));
        Mockito.when(Long.valueOf(group.getCounter(MRJobCounterHelper.TOTAL_VALUE_SIZE_GROUP_COUNTER_NAME.getCounterName()))).thenReturn(Long.valueOf(j2));
        RunningJob runningJob = (RunningJob) Mockito.mock(RunningJob.class);
        Mockito.when(runningJob.getCounters()).thenReturn(counters);
        Mockito.when(jobClient.getJob((JobID) Mockito.any(JobID.class))).thenReturn(runningJob);
        HadoopJobClientProvider hadoopJobClientProvider = (HadoopJobClientProvider) Mockito.mock(HadoopJobClientProvider.class);
        Mockito.when(hadoopJobClientProvider.getJobClientFromConfig((JobConf) Mockito.any())).thenReturn(jobClient);
        Configuration defaultJobConfiguration = getDefaultJobConfiguration();
        defaultJobConfiguration.setLong("storage.quota", j3);
        defaultJobConfiguration.setBoolean("venice.writer.chunking.enabled", false);
        VeniceReducer veniceReducer = new VeniceReducer();
        veniceReducer.setHadoopJobClientProvider(hadoopJobClientProvider);
        veniceReducer.configure(new JobConf(defaultJobConfiguration));
        if (j + j2 > j3) {
            Assert.assertTrue(veniceReducer.getExceedQuotaFlag());
        } else {
            Assert.assertFalse(veniceReducer.getExceedQuotaFlag());
        }
    }

    @Test
    public void testReducerAllowDuplicatedKeys() {
        Reporter reporter = (Reporter) Mockito.mock(Reporter.class);
        Counters.Counter counter = (Counters.Counter) Mockito.mock(Counters.Counter.class);
        Mockito.when(Long.valueOf(counter.getCounter())).thenReturn(0L);
        Counters.Counter counter2 = (Counters.Counter) Mockito.mock(Counters.Counter.class);
        Mockito.when(Long.valueOf(counter2.getCounter())).thenReturn(1L);
        Mockito.when(reporter.getCounter(MRJobCounterHelper.WRITE_ACL_FAILURE_GROUP_COUNTER_NAME.getGroupName(), MRJobCounterHelper.WRITE_ACL_FAILURE_GROUP_COUNTER_NAME.getCounterName())).thenReturn(counter);
        Mockito.when(reporter.getCounter(MRJobCounterHelper.TOTAL_KEY_SIZE_GROUP_COUNTER_NAME.getGroupName(), MRJobCounterHelper.TOTAL_KEY_SIZE_GROUP_COUNTER_NAME.getCounterName())).thenReturn(counter);
        Mockito.when(reporter.getCounter(MRJobCounterHelper.TOTAL_VALUE_SIZE_GROUP_COUNTER_NAME.getGroupName(), MRJobCounterHelper.TOTAL_VALUE_SIZE_GROUP_COUNTER_NAME.getCounterName())).thenReturn(counter);
        Mockito.when(reporter.getCounter(MRJobCounterHelper.RECORD_TOO_LARGE_FAILURE_GROUP_COUNTER_NAME.getGroupName(), MRJobCounterHelper.RECORD_TOO_LARGE_FAILURE_GROUP_COUNTER_NAME.getCounterName())).thenReturn(counter);
        Mockito.when(reporter.getCounter(MRJobCounterHelper.DUP_KEY_WITH_DISTINCT_VALUE_GROUP_COUNTER_NAME.getGroupName(), MRJobCounterHelper.DUP_KEY_WITH_DISTINCT_VALUE_GROUP_COUNTER_NAME.getCounterName())).thenReturn(counter2);
        AbstractVeniceWriter abstractVeniceWriter = (AbstractVeniceWriter) Mockito.mock(AbstractVeniceWriter.class);
        VeniceReducer veniceReducer = new VeniceReducer();
        veniceReducer.setVeniceWriter(abstractVeniceWriter);
        Configuration defaultJobConfiguration = getDefaultJobConfiguration();
        defaultJobConfiguration.set("allow.duplicate.key", String.valueOf(true));
        veniceReducer.configure(new JobConf(defaultJobConfiguration));
        veniceReducer.reduce(new BytesWritable(new VeniceAvroKafkaSerializer("\"string\"").serialize("test_topic", "test_key")), Arrays.asList(new BytesWritable("test_value_0".getBytes()), new BytesWritable("test_value_1".getBytes())).iterator(), (OutputCollector) Mockito.mock(OutputCollector.class), reporter);
        ((AbstractVeniceWriter) Mockito.verify(abstractVeniceWriter)).put(Mockito.any(), Mockito.any(), Mockito.anyInt(), (PubSubProducerCallback) Mockito.any(), (PutMetadata) Mockito.any());
        Assert.assertFalse(veniceReducer.hasReportedFailure(reporter, true));
    }

    @Test
    public void testReduceWithTopicAuthorizationException() throws IOException {
        BytesWritable bytesWritable = new BytesWritable(new VeniceAvroKafkaSerializer("\"string\"").serialize("test_topic", "test_key"));
        ArrayList arrayList = new ArrayList();
        arrayList.add(new BytesWritable("test_value".getBytes()));
        Reporter createZeroCountReporterMock = createZeroCountReporterMock();
        OutputCollector outputCollector = (OutputCollector) Mockito.mock(OutputCollector.class);
        AbstractVeniceWriter abstractVeniceWriter = (AbstractVeniceWriter) Mockito.mock(AbstractVeniceWriter.class);
        Mockito.when(abstractVeniceWriter.put(Mockito.any(), Mockito.any(), Mockito.anyInt(), (PubSubProducerCallback) Mockito.any(), (PutMetadata) Mockito.any())).thenThrow(new Throwable[]{new TopicAuthorizationVeniceException("No ACL permission")});
        VeniceReducer veniceReducer = new VeniceReducer();
        veniceReducer.setVeniceWriter(abstractVeniceWriter);
        veniceReducer.configure(setupJobConf());
        veniceReducer.reduce(bytesWritable, arrayList.iterator(), outputCollector, createZeroCountReporterMock);
        ((Reporter) Mockito.verify(createZeroCountReporterMock, Mockito.times(1))).incrCounter((String) Mockito.eq(MRJobCounterHelper.WRITE_ACL_FAILURE_GROUP_COUNTER_NAME.getGroupName()), (String) Mockito.eq(MRJobCounterHelper.WRITE_ACL_FAILURE_GROUP_COUNTER_NAME.getCounterName()), Mockito.eq(1L));
        ((OutputCollector) Mockito.verify(outputCollector, Mockito.never())).collect(Mockito.any(), Mockito.any());
    }

    @Test
    public void testCloseReducerAfterReduce() throws IOException {
        BytesWritable bytesWritable = new BytesWritable(new VeniceAvroKafkaSerializer("\"string\"").serialize("test_topic", "test_key"));
        ArrayList arrayList = new ArrayList();
        arrayList.add(new BytesWritable("test_value".getBytes()));
        Reporter createZeroCountReporterMock = createZeroCountReporterMock();
        OutputCollector outputCollector = (OutputCollector) Mockito.mock(OutputCollector.class);
        AbstractVeniceWriter abstractVeniceWriter = (AbstractVeniceWriter) Mockito.mock(AbstractVeniceWriter.class);
        VeniceReducer veniceReducer = new VeniceReducer();
        veniceReducer.setVeniceWriter(abstractVeniceWriter);
        veniceReducer.configure(setupJobConf());
        veniceReducer.reduce(bytesWritable, arrayList.iterator(), outputCollector, createZeroCountReporterMock);
        veniceReducer.callback.onCompletion(new SimplePubSubProduceResultImpl("topic-name", TASK_ID, 1L, 1), (Exception) null);
        veniceReducer.close();
        ((Reporter) Mockito.verify(createZeroCountReporterMock, Mockito.times(1))).incrCounter((String) Mockito.eq(MRJobCounterHelper.REDUCER_CLOSED_COUNT_GROUP_COUNTER_NAME.getGroupName()), (String) Mockito.eq(MRJobCounterHelper.REDUCER_CLOSED_COUNT_GROUP_COUNTER_NAME.getCounterName()), Mockito.eq(1L));
    }

    @Test
    public void testCloseReducerWithNoReduce() throws IOException {
        VeniceReducer veniceReducer = new VeniceReducer();
        veniceReducer.configure(setupJobConf());
        veniceReducer.close();
    }

    @Test
    public void testReduceWithExceedQuotaStillIncreaseCloseCounter() throws IOException {
        OutputCollector outputCollector = (OutputCollector) Mockito.mock(OutputCollector.class);
        Reporter createZeroCountReporterMock = createZeroCountReporterMock();
        VeniceReducer veniceReducer = new VeniceReducer();
        veniceReducer.setExceedQuota(true);
        veniceReducer.reduce(new BytesWritable("test_key".getBytes()), Collections.singleton(new BytesWritable("test_value".getBytes())).iterator(), outputCollector, createZeroCountReporterMock);
        veniceReducer.close();
        ((Reporter) Mockito.verify(createZeroCountReporterMock, Mockito.times(1))).incrCounter((String) Mockito.eq(MRJobCounterHelper.REDUCER_CLOSED_COUNT_GROUP_COUNTER_NAME.getGroupName()), (String) Mockito.eq(MRJobCounterHelper.REDUCER_CLOSED_COUNT_GROUP_COUNTER_NAME.getCounterName()), Mockito.anyLong());
    }

    @Test
    public void testReducerDetectErrorInReportCounter() throws IOException {
        BytesWritable bytesWritable = new BytesWritable(new VeniceAvroKafkaSerializer("\"string\"").serialize("test_topic", "test_key"));
        ArrayList arrayList = new ArrayList();
        arrayList.add(new BytesWritable("test_value".getBytes()));
        Reporter reporter = (Reporter) Mockito.mock(Reporter.class);
        Counters.Counter counter = (Counters.Counter) Mockito.mock(Counters.Counter.class);
        Mockito.when(Long.valueOf(counter.getCounter())).thenReturn(1L);
        Mockito.when(reporter.getCounter(Mockito.anyString(), Mockito.anyString())).thenReturn(counter);
        OutputCollector outputCollector = (OutputCollector) Mockito.mock(OutputCollector.class);
        AbstractVeniceWriter abstractVeniceWriter = (AbstractVeniceWriter) Mockito.mock(AbstractVeniceWriter.class);
        VeniceReducer veniceReducer = new VeniceReducer();
        veniceReducer.setVeniceWriter(abstractVeniceWriter);
        veniceReducer.configure(setupJobConf());
        veniceReducer.reduce(bytesWritable, arrayList.iterator(), outputCollector, reporter);
        ((AbstractVeniceWriter) Mockito.verify(abstractVeniceWriter, Mockito.never())).put(Mockito.any(), Mockito.any(), Mockito.anyInt(), (PubSubProducerCallback) Mockito.any());
        ((OutputCollector) Mockito.verify(outputCollector, Mockito.never())).collect(Mockito.any(), Mockito.any());
    }

    @Test
    public void testReduceWithDifferentReporters() {
        AbstractVeniceWriter abstractVeniceWriter = (AbstractVeniceWriter) Mockito.mock(AbstractVeniceWriter.class);
        VeniceReducer veniceReducer = new VeniceReducer();
        veniceReducer.setVeniceWriter(abstractVeniceWriter);
        veniceReducer.configure(setupJobConf());
        BytesWritable bytesWritable = new BytesWritable("test_key".getBytes());
        BytesWritable bytesWritable2 = new BytesWritable("test_value".getBytes());
        ArrayList arrayList = new ArrayList();
        arrayList.add(bytesWritable2);
        OutputCollector outputCollector = (OutputCollector) Mockito.mock(OutputCollector.class);
        Reporter createZeroCountReporterMock = createZeroCountReporterMock();
        veniceReducer.reduce(bytesWritable, arrayList.iterator(), outputCollector, createZeroCountReporterMock);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(VeniceReducer.ReducerProduceCallback.class);
        ((AbstractVeniceWriter) Mockito.verify(abstractVeniceWriter)).put(Mockito.any(), Mockito.any(), Mockito.anyInt(), (PubSubProducerCallback) forClass.capture(), (PutMetadata) Mockito.any());
        Assert.assertEquals(((VeniceReducer.ReducerProduceCallback) forClass.getValue()).getProgressable(), createZeroCountReporterMock);
        Reporter createZeroCountReporterMock2 = createZeroCountReporterMock();
        veniceReducer.reduce(bytesWritable, arrayList.iterator(), outputCollector, createZeroCountReporterMock2);
        ((AbstractVeniceWriter) Mockito.verify(abstractVeniceWriter, Mockito.times(TASK_ID))).put(Mockito.any(), Mockito.any(), Mockito.anyInt(), (PubSubProducerCallback) forClass.capture(), (PutMetadata) Mockito.any());
        Assert.assertEquals(((VeniceReducer.ReducerProduceCallback) forClass.getValue()).getProgressable(), createZeroCountReporterMock2);
    }

    @Test
    public void testReduceWithWriterException() {
        AbstractVeniceWriter abstractVeniceWriter = new AbstractVeniceWriter("test_store_v1") { // from class: com.linkedin.venice.hadoop.TestVeniceReducer.1
            public void close(boolean z) {
            }

            public Future<PubSubProduceResult> put(Object obj, Object obj2, int i, PubSubProducerCallback pubSubProducerCallback) {
                pubSubProducerCallback.onCompletion((PubSubProduceResult) null, new VeniceException("Fake exception"));
                return null;
            }

            public Future<PubSubProduceResult> put(Object obj, Object obj2, int i, PubSubProducerCallback pubSubProducerCallback, PutMetadata putMetadata) {
                pubSubProducerCallback.onCompletion((PubSubProduceResult) null, new VeniceException("Fake exception"));
                return null;
            }

            public Future<PubSubProduceResult> delete(Object obj, PubSubProducerCallback pubSubProducerCallback, DeleteMetadata deleteMetadata) {
                return null;
            }

            public Future<PubSubProduceResult> update(Object obj, Object obj2, int i, int i2, PubSubProducerCallback pubSubProducerCallback) {
                return null;
            }

            public void flush() {
            }

            public void close() {
            }
        };
        VeniceReducer veniceReducer = new VeniceReducer();
        veniceReducer.setVeniceWriter(abstractVeniceWriter);
        veniceReducer.configure(setupJobConf());
        BytesWritable bytesWritable = new BytesWritable("test_key".getBytes());
        BytesWritable bytesWritable2 = new BytesWritable("test_value".getBytes());
        ArrayList arrayList = new ArrayList();
        arrayList.add(bytesWritable2);
        OutputCollector outputCollector = (OutputCollector) Mockito.mock(OutputCollector.class);
        Reporter createZeroCountReporterMock = createZeroCountReporterMock();
        veniceReducer.reduce(bytesWritable, arrayList.iterator(), outputCollector, createZeroCountReporterMock);
        Assert.assertThrows(VeniceException.class, () -> {
            veniceReducer.reduce(bytesWritable, arrayList.iterator(), outputCollector, createZeroCountReporterMock);
        });
    }

    @Test
    public void testClosingReducerWithWriterException() throws IOException {
        AbstractVeniceWriter abstractVeniceWriter = new AbstractVeniceWriter("test_store_v1") { // from class: com.linkedin.venice.hadoop.TestVeniceReducer.2
            public Future<PubSubProduceResult> put(Object obj, Object obj2, int i, PubSubProducerCallback pubSubProducerCallback, PutMetadata putMetadata) {
                pubSubProducerCallback.onCompletion((PubSubProduceResult) null, new VeniceException("Some writer exception"));
                return null;
            }

            public Future<PubSubProduceResult> delete(Object obj, PubSubProducerCallback pubSubProducerCallback, DeleteMetadata deleteMetadata) {
                return null;
            }

            public Future<PubSubProduceResult> put(Object obj, Object obj2, int i, PubSubProducerCallback pubSubProducerCallback) {
                pubSubProducerCallback.onCompletion((PubSubProduceResult) null, new VeniceException("Some writer exception"));
                return null;
            }

            public Future<PubSubProduceResult> update(Object obj, Object obj2, int i, int i2, PubSubProducerCallback pubSubProducerCallback) {
                return null;
            }

            public void flush() {
            }

            public void close(boolean z) {
                Assert.assertFalse(z, "A writer exception is thrown, should not close all segments");
            }

            public void close() throws IOException {
            }
        };
        VeniceReducer veniceReducer = new VeniceReducer();
        veniceReducer.setVeniceWriter(abstractVeniceWriter);
        veniceReducer.configure(setupJobConf());
        BytesWritable bytesWritable = new BytesWritable("test_key".getBytes());
        BytesWritable bytesWritable2 = new BytesWritable("test_value".getBytes());
        ArrayList arrayList = new ArrayList();
        arrayList.add(bytesWritable2);
        veniceReducer.reduce(bytesWritable, arrayList.iterator(), (OutputCollector) Mockito.mock(OutputCollector.class), createZeroCountReporterMock());
        Assert.assertThrows(VeniceException.class, () -> {
            veniceReducer.close();
        });
    }

    private Reporter createZeroCountReporterMock() {
        Reporter reporter = (Reporter) Mockito.mock(Reporter.class);
        Counters.Counter counter = (Counters.Counter) Mockito.mock(Counters.Counter.class);
        Mockito.when(Long.valueOf(counter.getCounter())).thenReturn(0L);
        Mockito.when(reporter.getCounter(Mockito.anyString(), Mockito.anyString())).thenReturn(counter);
        return reporter;
    }
}
