package org.apache.pulsar.broker.service.persistent;

import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.SendCallback;
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentReplicator.class */
public class PersistentReplicator extends AbstractReplicator implements Replicator, AsyncCallbacks.ReadEntriesCallback, AsyncCallbacks.DeleteCallback {
    private final PersistentTopic topic;
    protected final ManagedCursor cursor;
    private Optional<DispatchRateLimiter> dispatchRateLimiter;
    private final Object dispatchRateLimiterLock;
    private int readBatchSize;
    private final int readMaxSizeBytes;
    private final int producerQueueThreshold;
    private volatile int pendingMessages;
    private static final int FALSE = 0;
    private static final int TRUE = 1;
    private volatile int havePendingRead;
    private final Rate msgOut;
    private final Rate msgExpired;
    private int messageTTLInSeconds;
    private final Backoff readFailureBackoff;
    private final PersistentMessageExpiryMonitor expiryMonitor;
    private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;
    private final ReplicatorStatsImpl stats;
    private volatile boolean fetchSchemaInProgress;
    private static final AtomicIntegerFieldUpdater<PersistentReplicator> PENDING_MESSAGES_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentReplicator.class, "pendingMessages");
    private static final AtomicIntegerFieldUpdater<PersistentReplicator> HAVE_PENDING_READ_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentReplicator.class, "havePendingRead");
    private static final Logger log = LoggerFactory.getLogger(PersistentReplicator.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentReplicator$ProducerSendCallback.class */
    public static final class ProducerSendCallback implements SendCallback {
        private PersistentReplicator replicator;
        private Entry entry;
        private MessageImpl msg;
        private final Recycler.Handle<ProducerSendCallback> recyclerHandle;
        private static final Recycler<ProducerSendCallback> RECYCLER = new Recycler<ProducerSendCallback>() { // from class: org.apache.pulsar.broker.service.persistent.PersistentReplicator.ProducerSendCallback.1
            protected ProducerSendCallback newObject(Recycler.Handle<ProducerSendCallback> handle) {
                return new ProducerSendCallback(handle);
            }

            /* renamed from: newObject, reason: collision with other method in class */
            protected /* bridge */ /* synthetic */ Object m165newObject(Recycler.Handle handle) {
                return newObject((Recycler.Handle<ProducerSendCallback>) handle);
            }
        };

        public void sendComplete(Exception exc) {
            if (exc == null || (exc instanceof PulsarClientException.InvalidMessageException)) {
                if (PersistentReplicator.log.isDebugEnabled()) {
                    PersistentReplicator.log.debug("[{}][{} -> {}] Message persisted on remote broker", new Object[]{this.replicator.topicName, this.replicator.localCluster, this.replicator.remoteCluster});
                }
                this.replicator.cursor.asyncDelete(this.entry.getPosition(), this.replicator, this.entry.getPosition());
            } else {
                PersistentReplicator.log.error("[{}][{} -> {}] Error producing on remote broker", new Object[]{this.replicator.topicName, this.replicator.localCluster, this.replicator.remoteCluster, exc});
                this.replicator.cursor.rewind();
            }
            this.entry.release();
            int decrementAndGet = PersistentReplicator.PENDING_MESSAGES_UPDATER.decrementAndGet(this.replicator);
            if (decrementAndGet < this.replicator.producerQueueThreshold && PersistentReplicator.HAVE_PENDING_READ_UPDATER.get(this.replicator) == 0) {
                if (decrementAndGet == 0 || this.replicator.producer.isWritable()) {
                    this.replicator.readMoreEntries();
                } else if (PersistentReplicator.log.isDebugEnabled()) {
                    PersistentReplicator.log.debug("[{}][{} -> {}] Not resuming reads. pending: {} is-writable: {}", new Object[]{this.replicator.topicName, this.replicator.localCluster, this.replicator.remoteCluster, Integer.valueOf(decrementAndGet), Boolean.valueOf(this.replicator.producer.isWritable())});
                }
            }
            recycle();
        }

        private ProducerSendCallback(Recycler.Handle<ProducerSendCallback> handle) {
            this.recyclerHandle = handle;
        }

        static ProducerSendCallback create(PersistentReplicator persistentReplicator, Entry entry, MessageImpl messageImpl) {
            ProducerSendCallback producerSendCallback = (ProducerSendCallback) RECYCLER.get();
            producerSendCallback.replicator = persistentReplicator;
            producerSendCallback.entry = entry;
            producerSendCallback.msg = messageImpl;
            return producerSendCallback;
        }

        private void recycle() {
            this.replicator = null;
            this.entry = null;
            if (this.msg != null) {
                this.msg.recycle();
                this.msg = null;
            }
            this.recyclerHandle.recycle(this);
        }

        public void addCallback(MessageImpl<?> messageImpl, SendCallback sendCallback) {
        }

        public SendCallback getNextSendCallback() {
            return null;
        }

        public MessageImpl<?> getNextMessage() {
            return null;
        }

        public CompletableFuture<MessageId> getFuture() {
            return CompletableFuture.completedFuture(null);
        }
    }

    public PersistentReplicator(PersistentTopic persistentTopic, ManagedCursor managedCursor, String str, String str2, BrokerService brokerService, PulsarClientImpl pulsarClientImpl) throws PulsarServerException {
        super(persistentTopic.getName(), persistentTopic.getReplicatorPrefix(), str, str2, brokerService, pulsarClientImpl);
        this.dispatchRateLimiter = Optional.empty();
        this.dispatchRateLimiterLock = new Object();
        this.pendingMessages = FALSE;
        this.havePendingRead = FALSE;
        this.msgOut = new Rate();
        this.msgExpired = new Rate();
        this.messageTTLInSeconds = FALSE;
        this.readFailureBackoff = new Backoff(1L, TimeUnit.SECONDS, 1L, TimeUnit.MINUTES, 0L, TimeUnit.MILLISECONDS);
        this.stats = new ReplicatorStatsImpl();
        this.fetchSchemaInProgress = false;
        this.topic = persistentTopic;
        this.cursor = managedCursor;
        this.expiryMonitor = new PersistentMessageExpiryMonitor(this.topicName, Codec.decode(managedCursor.getName()), managedCursor, null);
        HAVE_PENDING_READ_UPDATER.set(this, FALSE);
        PENDING_MESSAGES_UPDATER.set(this, FALSE);
        this.readBatchSize = Math.min(this.producerQueueSize, persistentTopic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize());
        this.readMaxSizeBytes = persistentTopic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadSizeBytes();
        this.producerQueueThreshold = (int) (this.producerQueueSize * 0.9d);
        initializeDispatchRateLimiterIfNeeded();
        startProducer();
    }

    @Override // org.apache.pulsar.broker.service.AbstractReplicator
    protected void readEntries(Producer<byte[]> producer) {
        this.cursor.rewind();
        this.cursor.cancelPendingReadRequest();
        HAVE_PENDING_READ_UPDATER.set(this, FALSE);
        this.producer = (ProducerImpl) producer;
        if (!STATE_UPDATER.compareAndSet(this, AbstractReplicator.State.Starting, AbstractReplicator.State.Started)) {
            log.info("[{}][{} -> {}] Replicator was stopped while creating the producer. Closing it. Replicator state: {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, STATE_UPDATER.get(this)});
            STATE_UPDATER.set(this, AbstractReplicator.State.Stopping);
            closeProducerAsync();
        } else {
            log.info("[{}][{} -> {}] Created replicator producer", new Object[]{this.topicName, this.localCluster, this.remoteCluster});
            this.backOff.reset();
            this.cursor.setActive();
            readMoreEntries();
        }
    }

    @Override // org.apache.pulsar.broker.service.AbstractReplicator
    protected Position getReplicatorReadPosition() {
        return this.cursor.getMarkDeletedPosition();
    }

    @Override // org.apache.pulsar.broker.service.AbstractReplicator
    protected long getNumberOfEntriesInBacklog() {
        return this.cursor.getNumberOfEntriesInBacklog(false);
    }

    @Override // org.apache.pulsar.broker.service.AbstractReplicator
    protected void disableReplicatorRead() {
        if (this.cursor != null) {
            this.cursor.setInactive();
        }
    }

    private int getAvailablePermits() {
        int i = this.producerQueueSize - PENDING_MESSAGES_UPDATER.get(this);
        if (i <= 0) {
            if (!log.isDebugEnabled()) {
                return FALSE;
            }
            log.debug("[{}][{} -> {}] Producer queue is full, availablePermits: {}, pause reading", new Object[]{this.topicName, this.localCluster, this.remoteCluster, Integer.valueOf(i)});
            return FALSE;
        }
        if (this.dispatchRateLimiter.isPresent() && this.dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
            DispatchRateLimiter dispatchRateLimiter = this.dispatchRateLimiter.get();
            if (!dispatchRateLimiter.hasMessageDispatchPermit()) {
                if (!log.isDebugEnabled()) {
                    return -1;
                }
                log.debug("[{}][{} -> {}] message-read exceeded topic replicator message-rate {}/{}, schedule after a {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, Long.valueOf(dispatchRateLimiter.getDispatchRateOnMsg()), Long.valueOf(dispatchRateLimiter.getDispatchRateOnByte()), 1000});
                return -1;
            }
            long availableDispatchRateLimitOnMsg = dispatchRateLimiter.getAvailableDispatchRateLimitOnMsg();
            if (availableDispatchRateLimitOnMsg > 0) {
                i = Math.min(i, (int) availableDispatchRateLimitOnMsg);
            }
        }
        return i;
    }

    protected void readMoreEntries() {
        if (this.fetchSchemaInProgress) {
            log.info("[{}][{} -> {}] Skip the reading due to new detected schema", new Object[]{this.topicName, this.localCluster, this.remoteCluster});
            return;
        }
        int availablePermits = getAvailablePermits();
        if (availablePermits <= 0) {
            if (availablePermits == -1) {
                this.topic.getBrokerService().executor().schedule(() -> {
                    readMoreEntries();
                }, 1000L, TimeUnit.MILLISECONDS);
                return;
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("[{}][{} -> {}] No Permits for reading. availablePermits: {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, Integer.valueOf(availablePermits)});
                    return;
                }
                return;
            }
        }
        int min = Math.min(availablePermits, this.readBatchSize);
        if (!isWritable()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}][{} -> {}] Throttling replication traffic because producer is not writable", new Object[]{this.topicName, this.localCluster, this.remoteCluster});
            }
            min = 1;
        }
        int max = Math.max(min, 1);
        if (HAVE_PENDING_READ_UPDATER.compareAndSet(this, FALSE, 1)) {
            if (log.isDebugEnabled()) {
                log.debug("[{}][{} -> {}] Schedule read of {} messages", new Object[]{this.topicName, this.localCluster, this.remoteCluster, Integer.valueOf(max)});
            }
            this.cursor.asyncReadEntriesOrWait(max, this.readMaxSizeBytes, this, (Object) null, PositionImpl.LATEST);
        } else if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Not scheduling read due to pending read. Messages To Read {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, Integer.valueOf(max)});
        }
    }

    public void readEntriesComplete(List<Entry> list, Object obj) {
        if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Read entries complete of {} messages", new Object[]{this.topicName, this.localCluster, this.remoteCluster, Integer.valueOf(list.size())});
        }
        int dispatcherMaxReadBatchSize = this.topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize();
        if (this.readBatchSize < dispatcherMaxReadBatchSize) {
            int min = Math.min(this.readBatchSize * 2, dispatcherMaxReadBatchSize);
            if (log.isDebugEnabled()) {
                log.debug("[{}][{} -> {}] Increasing read batch size from {} to {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, Integer.valueOf(this.readBatchSize), Integer.valueOf(min)});
            }
            this.readBatchSize = min;
        }
        this.readFailureBackoff.reduceToHalf();
        boolean z = FALSE;
        boolean isEnableReplicatedSubscriptions = this.brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions();
        boolean z2 = FALSE;
        boolean z3 = FALSE;
        for (int i = FALSE; i < list.size(); i++) {
            try {
                Entry entry = list.get(i);
                if (z3) {
                    entry.release();
                } else {
                    int length = entry.getLength();
                    ByteBuf dataBuffer = entry.getDataBuffer();
                    try {
                        MessageImpl<?> deserializeSkipBrokerEntryMetaData = MessageImpl.deserializeSkipBrokerEntryMetaData(dataBuffer);
                        if (isEnableReplicatedSubscriptions) {
                            checkReplicatedSubscriptionMarker(entry.getPosition(), deserializeSkipBrokerEntryMetaData, dataBuffer);
                        }
                        if (deserializeSkipBrokerEntryMetaData.isReplicated()) {
                            this.cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
                            entry.release();
                            deserializeSkipBrokerEntryMetaData.recycle();
                        } else if (deserializeSkipBrokerEntryMetaData.hasReplicateTo() && !deserializeSkipBrokerEntryMetaData.getReplicateTo().contains(this.remoteCluster)) {
                            if (log.isDebugEnabled()) {
                                log.debug("[{}][{} -> {}] Skipping message at position {}, replicateTo {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, entry.getPosition(), deserializeSkipBrokerEntryMetaData.getReplicateTo()});
                            }
                            this.cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
                            entry.release();
                            deserializeSkipBrokerEntryMetaData.recycle();
                        } else if (deserializeSkipBrokerEntryMetaData.isExpired(this.messageTTLInSeconds)) {
                            this.msgExpired.recordEvent(0L);
                            if (log.isDebugEnabled()) {
                                log.debug("[{}][{} -> {}] Discarding expired message at position {}, replicateTo {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, entry.getPosition(), deserializeSkipBrokerEntryMetaData.getReplicateTo()});
                            }
                            this.cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
                            entry.release();
                            deserializeSkipBrokerEntryMetaData.recycle();
                        } else if (STATE_UPDATER.get(this) != AbstractReplicator.State.Started || z2) {
                            if (log.isDebugEnabled()) {
                                log.debug("[{}][{} -> {}] Dropping read message at {} because producer is not ready", new Object[]{this.topicName, this.localCluster, this.remoteCluster, entry.getPosition()});
                            }
                            z2 = true;
                            entry.release();
                            deserializeSkipBrokerEntryMetaData.recycle();
                        } else {
                            this.dispatchRateLimiter.ifPresent(dispatchRateLimiter -> {
                                dispatchRateLimiter.tryDispatchPermit(1L, entry.getLength());
                            });
                            PENDING_MESSAGES_UPDATER.incrementAndGet(this);
                            this.msgOut.recordEvent(dataBuffer.readableBytes());
                            deserializeSkipBrokerEntryMetaData.setReplicatedFrom(this.localCluster);
                            dataBuffer.retain();
                            CompletableFuture<SchemaInfo> schemaInfo = getSchemaInfo(deserializeSkipBrokerEntryMetaData);
                            if (!schemaInfo.isDone() || schemaInfo.isCompletedExceptionally()) {
                                entry.release();
                                dataBuffer.release();
                                deserializeSkipBrokerEntryMetaData.recycle();
                                this.fetchSchemaInProgress = true;
                                z3 = true;
                                this.cursor.cancelPendingReadRequest();
                                log.info("[{}][{} -> {}] Pause the data replication due to new detected schema", new Object[]{this.topicName, this.localCluster, this.remoteCluster});
                                schemaInfo.whenComplete((schemaInfo2, th) -> {
                                    if (th != null) {
                                        log.warn("[{}][{} -> {}] Failed to get schema from local cluster, will try in the next loop", new Object[]{this.topicName, this.localCluster, this.remoteCluster, th});
                                    }
                                    log.info("[{}][{} -> {}] Resume the data replication after the schema fetching done", new Object[]{this.topicName, this.localCluster, this.remoteCluster});
                                    this.cursor.rewind();
                                    this.fetchSchemaInProgress = false;
                                    readMoreEntries();
                                });
                            } else {
                                deserializeSkipBrokerEntryMetaData.setSchemaInfoForReplicator(schemaInfo.get());
                                this.producer.sendAsync(deserializeSkipBrokerEntryMetaData, ProducerSendCallback.create(this, entry, deserializeSkipBrokerEntryMetaData));
                                z = true;
                            }
                        }
                    } catch (Throwable th2) {
                        log.error("[{}][{} -> {}] Failed to deserialize message at {} (buffer size: {}): {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, entry.getPosition(), Integer.valueOf(length), th2.getMessage(), th2});
                        this.cursor.asyncDelete(entry.getPosition(), this, entry.getPosition());
                        entry.release();
                    }
                }
            } catch (Exception e) {
                log.error("[{}][{} -> {}] Unexpected exception: {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, e.getMessage(), e});
            }
        }
        HAVE_PENDING_READ_UPDATER.set(this, FALSE);
        if (!z || isWritable()) {
            readMoreEntries();
        } else if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Pausing replication traffic. at-least-one: {} is-writable: {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, Boolean.valueOf(z), Boolean.valueOf(isWritable())});
        }
    }

    private CompletableFuture<SchemaInfo> getSchemaInfo(MessageImpl messageImpl) throws ExecutionException {
        return (messageImpl.getSchemaVersion() == null || messageImpl.getSchemaVersion().length == 0) ? CompletableFuture.completedFuture(null) : ((SchemaInfoProvider) this.client.getSchemaProviderLoadingCache().get(this.topicName)).getSchemaByVersion(messageImpl.getSchemaVersion());
    }

    public void updateCursorState() {
        if (this.cursor != null) {
            if (this.producer == null || !this.producer.isConnected()) {
                this.cursor.setInactive();
            } else {
                this.cursor.setActive();
            }
        }
    }

    public void readEntriesFailed(ManagedLedgerException managedLedgerException, Object obj) {
        if (STATE_UPDATER.get(this) != AbstractReplicator.State.Started) {
            log.info("[{}][{} -> {}] Replicator was stopped while reading entries. Stop reading. Replicator state: {}", new Object[]{this.topic, this.localCluster, this.remoteCluster, STATE_UPDATER.get(this)});
            return;
        }
        this.readBatchSize = this.topic.getBrokerService().pulsar().getConfiguration().getDispatcherMinReadBatchSize();
        long next = this.readFailureBackoff.next();
        if (managedLedgerException instanceof ManagedLedgerException.CursorAlreadyClosedException) {
            log.error("[{}][{} -> {}] Error reading entries because replicator is already deleted and cursor is already closed {}, ({})", new Object[]{this.topic, this.localCluster, this.remoteCluster, obj, managedLedgerException.getMessage(), managedLedgerException});
            closeProducerAsync();
            return;
        }
        if (!(managedLedgerException instanceof ManagedLedgerException.TooManyRequestsException)) {
            log.error("[{}][{} -> {}] Error reading entries at {}. Retrying to read in {}s. ({})", new Object[]{this.topic, this.localCluster, this.remoteCluster, obj, Double.valueOf(next / 1000.0d), managedLedgerException.getMessage(), managedLedgerException});
        } else if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Throttled by bookies while reading at {}. Retrying to read in {}s. ({})", new Object[]{this.topicName, this.localCluster, this.remoteCluster, obj, Double.valueOf(next / 1000.0d), managedLedgerException.getMessage(), managedLedgerException});
        }
        HAVE_PENDING_READ_UPDATER.set(this, FALSE);
        this.brokerService.executor().schedule(this::readMoreEntries, next, TimeUnit.MILLISECONDS);
    }

    public CompletableFuture<Void> clearBacklog() {
        final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Backlog size before clearing: {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, Long.valueOf(this.cursor.getNumberOfEntriesInBacklog(false))});
        }
        this.cursor.asyncClearBacklog(new AsyncCallbacks.ClearBacklogCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentReplicator.1
            public void clearBacklogComplete(Object obj) {
                if (PersistentReplicator.log.isDebugEnabled()) {
                    PersistentReplicator.log.debug("[{}][{} -> {}] Backlog size after clearing: {}", new Object[]{PersistentReplicator.this.topicName, PersistentReplicator.this.localCluster, PersistentReplicator.this.remoteCluster, Long.valueOf(PersistentReplicator.this.cursor.getNumberOfEntriesInBacklog(false))});
                }
                completableFuture.complete(null);
            }

            public void clearBacklogFailed(ManagedLedgerException managedLedgerException, Object obj) {
                PersistentReplicator.log.error("[{}][{} -> {}] Failed to clear backlog", new Object[]{PersistentReplicator.this.topicName, PersistentReplicator.this.localCluster, PersistentReplicator.this.remoteCluster, managedLedgerException});
                completableFuture.completeExceptionally(managedLedgerException);
            }
        }, (Object) null);
        return completableFuture;
    }

    public CompletableFuture<Void> skipMessages(final int i) {
        final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Skipping {} messages, current backlog {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, Integer.valueOf(i), Long.valueOf(this.cursor.getNumberOfEntriesInBacklog(false))});
        }
        this.cursor.asyncSkipEntries(i, ManagedCursor.IndividualDeletedEntries.Exclude, new AsyncCallbacks.SkipEntriesCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentReplicator.2
            public void skipEntriesComplete(Object obj) {
                if (PersistentReplicator.log.isDebugEnabled()) {
                    PersistentReplicator.log.debug("[{}][{} -> {}] Skipped {} messages, new backlog {}", new Object[]{PersistentReplicator.this.topicName, PersistentReplicator.this.localCluster, PersistentReplicator.this.remoteCluster, Integer.valueOf(i), Long.valueOf(PersistentReplicator.this.cursor.getNumberOfEntriesInBacklog(false))});
                }
                completableFuture.complete(null);
            }

            public void skipEntriesFailed(ManagedLedgerException managedLedgerException, Object obj) {
                PersistentReplicator.log.error("[{}][{} -> {}] Failed to skip {} messages", new Object[]{PersistentReplicator.this.topicName, PersistentReplicator.this.localCluster, PersistentReplicator.this.remoteCluster, Integer.valueOf(i), managedLedgerException});
                completableFuture.completeExceptionally(managedLedgerException);
            }
        }, (Object) null);
        return completableFuture;
    }

    public CompletableFuture<Entry> peekNthMessage(int i) {
        final CompletableFuture<Entry> completableFuture = new CompletableFuture<>();
        if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Getting message at position {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, Integer.valueOf(i)});
        }
        this.cursor.asyncGetNthEntry(i, ManagedCursor.IndividualDeletedEntries.Exclude, new AsyncCallbacks.ReadEntryCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentReplicator.3
            public void readEntryFailed(ManagedLedgerException managedLedgerException, Object obj) {
                completableFuture.completeExceptionally(managedLedgerException);
            }

            public void readEntryComplete(Entry entry, Object obj) {
                completableFuture.complete(entry);
            }
        }, (Object) null);
        return completableFuture;
    }

    public void deleteComplete(Object obj) {
        if (log.isDebugEnabled()) {
            log.debug("[{}][{} -> {}] Deleted message at {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, obj});
        }
    }

    public void deleteFailed(ManagedLedgerException managedLedgerException, Object obj) {
        log.error("[{}][{} -> {}] Failed to delete message at {}: {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, obj, managedLedgerException.getMessage(), managedLedgerException});
        if (obj instanceof PositionImpl) {
            PositionImpl positionImpl = (PositionImpl) obj;
            if (positionImpl.compareTo(this.cursor.getMarkDeletedPosition()) > 0) {
                this.brokerService.getPulsar().getExecutor().schedule(() -> {
                    this.cursor.asyncDelete(positionImpl, this, positionImpl);
                }, 10L, TimeUnit.SECONDS);
            }
        }
    }

    @Override // org.apache.pulsar.broker.service.Replicator
    public void updateRates() {
        this.msgOut.calculateRate();
        this.msgExpired.calculateRate();
        this.stats.msgRateOut = this.msgOut.getRate();
        this.stats.msgThroughputOut = this.msgOut.getValueRate();
        this.stats.msgRateExpired = this.msgExpired.getRate() + this.expiryMonitor.getMessageExpiryRate();
    }

    @Override // org.apache.pulsar.broker.service.Replicator
    /* renamed from: getStats */
    public ReplicatorStatsImpl mo138getStats() {
        this.stats.replicationBacklog = this.cursor != null ? this.cursor.getNumberOfEntriesInBacklog(false) : 0L;
        this.stats.connected = this.producer != null && this.producer.isConnected();
        this.stats.replicationDelayInSeconds = getReplicationDelayInSeconds();
        ProducerImpl producerImpl = this.producer;
        if (producerImpl != null) {
            this.stats.outboundConnection = producerImpl.getConnectionId();
            this.stats.outboundConnectedSince = producerImpl.getConnectedSince();
        } else {
            this.stats.outboundConnection = null;
            this.stats.outboundConnectedSince = null;
        }
        return this.stats;
    }

    public void updateMessageTTL(int i) {
        this.messageTTLInSeconds = i;
    }

    private long getReplicationDelayInSeconds() {
        if (this.producer != null) {
            return TimeUnit.MILLISECONDS.toSeconds(this.producer.getDelayInMillis());
        }
        return 0L;
    }

    public boolean expireMessages(int i) {
        if (this.cursor.getNumberOfEntriesInBacklog(false) == 0) {
            return false;
        }
        if (this.cursor.getNumberOfEntriesInBacklog(false) >= 1000 || this.topic.isOldestMessageExpired(this.cursor, i)) {
            return this.expiryMonitor.expireMessages(i);
        }
        return false;
    }

    public boolean expireMessages(Position position) {
        return this.expiryMonitor.expireMessages(position);
    }

    @Override // org.apache.pulsar.broker.service.Replicator
    public Optional<DispatchRateLimiter> getRateLimiter() {
        return this.dispatchRateLimiter;
    }

    @Override // org.apache.pulsar.broker.service.Replicator
    public void initializeDispatchRateLimiterIfNeeded() {
        synchronized (this.dispatchRateLimiterLock) {
            if (!this.dispatchRateLimiter.isPresent() && DispatchRateLimiter.isDispatchRateEnabled(this.topic.getReplicatorDispatchRate())) {
                this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this.topic, DispatchRateLimiter.Type.REPLICATOR));
            }
        }
    }

    @Override // org.apache.pulsar.broker.service.Replicator
    public void updateRateLimiter() {
        initializeDispatchRateLimiterIfNeeded();
        this.dispatchRateLimiter.ifPresent((v0) -> {
            v0.updateDispatchRate();
        });
    }

    private void checkReplicatedSubscriptionMarker(Position position, MessageImpl<?> messageImpl, ByteBuf byteBuf) {
        if (messageImpl.getMessageBuilder().hasMarkerType()) {
            int markerType = messageImpl.getMessageBuilder().getMarkerType();
            if (messageImpl.getMessageBuilder().hasReplicatedFrom() && this.remoteCluster.equals(messageImpl.getMessageBuilder().getReplicatedFrom())) {
                switch (markerType) {
                    case 10:
                    case FLOAT_VALUE:
                    case DATE_VALUE:
                        this.topic.receivedReplicatedSubscriptionMarker(position, markerType, byteBuf);
                        return;
                    case DOUBLE_VALUE:
                    default:
                        return;
                }
            }
        }
    }

    @Override // org.apache.pulsar.broker.service.AbstractReplicator, org.apache.pulsar.broker.service.Replicator
    public CompletableFuture<Void> disconnect() {
        return disconnect(false);
    }

    @Override // org.apache.pulsar.broker.service.AbstractReplicator, org.apache.pulsar.broker.service.Replicator
    public synchronized CompletableFuture<Void> disconnect(boolean z) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        super.disconnect(z).thenRun(() -> {
            this.dispatchRateLimiter.ifPresent((v0) -> {
                v0.close();
            });
            completableFuture.complete(null);
        }).exceptionally(th -> {
            Throwable cause = th instanceof CompletionException ? th.getCause() : th;
            if (!(cause instanceof BrokerServiceException.TopicBusyException)) {
                log.error("[{}][{} -> {}] Failed to close dispatch rate limiter: {}", new Object[]{this.topicName, this.localCluster, this.remoteCluster, th.getMessage()});
            }
            completableFuture.completeExceptionally(cause);
            return null;
        });
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.service.Replicator
    public boolean isConnected() {
        ProducerImpl producerImpl = this.producer;
        return producerImpl != null && producerImpl.isConnected();
    }

    @VisibleForTesting
    public ManagedCursor getCursor() {
        return this.cursor;
    }
}
