package org.apache.pulsar.broker.transaction.buffer;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import org.apache.pulsar.broker.transaction.buffer.exceptions.EndOfTransactionException;
import org.apache.pulsar.broker.transaction.buffer.impl.InMemTransactionBufferReader;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/transaction/buffer/InMemTransactionBufferReaderTest.class */
public class InMemTransactionBufferReaderTest {
    private final TxnID txnID = new TxnID(1234, 5678);

    @Test
    public void testInvalidNumEntriesArgument() {
        InMemTransactionBufferReader inMemTransactionBufferReader = new InMemTransactionBufferReader(this.txnID, Collections.emptySortedMap().entrySet().iterator(), 22L, 33L);
        try {
            try {
                inMemTransactionBufferReader.readNext(-1).join();
                Assert.fail("Should fail to readNext if `numEntries` is invalid");
            } catch (CompletionException e) {
                Assert.assertTrue(e.getCause() instanceof IllegalArgumentException);
            }
            inMemTransactionBufferReader.close();
        } catch (Throwable th) {
            try {
                inMemTransactionBufferReader.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testCloseReleaseAllEntries() throws Exception {
        TreeMap treeMap = new TreeMap();
        for (int i = 0; i < 100; i++) {
            treeMap.put(Long.valueOf(i), Unpooled.copiedBuffer("message-" + i, StandardCharsets.UTF_8));
        }
        InMemTransactionBufferReader inMemTransactionBufferReader = new InMemTransactionBufferReader(this.txnID, treeMap.entrySet().iterator(), 22L, 33L);
        try {
            verifyAndReleaseEntries((List) inMemTransactionBufferReader.readNext(10).get(), this.txnID, 0L, 10);
            verifyEntriesReleased(treeMap, 0L, 10);
            inMemTransactionBufferReader.close();
            verifyEntriesReleased(treeMap, 10L, 90);
        } catch (Throwable th) {
            try {
                inMemTransactionBufferReader.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testEndOfTransactionException() throws Exception {
        TreeMap treeMap = new TreeMap();
        for (int i = 0; i < 100; i++) {
            treeMap.put(Long.valueOf(i), Unpooled.copiedBuffer("message-" + i, StandardCharsets.UTF_8));
        }
        InMemTransactionBufferReader inMemTransactionBufferReader = new InMemTransactionBufferReader(this.txnID, treeMap.entrySet().iterator(), 22L, 33L);
        try {
            verifyAndReleaseEntries((List) inMemTransactionBufferReader.readNext(110).get(), this.txnID, 0L, 100);
            verifyEntriesReleased(treeMap, 0L, 100);
            try {
                inMemTransactionBufferReader.readNext(1).get();
                Assert.fail("should fail to read entries if there is no more in the transaction buffer");
            } catch (ExecutionException e) {
                Assert.assertTrue(e.getCause() instanceof EndOfTransactionException);
            }
            inMemTransactionBufferReader.close();
        } catch (Throwable th) {
            try {
                inMemTransactionBufferReader.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void verifyAndReleaseEntries(List<TransactionEntry> list, TxnID txnID, long j, int i) {
        Assert.assertEquals(list.size(), i);
        for (int i2 = 0; i2 < i; i2++) {
            TransactionEntry transactionEntry = list.get(i2);
            try {
                Assert.assertEquals(transactionEntry.committedAtLedgerId(), 22L);
                Assert.assertEquals(transactionEntry.committedAtEntryId(), 33L);
                Assert.assertEquals(transactionEntry.txnId(), txnID);
                Assert.assertEquals(transactionEntry.sequenceId(), j + i2);
                Assert.assertEquals(new String(ByteBufUtil.getBytes(transactionEntry.getEntry().getDataBuffer()), StandardCharsets.UTF_8), "message-" + i2);
                if (transactionEntry != null) {
                    transactionEntry.close();
                }
            } catch (Throwable th) {
                if (transactionEntry != null) {
                    try {
                        transactionEntry.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    private void verifyEntriesReleased(SortedMap<Long, ByteBuf> sortedMap, long j, int i) {
        for (int i2 = 0; i2 < i; i2++) {
            ByteBuf byteBuf = sortedMap.get(Long.valueOf(j + i2));
            Assert.assertNotNull(byteBuf);
            Assert.assertEquals(byteBuf.refCnt(), 0);
        }
    }
}
