package kafka.server;

import kafka.cluster.Partition;
import kafka.log.LeaderOffsetIncremented$;
import kafka.log.LogAppendInfo;
import kafka.log.UnifiedLog;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.server.common.MetadataVersion;
import scala.Function0;
import scala.Option;
import scala.Predef$;
import scala.collection.StringOps$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.ScalaRunTime$;

/* compiled from: ReplicaFetcherThread.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005ue\u0001B\u000e\u001d\u0001\u0005B\u0001B\n\u0001\u0003\u0002\u0003\u0006Ia\n\u0005\ni\u0001\u0011\t\u0011)A\u0005kaB\u0001\"\u000f\u0001\u0003\u0002\u0003\u0006IA\u000f\u0005\t{\u0001\u0011\t\u0011)A\u0005}!A\u0011\t\u0001B\u0001B\u0003%!\t\u0003\u0005F\u0001\t\u0005\t\u0015!\u0003G\u0011!I\u0005A!A!\u0002\u00139\u0003\u0002\u0003&\u0001\u0005\u0003\u0005\u000b\u0011B&\t\u000bm\u0003A\u0011\u0001/\t\u0011\u0019\u0004!\u0019!C\u00019\u001dDa!\u001e\u0001!\u0002\u0013A\u0007b\u0002<\u0001\u0005\u0004%\tf\u001e\u0005\u0007w\u0002\u0001\u000b\u0011\u0002=\t\u000bq\u0004A\u0011K?\t\u000f\u00055\u0001\u0001\"\u0015\u0002\u0010!9\u0011\u0011\u0004\u0001\u0005R\u0005m\u0001bBA\u0010\u0001\u0011E\u0013\u0011\u0005\u0005\b\u0003c\u0001A\u0011IA\u001a\u0011\u001d\t)\u0004\u0001C!\u0003oAq!a\u0010\u0001\t\u0003\n9\u0004C\u0004\u0002B\u0001!\t%a\u0011\t\u000f\u0005\u0015\u0004\u0001\"\u0003\u00028!9\u0011q\r\u0001\u0005\u0002\u0005%\u0004bBA?\u0001\u0011\u0005\u0013q\u0010\u0005\b\u0003\u001f\u0003A\u0011KAI\u00115\tI\n\u0001I\u0001\u0004\u0003\u0005I\u0011BANq\t!\"+\u001a9mS\u000e\fg)\u001a;dQ\u0016\u0014H\u000b\u001b:fC\u0012T!!\b\u0010\u0002\rM,'O^3s\u0015\u0005y\u0012!B6bM.\f7\u0001A\n\u0003\u0001\t\u0002\"a\t\u0013\u000e\u0003qI!!\n\u000f\u0003+\u0005\u00137\u000f\u001e:bGR4U\r^2iKJ$\u0006N]3bI\u0006!a.Y7f!\tA\u0013G\u0004\u0002*_A\u0011!&L\u0007\u0002W)\u0011A\u0006I\u0001\u0007yI|w\u000e\u001e \u000b\u00039\nQa]2bY\u0006L!\u0001M\u0017\u0002\rA\u0013X\rZ3g\u0013\t\u00114G\u0001\u0004TiJLgn\u001a\u0006\u0003a5\na\u0001\\3bI\u0016\u0014\bCA\u00127\u0013\t9DD\u0001\bMK\u0006$WM]#oIB{\u0017N\u001c;\n\u0005Q\"\u0013\u0001\u00042s_.,'oQ8oM&<\u0007CA\u0012<\u0013\taDDA\u0006LC\u001a\\\u0017mQ8oM&<\u0017\u0001\u00054bS2,G\rU1si&$\u0018n\u001c8t!\t\u0019s(\u0003\u0002A9\t\u0001b)Y5mK\u0012\u0004\u0016M\u001d;ji&|gn]\u0001\u000be\u0016\u0004H.[2b\u001b\u001e\u0014\bCA\u0012D\u0013\t!ED\u0001\bSKBd\u0017nY1NC:\fw-\u001a:\u0002\u000bE,x\u000e^1\u0011\u0005\r:\u0015B\u0001%\u001d\u00051\u0011V\r\u001d7jG\u0006\fVo\u001c;b\u0003%awn\u001a)sK\u001aL\u00070A\fnKR\fG-\u0019;b-\u0016\u00148/[8o'V\u0004\b\u000f\\5feB\u0019A*T(\u000e\u00035J!AT\u0017\u0003\u0013\u0019+hn\u0019;j_:\u0004\u0004C\u0001)Z\u001b\u0005\t&B\u0001*T\u0003\u0019\u0019w.\\7p]*\u0011Q\u0004\u0016\u0006\u0003?US!AV,\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005A\u0016aA8sO&\u0011!,\u0015\u0002\u0010\u001b\u0016$\u0018\rZ1uCZ+'o]5p]\u00061A(\u001b8jiz\"\u0012\"\u00180`A\u0006\u00147\rZ3\u0011\u0005\r\u0002\u0001\"\u0002\u0014\n\u0001\u00049\u0003\"\u0002\u001b\n\u0001\u0004)\u0004\"B\u001d\n\u0001\u0004Q\u0004\"B\u001f\n\u0001\u0004q\u0004\"B!\n\u0001\u0004\u0011\u0005\"B#\n\u0001\u00041\u0005\"B%\n\u0001\u00049\u0003\"\u0002&\n\u0001\u0004Y\u0015A\b9beRLG/[8og^KG\u000f\u001b(fo\"Kw\r[,bi\u0016\u0014X.\u0019:l+\u0005A\u0007cA5oa6\t!N\u0003\u0002lY\u00069Q.\u001e;bE2,'BA7.\u0003)\u0019w\u000e\u001c7fGRLwN\\\u0005\u0003_*\u0014aAQ;gM\u0016\u0014\bCA9t\u001b\u0005\u0011(B\u0001*U\u0013\t!(O\u0001\bU_BL7\rU1si&$\u0018n\u001c8\u0002?A\f'\u000f^5uS>t7oV5uQ:+w\u000fS5hQ^\u000bG/\u001a:nCJ\\\u0007%A\u0010jg>3gm]3u\r>\u0014H*Z1eKJ,\u0005o\\2i'V\u0004\bo\u001c:uK\u0012,\u0012\u0001\u001f\t\u0003\u0019fL!A_\u0017\u0003\u000f\t{w\u000e\\3b]\u0006\u0001\u0013n](gMN,GOR8s\u0019\u0016\fG-\u001a:Fa>\u001c\u0007nU;qa>\u0014H/\u001a3!\u0003-a\u0017\r^3ti\u0016\u0003xn\u00195\u0015\u0007y\fI\u0001\u0005\u0003M\u007f\u0006\r\u0011bAA\u0001[\t1q\n\u001d;j_:\u00042\u0001TA\u0003\u0013\r\t9!\f\u0002\u0004\u0013:$\bBBA\u0006\u001d\u0001\u0007\u0001/\u0001\bu_BL7\rU1si&$\u0018n\u001c8\u0002\u001d1|wm\u0015;beR|eMZ:fiR!\u0011\u0011CA\f!\ra\u00151C\u0005\u0004\u0003+i#\u0001\u0002'p]\u001eDa!a\u0003\u0010\u0001\u0004\u0001\u0018\u0001\u00047pO\u0016sGm\u00144gg\u0016$H\u0003BA\t\u0003;Aa!a\u0003\u0011\u0001\u0004\u0001\u0018!E3oI>3gm]3u\r>\u0014X\t]8dQR1\u00111EA\u0016\u0003[\u0001B\u0001T@\u0002&A\u00191%a\n\n\u0007\u0005%BD\u0001\bPM\u001a\u001cX\r^!oI\u0016\u0003xn\u00195\t\r\u0005-\u0011\u00031\u0001q\u0011\u001d\ty#\u0005a\u0001\u0003\u0007\tQ!\u001a9pG\"\f\u0001#\u001b8ji&\fG/Z*ikR$wn\u001e8\u0015\u0003a\fQ\"Y<bSR\u001c\u0006.\u001e;e_^tGCAA\u001d!\ra\u00151H\u0005\u0004\u0003{i#\u0001B+oSR\fa\u0001Z8X_J\\\u0017\u0001\u00069s_\u000e,7o\u001d)beRLG/[8o\t\u0006$\u0018\r\u0006\u0005\u0002F\u0005M\u0013QKA-!\u0011au0a\u0012\u0011\t\u0005%\u0013qJ\u0007\u0003\u0003\u0017R1!!\u0014\u001f\u0003\rawnZ\u0005\u0005\u0003#\nYEA\u0007M_\u001e\f\u0005\u000f]3oI&sgm\u001c\u0005\u0007\u0003\u0017)\u0002\u0019\u00019\t\u000f\u0005]S\u00031\u0001\u0002\u0012\u0005Ya-\u001a;dQ>3gm]3u\u0011\u001d\tY&\u0006a\u0001\u0003;\nQ\u0002]1si&$\u0018n\u001c8ECR\f\u0007\u0003BA0\u0003Cj\u0011\u0001A\u0005\u0004\u0003G\"#!\u0003$fi\u000eDG)\u0019;b\u0003q\u0019w.\u001c9mKR,G)\u001a7bs\u0016$g)\u001a;dQJ+\u0017/^3tiN\f1$\\1zE\u0016<\u0016M\u001d8JM>3XM]:ju\u0016$'+Z2pe\u0012\u001cHCBA\u001d\u0003W\nY\bC\u0004\u0002n]\u0001\r!a\u001c\u0002\u000fI,7m\u001c:egB!\u0011\u0011OA<\u001b\t\t\u0019HC\u0002\u0002vI\faA]3d_J$\u0017\u0002BA=\u0003g\u0012Q\"T3n_JL(+Z2pe\u0012\u001c\bBBA\u0006/\u0001\u0007\u0001/\u0001\u0005ueVt7-\u0019;f)\u0019\tI$!!\u0002\u0006\"1\u00111\u0011\rA\u0002A\f!\u0001\u001e9\t\u000f\u0005\u001d\u0005\u00041\u0001\u0002\n\u0006)rN\u001a4tKR$&/\u001e8dCRLwN\\*uCR,\u0007cA\u0012\u0002\f&\u0019\u0011Q\u0012\u000f\u0003+=3gm]3u)J,hnY1uS>t7\u000b^1uK\u00069BO];oG\u0006$XMR;mYf\fe\u000eZ*uCJ$\u0018\t\u001e\u000b\u0007\u0003s\t\u0019*!&\t\r\u0005-\u0011\u00041\u0001q\u0011\u001d\t9*\u0007a\u0001\u0003#\taa\u001c4gg\u0016$\u0018\u0001D:va\u0016\u0014H\u0005\\3bI\u0016\u0014X#A\u001b")
/* loaded from: input_file:META-INF/bundled-dependencies/kafka_2.13-3.4.0.jar:kafka/server/ReplicaFetcherThread.class */
public class ReplicaFetcherThread extends AbstractFetcherThread {
    private final KafkaConfig brokerConfig;
    private final ReplicaManager replicaMgr;
    private final ReplicaQuota quota;
    private final Function0<MetadataVersion> metadataVersionSupplier;
    private final Buffer<TopicPartition> partitionsWithNewHighWatermark;
    private final boolean isOffsetForLeaderEpochSupported;

    private /* synthetic */ LeaderEndPoint super$leader() {
        return super.leader();
    }

    public Buffer<TopicPartition> partitionsWithNewHighWatermark() {
        return this.partitionsWithNewHighWatermark;
    }

    @Override // kafka.server.AbstractFetcherThread
    public boolean isOffsetForLeaderEpochSupported() {
        return this.isOffsetForLeaderEpochSupported;
    }

    @Override // kafka.server.AbstractFetcherThread
    public Option<Object> latestEpoch(TopicPartition topicPartition) {
        return this.replicaMgr.localLogOrException(topicPartition).latestEpoch();
    }

    @Override // kafka.server.AbstractFetcherThread
    public long logStartOffset(TopicPartition topicPartition) {
        return this.replicaMgr.localLogOrException(topicPartition).kafka$log$UnifiedLog$$$anonfun$new$2();
    }

    @Override // kafka.server.AbstractFetcherThread
    public long logEndOffset(TopicPartition topicPartition) {
        return this.replicaMgr.localLogOrException(topicPartition).kafka$log$UnifiedLog$$$anonfun$new$3();
    }

    @Override // kafka.server.AbstractFetcherThread
    public Option<OffsetAndEpoch> endOffsetForEpoch(TopicPartition topicPartition, int i) {
        return this.replicaMgr.localLogOrException(topicPartition).endOffsetForEpoch(i);
    }

    @Override // kafka.utils.ShutdownableThread
    public boolean initiateShutdown() {
        boolean initiateShutdown = super.initiateShutdown();
        if (initiateShutdown) {
            try {
                super.leader().initiateClose();
            } catch (Throwable th) {
                error(() -> {
                    return new StringBuilder(80).append("Failed to initiate shutdown of ").append(this.super$leader()).append(" after initiating replica fetcher thread shutdown").toString();
                }, () -> {
                    return th;
                });
            }
        }
        return initiateShutdown;
    }

    @Override // kafka.utils.ShutdownableThread
    public void awaitShutdown() {
        super.awaitShutdown();
        try {
            super.leader().close();
        } catch (Throwable th) {
            error(() -> {
                return new StringBuilder(59).append("Failed to close ").append(this.super$leader()).append(" after shutting down replica fetcher thread").toString();
            }, () -> {
                return th;
            });
        }
    }

    @Override // kafka.server.AbstractFetcherThread, kafka.utils.ShutdownableThread
    public void doWork() {
        super.doWork();
        completeDelayedFetchRequests();
    }

    @Override // kafka.server.AbstractFetcherThread
    public Option<LogAppendInfo> processPartitionData(TopicPartition topicPartition, long j, FetchResponseData.PartitionData partitionData) {
        boolean isTraceEnabled = isTraceEnabled();
        Partition partitionOrException = this.replicaMgr.getPartitionOrException(topicPartition);
        UnifiedLog localLogOrException = partitionOrException.localLogOrException();
        MemoryRecords memoryRecords = toMemoryRecords(FetchResponse.recordsOrFail(partitionData));
        maybeWarnIfOversizedRecords(memoryRecords, topicPartition);
        if (j != localLogOrException.kafka$log$UnifiedLog$$$anonfun$new$3()) {
            throw new IllegalStateException(StringOps$.MODULE$.format$extension(Predef$.MODULE$.augmentString("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d."), ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{topicPartition, BoxesRunTime.boxToLong(j), BoxesRunTime.boxToLong(localLogOrException.kafka$log$UnifiedLog$$$anonfun$new$3())})));
        }
        if (isTraceEnabled) {
            trace(() -> {
                return StringOps$.MODULE$.format$extension(Predef$.MODULE$.augmentString("Follower has replica log end offset %d for partition %s. Received %d messages and leader hw %d"), ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(localLogOrException.kafka$log$UnifiedLog$$$anonfun$new$3()), topicPartition, BoxesRunTime.boxToInteger(memoryRecords.sizeInBytes()), BoxesRunTime.boxToLong(partitionData.highWatermark())}));
            });
        }
        Option<LogAppendInfo> appendRecordsToFollowerOrFutureReplica = partitionOrException.appendRecordsToFollowerOrFutureReplica(memoryRecords, false);
        if (isTraceEnabled) {
            trace(() -> {
                return StringOps$.MODULE$.format$extension(Predef$.MODULE$.augmentString("Follower has replica log end offset %d after appending %d bytes of messages for partition %s"), ScalaRunTime$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(localLogOrException.kafka$log$UnifiedLog$$$anonfun$new$3()), BoxesRunTime.boxToInteger(memoryRecords.sizeInBytes()), topicPartition}));
            });
        }
        long logStartOffset = partitionData.logStartOffset();
        ObjectRef create = ObjectRef.create("but did not update replica high watermark");
        localLogOrException.maybeUpdateHighWatermark(partitionData.highWatermark()).foreach(obj -> {
            return $anonfun$processPartitionData$3(this, create, topicPartition, BoxesRunTime.unboxToLong(obj));
        });
        localLogOrException.maybeIncrementLogStartOffset(logStartOffset, LeaderOffsetIncremented$.MODULE$);
        if (isTraceEnabled) {
            trace(() -> {
                return new StringBuilder(65).append("Follower received high watermark ").append(partitionData.highWatermark()).append(" from the leader ").append((String) create.elem).append(" for partition ").append(topicPartition).toString();
            });
        }
        if (this.quota.isThrottled(topicPartition)) {
            this.quota.record(memoryRecords.sizeInBytes());
        }
        if (partitionOrException.isReassigning() && partitionOrException.isAddingLocalReplica()) {
            brokerTopicStats().updateReassignmentBytesIn(memoryRecords.sizeInBytes());
        }
        brokerTopicStats().updateReplicationBytesIn(memoryRecords.sizeInBytes());
        return appendRecordsToFollowerOrFutureReplica;
    }

    private void completeDelayedFetchRequests() {
        if (partitionsWithNewHighWatermark().nonEmpty()) {
            this.replicaMgr.completeDelayedFetchRequests(partitionsWithNewHighWatermark().toSeq());
            partitionsWithNewHighWatermark().clear();
        }
    }

    public void maybeWarnIfOversizedRecords(MemoryRecords memoryRecords, TopicPartition topicPartition) {
        if (this.metadataVersionSupplier.mo5830apply().fetchRequestVersion() > 2 || memoryRecords.sizeInBytes() <= 0 || memoryRecords.validBytes() > 0) {
            return;
        }
        error(() -> {
            return new StringBuilder(412).append("Replication is failing due to a message that is greater than replica.fetch.max.bytes for partition ").append(topicPartition).append(". ").append("This generally occurs when the max.message.bytes has been overridden to exceed this value and a suitably large ").append("message has also been sent. To fix this problem increase replica.fetch.max.bytes in your broker config to be ").append("equal or larger than your settings for max.message.bytes, both at a broker and topic level.").toString();
        });
    }

    @Override // kafka.server.AbstractFetcherThread
    public void truncate(TopicPartition topicPartition, OffsetTruncationState offsetTruncationState) {
        Partition partitionOrException = this.replicaMgr.getPartitionOrException(topicPartition);
        UnifiedLog localLogOrException = partitionOrException.localLogOrException();
        partitionOrException.truncateTo(offsetTruncationState.offset(), false);
        if (offsetTruncationState.offset() < localLogOrException.highWatermark()) {
            warn(() -> {
                return new StringBuilder(44).append("Truncating ").append(topicPartition).append(" to offset ").append(offsetTruncationState.offset()).append(" below high watermark ").append(localLogOrException.highWatermark()).toString();
            });
        }
        if (offsetTruncationState.truncationCompleted()) {
            this.replicaMgr.replicaAlterLogDirsManager().markPartitionsForTruncation(this.brokerConfig.brokerId(), topicPartition, offsetTruncationState.offset());
        }
    }

    @Override // kafka.server.AbstractFetcherThread
    public void truncateFullyAndStartAt(TopicPartition topicPartition, long j) {
        this.replicaMgr.getPartitionOrException(topicPartition).truncateFullyAndStartAt(j, false);
    }

    /* JADX WARN: Type inference failed for: r1v3, types: [T, java.lang.String] */
    public static final /* synthetic */ Buffer $anonfun$processPartitionData$3(ReplicaFetcherThread replicaFetcherThread, ObjectRef objectRef, TopicPartition topicPartition, long j) {
        objectRef.elem = new StringBuilder(38).append("and updated replica high watermark to ").append(j).toString();
        return (Buffer) replicaFetcherThread.partitionsWithNewHighWatermark().$plus$eq(topicPartition);
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public ReplicaFetcherThread(String str, LeaderEndPoint leaderEndPoint, KafkaConfig kafkaConfig, FailedPartitions failedPartitions, ReplicaManager replicaManager, ReplicaQuota replicaQuota, String str2, Function0<MetadataVersion> function0) {
        super(str, str, leaderEndPoint, failedPartitions, Predef$.MODULE$.Integer2int(kafkaConfig.replicaFetchBackoffMs()), false, replicaManager.brokerTopicStats());
        this.brokerConfig = kafkaConfig;
        this.replicaMgr = replicaManager;
        this.quota = replicaQuota;
        this.metadataVersionSupplier = function0;
        logIdent_$eq(str2);
        this.partitionsWithNewHighWatermark = Buffer$.MODULE$.apply2((Seq) Nil$.MODULE$);
        this.isOffsetForLeaderEpochSupported = function0.mo5830apply().isOffsetForLeaderEpochSupported();
    }
}
