package org.apache.kafka.connect.storage;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/storage/OffsetStorageReaderImpl.class
 */
/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.18.jar:META-INF/bundled-dependencies/connect-runtime-2.7.0.jar:org/apache/kafka/connect/storage/OffsetStorageReaderImpl.class */
public class OffsetStorageReaderImpl implements CloseableOffsetStorageReader {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) OffsetStorageReaderImpl.class);
    private final OffsetBackingStore backingStore;
    private final String namespace;
    private final Converter keyConverter;
    private final Converter valueConverter;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final Set<Future<Map<ByteBuffer, ByteBuffer>>> offsetReadFutures = new HashSet();

    public OffsetStorageReaderImpl(OffsetBackingStore offsetBackingStore, String str, Converter converter, Converter converter2) {
        this.backingStore = offsetBackingStore;
        this.namespace = str;
        this.keyConverter = converter;
        this.valueConverter = converter2;
    }

    @Override // org.apache.kafka.connect.storage.OffsetStorageReader
    public <T> Map<String, Object> offset(Map<String, T> map) {
        return offsets(Collections.singletonList(map)).get(map);
    }

    @Override // org.apache.kafka.connect.storage.OffsetStorageReader
    public <T> Map<Map<String, T>, Map<String, Object>> offsets(Collection<Map<String, T>> collection) {
        Future<Map<ByteBuffer, ByteBuffer>> future;
        HashMap hashMap = new HashMap(collection.size());
        for (Map<String, T> map : collection) {
            try {
                OffsetUtils.validateFormat((Map) map);
                byte[] fromConnectData = this.keyConverter.fromConnectData(this.namespace, null, Arrays.asList(this.namespace, map));
                hashMap.put(fromConnectData != null ? ByteBuffer.wrap(fromConnectData) : null, map);
            } catch (Throwable th) {
                log.error("CRITICAL: Failed to serialize partition key when getting offsets for task with namespace {}. No value for this data will be returned, which may break the task or cause it to skip some data.", this.namespace, th);
            }
        }
        try {
            synchronized (this.offsetReadFutures) {
                if (this.closed.get()) {
                    throw new ConnectException("Offset reader is closed. This is likely because the task has already been scheduled to stop but has taken longer than the graceful shutdown period to do so.");
                }
                future = this.backingStore.get(hashMap.keySet());
                this.offsetReadFutures.add(future);
            }
            try {
                try {
                    Map<ByteBuffer, ByteBuffer> map2 = future.get();
                    synchronized (this.offsetReadFutures) {
                        this.offsetReadFutures.remove(future);
                    }
                    HashMap hashMap2 = new HashMap(collection.size());
                    for (Map.Entry<ByteBuffer, ByteBuffer> entry : map2.entrySet()) {
                        try {
                            if (hashMap.containsKey(entry.getKey())) {
                                Map map3 = (Map) hashMap.get(entry.getKey());
                                Object value = this.valueConverter.toConnectData(this.namespace, entry.getValue() != null ? entry.getValue().array() : null).value();
                                OffsetUtils.validateFormat(value);
                                hashMap2.put(map3, (Map) value);
                            } else {
                                log.error("Should be able to map {} back to a requested partition-offset key, backing store may have returned invalid data", entry.getKey());
                            }
                        } catch (Throwable th2) {
                            log.error("CRITICAL: Failed to deserialize offset data when getting offsets for task with namespace {}. No value for this data will be returned, which may break the task or cause it to skip some data. This could either be due to an error in the connector implementation or incompatible schema.", this.namespace, th2);
                        }
                    }
                    return hashMap2;
                } catch (CancellationException e) {
                    throw new ConnectException("Offset reader closed while attempting to read offsets. This is likely because the task was been scheduled to stop but has taken longer than the graceful shutdown period to do so.");
                }
            } catch (Throwable th3) {
                synchronized (this.offsetReadFutures) {
                    this.offsetReadFutures.remove(future);
                    throw th3;
                }
            }
        } catch (Exception e2) {
            log.error("Failed to fetch offsets from namespace {}: ", this.namespace, e2);
            throw new ConnectException("Failed to fetch offsets.", e2);
        }
    }

    @Override // org.apache.kafka.connect.storage.CloseableOffsetStorageReader, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.closed.getAndSet(true)) {
            return;
        }
        synchronized (this.offsetReadFutures) {
            Iterator<Future<Map<ByteBuffer, ByteBuffer>>> it = this.offsetReadFutures.iterator();
            while (it.hasNext()) {
                try {
                    it.next().cancel(true);
                } catch (Throwable th) {
                    log.error("Failed to cancel offset read future", th);
                }
            }
            this.offsetReadFutures.clear();
        }
    }
}
