package com.bazaarvoice.emodb.event.db.astyanax;

import ch.qos.logback.core.spi.AbstractComponentTracker;
import com.bazaarvoice.emodb.common.cassandra.CassandraKeyspace;
import com.bazaarvoice.emodb.common.dropwizard.lifecycle.LifeCycleRegistry;
import com.bazaarvoice.emodb.common.dropwizard.metrics.InstrumentedCache;
import com.bazaarvoice.emodb.common.dropwizard.metrics.ParameterizedTimed;
import com.bazaarvoice.emodb.common.uuid.TimeUUIDs;
import com.bazaarvoice.emodb.event.core.MetricsGroupName;
import com.bazaarvoice.emodb.event.db.EventId;
import com.bazaarvoice.emodb.event.db.EventReaderDAO;
import com.bazaarvoice.emodb.event.db.EventSink;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.PeekingIterator;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.netflix.astyanax.Execution;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.ColumnList;
import com.netflix.astyanax.model.ConsistencyLevel;
import com.netflix.astyanax.model.Row;
import com.netflix.astyanax.model.Rows;
import com.netflix.astyanax.query.RowQuery;
import com.netflix.astyanax.serializers.TimeUUIDSerializer;
import com.netflix.astyanax.serializers.UUIDSerializer;
import com.netflix.astyanax.util.RangeBuilder;
import io.dropwizard.lifecycle.ExecutorServiceManager;
import io.dropwizard.util.Duration;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/bazaarvoice/emodb/event/db/astyanax/AstyanaxEventReaderDAO.class */
public class AstyanaxEventReaderDAO implements EventReaderDAO {
    private static final Logger _log = LoggerFactory.getLogger(AstyanaxEventReaderDAO.class);
    private static final DateTimeFormatter ISO_FORMATTER = ISODateTimeFormat.dateTime().withZoneUTC();
    private static final int NUM_CLEANUP_THREADS = 2;
    private static final int MAX_CLEANUP_QUEUE_LENGTH = 100;
    private static final int SLAB_MOVE_BATCH = 100;
    private final CassandraKeyspace _keyspace;
    private final ManifestPersister _manifestPersister;
    private final ExecutorService _cleanupExecutor;
    private final LoadingCache<ChannelSlab, SlabCursor> _openSlabCursors;
    private final LoadingCache<ChannelSlab, SlabCursor> _closedSlabCursors;
    private final Meter _staleSlabMeter;

    /* loaded from: input_file:com/bazaarvoice/emodb/event/db/astyanax/AstyanaxEventReaderDAO$ChannelSlab.class */
    private static class ChannelSlab {
        private final String _channel;
        private final ByteBuffer _slabId;

        private ChannelSlab(String str, ByteBuffer byteBuffer) {
            this._channel = str;
            this._slabId = byteBuffer;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (!(obj instanceof ChannelSlab)) {
                return false;
            }
            ChannelSlab channelSlab = (ChannelSlab) obj;
            return this._slabId.equals(channelSlab._slabId) && this._channel.equals(channelSlab._channel);
        }

        public int hashCode() {
            return Objects.hashCode(this._channel, this._slabId);
        }

        public String toString() {
            return Objects.toStringHelper(this).add("channel", this._channel).add("slabId", ByteBufferUtil.bytesToHex(this._slabId.asReadOnlyBuffer())).toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/bazaarvoice/emodb/event/db/astyanax/AstyanaxEventReaderDAO$SlabCursor.class */
    public static class SlabCursor {
        static final int END = Integer.MAX_VALUE;
        private int _value;

        private SlabCursor() {
        }

        public int get() {
            return this._value;
        }

        public void set(int i) {
            this._value = i;
        }
    }

    @Inject
    public AstyanaxEventReaderDAO(LifeCycleRegistry lifeCycleRegistry, CassandraKeyspace cassandraKeyspace, ManifestPersister manifestPersister, @MetricsGroupName String str, MetricRegistry metricRegistry) {
        this(cassandraKeyspace, manifestPersister, str, defaultCleanupExecutor(str, lifeCycleRegistry, metricRegistry), metricRegistry);
    }

    @VisibleForTesting
    AstyanaxEventReaderDAO(CassandraKeyspace cassandraKeyspace, ManifestPersister manifestPersister, String str, ExecutorService executorService, MetricRegistry metricRegistry) {
        this._keyspace = cassandraKeyspace;
        this._manifestPersister = manifestPersister;
        this._cleanupExecutor = executorService;
        CacheLoader<ChannelSlab, SlabCursor> cacheLoader = new CacheLoader<ChannelSlab, SlabCursor>() { // from class: com.bazaarvoice.emodb.event.db.astyanax.AstyanaxEventReaderDAO.1
            @Override // com.google.common.cache.CacheLoader
            public SlabCursor load(ChannelSlab channelSlab) {
                return new SlabCursor();
            }
        };
        this._openSlabCursors = CacheBuilder.newBuilder().expireAfterWrite(250L, TimeUnit.MILLISECONDS).maximumSize(AbstractComponentTracker.LINGERING_TIMEOUT).recordStats().build(cacheLoader);
        this._closedSlabCursors = CacheBuilder.newBuilder().expireAfterWrite(10L, TimeUnit.SECONDS).maximumSize(100000L).recordStats().build(cacheLoader);
        InstrumentedCache.instrument(this._openSlabCursors, metricRegistry, str, "openSlabCursors", false);
        InstrumentedCache.instrument(this._closedSlabCursors, metricRegistry, str, "closedSlabCursors", false);
        this._staleSlabMeter = metricRegistry.meter(MetricRegistry.name(str, "AstyanaxEventReaderDAO", "stale_slabs"));
    }

    private static ExecutorService defaultCleanupExecutor(String str, LifeCycleRegistry lifeCycleRegistry, MetricRegistry metricRegistry) {
        final Meter meter = metricRegistry.meter(MetricRegistry.name(str, "AstyanaxEventReaderDAO", "discarded_slab_cleanup"));
        String str2 = "Events Slab Reader Cleanup-" + str.substring(str.lastIndexOf(46) + 1) + "-%d";
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(100), new ThreadFactoryBuilder().setNameFormat(str2).build(), new ThreadPoolExecutor.DiscardPolicy() { // from class: com.bazaarvoice.emodb.event.db.astyanax.AstyanaxEventReaderDAO.2
            @Override // java.util.concurrent.ThreadPoolExecutor.DiscardPolicy, java.util.concurrent.RejectedExecutionHandler
            public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor2) {
                Meter.this.mark();
            }
        });
        lifeCycleRegistry.manage((LifeCycleRegistry) new ExecutorServiceManager(threadPoolExecutor, Duration.seconds(5L), str2));
        return threadPoolExecutor;
    }

    @Override // com.bazaarvoice.emodb.event.db.EventReaderDAO
    public Iterator<String> listChannels() {
        final Iterator<Row<K, C>> it2 = ((Rows) execute(this._keyspace.prepareQuery(ColumnFamilies.MANIFEST, ConsistencyLevel.CL_LOCAL_QUORUM).getAllRows().setRowLimit(1000).withColumnRange(new RangeBuilder().setLimit(1).build()))).iterator();
        return new AbstractIterator<String>() { // from class: com.bazaarvoice.emodb.event.db.astyanax.AstyanaxEventReaderDAO.3
            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.google.common.collect.AbstractIterator
            public String computeNext() {
                while (it2.hasNext()) {
                    Row row = (Row) it2.next();
                    if (!row.getColumns().isEmpty()) {
                        return (String) row.getKey();
                    }
                }
                return endOfData();
            }
        };
    }

    @Override // com.bazaarvoice.emodb.event.db.EventReaderDAO
    @ParameterizedTimed(type = "AstyanaxEventReaderDAO")
    public long count(String str, long j) {
        long j2 = 0;
        Iterator it2 = executePaginated(this._keyspace.prepareQuery(ColumnFamilies.MANIFEST, ConsistencyLevel.CL_ONE).getKey(str).withColumnRange(new RangeBuilder().setLimit(100).build()).autoPaginate(true)).iterator();
        while (true) {
            if (!it2.hasNext()) {
                break;
            }
            ByteBuffer byteBuffer = (ByteBuffer) ((Column) it2.next()).getName();
            if (j2 > j) {
                j2 += ((Integer) execute(this._keyspace.prepareQuery(ColumnFamilies.MANIFEST, ConsistencyLevel.CL_ONE).getKey(str).withColumnRange(new RangeBuilder().setStart(byteBuffer).build()).getCount())).intValue() * 1000;
                break;
            }
            j2 += ((Integer) execute(this._keyspace.prepareQuery(ColumnFamilies.SLAB, ConsistencyLevel.CL_ONE).getKey(byteBuffer).withColumnRange(0, 2147483646, false, Integer.MAX_VALUE).getCount())).intValue();
        }
        return j2;
    }

    @Override // com.bazaarvoice.emodb.event.db.EventReaderDAO
    public boolean moveIfFast(String str, String str2) {
        Iterable<Column> executePaginated = executePaginated(this._keyspace.prepareQuery(ColumnFamilies.MANIFEST, ConsistencyLevel.CL_LOCAL_QUORUM).getKey(str).withColumnRange(new RangeBuilder().setLimit(50).build()).autoPaginate(true));
        ArrayList newArrayList = Lists.newArrayList();
        boolean z = true;
        for (Column column : executePaginated) {
            ByteBuffer byteBuffer = (ByteBuffer) column.getName();
            if (column.getBooleanValue()) {
                z = false;
            } else {
                newArrayList.add(byteBuffer);
                if (newArrayList.size() >= 100) {
                    this._manifestPersister.move(str, str2, newArrayList, false);
                    newArrayList.clear();
                }
            }
        }
        if (!newArrayList.isEmpty()) {
            this._manifestPersister.move(str, str2, newArrayList, false);
        }
        return z;
    }

    @Override // com.bazaarvoice.emodb.event.db.EventReaderDAO
    @ParameterizedTimed(type = "AstyanaxEventReaderDAO")
    public void readAll(String str, EventSink eventSink, Date date) {
        readAll(str, date != null ? getSlabFilterSince(date, str) : null, eventSink, ConsistencyLevel.CL_LOCAL_QUORUM);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void readAll(String str, SlabFilter slabFilter, EventSink eventSink, ConsistencyLevel consistencyLevel) {
        PeekingIterator peekingIterator = Iterators.peekingIterator(executePaginated(this._keyspace.prepareQuery(ColumnFamilies.MANIFEST, consistencyLevel).getKey(str).withColumnRange(new RangeBuilder().setLimit(50).build()).autoPaginate(true)).iterator());
        while (peekingIterator.hasNext()) {
            Column column = (Column) peekingIterator.next();
            ByteBuffer byteBuffer = (ByteBuffer) column.getName();
            ByteBuffer byteBuffer2 = peekingIterator.hasNext() ? (ByteBuffer) ((Column) peekingIterator.peek()).getName() : null;
            boolean booleanValue = column.getBooleanValue();
            if (slabFilter == null || slabFilter.accept(byteBuffer, booleanValue, byteBuffer2)) {
                if (!readSlab(str, byteBuffer, new SlabCursor(), booleanValue, eventSink)) {
                    return;
                }
            }
        }
    }

    @Override // com.bazaarvoice.emodb.event.db.EventReaderDAO
    @ParameterizedTimed(type = "AstyanaxEventReaderDAO")
    public void readNewer(String str, EventSink eventSink) {
        for (Column column : executePaginated(this._keyspace.prepareQuery(ColumnFamilies.MANIFEST, ConsistencyLevel.CL_ONE).getKey(str).withColumnRange(new RangeBuilder().setLimit(50).build()).autoPaginate(true))) {
            ByteBuffer byteBuffer = (ByteBuffer) column.getName();
            boolean booleanValue = column.getBooleanValue();
            SlabCursor unchecked = (booleanValue ? this._openSlabCursors : this._closedSlabCursors).getUnchecked(new ChannelSlab(str, byteBuffer));
            if (unchecked.get() != Integer.MAX_VALUE) {
                synchronized (unchecked) {
                    if (!readSlab(str, byteBuffer, unchecked, booleanValue, eventSink)) {
                        return;
                    }
                }
            }
        }
    }

    private boolean readSlab(String str, ByteBuffer byteBuffer, SlabCursor slabCursor, boolean z, EventSink eventSink) {
        int i = slabCursor.get();
        if (i == Integer.MAX_VALUE) {
            return true;
        }
        boolean isRecent = isRecent(byteBuffer);
        ColumnList columnList = (ColumnList) execute(this._keyspace.prepareQuery(ColumnFamilies.SLAB, ConsistencyLevel.CL_LOCAL_QUORUM).getKey(byteBuffer).withColumnRange(Integer.valueOf(i), (Integer) Integer.MAX_VALUE, false, Integer.MAX_VALUE));
        boolean z2 = true;
        boolean z3 = i == 0;
        boolean z4 = false;
        int i2 = i;
        Iterator<Column<C>> it2 = columnList.iterator();
        while (true) {
            if (!it2.hasNext()) {
                break;
            }
            Column column = (Column) it2.next();
            int intValue = ((Integer) column.getName()).intValue();
            if (intValue == Integer.MAX_VALUE) {
                break;
            }
            z3 = false;
            if (!z2) {
                z4 = true;
                break;
            }
            z2 = eventSink.accept(AstyanaxEventId.create(str, byteBuffer, intValue), column.getByteBufferValue());
            i2 = intValue;
        }
        slabCursor.set(i2);
        boolean z5 = (!z || isRecent || (!columnList.isEmpty() && ((Integer) columnList.getColumnByIndex(columnList.size() - 1).getName()).intValue() == Integer.MAX_VALUE)) ? false : true;
        if (z5) {
            this._staleSlabMeter.mark();
        }
        if (z3 && (!z || z5)) {
            deleteEmptySlabAsync(str, byteBuffer);
            z = false;
        } else if (z5) {
            closeStaleSlabAsync(str, byteBuffer);
            z = false;
        }
        if (!z4 && !z) {
            slabCursor.set(Integer.MAX_VALUE);
        }
        return z2;
    }

    private boolean isRecent(ByteBuffer byteBuffer) {
        return System.currentTimeMillis() - TimeUUIDs.getTimeMillis(TimeUUIDSerializer.get().fromByteBuffer(byteBuffer.duplicate())) <= Constants.OPEN_SLAB_MARKER_TTL.getMillis();
    }

    private void closeStaleSlabAsync(final String str, final ByteBuffer byteBuffer) {
        this._cleanupExecutor.submit(new Runnable() { // from class: com.bazaarvoice.emodb.event.db.astyanax.AstyanaxEventReaderDAO.4
            @Override // java.lang.Runnable
            public void run() {
                AstyanaxEventReaderDAO.this._manifestPersister.close(str, byteBuffer);
            }
        });
    }

    private void deleteEmptySlabAsync(final String str, final ByteBuffer byteBuffer) {
        this._cleanupExecutor.submit(new Runnable() { // from class: com.bazaarvoice.emodb.event.db.astyanax.AstyanaxEventReaderDAO.5
            @Override // java.lang.Runnable
            public void run() {
                AstyanaxEventReaderDAO.this._manifestPersister.delete(str, byteBuffer);
            }
        });
    }

    private <K, C> Iterable<Column<C>> executePaginated(final RowQuery<K, C> rowQuery) {
        return OneTimeIterable.wrap(Iterators.concat(new AbstractIterator<Iterator<Column<C>>>() { // from class: com.bazaarvoice.emodb.event.db.astyanax.AstyanaxEventReaderDAO.6
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // com.google.common.collect.AbstractIterator
            public Iterator<Column<C>> computeNext() {
                ColumnList columnList = (ColumnList) AstyanaxEventReaderDAO.this.execute(rowQuery);
                return !columnList.isEmpty() ? columnList.iterator() : endOfData();
            }
        }));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <R> R execute(Execution<R> execution) {
        try {
            return execution.execute().getResult();
        } catch (ConnectionException e) {
            throw Throwables.propagate(e);
        }
    }

    @Override // com.bazaarvoice.emodb.event.db.EventReaderDAO
    public void markUnread(String str, Collection<EventId> collection) {
        ConcurrentMap newConcurrentMap = Maps.newConcurrentMap();
        Iterator<EventId> it2 = collection.iterator();
        while (it2.hasNext()) {
            AstyanaxEventId astyanaxEventId = (AstyanaxEventId) it2.next();
            Preconditions.checkArgument(str.equals(astyanaxEventId.getChannel()));
            newConcurrentMap.merge(new ChannelSlab(str, astyanaxEventId.getSlabId()), Integer.valueOf(astyanaxEventId.getEventIdx()), (i, i2) -> {
                return Integer.valueOf(Ints.min(i, i2));
            });
        }
        for (Map.Entry entry : newConcurrentMap.entrySet()) {
            ChannelSlab channelSlab = (ChannelSlab) entry.getKey();
            int intValue = ((Integer) entry.getValue()).intValue();
            SlabCursor ifPresent = this._closedSlabCursors.getIfPresent(channelSlab);
            if (ifPresent != null && (ifPresent.get() == Integer.MAX_VALUE || ifPresent.get() > intValue)) {
                synchronized (ifPresent) {
                    if (ifPresent.get() == Integer.MAX_VALUE || ifPresent.get() > intValue) {
                        ifPresent.set(intValue);
                    }
                }
            }
        }
    }

    @VisibleForTesting
    protected SlabFilter getSlabFilterSince(final Date date, final String str) {
        return new SlabFilter() { // from class: com.bazaarvoice.emodb.event.db.astyanax.AstyanaxEventReaderDAO.7
            boolean foundStartingSlab = false;
            final UUID sinceUUID;

            {
                this.sinceUUID = TimeUUIDs.uuidForTimestamp(date);
            }

            @Override // com.bazaarvoice.emodb.event.db.astyanax.SlabFilter
            public boolean accept(ByteBuffer byteBuffer, boolean z, ByteBuffer byteBuffer2) {
                UUID fromByteBuffer = (this.foundStartingSlab || byteBuffer2 == null) ? null : TimeUUIDSerializer.get().fromByteBuffer(byteBuffer2.duplicate());
                if (fromByteBuffer != null && TimeUUIDs.compareTimestamps(fromByteBuffer, this.sinceUUID) < 0) {
                    return false;
                }
                if (this.foundStartingSlab) {
                    return true;
                }
                this.foundStartingSlab = true;
                AstyanaxEventReaderDAO._log.info("Starting to replay {} from slabid {}, for since timestamp of {}", str, UUIDSerializer.get().fromByteBuffer(byteBuffer), AstyanaxEventReaderDAO.ISO_FORMATTER.print(date.getTime()));
                return true;
            }
        };
    }
}
