package kafka.server;

import java.util.Optional;
import kafka.cluster.Partition;
import kafka.log.LogOffsetSnapshot;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.FencedLeaderEpochException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.replica.ClientMetadata;
import org.apache.kafka.common.requests.EpochEndOffset;
import org.apache.kafka.common.requests.FetchRequest;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.NonLocalReturnControl;
import scala.runtime.NonLocalReturnControl$mcZ$sp;

/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.32.jar:META-INF/bundled-dependencies/kafka_2.13-2.7.0.jar:kafka/server/DelayedFetch.class
 */
/* compiled from: DelayedFetch.scala */
@ScalaSignature(bytes = "\u0006\u0005\r4Aa\u0003\u0007\u0001#!Ia\u0003\u0001B\u0001B\u0003%q#\b\u0005\t=\u0001\u0011\t\u0011)A\u0005?!A!\u0005\u0001B\u0001B\u0003%1\u0005\u0003\u0005'\u0001\t\u0005\t\u0015!\u0003(\u0011!Q\u0003A!A!\u0002\u0013Y\u0003\u0002C\u001e\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u001f\t\u000bI\u0003A\u0011A*\t\u000bm\u0003A\u0011\t/\t\u000b\u0001\u0004A\u0011I1\t\u000b\t\u0004A\u0011I1\u0003\u0019\u0011+G.Y=fI\u001a+Go\u00195\u000b\u00055q\u0011AB:feZ,'OC\u0001\u0010\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001\u0001\n\u0011\u0005M!R\"\u0001\u0007\n\u0005Ua!\u0001\u0005#fY\u0006LX\rZ(qKJ\fG/[8o\u0003\u001d!W\r\\1z\u001bN\u0004\"\u0001G\u000e\u000e\u0003eQ\u0011AG\u0001\u0006g\u000e\fG.Y\u0005\u00039e\u0011A\u0001T8oO&\u0011a\u0003F\u0001\u000eM\u0016$8\r['fi\u0006$\u0017\r^1\u0011\u0005M\u0001\u0013BA\u0011\r\u000551U\r^2i\u001b\u0016$\u0018\rZ1uC\u0006q!/\u001a9mS\u000e\fW*\u00198bO\u0016\u0014\bCA\n%\u0013\t)CB\u0001\bSKBd\u0017nY1NC:\fw-\u001a:\u0002\u000bE,x\u000e^1\u0011\u0005MA\u0013BA\u0015\r\u00051\u0011V\r\u001d7jG\u0006\fVo\u001c;b\u00039\u0019G.[3oi6+G/\u00193bi\u0006\u00042\u0001\u0007\u0017/\u0013\ti\u0013D\u0001\u0004PaRLwN\u001c\t\u0003_ej\u0011\u0001\r\u0006\u0003cI\nqA]3qY&\u001c\u0017M\u0003\u00024i\u000511m\\7n_:T!aD\u001b\u000b\u0005Y:\u0014AB1qC\u000eDWMC\u00019\u0003\ry'oZ\u0005\u0003uA\u0012ab\u00117jK:$X*\u001a;bI\u0006$\u0018-\u0001\tsKN\u0004xN\\:f\u0007\u0006dGNY1dWB!\u0001$P P\u0013\tq\u0014DA\u0005Gk:\u001cG/[8ocA\u0019\u0001iQ#\u000e\u0003\u0005S!AQ\r\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0002E\u0003\n\u00191+Z9\u0011\ta1\u0005\nT\u0005\u0003\u000ff\u0011a\u0001V;qY\u0016\u0014\u0004CA%K\u001b\u0005\u0011\u0014BA&3\u00059!v\u000e]5d!\u0006\u0014H/\u001b;j_:\u0004\"aE'\n\u00059c!A\u0005$fi\u000eD\u0007+\u0019:uSRLwN\u001c#bi\u0006\u0004\"\u0001\u0007)\n\u0005EK\"\u0001B+oSR\fa\u0001P5oSRtDc\u0002+V-^C\u0016L\u0017\t\u0003'\u0001AQAF\u0004A\u0002]AQAH\u0004A\u0002}AQAI\u0004A\u0002\rBQAJ\u0004A\u0002\u001dBQAK\u0004A\u0002-BQaO\u0004A\u0002q\n1\u0002\u001e:z\u0007>l\u0007\u000f\\3uKR\tQ\f\u0005\u0002\u0019=&\u0011q,\u0007\u0002\b\u0005>|G.Z1o\u00031yg.\u0012=qSJ\fG/[8o)\u0005y\u0015AC8o\u0007>l\u0007\u000f\\3uK\u0002")
/* loaded from: input_file:META-INF/bundled-dependencies/kafka_2.13-2.7.0.jar:kafka/server/DelayedFetch.class */
public class DelayedFetch extends DelayedOperation {
    private final FetchMetadata fetchMetadata;
    private final ReplicaManager replicaManager;
    private final ReplicaQuota quota;
    private final Option<ClientMetadata> clientMetadata;
    private final Function1<Seq<Tuple2<TopicPartition, FetchPartitionData>>, BoxedUnit> responseCallback;

    @Override // kafka.server.DelayedOperation
    public boolean tryComplete() {
        Object obj = new Object();
        try {
            IntRef intRef = new IntRef(0);
            this.fetchMetadata.fetchPartitionStatus().foreach(tuple2 -> {
                $anonfun$tryComplete$1(this, obj, intRef, tuple2);
                return BoxedUnit.UNIT;
            });
            if (intRef.elem >= this.fetchMetadata.fetchMinBytes()) {
                return forceComplete();
            }
            return false;
        } catch (NonLocalReturnControl e) {
            if (e.key() == obj) {
                return e.value$mcZ$sp();
            }
            throw e;
        }
    }

    @Override // kafka.server.DelayedOperation
    public void onExpiration() {
        if (this.fetchMetadata.isFromFollower()) {
            DelayedFetchMetrics$.MODULE$.followerExpiredRequestMeter().mark();
        } else {
            DelayedFetchMetrics$.MODULE$.consumerExpiredRequestMeter().mark();
        }
    }

    @Override // kafka.server.DelayedOperation
    public void onComplete() {
        int replicaId = this.fetchMetadata.replicaId();
        boolean fetchOnlyLeader = this.fetchMetadata.fetchOnlyLeader();
        FetchIsolation fetchIsolation = this.fetchMetadata.fetchIsolation();
        int fetchMaxBytes = this.fetchMetadata.fetchMaxBytes();
        boolean hardMaxBytesLimit = this.fetchMetadata.hardMaxBytesLimit();
        Seq<Tuple2<TopicPartition, FetchRequest.PartitionData>> map = this.fetchMetadata.fetchPartitionStatus().map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(null);
            }
            TopicPartition topicPartition = (TopicPartition) tuple2.mo6777_1();
            FetchPartitionStatus fetchPartitionStatus = (FetchPartitionStatus) tuple2.mo6776_2();
            Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
            return new Tuple2(topicPartition, fetchPartitionStatus.fetchInfo());
        });
        Option<ClientMetadata> option = this.clientMetadata;
        this.responseCallback.mo6796apply(this.replicaManager.readFromLocalLog(replicaId, fetchOnlyLeader, fetchIsolation, fetchMaxBytes, hardMaxBytesLimit, map, this.quota, option).map(tuple22 -> {
            if (tuple22 == null) {
                throw new MatchError(null);
            }
            TopicPartition topicPartition = (TopicPartition) tuple22.mo6777_1();
            LogReadResult logReadResult = (LogReadResult) tuple22.mo6776_2();
            boolean z = this.fetchMetadata.isFromFollower() && this.replicaManager.isAddingReplica(topicPartition, this.fetchMetadata.replicaId());
            Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
            return new Tuple2(topicPartition, new FetchPartitionData(logReadResult.error(), logReadResult.highWatermark(), logReadResult.leaderLogStartOffset(), logReadResult.info().records(), logReadResult.divergingEpoch(), logReadResult.lastStableOffset(), logReadResult.info().abortedTransactions(), logReadResult.preferredReadReplica(), z));
        }));
    }

    public static final /* synthetic */ void $anonfun$tryComplete$1(DelayedFetch delayedFetch, Object obj, IntRef intRef, Tuple2 tuple2) {
        LogOffsetMetadata lastStableOffset;
        if (tuple2 == null) {
            throw new MatchError(null);
        }
        TopicPartition topicPartition = (TopicPartition) tuple2.mo6777_1();
        FetchPartitionStatus fetchPartitionStatus = (FetchPartitionStatus) tuple2.mo6776_2();
        LogOffsetMetadata startOffsetMetadata = fetchPartitionStatus.startOffsetMetadata();
        Optional<Integer> optional = fetchPartitionStatus.fetchInfo().currentLeaderEpoch;
        try {
            LogOffsetMetadata UnknownOffsetMetadata = LogOffsetMetadata$.MODULE$.UnknownOffsetMetadata();
            if (startOffsetMetadata == null) {
                if (UnknownOffsetMetadata == null) {
                    return;
                }
            } else if (startOffsetMetadata.equals(UnknownOffsetMetadata)) {
                return;
            }
            Partition partitionOrException = delayedFetch.replicaManager.getPartitionOrException(topicPartition);
            LogOffsetSnapshot fetchOffsetSnapshot = partitionOrException.fetchOffsetSnapshot(optional, delayedFetch.fetchMetadata.fetchOnlyLeader());
            FetchIsolation fetchIsolation = delayedFetch.fetchMetadata.fetchIsolation();
            if (FetchLogEnd$.MODULE$.equals(fetchIsolation)) {
                lastStableOffset = fetchOffsetSnapshot.logEndOffset();
            } else if (FetchHighWatermark$.MODULE$.equals(fetchIsolation)) {
                lastStableOffset = fetchOffsetSnapshot.highWatermark();
            } else {
                if (!FetchTxnCommitted$.MODULE$.equals(fetchIsolation)) {
                    throw new MatchError(fetchIsolation);
                }
                lastStableOffset = fetchOffsetSnapshot.lastStableOffset();
            }
            if (lastStableOffset.messageOffset() != startOffsetMetadata.messageOffset()) {
                if (lastStableOffset.onOlderSegment(startOffsetMetadata)) {
                    delayedFetch.debug(() -> {
                        return new StringBuilder(68).append("Satisfying fetch ").append(delayedFetch.fetchMetadata).append(" since it is fetching later segments of partition ").append(topicPartition).append(DefaultExpressionEngine.DEFAULT_PROPERTY_DELIMITER).toString();
                    });
                    throw new NonLocalReturnControl$mcZ$sp(obj, delayedFetch.forceComplete());
                }
                if (startOffsetMetadata.onOlderSegment(lastStableOffset)) {
                    delayedFetch.debug(() -> {
                        return new StringBuilder(66).append("Satisfying fetch ").append(delayedFetch.fetchMetadata).append(" immediately since it is fetching older segments.").toString();
                    });
                    if (!delayedFetch.replicaManager.shouldLeaderThrottle(delayedFetch.quota, partitionOrException, delayedFetch.fetchMetadata.replicaId())) {
                        throw new NonLocalReturnControl$mcZ$sp(obj, delayedFetch.forceComplete());
                    }
                } else if (startOffsetMetadata.messageOffset() < lastStableOffset.messageOffset()) {
                    package$ package_ = package$.MODULE$;
                    int min = Math.min(lastStableOffset.positionDiff(startOffsetMetadata), fetchPartitionStatus.fetchInfo().maxBytes);
                    if (!delayedFetch.replicaManager.shouldLeaderThrottle(delayedFetch.quota, partitionOrException, delayedFetch.fetchMetadata.replicaId())) {
                        intRef.elem += min;
                    }
                }
            }
            fetchPartitionStatus.fetchInfo().lastFetchedEpoch.ifPresent(num -> {
                EpochEndOffset lastOffsetForLeaderEpoch = partitionOrException.lastOffsetForLeaderEpoch(optional, BoxesRunTime.unboxToInt(num), false);
                Errors error = lastOffsetForLeaderEpoch.error();
                Errors errors = Errors.NONE;
                if (error != null ? error.equals(errors) : errors == null) {
                    if (!lastOffsetForLeaderEpoch.hasUndefinedEpochOrOffset()) {
                        if (lastOffsetForLeaderEpoch.leaderEpoch() < BoxesRunTime.unboxToInt(num) || lastOffsetForLeaderEpoch.endOffset() < fetchPartitionStatus.fetchInfo().fetchOffset) {
                            delayedFetch.debug(() -> {
                                return new StringBuilder(0).append(new StringBuilder(82).append("Satisfying fetch ").append(delayedFetch.fetchMetadata).append(" since it has diverging epoch requiring truncation for partition ").toString()).append(new StringBuilder(42).append(topicPartition).append(" epochEndOffset=").append(lastOffsetForLeaderEpoch).append(" fetchEpoch=").append(num).append(" fetchOffset=").append(fetchPartitionStatus.fetchInfo().fetchOffset).append(DefaultExpressionEngine.DEFAULT_PROPERTY_DELIMITER).toString()).toString();
                            });
                            throw new NonLocalReturnControl$mcZ$sp(obj, delayedFetch.forceComplete());
                        }
                        return;
                    }
                }
                delayedFetch.debug(() -> {
                    return new StringBuilder(78).append("Could not obtain last offset for leader epoch for partition ").append(topicPartition).append(", epochEndOffset=").append(lastOffsetForLeaderEpoch).append(DefaultExpressionEngine.DEFAULT_PROPERTY_DELIMITER).toString();
                });
                throw new NonLocalReturnControl$mcZ$sp(obj, delayedFetch.forceComplete());
            });
        } catch (FencedLeaderEpochException unused) {
            delayedFetch.debug(() -> {
                return new StringBuilder(0).append(new StringBuilder(60).append("Broker is the leader of partition ").append(topicPartition).append(", but the requested epoch ").toString()).append(new StringBuilder(59).append(optional).append(" is fenced by the latest leader epoch, satisfy ").append(delayedFetch.fetchMetadata).append(" immediately").toString()).toString();
            });
            throw new NonLocalReturnControl$mcZ$sp(obj, delayedFetch.forceComplete());
        } catch (KafkaStorageException unused2) {
            delayedFetch.debug(() -> {
                return new StringBuilder(63).append("Partition ").append(topicPartition).append(" is in an offline log directory, satisfy ").append(delayedFetch.fetchMetadata).append(" immediately").toString();
            });
            throw new NonLocalReturnControl$mcZ$sp(obj, delayedFetch.forceComplete());
        } catch (NotLeaderOrFollowerException unused3) {
            delayedFetch.debug(() -> {
                return new StringBuilder(68).append("Broker is no longer the leader or follower of ").append(topicPartition).append(", satisfy ").append(delayedFetch.fetchMetadata).append(" immediately").toString();
            });
            throw new NonLocalReturnControl$mcZ$sp(obj, delayedFetch.forceComplete());
        } catch (UnknownTopicOrPartitionException unused4) {
            delayedFetch.debug(() -> {
                return new StringBuilder(58).append("Broker no longer knows of partition ").append(topicPartition).append(", satisfy ").append(delayedFetch.fetchMetadata).append(" immediately").toString();
            });
            throw new NonLocalReturnControl$mcZ$sp(obj, delayedFetch.forceComplete());
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public DelayedFetch(long j, FetchMetadata fetchMetadata, ReplicaManager replicaManager, ReplicaQuota replicaQuota, Option<ClientMetadata> option, Function1<Seq<Tuple2<TopicPartition, FetchPartitionData>>, BoxedUnit> function1) {
        super(j, None$.MODULE$);
        this.fetchMetadata = fetchMetadata;
        this.replicaManager = replicaManager;
        this.quota = replicaQuota;
        this.clientMetadata = option;
        this.responseCallback = function1;
        DelayedOperation$ delayedOperation$ = DelayedOperation$.MODULE$;
    }
}
