package org.apache.pulsar.client.impl;

import io.netty.util.Timeout;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-client-original-2.10.2.10.jar:org/apache/pulsar/client/impl/TableViewImpl.class */
public class TableViewImpl<T> implements TableView<T> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) TableViewImpl.class);
    private final PulsarClientImpl client;
    private final Schema<T> schema;
    private final TableViewConfigurationData conf;
    private final ConcurrentMap<String, T> data = new ConcurrentHashMap();
    private final Map<String, T> immutableData = Collections.unmodifiableMap(this.data);
    private final ConcurrentMap<String, Reader<T>> readers = new ConcurrentHashMap();
    private final List<BiConsumer<String, T>> listeners = new ArrayList();
    private final ReentrantLock listenersMutex = new ReentrantLock();

    /* JADX INFO: Access modifiers changed from: package-private */
    public TableViewImpl(PulsarClientImpl pulsarClientImpl, Schema<T> schema, TableViewConfigurationData tableViewConfigurationData) {
        this.client = pulsarClientImpl;
        this.schema = schema;
        this.conf = tableViewConfigurationData;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<TableView<T>> start() {
        return this.client.getPartitionsForTopic(this.conf.getTopicName()).thenCompose(list -> {
            HashSet hashSet = new HashSet(list);
            ArrayList arrayList = new ArrayList();
            list.forEach(str -> {
                if (this.readers.containsKey(str)) {
                    return;
                }
                arrayList.add(newReader(str));
            });
            this.readers.forEach((str2, reader) -> {
                if (hashSet.contains(str2)) {
                    return;
                }
                arrayList.add(reader.closeAsync().thenRun(() -> {
                    this.readers.remove(str2, reader);
                }));
            });
            return FutureUtil.waitForAll((List<? extends CompletableFuture<?>>) arrayList).thenRun(() -> {
                schedulePartitionsCheck();
            });
        }).thenApply((Function<? super U, ? extends U>) r3 -> {
            return this;
        });
    }

    private void schedulePartitionsCheck() {
        this.client.timer().newTimeout(this::checkForPartitionsChanges, this.conf.getAutoUpdatePartitionsSeconds(), TimeUnit.SECONDS);
    }

    private void checkForPartitionsChanges(Timeout timeout) {
        if (timeout.isCancelled()) {
            return;
        }
        start().whenComplete((tableView, th) -> {
            if (th != null) {
                log.warn("Failed to check for changes in number of partitions: {}", th);
                schedulePartitionsCheck();
            }
        });
    }

    @Override // org.apache.pulsar.client.api.TableView
    public int size() {
        return this.data.size();
    }

    @Override // org.apache.pulsar.client.api.TableView
    public boolean isEmpty() {
        return this.data.isEmpty();
    }

    @Override // org.apache.pulsar.client.api.TableView
    public boolean containsKey(String str) {
        return this.data.containsKey(str);
    }

    @Override // org.apache.pulsar.client.api.TableView
    public T get(String str) {
        return this.data.get(str);
    }

    @Override // org.apache.pulsar.client.api.TableView
    public Set<Map.Entry<String, T>> entrySet() {
        return this.immutableData.entrySet();
    }

    @Override // org.apache.pulsar.client.api.TableView
    public Set<String> keySet() {
        return this.immutableData.keySet();
    }

    @Override // org.apache.pulsar.client.api.TableView
    public Collection<T> values() {
        return this.immutableData.values();
    }

    @Override // org.apache.pulsar.client.api.TableView
    public void forEach(BiConsumer<String, T> biConsumer) {
        this.data.forEach(biConsumer);
    }

    @Override // org.apache.pulsar.client.api.TableView
    public void forEachAndListen(BiConsumer<String, T> biConsumer) {
        try {
            this.listenersMutex.lock();
            forEach(biConsumer);
            this.listeners.add(biConsumer);
        } finally {
            this.listenersMutex.unlock();
        }
    }

    @Override // org.apache.pulsar.client.api.TableView
    public CompletableFuture<Void> closeAsync() {
        return FutureUtil.waitForAll((List<? extends CompletableFuture<?>>) this.readers.values().stream().map((v0) -> {
            return v0.closeAsync();
        }).collect(Collectors.toList()));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws PulsarClientException {
        try {
            closeAsync().get();
        } catch (Exception e) {
            throw PulsarClientException.unwrap(e);
        }
    }

    /* JADX WARN: Finally extract failed */
    private void handleMessage(Message<T> message) {
        try {
            if (message.hasKey()) {
                if (log.isDebugEnabled()) {
                    log.debug("Applying message from topic {}. key={} value={}", this.conf.getTopicName(), message.getKey(), message.getValue());
                }
                try {
                    this.listenersMutex.lock();
                    if (null == message.getValue()) {
                        this.data.remove(message.getKey());
                    } else {
                        this.data.put(message.getKey(), message.getValue());
                    }
                    Iterator<BiConsumer<String, T>> it = this.listeners.iterator();
                    while (it.hasNext()) {
                        try {
                            it.next().accept(message.getKey(), message.getValue());
                        } catch (Throwable th) {
                            log.error("Table view listener raised an exception", th);
                        }
                    }
                    this.listenersMutex.unlock();
                } catch (Throwable th2) {
                    this.listenersMutex.unlock();
                    throw th2;
                }
            }
        } finally {
            message.release();
        }
    }

    private CompletableFuture<Reader<T>> newReader(String str) {
        return this.client.newReader(this.schema).topic(str).startMessageId(MessageId.earliest).readCompacted(true).poolMessages(true).createAsync().thenCompose(this::cacheNewReader).thenCompose((Function<? super U, ? extends CompletionStage<U>>) this::readAllExistingMessages);
    }

    private CompletableFuture<Reader<T>> cacheNewReader(Reader<T> reader) {
        CompletableFuture<Reader<T>> completableFuture = new CompletableFuture<>();
        if (this.readers.containsKey(reader.getTopic())) {
            completableFuture.completeExceptionally(new IllegalArgumentException("reader on partition " + reader.getTopic() + " already existed"));
        } else {
            this.readers.put(reader.getTopic(), reader);
            completableFuture.complete(reader);
        }
        return completableFuture;
    }

    private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T> reader) {
        long nanoTime = System.nanoTime();
        AtomicLong atomicLong = new AtomicLong();
        CompletableFuture<Reader<T>> completableFuture = new CompletableFuture<>();
        readAllExistingMessages(reader, completableFuture, nanoTime, atomicLong);
        return completableFuture;
    }

    private void readAllExistingMessages(Reader<T> reader, CompletableFuture<Reader<T>> completableFuture, long j, AtomicLong atomicLong) {
        reader.hasMessageAvailableAsync().thenAccept(bool -> {
            if (bool.booleanValue()) {
                reader.readNextAsync().thenAccept(message -> {
                    atomicLong.incrementAndGet();
                    handleMessage(message);
                    readAllExistingMessages(reader, completableFuture, j, atomicLong);
                }).exceptionally(th -> {
                    completableFuture.completeExceptionally(th);
                    return null;
                });
                return;
            }
            log.info("Started table view for topic {} - Replayed {} messages in {} seconds", reader.getTopic(), atomicLong, Double.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - j) / 1000.0d));
            completableFuture.complete(reader);
            readTailMessages(reader);
        });
    }

    private void readTailMessages(Reader<T> reader) {
        reader.readNextAsync().thenAccept(message -> {
            handleMessage(message);
            readTailMessages(reader);
        }).exceptionally(th -> {
            log.info("Reader {} was interrupted", reader.getTopic());
            return null;
        });
    }
}
