package org.apache.kafka.raft;

import java.nio.ByteBuffer;
import java.util.OptionalInt;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.protocol.types.Type;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;

/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.17.jar:META-INF/bundled-dependencies/kafka-raft-2.7.0.jar:org/apache/kafka/raft/ReplicatedCounter.class
 */
/* loaded from: input_file:META-INF/bundled-dependencies/kafka-raft-2.7.0.jar:org/apache/kafka/raft/ReplicatedCounter.class */
public class ReplicatedCounter {
    private final int localBrokerId;
    private final Logger log;
    private final RaftClient client;
    private final AtomicInteger committed = new AtomicInteger(0);
    private final AtomicInteger uncommitted = new AtomicInteger(0);
    private OffsetAndEpoch position = new OffsetAndEpoch(0, 0);
    private LeaderAndEpoch currentLeaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty(), 0);

    public ReplicatedCounter(int i, RaftClient raftClient, LogContext logContext) {
        this.localBrokerId = i;
        this.client = raftClient;
        this.log = logContext.logger(ReplicatedCounter.class);
    }

    private Records tryRead(long j) {
        try {
            return this.client.read(this.position, Isolation.COMMITTED, j).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private void apply(Record record) {
        int deserialize = deserialize(record);
        if (deserialize != this.committed.get() + 1) {
            this.log.debug("Ignoring non-sequential append at offset {}: {} -> {}", Long.valueOf(record.offset()), Integer.valueOf(this.committed.get()), Integer.valueOf(deserialize));
            return;
        }
        this.log.debug("Applied increment at offset {}: {} -> {}", Long.valueOf(record.offset()), Integer.valueOf(this.committed.get()), Integer.valueOf(deserialize));
        this.committed.set(deserialize);
        if (deserialize > this.uncommitted.get()) {
            this.uncommitted.set(deserialize);
        }
    }

    public synchronized void poll(long j) {
        LeaderAndEpoch currentLeaderAndEpoch = this.client.currentLeaderAndEpoch();
        if (!this.currentLeaderAndEpoch.equals(currentLeaderAndEpoch)) {
            if (this.localBrokerId == currentLeaderAndEpoch.leaderId.orElse(-1)) {
                this.uncommitted.set(this.committed.get());
            }
            this.currentLeaderAndEpoch = currentLeaderAndEpoch;
        }
        for (RecordBatch recordBatch : tryRead(j).batches()) {
            if (!recordBatch.isControlBatch()) {
                recordBatch.forEach(this::apply);
            }
            this.position = new OffsetAndEpoch(recordBatch.lastOffset() + 1, recordBatch.partitionLeaderEpoch());
        }
    }

    public synchronized boolean isWritable() {
        return this.localBrokerId == this.currentLeaderAndEpoch.leaderId.orElse(-1);
    }

    public synchronized void increment() {
        if (!isWritable()) {
            throw new KafkaException("Counter is not currently writable");
        }
        int i = this.uncommitted.get();
        int incrementAndGet = this.uncommitted.incrementAndGet();
        this.client.append(MemoryRecords.withRecords(CompressionType.NONE, serialize(incrementAndGet)), AckMode.LEADER, 2147483647L).whenComplete((offsetAndEpoch, th) -> {
            if (offsetAndEpoch != null) {
                this.log.debug("Appended increment at offset {}: {} -> {}", Long.valueOf(offsetAndEpoch.offset), Integer.valueOf(i), Integer.valueOf(incrementAndGet));
            } else {
                this.uncommitted.set(i);
                this.log.debug("Failed append of increment: {} -> {}", Integer.valueOf(i), Integer.valueOf(incrementAndGet), th);
            }
        });
    }

    private SimpleRecord serialize(int i) {
        ByteBuffer allocate = ByteBuffer.allocate(4);
        Type.INT32.write(allocate, Integer.valueOf(i));
        allocate.flip();
        return new SimpleRecord(allocate);
    }

    private int deserialize(Record record) {
        return ((Integer) Type.INT32.read(record.value())).intValue();
    }
}
