package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.SimpleTimeZone;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/RocksDBWindowStore.class */
public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
    public static final long MIN_SEGMENT_INTERVAL = 60000;
    private final String name;
    private final int numSegments;
    private final long segmentInterval;
    private final boolean retainDuplicates;
    private final Serde<K> keySerde;
    private final Serde<V> valueSerde;
    private ProcessorContext context;
    private StateSerdes<K, V> serdes;
    private volatile boolean open = false;
    private final ConcurrentHashMap<Long, Segment> segments = new ConcurrentHashMap<>();
    private int seqnum = 0;
    private long currentSegmentId = -1;
    private boolean loggingEnabled = false;
    private StoreChangeLogger<Bytes, byte[]> changeLogger = null;
    private final SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm");

    /* loaded from: input_file:org/apache/kafka/streams/state/internals/RocksDBWindowStore$RocksDBWindowStoreIterator.class */
    private static class RocksDBWindowStoreIterator<V> implements WindowStoreIterator<V> {
        private final StateSerdes<?, V> serdes;
        private final Iterator<Segment> segments;
        private final Bytes from;
        private final Bytes to;
        private KeyValueIterator<Bytes, byte[]> currentIterator;
        private KeyValueStore<Bytes, byte[]> currentSegment;

        RocksDBWindowStoreIterator(StateSerdes<?, V> stateSerdes) {
            this(stateSerdes, null, null, Collections.emptyIterator());
        }

        RocksDBWindowStoreIterator(StateSerdes<?, V> stateSerdes, Bytes bytes, Bytes bytes2, Iterator<Segment> it) {
            this.serdes = stateSerdes;
            this.from = bytes;
            this.to = bytes2;
            this.segments = it;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            while (true) {
                if ((this.currentIterator == null || !this.currentIterator.hasNext() || !this.currentSegment.isOpen()) && this.segments.hasNext()) {
                    close();
                    this.currentSegment = this.segments.next();
                    try {
                        this.currentIterator = this.currentSegment.range(this.from, this.to);
                    } catch (InvalidStateStoreException e) {
                    }
                }
            }
            return this.currentIterator != null && this.currentIterator.hasNext();
        }

        @Override // java.util.Iterator
        public KeyValue<Long, V> next() {
            if (!hasNext()) {
                throw new NoSuchElementException();
            }
            KeyValue<K, V> next = this.currentIterator.next();
            return new KeyValue<>(Long.valueOf(WindowStoreUtils.timestampFromBinaryKey(((Bytes) next.key).get())), this.serdes.valueFrom((byte[]) next.value));
        }

        @Override // java.util.Iterator
        public void remove() {
        }

        @Override // org.apache.kafka.streams.state.WindowStoreIterator, java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            if (this.currentIterator != null) {
                this.currentIterator.close();
                this.currentIterator = null;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/state/internals/RocksDBWindowStore$Segment.class */
    public static class Segment extends RocksDBStore<Bytes, byte[]> {
        public final long id;

        Segment(String str, String str2, long j) {
            super(str, str2, WindowStoreUtils.INNER_KEY_SERDE, WindowStoreUtils.INNER_VALUE_SERDE);
            this.id = j;
        }

        public void destroy() {
            Utils.delete(this.dbDir);
        }

        @Override // org.apache.kafka.streams.state.internals.RocksDBStore
        public void openDB(ProcessorContext processorContext) {
            super.openDB(processorContext);
            this.open = true;
        }
    }

    public RocksDBWindowStore(String str, long j, int i, boolean z, Serde<K> serde, Serde<V> serde2) {
        this.name = str;
        this.numSegments = i;
        this.segmentInterval = Math.max(j / (i - 1), MIN_SEGMENT_INTERVAL);
        this.keySerde = serde;
        this.valueSerde = serde2;
        this.retainDuplicates = z;
        this.formatter.setTimeZone(new SimpleTimeZone(0, "GMT"));
    }

    public RocksDBWindowStore<K, V> enableLogging() {
        this.loggingEnabled = true;
        return this;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public String name() {
        return this.name;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public void init(ProcessorContext processorContext, StateStore stateStore) {
        this.context = processorContext;
        this.serdes = new StateSerdes<>(this.name, this.keySerde == null ? processorContext.keySerde() : this.keySerde, this.valueSerde == null ? processorContext.valueSerde() : this.valueSerde);
        openExistingSegments();
        this.changeLogger = this.loggingEnabled ? new StoreChangeLogger<>(this.name, processorContext, WindowStoreUtils.INNER_SERDES) : null;
        processorContext.register(stateStore, this.loggingEnabled, new StateRestoreCallback() { // from class: org.apache.kafka.streams.state.internals.RocksDBWindowStore.1
            @Override // org.apache.kafka.streams.processor.StateRestoreCallback
            public void restore(byte[] bArr, byte[] bArr2) {
                if (bArr2 != null) {
                    RocksDBWindowStore.this.putInternal(bArr, bArr2);
                }
            }
        });
        flush();
        this.open = true;
    }

    private void openExistingSegments() {
        try {
            File file = new File(this.context.stateDir(), this.name);
            if (file.exists()) {
                String[] list = file.list();
                if (list != null) {
                    long[] jArr = new long[list.length];
                    for (int i = 0; i < list.length; i++) {
                        jArr[i] = segmentIdFromSegmentName(list[i]);
                    }
                    Arrays.sort(jArr);
                    for (long j : jArr) {
                        if (j >= 0) {
                            this.currentSegmentId = j;
                            getOrCreateSegment(j);
                        }
                    }
                }
            } else {
                file.mkdir();
            }
        } catch (Exception e) {
        }
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public boolean persistent() {
        return true;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public boolean isOpen() {
        return this.open;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public void flush() {
        for (Segment segment : this.segments.values()) {
            if (segment != null) {
                segment.flush();
            }
        }
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public void close() {
        this.open = false;
        flush();
        for (Segment segment : this.segments.values()) {
            if (segment != null) {
                segment.close();
            }
        }
    }

    @Override // org.apache.kafka.streams.state.WindowStore
    public void put(K k, V v) {
        put(k, v, this.context.timestamp());
    }

    @Override // org.apache.kafka.streams.state.WindowStore
    public void put(K k, V v, long j) {
        byte[] rawValue = this.serdes.rawValue(v);
        byte[] putAndReturnInternalKey = putAndReturnInternalKey(k, rawValue, j);
        if (putAndReturnInternalKey == null || !this.loggingEnabled) {
            return;
        }
        this.changeLogger.logChange(Bytes.wrap(putAndReturnInternalKey), rawValue);
    }

    private byte[] putAndReturnInternalKey(K k, byte[] bArr, long j) {
        long segmentId = segmentId(j);
        if (segmentId > this.currentSegmentId) {
            this.currentSegmentId = segmentId;
            cleanup();
        }
        Segment orCreateSegment = getOrCreateSegment(segmentId);
        if (orCreateSegment == null) {
            return null;
        }
        if (this.retainDuplicates) {
            this.seqnum = (this.seqnum + 1) & Integer.MAX_VALUE;
        }
        byte[] binaryKey = WindowStoreUtils.toBinaryKey(k, j, this.seqnum, this.serdes);
        orCreateSegment.put(Bytes.wrap(binaryKey), bArr);
        return binaryKey;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void putInternal(byte[] bArr, byte[] bArr2) {
        long segmentId = segmentId(WindowStoreUtils.timestampFromBinaryKey(bArr));
        if (segmentId > this.currentSegmentId) {
            this.currentSegmentId = segmentId;
            cleanup();
        }
        Segment orCreateSegment = getOrCreateSegment(segmentId);
        if (orCreateSegment != null) {
            orCreateSegment.writeToStore(Bytes.wrap(bArr), bArr2);
        }
    }

    private byte[] getInternal(byte[] bArr) {
        Segment segment = getSegment(segmentId(WindowStoreUtils.timestampFromBinaryKey(bArr)));
        if (segment != null) {
            return (byte[]) segment.get(Bytes.wrap(bArr));
        }
        return null;
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyWindowStore
    public WindowStoreIterator<V> fetch(K k, long j, long j2) {
        if (!isOpen()) {
            throw new InvalidStateStoreException("Store " + this.name + " is currently not isOpen");
        }
        long segmentId = segmentId(j);
        long segmentId2 = segmentId(Math.max(0L, j2));
        byte[] binaryKey = WindowStoreUtils.toBinaryKey(k, j, 0, this.serdes);
        byte[] binaryKey2 = WindowStoreUtils.toBinaryKey(k, j2, Integer.MAX_VALUE, this.serdes);
        ArrayList arrayList = new ArrayList();
        long j3 = segmentId;
        while (true) {
            long j4 = j3;
            if (j4 > segmentId2) {
                break;
            }
            Segment segment = getSegment(j4);
            if (segment != null && segment.isOpen()) {
                try {
                    arrayList.add(segment);
                } catch (InvalidStateStoreException e) {
                }
            }
            j3 = j4 + 1;
        }
        return !arrayList.isEmpty() ? new RocksDBWindowStoreIterator(this.serdes, Bytes.wrap(binaryKey), Bytes.wrap(binaryKey2), arrayList.iterator()) : new RocksDBWindowStoreIterator(this.serdes);
    }

    private Segment getSegment(long j) {
        Segment segment = this.segments.get(Long.valueOf(j % this.numSegments));
        if (isSegment(segment, j)) {
            return segment;
        }
        return null;
    }

    private boolean isSegment(Segment segment, long j) {
        return segment != null && segment.id == j;
    }

    private Segment getOrCreateSegment(long j) {
        if (j > this.currentSegmentId || j <= this.currentSegmentId - this.numSegments) {
            return null;
        }
        long j2 = j % this.numSegments;
        if (!isSegment(this.segments.get(Long.valueOf(j2)), j)) {
            cleanup();
        }
        if (!this.segments.containsKey(Long.valueOf(j2))) {
            Segment segment = new Segment(segmentName(j), this.name, j);
            segment.openDB(this.context);
            this.segments.put(Long.valueOf(j2), segment);
        }
        return this.segments.get(Long.valueOf(j2));
    }

    private void cleanup() {
        for (Map.Entry<Long, Segment> entry : this.segments.entrySet()) {
            Segment value = entry.getValue();
            if (value != null && value.id <= this.currentSegmentId - this.numSegments) {
                this.segments.remove(entry.getKey());
                value.close();
                value.destroy();
            }
        }
    }

    private long segmentId(long j) {
        return j / this.segmentInterval;
    }

    public String segmentName(long j) {
        return this.name + "-" + this.formatter.format(new Date(j * this.segmentInterval));
    }

    public long segmentIdFromSegmentName(String str) {
        try {
            return this.formatter.parse(str.substring(this.name.length() + 1)).getTime() / this.segmentInterval;
        } catch (Exception e) {
            return -1L;
        }
    }

    public Set<Long> segmentIds() {
        HashSet hashSet = new HashSet();
        for (Segment segment : this.segments.values()) {
            if (segment != null) {
                hashSet.add(Long.valueOf(segment.id));
            }
        }
        return hashSet;
    }
}
