package org.apache.bookkeeper.mledger.impl.cache;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.class */
public class PendingReadsManagerTest {
    private static final Logger log = LoggerFactory.getLogger(PendingReadsManagerTest.class);
    static final Object CTX = "foo";
    static final Object CTX2 = "far";
    static final long ledgerId = 123414;
    ExecutorService orderedExecutor;
    RangeEntryCacheImpl rangeEntryCache;
    PendingReadsManager pendingReadsManager;
    InflightReadsLimiter inflighReadsLimiter;
    ReadHandle lh;
    ManagedLedgerImpl ml;

    /* loaded from: input_file:org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest$CapturingReadEntriesCallback.class */
    private static class CapturingReadEntriesCallback extends CompletableFuture<Void> implements AsyncCallbacks.ReadEntriesCallback {
        List<Position> entries;
        Object ctx;
        Throwable error;

        public synchronized void readEntriesComplete(List<Entry> list, Object obj) {
            this.entries = (List) list.stream().map((v0) -> {
                return v0.getPosition();
            }).collect(Collectors.toList());
            this.ctx = obj;
            this.error = null;
            complete(null);
        }

        public synchronized void readEntriesFailed(ManagedLedgerException managedLedgerException, Object obj) {
            this.entries = null;
            this.ctx = obj;
            this.error = managedLedgerException;
            completeExceptionally(managedLedgerException);
        }

        public List<Position> getEntries() {
            return this.entries;
        }

        public Object getCtx() {
            return this.ctx;
        }

        public Throwable getError() {
            return this.error;
        }

        public void setEntries(List<Position> list) {
            this.entries = list;
        }

        public void setCtx(Object obj) {
            this.ctx = obj;
        }

        public void setError(Throwable th) {
            this.error = th;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof CapturingReadEntriesCallback)) {
                return false;
            }
            CapturingReadEntriesCallback capturingReadEntriesCallback = (CapturingReadEntriesCallback) obj;
            if (!capturingReadEntriesCallback.canEqual(this)) {
                return false;
            }
            List<Position> entries = getEntries();
            List<Position> entries2 = capturingReadEntriesCallback.getEntries();
            if (entries == null) {
                if (entries2 != null) {
                    return false;
                }
            } else if (!entries.equals(entries2)) {
                return false;
            }
            Object ctx = getCtx();
            Object ctx2 = capturingReadEntriesCallback.getCtx();
            if (ctx == null) {
                if (ctx2 != null) {
                    return false;
                }
            } else if (!ctx.equals(ctx2)) {
                return false;
            }
            Throwable error = getError();
            Throwable error2 = capturingReadEntriesCallback.getError();
            return error == null ? error2 == null : error.equals(error2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof CapturingReadEntriesCallback;
        }

        public int hashCode() {
            List<Position> entries = getEntries();
            int hashCode = (1 * 59) + (entries == null ? 43 : entries.hashCode());
            Object ctx = getCtx();
            int hashCode2 = (hashCode * 59) + (ctx == null ? 43 : ctx.hashCode());
            Throwable error = getError();
            return (hashCode2 * 59) + (error == null ? 43 : error.hashCode());
        }

        @Override // java.util.concurrent.CompletableFuture
        public String toString() {
            return "PendingReadsManagerTest.CapturingReadEntriesCallback(entries=" + getEntries() + ", ctx=" + getCtx() + ", error=" + getError() + ")";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest$PreparedReadFromStorage.class */
    public static class PreparedReadFromStorage extends CompletableFuture<List<EntryImpl>> {
        final long firstEntry;
        final long endEntry;
        final boolean shouldCacheEntry;

        public PreparedReadFromStorage(long j, long j2, boolean z) {
            this.firstEntry = j;
            this.endEntry = j2;
            this.shouldCacheEntry = z;
        }

        @Override // java.util.concurrent.CompletableFuture
        public String toString() {
            long j = this.firstEntry;
            long j2 = this.endEntry;
            boolean z = this.shouldCacheEntry;
            return "PreparedReadFromStorage(" + j + "," + j + "," + j2 + ")";
        }

        public void storageReadCompleted() {
            complete(PendingReadsManagerTest.buildList(this.firstEntry, this.endEntry));
        }
    }

    PendingReadsManagerTest() {
    }

    @BeforeClass(alwaysRun = true)
    void before() {
        this.orderedExecutor = Executors.newSingleThreadExecutor();
    }

    @AfterClass(alwaysRun = true)
    void after() {
        if (this.orderedExecutor != null) {
            this.orderedExecutor.shutdown();
            this.orderedExecutor = null;
        }
    }

    @BeforeMethod(alwaysRun = true)
    void setupMocks() {
        this.rangeEntryCache = (RangeEntryCacheImpl) Mockito.mock(RangeEntryCacheImpl.class);
        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
        managedLedgerConfig.setReadEntryTimeoutSeconds(10000L);
        Mockito.when(this.rangeEntryCache.getName()).thenReturn("my-topic");
        Mockito.when(this.rangeEntryCache.getManagedLedgerConfig()).thenReturn(managedLedgerConfig);
        this.inflighReadsLimiter = new InflightReadsLimiter(0L);
        Mockito.when(this.rangeEntryCache.getPendingReadsLimiter()).thenReturn(this.inflighReadsLimiter);
        this.pendingReadsManager = new PendingReadsManager(this.rangeEntryCache);
        ((RangeEntryCacheImpl) Mockito.doAnswer(new Answer() { // from class: org.apache.bookkeeper.mledger.impl.cache.PendingReadsManagerTest.1
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                PendingReadsManagerTest.log.info("rangeEntryCache asyncReadEntry0 {}", invocationOnMock);
                PendingReadsManagerTest.this.pendingReadsManager.readEntries(PendingReadsManagerTest.this.lh, ((Long) invocationOnMock.getArgument(1)).longValue(), ((Long) invocationOnMock.getArgument(2)).longValue(), ((Boolean) invocationOnMock.getArgument(3)).booleanValue(), (AsyncCallbacks.ReadEntriesCallback) invocationOnMock.getArgument(4), invocationOnMock.getArgument(5));
                return null;
            }
        }).when(this.rangeEntryCache)).asyncReadEntry0((ReadHandle) ArgumentMatchers.any(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyBoolean(), (AsyncCallbacks.ReadEntriesCallback) ArgumentMatchers.any(), ArgumentMatchers.any());
        this.lh = (ReadHandle) Mockito.mock(ReadHandle.class);
        this.ml = (ManagedLedgerImpl) Mockito.mock(ManagedLedgerImpl.class);
        Mockito.when(this.ml.getExecutor()).thenReturn(this.orderedExecutor);
        Mockito.when(this.rangeEntryCache.getManagedLedger()).thenReturn(this.ml);
    }

    private static List<EntryImpl> buildList(long j, long j2) {
        ArrayList arrayList = new ArrayList();
        long j3 = j;
        while (true) {
            long j4 = j3;
            if (j4 > j2) {
                return arrayList;
            }
            arrayList.add(EntryImpl.create(ledgerId, j4, "data".getBytes(StandardCharsets.UTF_8)));
            j3 = j4 + 1;
        }
    }

    private void verifyRange(List<Position> list, long j, long j2) {
        int i = 0;
        log.info("verifyRange numEntries {}", Integer.valueOf(list.size()));
        long j3 = j;
        while (true) {
            long j4 = j3;
            if (j4 > j2) {
                return;
            }
            int i2 = i;
            i++;
            Assert.assertEquals(list.get(i2).getEntryId(), j4);
            j3 = j4 + 1;
        }
    }

    private PreparedReadFromStorage prepareReadFromStorage(ReadHandle readHandle, RangeEntryCacheImpl rangeEntryCacheImpl, long j, long j2, boolean z) {
        PreparedReadFromStorage preparedReadFromStorage = new PreparedReadFromStorage(j, j2, z);
        log.info("prepareReadFromStorage from {} to {} shouldCacheEntry {}", new Object[]{Long.valueOf(j), Long.valueOf(j2), Boolean.valueOf(z)});
        Mockito.when(rangeEntryCacheImpl.readFromStorage((ReadHandle) ArgumentMatchers.eq(readHandle), ArgumentMatchers.eq(j), ArgumentMatchers.eq(j2), ArgumentMatchers.eq(z))).thenAnswer(invocationOnMock -> {
            log.info("readFromStorage from {} to {} shouldCacheEntry {}", new Object[]{Long.valueOf(j), Long.valueOf(j2), Boolean.valueOf(z)});
            return preparedReadFromStorage;
        });
        return preparedReadFromStorage;
    }

    @Test
    public void simpleRead() throws Exception {
        PreparedReadFromStorage prepareReadFromStorage = prepareReadFromStorage(this.lh, this.rangeEntryCache, 100L, 199L, false);
        CapturingReadEntriesCallback capturingReadEntriesCallback = new CapturingReadEntriesCallback();
        this.pendingReadsManager.readEntries(this.lh, 100L, 199L, false, capturingReadEntriesCallback, CTX);
        prepareReadFromStorage.storageReadCompleted();
        capturingReadEntriesCallback.get();
        AssertJUnit.assertSame(capturingReadEntriesCallback.getCtx(), CTX);
        verifyRange(capturingReadEntriesCallback.entries, 100L, 199L);
    }

    @Test
    public void simpleConcurrentReadPerfectMatch() throws Exception {
        PreparedReadFromStorage prepareReadFromStorage = prepareReadFromStorage(this.lh, this.rangeEntryCache, 100L, 199L, false);
        PendingReadsManager pendingReadsManager = new PendingReadsManager(this.rangeEntryCache);
        CapturingReadEntriesCallback capturingReadEntriesCallback = new CapturingReadEntriesCallback();
        pendingReadsManager.readEntries(this.lh, 100L, 199L, false, capturingReadEntriesCallback, CTX);
        CapturingReadEntriesCallback capturingReadEntriesCallback2 = new CapturingReadEntriesCallback();
        pendingReadsManager.readEntries(this.lh, 100L, 199L, false, capturingReadEntriesCallback2, CTX2);
        prepareReadFromStorage.storageReadCompleted();
        capturingReadEntriesCallback.get();
        capturingReadEntriesCallback2.get();
        AssertJUnit.assertSame(capturingReadEntriesCallback.getCtx(), CTX);
        AssertJUnit.assertSame(capturingReadEntriesCallback2.getCtx(), CTX2);
        verifyRange(capturingReadEntriesCallback.entries, 100L, 199L);
        verifyRange(capturingReadEntriesCallback2.entries, 100L, 199L);
        int i = 0;
        long j = 100;
        while (true) {
            long j2 = j;
            if (j2 > 199) {
                return;
            }
            AssertJUnit.assertNotSame(capturingReadEntriesCallback.entries.get(i), capturingReadEntriesCallback2.entries.get(i));
            Assert.assertEquals(capturingReadEntriesCallback.entries.get(i).getEntryId(), capturingReadEntriesCallback2.entries.get(i).getEntryId());
            i++;
            j = j2 + 1;
        }
    }

    @Test
    public void simpleConcurrentReadIncluding() throws Exception {
        long j = 100 + 10;
        long j2 = 199 - 10;
        PreparedReadFromStorage prepareReadFromStorage = prepareReadFromStorage(this.lh, this.rangeEntryCache, 100L, 199L, false);
        PendingReadsManager pendingReadsManager = new PendingReadsManager(this.rangeEntryCache);
        CapturingReadEntriesCallback capturingReadEntriesCallback = new CapturingReadEntriesCallback();
        pendingReadsManager.readEntries(this.lh, 100L, 199L, false, capturingReadEntriesCallback, CTX);
        CapturingReadEntriesCallback capturingReadEntriesCallback2 = new CapturingReadEntriesCallback();
        pendingReadsManager.readEntries(this.lh, j, j2, false, capturingReadEntriesCallback2, CTX2);
        prepareReadFromStorage.storageReadCompleted();
        capturingReadEntriesCallback.get();
        capturingReadEntriesCallback2.get();
        AssertJUnit.assertSame(capturingReadEntriesCallback.getCtx(), CTX);
        AssertJUnit.assertSame(capturingReadEntriesCallback2.getCtx(), CTX2);
        verifyRange(capturingReadEntriesCallback.entries, 100L, 199L);
        verifyRange(capturingReadEntriesCallback2.entries, j, j2);
        int i = 0;
        long j3 = 100;
        while (true) {
            long j4 = j3;
            if (j4 > 199) {
                return;
            }
            if (j4 >= j && j4 <= j2) {
                int i2 = (int) (i - (j - 100));
                AssertJUnit.assertNotSame(capturingReadEntriesCallback.entries.get(i), capturingReadEntriesCallback2.entries.get(i2));
                Assert.assertEquals(capturingReadEntriesCallback.entries.get(i).getEntryId(), capturingReadEntriesCallback2.entries.get(i2).getEntryId());
            }
            i++;
            j3 = j4 + 1;
        }
    }

    @Test
    public void simpleConcurrentReadMissingLeft() throws Exception {
        long j = 100 - 10;
        PreparedReadFromStorage prepareReadFromStorage = prepareReadFromStorage(this.lh, this.rangeEntryCache, 100L, 199L, false);
        PreparedReadFromStorage prepareReadFromStorage2 = prepareReadFromStorage(this.lh, this.rangeEntryCache, j, 100 - 1, false);
        PendingReadsManager pendingReadsManager = new PendingReadsManager(this.rangeEntryCache);
        CapturingReadEntriesCallback capturingReadEntriesCallback = new CapturingReadEntriesCallback();
        pendingReadsManager.readEntries(this.lh, 100L, 199L, false, capturingReadEntriesCallback, CTX);
        CapturingReadEntriesCallback capturingReadEntriesCallback2 = new CapturingReadEntriesCallback();
        pendingReadsManager.readEntries(this.lh, j, 199L, false, capturingReadEntriesCallback2, CTX2);
        prepareReadFromStorage.storageReadCompleted();
        capturingReadEntriesCallback.get();
        prepareReadFromStorage2.storageReadCompleted();
        capturingReadEntriesCallback2.get();
        AssertJUnit.assertSame(capturingReadEntriesCallback.getCtx(), CTX);
        AssertJUnit.assertSame(capturingReadEntriesCallback2.getCtx(), CTX2);
        verifyRange(capturingReadEntriesCallback.entries, 100L, 199L);
        verifyRange(capturingReadEntriesCallback2.entries, j, 199L);
    }

    @Test
    public void simpleConcurrentReadMissingRight() throws Exception {
        long j = 199 + 10;
        PreparedReadFromStorage prepareReadFromStorage = prepareReadFromStorage(this.lh, this.rangeEntryCache, 100L, 199L, false);
        PreparedReadFromStorage prepareReadFromStorage2 = prepareReadFromStorage(this.lh, this.rangeEntryCache, 199 + 1, j, false);
        PendingReadsManager pendingReadsManager = new PendingReadsManager(this.rangeEntryCache);
        CapturingReadEntriesCallback capturingReadEntriesCallback = new CapturingReadEntriesCallback();
        pendingReadsManager.readEntries(this.lh, 100L, 199L, false, capturingReadEntriesCallback, CTX);
        CapturingReadEntriesCallback capturingReadEntriesCallback2 = new CapturingReadEntriesCallback();
        pendingReadsManager.readEntries(this.lh, 100L, j, false, capturingReadEntriesCallback2, CTX2);
        prepareReadFromStorage.storageReadCompleted();
        capturingReadEntriesCallback.get();
        prepareReadFromStorage2.storageReadCompleted();
        capturingReadEntriesCallback2.get();
        AssertJUnit.assertSame(capturingReadEntriesCallback.getCtx(), CTX);
        AssertJUnit.assertSame(capturingReadEntriesCallback2.getCtx(), CTX2);
        verifyRange(capturingReadEntriesCallback.entries, 100L, 199L);
        verifyRange(capturingReadEntriesCallback2.entries, 100L, j);
    }

    @Test
    public void simpleConcurrentReadMissingBoth() throws Exception {
        long j = 100 - 10;
        long j2 = 199 + 10;
        PreparedReadFromStorage prepareReadFromStorage = prepareReadFromStorage(this.lh, this.rangeEntryCache, 100L, 199L, false);
        PreparedReadFromStorage prepareReadFromStorage2 = prepareReadFromStorage(this.lh, this.rangeEntryCache, j, 100 - 1, false);
        PreparedReadFromStorage prepareReadFromStorage3 = prepareReadFromStorage(this.lh, this.rangeEntryCache, 199 + 1, j2, false);
        PendingReadsManager pendingReadsManager = new PendingReadsManager(this.rangeEntryCache);
        CapturingReadEntriesCallback capturingReadEntriesCallback = new CapturingReadEntriesCallback();
        pendingReadsManager.readEntries(this.lh, 100L, 199L, false, capturingReadEntriesCallback, CTX);
        CapturingReadEntriesCallback capturingReadEntriesCallback2 = new CapturingReadEntriesCallback();
        pendingReadsManager.readEntries(this.lh, j, j2, false, capturingReadEntriesCallback2, CTX2);
        prepareReadFromStorage.storageReadCompleted();
        capturingReadEntriesCallback.get();
        prepareReadFromStorage2.storageReadCompleted();
        prepareReadFromStorage3.storageReadCompleted();
        capturingReadEntriesCallback2.get();
        AssertJUnit.assertSame(capturingReadEntriesCallback.getCtx(), CTX);
        AssertJUnit.assertSame(capturingReadEntriesCallback2.getCtx(), CTX2);
        verifyRange(capturingReadEntriesCallback.entries, 100L, 199L);
        verifyRange(capturingReadEntriesCallback2.entries, j, j2);
    }

    @Test
    public void simpleConcurrentReadNoMatch() throws Exception {
        PreparedReadFromStorage prepareReadFromStorage = prepareReadFromStorage(this.lh, this.rangeEntryCache, 100L, 199L, false);
        PreparedReadFromStorage prepareReadFromStorage2 = prepareReadFromStorage(this.lh, this.rangeEntryCache, 1000L, 1099L, false);
        PendingReadsManager pendingReadsManager = new PendingReadsManager(this.rangeEntryCache);
        CapturingReadEntriesCallback capturingReadEntriesCallback = new CapturingReadEntriesCallback();
        pendingReadsManager.readEntries(this.lh, 100L, 199L, false, capturingReadEntriesCallback, CTX);
        CapturingReadEntriesCallback capturingReadEntriesCallback2 = new CapturingReadEntriesCallback();
        pendingReadsManager.readEntries(this.lh, 1000L, 1099L, false, capturingReadEntriesCallback2, CTX2);
        prepareReadFromStorage.storageReadCompleted();
        capturingReadEntriesCallback.get();
        prepareReadFromStorage2.storageReadCompleted();
        capturingReadEntriesCallback2.get();
        AssertJUnit.assertSame(capturingReadEntriesCallback.getCtx(), CTX);
        AssertJUnit.assertSame(capturingReadEntriesCallback2.getCtx(), CTX2);
        verifyRange(capturingReadEntriesCallback.entries, 100L, 199L);
        verifyRange(capturingReadEntriesCallback2.entries, 1000L, 1099L);
    }
}
