package com.linkedin.venice.hadoop;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.venice.exceptions.RecordTooLargeException;
import com.linkedin.venice.exceptions.TopicAuthorizationVeniceException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.guid.GuidUtils;
import com.linkedin.venice.hadoop.utils.HadoopUtils;
import com.linkedin.venice.partitioner.VenicePartitioner;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import com.linkedin.venice.pubsub.api.PubSubProducerCallback;
import com.linkedin.venice.serialization.DefaultSerializer;
import com.linkedin.venice.serialization.VeniceKafkaSerializer;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.PartitionUtils;
import com.linkedin.venice.utils.SystemTime;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.writer.AbstractVeniceWriter;
import com.linkedin.venice.writer.DeleteMetadata;
import com.linkedin.venice.writer.PutMetadata;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.JsonEncoder;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.util.Progressable;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/hadoop/VeniceReducer.class */
public class VeniceReducer extends AbstractMapReduceTask implements Reducer<BytesWritable, BytesWritable, BytesWritable, BytesWritable> {
    public static final String MAP_REDUCE_JOB_ID_PROP = "mapred.job.id";
    private static final Logger LOGGER = LogManager.getLogger(VeniceReducer.class);
    private VeniceProperties props;
    private JobID mapReduceJobId;
    private long telemetryMessageInterval;
    private DuplicateKeyPrinter duplicateKeyPrinter;
    private InputStorageQuotaTracker inputStorageQuotaTracker;
    private long lastTimeThroughputWasLoggedInNS = System.nanoTime();
    private long lastMessageCompletedCount = 0;
    private AbstractVeniceWriter<byte[], byte[], byte[]> veniceWriter = null;
    private int valueSchemaId = -1;
    private int derivedValueSchemaId = -1;
    private boolean enableWriteCompute = false;
    private final Set<Integer> partitionSet = ConcurrentHashMap.newKeySet();
    private Exception sendException = null;
    protected ReducerProduceCallback callback = null;
    private Reporter previousReporter = null;
    private long messageSent = 0;
    private final AtomicLong messageCompleted = new AtomicLong();
    private final AtomicLong messageErrored = new AtomicLong();
    private long timeOfLastReduceFunctionEndInNS = 0;
    private long aggregateTimeOfReduceExecutionInNS = 0;
    private long aggregateTimeOfInBetweenReduceInvocationsInNS = 0;
    private boolean exceedQuota = false;
    private boolean hasWriteAclFailure = false;
    private boolean hasDuplicateKeyWithDistinctValue = false;
    private boolean hasRecordTooLargeFailure = false;
    private HadoopJobClientProvider hadoopJobClientProvider = new DefaultHadoopJobClientProvider();
    private boolean isDuplicateKeyAllowed = false;
    private final ScheduledExecutorService reducerProgressHeartbeatScheduler = Executors.newScheduledThreadPool(1);

    /* loaded from: input_file:com/linkedin/venice/hadoop/VeniceReducer$DuplicateKeyPrinter.class */
    public static class DuplicateKeyPrinter implements AutoCloseable, Closeable {
        private static final int MAX_NUM_OF_LOG = 10;
        private final boolean isDupKeyAllowed;
        private final String topic;
        private final Schema keySchema;
        private final AbstractVeniceRecordReader<?, ?> recordReader;
        private final VeniceKafkaSerializer<?> keySerializer;
        private final GenericDatumWriter<Object> avroDatumWriter;
        private int numOfDupKey = 0;

        DuplicateKeyPrinter(JobConf jobConf) {
            this.topic = jobConf.get(VenicePushJob.TOPIC_PROP);
            this.isDupKeyAllowed = jobConf.getBoolean(VenicePushJob.ALLOW_DUPLICATE_KEY, false);
            VeniceProperties veniceProps = HadoopUtils.getVeniceProps(jobConf);
            this.recordReader = jobConf.getBoolean(VenicePushJob.VSON_PUSH, false) ? new VeniceVsonRecordReader(veniceProps) : new VeniceAvroRecordReader(veniceProps);
            this.keySchema = Schema.parse(this.recordReader.getKeySchemaStr());
            if (this.recordReader.getKeySerializer() == null) {
                throw new VeniceException("key serializer can not be null.");
            }
            this.keySerializer = this.recordReader.getKeySerializer();
            this.avroDatumWriter = new GenericDatumWriter<>(this.keySchema);
        }

        protected void detectAndHandleDuplicateKeys(byte[] bArr, byte[] bArr2, Iterator<BytesWritable> it, Reporter reporter) {
            if (this.numOfDupKey > MAX_NUM_OF_LOG) {
                return;
            }
            boolean z = true;
            int i = 0;
            int i2 = 0;
            while (it.hasNext()) {
                if (Arrays.equals(it.next().copyBytes(), bArr2)) {
                    i2++;
                    if (z) {
                        z = false;
                        VeniceReducer.LOGGER.warn(printDuplicateKey(bArr));
                    }
                } else {
                    i++;
                    if (this.isDupKeyAllowed && z) {
                        z = false;
                        VeniceReducer.LOGGER.warn(printDuplicateKey(bArr));
                    }
                }
            }
            MRJobCounterHelper.incrDuplicateKeyWithIdenticalValue(reporter, i2);
            MRJobCounterHelper.incrDuplicateKeyWithDistinctValue(reporter, i);
        }

        private String printDuplicateKey(byte[] bArr) {
            Object deserialize = this.keySerializer.deserialize(this.topic, bArr);
            try {
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                try {
                    JsonEncoder newJsonEncoder = AvroCompatibilityHelper.newJsonEncoder(this.keySchema, byteArrayOutputStream, false);
                    this.avroDatumWriter.write(deserialize, newJsonEncoder);
                    newJsonEncoder.flush();
                    byteArrayOutputStream.flush();
                    this.numOfDupKey++;
                    String format = String.format("There are multiple records for key:\n%s", new String(byteArrayOutputStream.toByteArray()));
                    byteArrayOutputStream.close();
                    return format;
                } finally {
                }
            } catch (IOException e) {
                throw new VeniceException(e);
            }
        }

        @Override // java.lang.AutoCloseable, java.io.Closeable
        public void close() {
            Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.recordReader});
            Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.keySerializer});
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:com/linkedin/venice/hadoop/VeniceReducer$ReducerProduceCallback.class */
    public class ReducerProduceCallback implements PubSubProducerCallback {
        private final Reporter reporter;

        public ReducerProduceCallback(Reporter reporter) {
            this.reporter = reporter;
        }

        public void onCompletion(PubSubProduceResult pubSubProduceResult, Exception exc) {
            if (exc != null) {
                VeniceReducer.this.messageErrored.incrementAndGet();
                VeniceReducer.LOGGER.error("Exception thrown in send message callback. ", exc);
                VeniceReducer.this.sendException = exc;
            } else {
                VeniceReducer.this.messageCompleted.incrementAndGet();
                int partition = pubSubProduceResult.getPartition();
                VeniceReducer.this.partitionSet.add(Integer.valueOf(partition));
                if (partition != VeniceReducer.this.getTaskId()) {
                    VeniceReducer.this.messageErrored.incrementAndGet();
                    VeniceReducer.this.sendException = new VeniceException(String.format("The reducer is not writing to the Kafka partition that maps to its task (taskId = %d, partition = %d). This could mean that MR shuffling is buggy or that the configured %s (%s) is non-deterministic.", Integer.valueOf(VeniceReducer.this.getTaskId()), Integer.valueOf(partition), VenicePartitioner.class.getSimpleName(), VeniceReducer.this.props.getString("partitioner.class")));
                }
            }
            this.reporter.progress();
        }

        protected Progressable getProgressable() {
            return this.reporter;
        }
    }

    /* loaded from: input_file:com/linkedin/venice/hadoop/VeniceReducer$VeniceWriterMessage.class */
    public static class VeniceWriterMessage {
        private final byte[] keyBytes;
        private final byte[] valueBytes;
        private final int valueSchemaId;
        private final int rmdVersionId;
        private final ByteBuffer rmdPayload;
        private final Consumer<AbstractVeniceWriter<byte[], byte[], byte[]>> consumer;

        public VeniceWriterMessage(byte[] bArr, byte[] bArr2, int i, PubSubProducerCallback pubSubProducerCallback, boolean z, int i2) {
            this(bArr, bArr2, i, -1, null, pubSubProducerCallback, z, i2);
        }

        public VeniceWriterMessage(byte[] bArr, byte[] bArr2, int i, int i2, ByteBuffer byteBuffer, PubSubProducerCallback pubSubProducerCallback, boolean z, int i3) {
            this.keyBytes = bArr;
            this.valueBytes = bArr2;
            this.valueSchemaId = i;
            this.rmdPayload = byteBuffer;
            this.rmdVersionId = i2;
            this.consumer = abstractVeniceWriter -> {
                if (byteBuffer == null) {
                    if (!z || i3 <= 0) {
                        abstractVeniceWriter.put(bArr, bArr2, i, pubSubProducerCallback, (PutMetadata) null);
                        return;
                    } else {
                        abstractVeniceWriter.update(bArr, bArr2, i, i3, pubSubProducerCallback);
                        return;
                    }
                }
                if (byteBuffer.remaining() == 0) {
                    throw new VeniceException("Found empty replication metadata");
                }
                if (bArr2 == null) {
                    abstractVeniceWriter.delete(bArr, pubSubProducerCallback, new DeleteMetadata(i, i2, byteBuffer));
                } else {
                    abstractVeniceWriter.put(bArr, bArr2, i, pubSubProducerCallback, new PutMetadata(i2, byteBuffer));
                }
            };
        }

        public Consumer<AbstractVeniceWriter<byte[], byte[], byte[]>> getConsumer() {
            return this.consumer;
        }

        public ByteBuffer getRmdPayload() {
            return this.rmdPayload;
        }

        public int getRmdVersionId() {
            return this.rmdVersionId;
        }

        public byte[] getKeyBytes() {
            return this.keyBytes;
        }

        public byte[] getValueBytes() {
            return this.valueBytes;
        }

        public int getValueSchemaId() {
            return this.valueSchemaId;
        }
    }

    public void reduce(BytesWritable bytesWritable, Iterator<BytesWritable> it, OutputCollector<BytesWritable, BytesWritable> outputCollector, Reporter reporter) {
        VeniceWriterMessage extract;
        if (updatePreviousReporter(reporter)) {
            this.callback = new ReducerProduceCallback(reporter);
        }
        long nanoTime = System.nanoTime();
        if (this.timeOfLastReduceFunctionEndInNS > 0) {
            this.aggregateTimeOfInBetweenReduceInvocationsInNS += nanoTime - this.timeOfLastReduceFunctionEndInNS;
        }
        if (bytesWritable.getLength() > 0 && !hasReportedFailure(reporter, this.isDuplicateKeyAllowed) && (extract = extract(bytesWritable, it, reporter)) != null) {
            try {
                sendMessageToKafka(reporter, extract.getConsumer());
            } catch (VeniceException e) {
                if (e instanceof TopicAuthorizationVeniceException) {
                    MRJobCounterHelper.incrWriteAclAuthorizationFailureCount(reporter, 1L);
                    LOGGER.error(e);
                    return;
                } else {
                    if (!(e instanceof RecordTooLargeException)) {
                        throw e;
                    }
                    MRJobCounterHelper.incrRecordTooLargeFailureCount(reporter, 1L);
                    LOGGER.error(e);
                    return;
                }
            }
        }
        updateExecutionTimeStatus(nanoTime);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public PubSubProducerCallback getCallback() {
        return this.callback;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int getDerivedValueSchemaId() {
        return this.derivedValueSchemaId;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isEnableWriteCompute() {
        return this.enableWriteCompute;
    }

    protected VeniceWriterMessage extract(BytesWritable bytesWritable, Iterator<BytesWritable> it, Reporter reporter) {
        byte[] copyBytes = bytesWritable.copyBytes();
        if (!it.hasNext()) {
            throw new VeniceException("There is no value corresponding to key bytes: " + ByteUtils.toHexString(copyBytes));
        }
        byte[] copyBytes2 = it.next().copyBytes();
        if (this.duplicateKeyPrinter == null) {
            throw new VeniceException("'DuplicateKeyPrinter' is not initialized properly");
        }
        this.duplicateKeyPrinter.detectAndHandleDuplicateKeys(copyBytes, copyBytes2, it, reporter);
        return new VeniceWriterMessage(copyBytes, copyBytes2, this.valueSchemaId, getCallback(), isEnableWriteCompute(), getDerivedValueSchemaId());
    }

    protected boolean hasReportedFailure(Reporter reporter, boolean z) {
        return exceedQuota(reporter) || hasWriteAclFailure(reporter) || hasDuplicatedKeyWithDistinctValueFailure(reporter, z) || hasRecordTooLargeFailure(reporter);
    }

    private boolean hasRecordTooLargeFailure(Reporter reporter) {
        if (this.hasRecordTooLargeFailure) {
            return true;
        }
        boolean z = MRJobCounterHelper.getRecordTooLargeFailureCount(reporter) > 0;
        if (z) {
            this.hasRecordTooLargeFailure = true;
        }
        return z;
    }

    private boolean hasDuplicatedKeyWithDistinctValueFailure(Reporter reporter, boolean z) {
        if (z) {
            return false;
        }
        return hasDuplicateKeyWithDistinctValue(reporter);
    }

    private boolean hasWriteAclFailure(Reporter reporter) {
        if (this.hasWriteAclFailure) {
            return true;
        }
        boolean z = MRJobCounterHelper.getWriteAclAuthorizationFailureCount(reporter) > 0;
        if (z) {
            this.hasWriteAclFailure = true;
        }
        return z;
    }

    private boolean hasDuplicateKeyWithDistinctValue(Reporter reporter) {
        if (this.hasDuplicateKeyWithDistinctValue) {
            return true;
        }
        boolean z = MRJobCounterHelper.getDuplicateKeyWithDistinctCount(reporter) > 0;
        if (z) {
            this.hasDuplicateKeyWithDistinctValue = true;
        }
        return z;
    }

    boolean getExceedQuotaFlag() {
        return this.exceedQuota;
    }

    private boolean exceedQuota(Reporter reporter) {
        if (this.exceedQuota) {
            return true;
        }
        if (this.inputStorageQuotaTracker == null) {
            return false;
        }
        boolean exceedQuota = this.inputStorageQuotaTracker.exceedQuota(MRJobCounterHelper.getTotalKeySize(reporter) + MRJobCounterHelper.getTotalValueSize(reporter));
        if (exceedQuota) {
            this.exceedQuota = exceedQuota;
        }
        return exceedQuota;
    }

    private void updateExecutionTimeStatus(long j) {
        this.timeOfLastReduceFunctionEndInNS = System.nanoTime();
        this.aggregateTimeOfReduceExecutionInNS += this.timeOfLastReduceFunctionEndInNS - j;
    }

    protected void sendMessageToKafka(Reporter reporter, Consumer<AbstractVeniceWriter<byte[], byte[], byte[]>> consumer) {
        maybePropagateCallbackException();
        if (this.veniceWriter == null) {
            this.veniceWriter = createBasicVeniceWriter();
        }
        consumer.accept(this.veniceWriter);
        this.messageSent++;
        telemetry();
        MRJobCounterHelper.incrOutputRecordCount(reporter, 1L);
    }

    private boolean updatePreviousReporter(Reporter reporter) {
        if (this.previousReporter != null && this.previousReporter.equals(reporter)) {
            return false;
        }
        this.previousReporter = reporter;
        return true;
    }

    private VeniceWriter<byte[], byte[], byte[]> createBasicVeniceWriter() {
        Properties properties = this.props.toProperties();
        properties.put("guid.generator.implementation", GuidUtils.DETERMINISTIC_GUID_GENERATOR_IMPLEMENTATION);
        properties.put("venice.writer.max.elapsed.time.for.segment.in.ms", -1);
        try {
            properties.put("push.job.map.reduce.jt.id", Long.valueOf(Long.parseLong(this.mapReduceJobId.getJtIdentifier().replaceAll("\\D+", ""))));
        } catch (NumberFormatException e) {
            LOGGER.warn("Unable to parse job tracker id, using default value for guid generation", e);
        }
        properties.put("push.job.map.reduce.job.id", Integer.valueOf(this.mapReduceJobId.getId()));
        VeniceWriterFactory veniceWriterFactory = new VeniceWriterFactory(properties);
        boolean z = this.props.getBoolean("venice.writer.chunking.enabled", false);
        boolean z2 = this.props.getBoolean("venice.writer.replication.metadata.chunking.enabled", false);
        return veniceWriterFactory.createVeniceWriter(new VeniceWriterOptions.Builder(this.props.getString(VenicePushJob.TOPIC_PROP)).setKeySerializer(new DefaultSerializer()).setValueSerializer(new DefaultSerializer()).setWriteComputeSerializer(new DefaultSerializer()).setChunkingEnabled(z).setRmdChunkingEnabled(z2).setTime(SystemTime.INSTANCE).setPartitioner(PartitionUtils.getVenicePartitioner(this.props)).build());
    }

    private void telemetry() {
        if (this.messageSent % this.telemetryMessageInterval == 0) {
            double nanoTime = (System.nanoTime() - this.lastTimeThroughputWasLoggedInNS) / 1.0E9d;
            LOGGER.info("MR Framework records processed: {}, total time spent: {}, current throughput: {} rec/s", Long.valueOf(this.messageSent), Utils.makeTimePretty(this.aggregateTimeOfInBetweenReduceInvocationsInNS), Utils.makeLargeNumberPretty((long) (this.telemetryMessageInterval / nanoTime)));
            long j = this.messageCompleted.get();
            LOGGER.info("Kafka records produced: {}, total time spent: {}, current throughput: {} rec/s", Long.valueOf(j), Utils.makeTimePretty(this.aggregateTimeOfReduceExecutionInNS), Utils.makeLargeNumberPretty((long) ((j - this.lastMessageCompletedCount) / nanoTime)));
            this.lastTimeThroughputWasLoggedInNS = System.nanoTime();
            this.lastMessageCompletedCount = j;
        }
    }

    public void close() throws IOException {
        boolean z;
        try {
            LOGGER.info("Kafka message progress before flushing and closing producer:");
            logMessageProgress();
            if (this.veniceWriter != null) {
                try {
                    this.veniceWriter.flush();
                    if (this.messageErrored.get() == 0 && this.messageSent == this.messageCompleted.get()) {
                        if (this.previousReporter.getProgress() == 1.0d) {
                            z = true;
                            this.veniceWriter.close(z);
                        }
                    }
                    z = false;
                    this.veniceWriter.close(z);
                } catch (Throwable th) {
                    this.veniceWriter.close(false);
                    throw th;
                }
            }
            maybePropagateCallbackException();
            LOGGER.info("Kafka message progress after flushing and closing producer:");
            logMessageProgress();
            if (this.messageSent != this.messageCompleted.get()) {
                throw new VeniceException("Message sent: " + this.messageSent + " doesn't match message completed: " + this.messageCompleted.get());
            }
            Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.duplicateKeyPrinter});
            this.reducerProgressHeartbeatScheduler.shutdownNow();
            if (this.previousReporter == null) {
                LOGGER.warn("No MapReduce reporter set");
            } else {
                MRJobCounterHelper.incrReducerClosedCount(this.previousReporter, 1L);
            }
        } catch (Throwable th2) {
            Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.duplicateKeyPrinter});
            this.reducerProgressHeartbeatScheduler.shutdownNow();
            throw th2;
        }
    }

    protected DuplicateKeyPrinter initDuplicateKeyPrinter(JobConf jobConf) {
        return new DuplicateKeyPrinter(jobConf);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.linkedin.venice.hadoop.AbstractMapReduceTask
    public void configureTask(VeniceProperties veniceProperties, JobConf jobConf) {
        this.props = veniceProperties;
        this.isDuplicateKeyAllowed = veniceProperties.getBoolean(VenicePushJob.ALLOW_DUPLICATE_KEY, false);
        this.mapReduceJobId = JobID.forName(jobConf.get(MAP_REDUCE_JOB_ID_PROP));
        this.valueSchemaId = veniceProperties.getInt(VenicePushJob.VALUE_SCHEMA_ID_PROP);
        this.derivedValueSchemaId = veniceProperties.containsKey(VenicePushJob.DERIVED_SCHEMA_ID_PROP) ? veniceProperties.getInt(VenicePushJob.DERIVED_SCHEMA_ID_PROP) : -1;
        this.enableWriteCompute = veniceProperties.containsKey(VenicePushJob.ENABLE_WRITE_COMPUTE) && veniceProperties.getBoolean(VenicePushJob.ENABLE_WRITE_COMPUTE);
        this.duplicateKeyPrinter = initDuplicateKeyPrinter(jobConf);
        this.telemetryMessageInterval = veniceProperties.getInt(VenicePushJob.TELEMETRY_MESSAGE_INTERVAL, 10000);
        initStorageQuotaFields(veniceProperties, jobConf);
        this.reducerProgressHeartbeatScheduler.scheduleAtFixedRate(() -> {
            if (this.previousReporter != null) {
                this.previousReporter.progress();
            }
        }, 0L, 5L, TimeUnit.MINUTES);
    }

    private void initStorageQuotaFields(VeniceProperties veniceProperties, JobConf jobConf) {
        Long valueOf = veniceProperties.containsKey(VenicePushJob.STORAGE_QUOTA_PROP) ? Long.valueOf(veniceProperties.getLong(VenicePushJob.STORAGE_QUOTA_PROP)) : null;
        this.inputStorageQuotaTracker = new InputStorageQuotaTracker(valueOf);
        if (valueOf == null) {
            return;
        }
        if (valueOf.longValue() == -1) {
            this.exceedQuota = false;
        } else {
            this.exceedQuota = this.inputStorageQuotaTracker.exceedQuota(getTotalIncomingDataSizeInBytes(jobConf));
        }
    }

    private long getTotalIncomingDataSizeInBytes(JobConf jobConf) {
        JobClient jobClient = null;
        String str = null;
        JobID jobID = null;
        RunningJob runningJob = null;
        Counters counters = null;
        try {
            try {
                jobClient = this.hadoopJobClientProvider.getJobClientFromConfig(jobConf);
                str = jobConf.get(MAP_REDUCE_JOB_ID_PROP);
                jobID = JobID.forName(str);
                runningJob = jobClient.getJob(jobID);
                counters = runningJob.getCounters();
                long totalKeySize = MRJobCounterHelper.getTotalKeySize(counters) + MRJobCounterHelper.getTotalValueSize(counters);
                if (jobClient != null) {
                    try {
                        jobClient.close();
                    } catch (IOException e) {
                        throw new VeniceException("Cannot close hadoopJobClient", e);
                    }
                }
                return totalKeySize;
            } catch (Exception e2) {
                throw new VeniceException(String.format("Can't read input file size from counters; hadoopJobClient: %s; jobIdProp: %s; jobID: %s; runningJob: %s; quotaCounters: %s", jobClient, str, jobID, runningJob, counters), e2);
            }
        } catch (Throwable th) {
            if (jobClient != null) {
                try {
                    jobClient.close();
                } catch (IOException e3) {
                    throw new VeniceException("Cannot close hadoopJobClient", e3);
                }
            }
            throw th;
        }
    }

    private void maybePropagateCallbackException() {
        if (this.sendException != null) {
            throw new VeniceException(this.sendException);
        }
    }

    private void logMessageProgress() {
        LOGGER.info("Message sent: {}, message completed: {}, message errored: {}", Long.valueOf(this.messageSent), Long.valueOf(this.messageCompleted.get()), Long.valueOf(this.messageErrored.get()));
    }

    protected void setVeniceWriter(AbstractVeniceWriter abstractVeniceWriter) {
        this.veniceWriter = abstractVeniceWriter;
    }

    protected void setHadoopJobClientProvider(HadoopJobClientProvider hadoopJobClientProvider) {
        this.hadoopJobClientProvider = hadoopJobClientProvider;
    }

    protected void setExceedQuota(boolean z) {
        this.exceedQuota = z;
    }

    public /* bridge */ /* synthetic */ void reduce(Object obj, Iterator it, OutputCollector outputCollector, Reporter reporter) throws IOException {
        reduce((BytesWritable) obj, (Iterator<BytesWritable>) it, (OutputCollector<BytesWritable, BytesWritable>) outputCollector, reporter);
    }
}
