package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.test.MockProcessorContext;
import org.apache.kafka.test.NoOpRecordCollector;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.class */
public class RocksDBSegmentedBytesStoreTest {
    private final long retention = 60000;
    private final int numSegments = 3;
    private final String storeName = "bytes-store";
    private RocksDBSegmentedBytesStore bytesStore;
    private File stateDir;
    private MockProcessorContext context;

    @Before
    public void before() {
        this.bytesStore = new RocksDBSegmentedBytesStore("bytes-store", 60000L, 3, new SessionKeySchema());
        this.stateDir = TestUtils.tempDirectory();
        this.context = new MockProcessorContext(this.stateDir, (Serde<?>) Serdes.String(), (Serde<?>) Serdes.Long(), new NoOpRecordCollector(), new ThreadCache("testCache", 0L, new MockStreamsMetrics(new Metrics())));
        this.bytesStore.init(this.context, this.bytesStore);
    }

    @After
    public void close() {
        this.bytesStore.close();
    }

    @Test
    public void shouldPutAndFetch() throws Exception {
        this.bytesStore.put(serializeKey(new Windowed<>("a", new SessionWindow(10L, 10L))), serializeValue(10L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", new SessionWindow(500L, 1000L))), serializeValue(50L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", new SessionWindow(1500L, 2000L))), serializeValue(100L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", new SessionWindow(2500L, 3000L))), serializeValue(200L));
        Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", new SessionWindow(10L, 10L)), 10L), KeyValue.pair(new Windowed("a", new SessionWindow(500L, 1000L)), 50L)), toList(this.bytesStore.fetch(Bytes.wrap("a".getBytes()), 0L, 1000L)));
    }

    @Test
    public void shouldFindValuesWithinRange() throws Exception {
        this.bytesStore.put(serializeKey(new Windowed<>("a", new SessionWindow(0L, 0L))), serializeValue(50L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", new SessionWindow(1000L, 1000L))), serializeValue(10L));
        Assert.assertEquals(Collections.singletonList(KeyValue.pair(new Windowed("a", new SessionWindow(1000L, 1000L)), 10L)), toList(this.bytesStore.fetch(Bytes.wrap("a".getBytes()), 1L, 1999L)));
    }

    @Test
    public void shouldRemove() throws Exception {
        this.bytesStore.put(serializeKey(new Windowed<>("a", new SessionWindow(0L, 1000L))), serializeValue(30L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", new SessionWindow(1500L, 2500L))), serializeValue(50L));
        this.bytesStore.remove(serializeKey(new Windowed<>("a", new SessionWindow(0L, 1000L))));
        Assert.assertFalse(this.bytesStore.fetch(Bytes.wrap("a".getBytes()), 0L, 1000L).hasNext());
    }

    @Test
    public void shouldRollSegments() throws Exception {
        Segments segments = new Segments("bytes-store", 60000L, 3);
        this.bytesStore.put(serializeKey(new Windowed<>("a", new SessionWindow(0L, 0L))), serializeValue(50L));
        Assert.assertEquals(Collections.singleton(segments.segmentName(0L)), segmentDirs());
        this.bytesStore.put(serializeKey(new Windowed<>("a", new SessionWindow(IntegrationTestUtils.DEFAULT_TIMEOUT, 60000L))), serializeValue(100L));
        Assert.assertEquals(Utils.mkSet(new String[]{segments.segmentName(0L), segments.segmentName(1L)}), segmentDirs());
        this.bytesStore.put(serializeKey(new Windowed<>("a", new SessionWindow(61000L, 120000L))), serializeValue(200L));
        Assert.assertEquals(Utils.mkSet(new String[]{segments.segmentName(0L), segments.segmentName(1L), segments.segmentName(2L)}), segmentDirs());
        this.bytesStore.put(serializeKey(new Windowed<>("a", new SessionWindow(121000L, 180000L))), serializeValue(300L));
        Assert.assertEquals(Utils.mkSet(new String[]{segments.segmentName(1L), segments.segmentName(2L), segments.segmentName(3L)}), segmentDirs());
        this.bytesStore.put(serializeKey(new Windowed<>("a", new SessionWindow(181000L, 240000L))), serializeValue(400L));
        Assert.assertEquals(Utils.mkSet(new String[]{segments.segmentName(2L), segments.segmentName(3L), segments.segmentName(4L)}), segmentDirs());
        Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", new SessionWindow(61000L, 120000L)), 200L), KeyValue.pair(new Windowed("a", new SessionWindow(121000L, 180000L)), 300L), KeyValue.pair(new Windowed("a", new SessionWindow(181000L, 240000L)), 400L)), toList(this.bytesStore.fetch(Bytes.wrap("a".getBytes()), 0L, 240000L)));
    }

    @Test
    public void shouldBeAbleToWriteToReInitializedStore() {
        this.bytesStore.put(serializeKey(new Windowed<>("a", new SessionWindow(0L, 0L))), serializeValue(50L));
        this.bytesStore.close();
        this.bytesStore.init(this.context, this.bytesStore);
        this.bytesStore.put(serializeKey(new Windowed<>("a", new SessionWindow(0L, 0L))), serializeValue(50L));
    }

    private Set<String> segmentDirs() {
        return new HashSet(Arrays.asList(new File(this.stateDir, "bytes-store").list()));
    }

    private byte[] serializeValue(long j) {
        return Serdes.Long().serializer().serialize("", Long.valueOf(j));
    }

    private Bytes serializeKey(Windowed<String> windowed) {
        return SessionKeySerde.toBinary(windowed, Serdes.String().serializer(), "topic");
    }

    private List<KeyValue<Windowed<String>, Long>> toList(KeyValueIterator<Bytes, byte[]> keyValueIterator) {
        ArrayList arrayList = new ArrayList();
        while (keyValueIterator.hasNext()) {
            KeyValue keyValue = (KeyValue) keyValueIterator.next();
            arrayList.add(KeyValue.pair(SessionKeySerde.from(((Bytes) keyValue.key).get(), Serdes.String().deserializer(), "topic"), Serdes.Long().deserializer().deserialize("", (byte[]) keyValue.value)));
        }
        return arrayList;
    }
}
