package org.apache.bookkeeper.stream.storage.impl.kv;

import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.stream.proto.kv.KeyValue;
import org.apache.bookkeeper.stream.proto.kv.rpc.Compare;
import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
import org.apache.bookkeeper.stream.proto.kv.rpc.RequestOp;
import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseOp;
import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
import org.apache.bookkeeper.stream.proto.storage.StatusCode;
import org.apache.bookkeeper.stream.storage.impl.store.MVCCAsyncStoreTestBase;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/stream/storage/impl/kv/TableStoreImplTest.class */
public class TableStoreImplTest extends MVCCAsyncStoreTestBase {
    private static final long SC_ID = 123;
    private TableStoreImpl tableStore;
    private static final Logger log = LoggerFactory.getLogger(TableStoreImplTest.class);
    private static final ByteString RKEY = ByteString.copyFromUtf8("routing-key");
    private static final RoutingHeader HEADER = RoutingHeader.newBuilder().setRangeId(1234).setRKey(RKEY).setStreamId(1256).build();

    @Override // org.apache.bookkeeper.stream.storage.impl.store.MVCCAsyncStoreTestBase
    protected void doSetup() throws Exception {
        this.tableStore = new TableStoreImpl(this.store);
    }

    @Override // org.apache.bookkeeper.stream.storage.impl.store.MVCCAsyncStoreTestBase
    protected void doTeardown() throws Exception {
    }

    private static ByteString getKey(int i) {
        return ByteString.copyFromUtf8(String.format("key-%05d", Integer.valueOf(i)));
    }

    private ByteString getValue(int i) {
        return ByteString.copyFromUtf8(String.format("value-%05d", Integer.valueOf(i)));
    }

    private List<KeyValue> writeKVs(int i, boolean z) throws Exception {
        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(i);
        for (int i2 = 0; i2 < i; i2++) {
            newArrayListWithExpectedSize.add(writeKV(i2, z));
        }
        return Lists.transform((List) FutureUtils.result(FutureUtils.collect(newArrayListWithExpectedSize)), putResponse -> {
            Assert.assertEquals(StatusCode.SUCCESS, putResponse.getHeader().getCode());
            Assert.assertEquals(HEADER, putResponse.getHeader().getRoutingHeader());
            if (putResponse.hasPrevKv()) {
                return putResponse.getPrevKv();
            }
            return null;
        });
    }

    private CompletableFuture<PutResponse> writeKV(int i, boolean z) {
        return this.tableStore.put(PutRequest.newBuilder().setKey(getKey(i)).setValue(getValue(i)).setHeader(HEADER).setPrevKv(z).build());
    }

    RangeResponse getKeyFromTableStore(int i) throws Exception {
        return (RangeResponse) FutureUtils.result(this.tableStore.range(RangeRequest.newBuilder().setHeader(HEADER).setKey(getKey(i)).build()));
    }

    KeyValue getKeyValue(int i) throws Exception {
        RangeResponse keyFromTableStore = getKeyFromTableStore(i);
        Assert.assertEquals(StatusCode.SUCCESS, keyFromTableStore.getHeader().getCode());
        Assert.assertEquals(HEADER, keyFromTableStore.getHeader().getRoutingHeader());
        Assert.assertFalse(keyFromTableStore.getMore());
        if (0 == keyFromTableStore.getCount()) {
            return null;
        }
        return keyFromTableStore.getKvs(0);
    }

    void putKeyToTableStore(int i, int i2) throws Exception {
        PutResponse putResponse = (PutResponse) FutureUtils.result(this.tableStore.put(PutRequest.newBuilder().setHeader(HEADER).setKey(getKey(i)).setValue(getValue(i2)).build()));
        Assert.assertEquals(StatusCode.SUCCESS, putResponse.getHeader().getCode());
        Assert.assertEquals(HEADER, putResponse.getHeader().getRoutingHeader());
        Assert.assertFalse(putResponse.hasPrevKv());
    }

    KeyValue putIfAbsentToTableStore(int i, int i2, boolean z) throws Exception {
        TxnResponse txnResponse = (TxnResponse) FutureUtils.result(this.tableStore.txn(TxnRequest.newBuilder().setHeader(HEADER).addCompare(Compare.newBuilder().setResult(Compare.CompareResult.EQUAL).setTarget(Compare.CompareTarget.VALUE).setKey(getKey(i)).setValue(ByteString.copyFrom(new byte[0]))).addSuccess(RequestOp.newBuilder().setRequestPut(PutRequest.newBuilder().setHeader(HEADER).setKey(getKey(i)).setValue(getValue(i2)).setPrevKv(true).build())).addFailure(RequestOp.newBuilder().setRequestRange(RangeRequest.newBuilder().setHeader(HEADER).setKey(getKey(i)).build())).build()));
        Assert.assertEquals(HEADER, txnResponse.getHeader().getRoutingHeader());
        Assert.assertEquals(StatusCode.SUCCESS, txnResponse.getHeader().getCode());
        ResponseOp responses = txnResponse.getResponses(0);
        if (z) {
            Assert.assertTrue(txnResponse.getSucceeded());
            PutResponse responsePut = responses.getResponsePut();
            Assert.assertEquals(HEADER, responsePut.getHeader().getRoutingHeader());
            if (responsePut.hasPrevKv()) {
                return responsePut.getPrevKv();
            }
            return null;
        }
        Assert.assertFalse(txnResponse.getSucceeded());
        RangeResponse responseRange = responses.getResponseRange();
        if (responseRange.getCount() == 0) {
            return null;
        }
        Assert.assertEquals(1L, responseRange.getCount());
        return responseRange.getKvs(0);
    }

    TxnResponse vPutToTableStore(int i, int i2, long j) throws Exception {
        return (TxnResponse) FutureUtils.result(this.tableStore.txn(TxnRequest.newBuilder().setHeader(HEADER).addCompare(Compare.newBuilder().setResult(Compare.CompareResult.EQUAL).setTarget(Compare.CompareTarget.VERSION).setKey(getKey(i)).setVersion(j)).addSuccess(RequestOp.newBuilder().setRequestPut(PutRequest.newBuilder().setHeader(HEADER).setKey(getKey(i)).setValue(getValue(i2)).setPrevKv(true).build())).addFailure(RequestOp.newBuilder().setRequestRange(RangeRequest.newBuilder().setHeader(HEADER).setKey(getKey(i)).build())).build()));
    }

    KeyValue verifyVPutResponse(TxnResponse txnResponse, boolean z) throws Exception {
        Assert.assertEquals(HEADER, txnResponse.getHeader().getRoutingHeader());
        Assert.assertEquals(StatusCode.SUCCESS, txnResponse.getHeader().getCode());
        ResponseOp responses = txnResponse.getResponses(0);
        if (z) {
            Assert.assertTrue(txnResponse.getSucceeded());
            PutResponse responsePut = responses.getResponsePut();
            Assert.assertEquals(HEADER, responsePut.getHeader().getRoutingHeader());
            if (responsePut.hasPrevKv()) {
                return responsePut.getPrevKv();
            }
            return null;
        }
        Assert.assertFalse(txnResponse.getSucceeded());
        RangeResponse responseRange = responses.getResponseRange();
        if (responseRange.getCount() == 0) {
            return null;
        }
        Assert.assertEquals(1L, responseRange.getCount());
        return responseRange.getKvs(0);
    }

    TxnResponse rPutToTableStore(int i, int i2, long j) throws Exception {
        return (TxnResponse) FutureUtils.result(this.tableStore.txn(TxnRequest.newBuilder().setHeader(HEADER).addCompare(Compare.newBuilder().setResult(Compare.CompareResult.EQUAL).setTarget(Compare.CompareTarget.MOD).setKey(getKey(i)).setModRevision(j)).addSuccess(RequestOp.newBuilder().setRequestPut(PutRequest.newBuilder().setHeader(HEADER).setKey(getKey(i)).setValue(getValue(i2)).setPrevKv(true).build())).addFailure(RequestOp.newBuilder().setRequestRange(RangeRequest.newBuilder().setHeader(HEADER).setKey(getKey(i)).build())).build()));
    }

    KeyValue deleteKeyFromTableStore(int i) throws Exception {
        DeleteRangeResponse deleteRangeResponse = (DeleteRangeResponse) FutureUtils.result(this.tableStore.delete(DeleteRangeRequest.newBuilder().setHeader(HEADER).setKey(getKey(i)).setPrevKv(true).build()));
        Assert.assertEquals(StatusCode.SUCCESS, deleteRangeResponse.getHeader().getCode());
        Assert.assertEquals(HEADER, deleteRangeResponse.getHeader().getRoutingHeader());
        if (0 == deleteRangeResponse.getPrevKvsCount()) {
            return null;
        }
        return deleteRangeResponse.getPrevKvs(0);
    }

    List<KeyValue> deleteRange(int i, int i2) throws Exception {
        DeleteRangeResponse deleteRangeResponse = (DeleteRangeResponse) FutureUtils.result(this.tableStore.delete(DeleteRangeRequest.newBuilder().setHeader(HEADER).setKey(getKey(i)).setRangeEnd(getKey(i2)).setPrevKv(true).build()));
        Assert.assertEquals(StatusCode.SUCCESS, deleteRangeResponse.getHeader().getCode());
        Assert.assertEquals(HEADER, deleteRangeResponse.getHeader().getRoutingHeader());
        return deleteRangeResponse.getPrevKvsList();
    }

    List<KeyValue> range(int i, int i2) throws Exception {
        RangeResponse rangeResponse = (RangeResponse) FutureUtils.result(this.tableStore.range(RangeRequest.newBuilder().setHeader(HEADER).setKey(getKey(i)).setRangeEnd(getKey(i2)).build()));
        Assert.assertEquals(StatusCode.SUCCESS, rangeResponse.getHeader().getCode());
        Assert.assertEquals(HEADER, rangeResponse.getHeader().getRoutingHeader());
        return rangeResponse.getKvsList();
    }

    @Test
    public void testBasicOps() throws Exception {
        Assert.assertNull(getKeyValue(0));
        putKeyToTableStore(0, 0);
        KeyValue keyValue = getKeyValue(0);
        Assert.assertEquals(getKey(0), keyValue.getKey());
        Assert.assertEquals(getValue(0), keyValue.getValue());
        KeyValue putIfAbsentToTableStore = putIfAbsentToTableStore(0, 99, false);
        Assert.assertNotNull(putIfAbsentToTableStore);
        Assert.assertEquals(getKey(0), putIfAbsentToTableStore.getKey());
        Assert.assertEquals(getValue(0), putIfAbsentToTableStore.getValue());
        KeyValue keyValue2 = getKeyValue(0);
        Assert.assertEquals(getKey(0), keyValue2.getKey());
        Assert.assertEquals(getValue(0), keyValue2.getValue());
        Assert.assertNull(putIfAbsentToTableStore(1, 1, true));
        KeyValue keyValue3 = getKeyValue(1);
        Assert.assertEquals(getKey(1), keyValue3.getKey());
        Assert.assertEquals(getValue(1), keyValue3.getValue());
        Assert.assertEquals(StatusCode.KEY_NOT_FOUND, vPutToTableStore(2, 2, 100L).getHeader().getCode());
        Assert.assertEquals(StatusCode.KEY_NOT_FOUND, vPutToTableStore(2, 2, -1L).getHeader().getCode());
        Assert.assertNull(putIfAbsentToTableStore(2, 2, true));
        TxnResponse vPutToTableStore = vPutToTableStore(2, 99, 0L);
        Assert.assertEquals(StatusCode.SUCCESS, vPutToTableStore.getHeader().getCode());
        KeyValue verifyVPutResponse = verifyVPutResponse(vPutToTableStore, true);
        Assert.assertNotNull(verifyVPutResponse);
        Assert.assertEquals(getKey(2), verifyVPutResponse.getKey());
        Assert.assertEquals(getValue(2), verifyVPutResponse.getValue());
        Assert.assertEquals(0L, verifyVPutResponse.getVersion());
        KeyValue keyValue4 = getKeyValue(2);
        Assert.assertEquals(getKey(2), keyValue4.getKey());
        Assert.assertEquals(getValue(99), keyValue4.getValue());
        Assert.assertEquals(StatusCode.KEY_NOT_FOUND, rPutToTableStore(3, 3, 100L).getHeader().getCode());
        Assert.assertEquals(StatusCode.KEY_NOT_FOUND, rPutToTableStore(3, 3, -1L).getHeader().getCode());
        Assert.assertNull(putIfAbsentToTableStore(3, 3, true));
        KeyValue keyValue5 = getKeyValue(3);
        long modRevision = keyValue5.getModRevision();
        Assert.assertEquals(getValue(3), keyValue5.getValue());
        Assert.assertEquals(StatusCode.SUCCESS, rPutToTableStore(3, 99, modRevision).getHeader().getCode());
        KeyValue keyValue6 = getKeyValue(3);
        Assert.assertEquals(modRevision + 1, keyValue6.getModRevision());
        Assert.assertEquals(getValue(99), keyValue6.getValue());
        Assert.assertNull(deleteKeyFromTableStore(99));
        KeyValue keyValue7 = getKeyValue(0);
        Assert.assertEquals(getKey(0), keyValue7.getKey());
        Assert.assertEquals(getValue(0), keyValue7.getValue());
        KeyValue deleteKeyFromTableStore = deleteKeyFromTableStore(0);
        Assert.assertNotNull(deleteKeyFromTableStore);
        Assert.assertEquals(getKey(0), deleteKeyFromTableStore.getKey());
        Assert.assertEquals(getValue(0), deleteKeyFromTableStore.getValue());
        Assert.assertNull(getKeyValue(0));
    }

    @Test
    public void testPutGetDeleteRanges() throws Exception {
        List<KeyValue> writeKVs = writeKVs(100, true);
        Assert.assertEquals(100, writeKVs.size());
        Iterator<KeyValue> it = writeKVs.iterator();
        while (it.hasNext()) {
            Assert.assertNull(it.next());
        }
        verifyRange(20, 70, 2, 2, 0);
        List<KeyValue> deleteRange = deleteRange(20, 70);
        Assert.assertNotNull(deleteRange);
        verifyRecords(deleteRange, 20, 70, 2, 2, 0);
        Assert.assertTrue(range(20, 70).isEmpty());
    }

    private void verifyRange(int i, int i2, int i3, int i4, int i5) throws Exception {
        List<KeyValue> range = range(i, i2);
        Assert.assertEquals((i2 - i) + 1, range.size());
        verifyRecords(range, i, i2, i3, i4, i5);
    }

    private void verifyRecords(List<KeyValue> list, int i, int i2, int i3, int i4, int i5) {
        int i6 = i;
        for (KeyValue keyValue : list) {
            Assert.assertEquals(getKey(i6), keyValue.getKey());
            Assert.assertEquals(getValue(i6), keyValue.getValue());
            Assert.assertEquals(i6 + i3, keyValue.getCreateRevision());
            Assert.assertEquals(i6 + i4, keyValue.getModRevision());
            Assert.assertEquals(i5, keyValue.getVersion());
            i6++;
        }
        Assert.assertEquals(i2 + 1, i6);
    }
}
