package org.apache.kafka.connect.runtime.errors;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.runtime.InternalSinkRecord;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.10.jar:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.class
 */
/* loaded from: input_file:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter.class */
public class WorkerErrantRecordReporter implements ErrantRecordReporter {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) WorkerErrantRecordReporter.class);
    private final RetryWithToleranceOperator retryWithToleranceOperator;
    private final Converter keyConverter;
    private final Converter valueConverter;
    private final HeaderConverter headerConverter;
    protected final LinkedList<Future<Void>> futures = new LinkedList<>();

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.10.jar:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter$ErrantRecordFuture.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporter$ErrantRecordFuture.class */
    public static class ErrantRecordFuture implements Future<Void> {
        private final List<Future<RecordMetadata>> futures;

        public ErrantRecordFuture(List<Future<RecordMetadata>> list) {
            this.futures = list;
        }

        @Override // java.util.concurrent.Future
        public boolean cancel(boolean z) {
            throw new UnsupportedOperationException("Reporting an errant record cannot be cancelled.");
        }

        @Override // java.util.concurrent.Future
        public boolean isCancelled() {
            return false;
        }

        @Override // java.util.concurrent.Future
        public boolean isDone() {
            return this.futures.stream().allMatch((v0) -> {
                return v0.isDone();
            });
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Future
        public Void get() throws InterruptedException, ExecutionException {
            Iterator<Future<RecordMetadata>> it = this.futures.iterator();
            while (it.hasNext()) {
                it.next().get();
            }
            return null;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Future
        public Void get(long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
            Iterator<Future<RecordMetadata>> it = this.futures.iterator();
            while (it.hasNext()) {
                it.next().get(j, timeUnit);
            }
            return null;
        }
    }

    public WorkerErrantRecordReporter(RetryWithToleranceOperator retryWithToleranceOperator, Converter converter, Converter converter2, HeaderConverter headerConverter) {
        this.retryWithToleranceOperator = retryWithToleranceOperator;
        this.keyConverter = converter;
        this.valueConverter = converter2;
        this.headerConverter = headerConverter;
    }

    @Override // org.apache.kafka.connect.sink.ErrantRecordReporter
    public Future<Void> report(SinkRecord sinkRecord, Throwable th) {
        ConsumerRecord<byte[], byte[]> consumerRecord;
        if (sinkRecord instanceof InternalSinkRecord) {
            consumerRecord = ((InternalSinkRecord) sinkRecord).originalRecord();
        } else {
            String str = sinkRecord.topic();
            byte[] fromConnectData = this.keyConverter.fromConnectData(str, sinkRecord.keySchema(), sinkRecord.key());
            byte[] fromConnectData2 = this.valueConverter.fromConnectData(str, sinkRecord.valueSchema(), sinkRecord.value());
            RecordHeaders recordHeaders = new RecordHeaders();
            if (sinkRecord.headers() != null) {
                for (Header header : sinkRecord.headers()) {
                    String key = header.key();
                    recordHeaders.add(key, this.headerConverter.fromConnectHeader(str, key, header.schema(), header.value()));
                }
            }
            consumerRecord = new ConsumerRecord<>(sinkRecord.topic(), sinkRecord.kafkaPartition().intValue(), sinkRecord.kafkaOffset(), sinkRecord.timestamp().longValue(), sinkRecord.timestampType(), -1L, fromConnectData != null ? fromConnectData.length : -1, fromConnectData2 != null ? fromConnectData2.length : -1, fromConnectData, fromConnectData2, recordHeaders);
        }
        Future<Void> executeFailed = this.retryWithToleranceOperator.executeFailed(Stage.TASK_PUT, SinkTask.class, consumerRecord, th);
        if (!executeFailed.isDone()) {
            this.futures.add(executeFailed);
        }
        return executeFailed;
    }

    public void awaitAllFutures() {
        while (true) {
            Future<Void> poll = this.futures.poll();
            if (poll == null) {
                return;
            }
            try {
                poll.get();
            } catch (InterruptedException | ExecutionException e) {
                log.error("Encountered an error while awaiting an errant record future's completion.");
                throw new ConnectException(e);
            }
        }
    }
}
