package com.bazaarvoice.emodb.event.dedup;

import com.bazaarvoice.emodb.common.dropwizard.lifecycle.LifeCycleRegistry;
import com.bazaarvoice.emodb.event.DedupEnabled;
import com.bazaarvoice.emodb.event.api.DedupEventStore;
import com.bazaarvoice.emodb.event.api.DedupEventStoreChannels;
import com.bazaarvoice.emodb.event.api.EventData;
import com.bazaarvoice.emodb.event.api.EventSink;
import com.bazaarvoice.emodb.event.api.EventStore;
import com.bazaarvoice.emodb.event.api.ScanSink;
import com.bazaarvoice.emodb.event.api.SimpleEventSink;
import com.bazaarvoice.emodb.event.core.MetricsGroupName;
import com.bazaarvoice.emodb.event.owner.OstrichOwnerFactory;
import com.bazaarvoice.emodb.event.owner.OstrichOwnerGroupFactory;
import com.bazaarvoice.emodb.event.owner.OwnerGroup;
import com.bazaarvoice.emodb.sortedq.api.SortedQueue;
import com.bazaarvoice.emodb.sortedq.api.SortedQueueFactory;
import com.bazaarvoice.emodb.sortedq.core.ReadOnlyQueueException;
import com.bazaarvoice.emodb.sortedq.db.QueueDAO;
import com.bazaarvoice.emodb.web.auth.Permissions;
import com.bazaarvoice.ostrich.PartitionContext;
import com.bazaarvoice.ostrich.PartitionContextBuilder;
import com.codahale.metrics.MetricRegistry;
import com.datastax.driver.core.QueryLogger;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import io.dropwizard.lifecycle.ExecutorServiceManager;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;
import org.apache.cassandra.tracing.TraceKeyspace;
import org.joda.time.Duration;

/* loaded from: input_file:com/bazaarvoice/emodb/event/dedup/DefaultDedupEventStore.class */
public class DefaultDedupEventStore implements DedupEventStore, DedupQueueAdmin {
    private static final Duration SERVICE_FAST_WAIT_DURATION = Duration.millis(100);
    private static final Duration SERVICE_SLOW_WAIT_DURATION = Duration.standardSeconds(3);
    private static final int COPY_BATCH_SIZE = 2000;
    private final EventStore _delegate;
    private final DedupEventStoreChannels _channels;
    private final QueueDAO _queueDAO;
    private final OwnerGroup<DedupQueue> _ownerGroup;
    private final SortedQueueFactory _sortedQueueFactory;

    @Inject
    public DefaultDedupEventStore(LifeCycleRegistry lifeCycleRegistry, final EventStore eventStore, DedupEventStoreChannels dedupEventStoreChannels, final QueueDAO queueDAO, OstrichOwnerGroupFactory ostrichOwnerGroupFactory, @DedupEnabled final Supplier<Boolean> supplier, @MetricsGroupName String str, final SortedQueueFactory sortedQueueFactory, final MetricRegistry metricRegistry) {
        this._delegate = (EventStore) Preconditions.checkNotNull(eventStore, "delegate");
        this._channels = (DedupEventStoreChannels) Preconditions.checkNotNull(dedupEventStoreChannels, "channels");
        this._queueDAO = (QueueDAO) Preconditions.checkNotNull(queueDAO, "queueDAO");
        this._sortedQueueFactory = sortedQueueFactory;
        String substring = str.substring(str.lastIndexOf(46) + 1);
        String str2 = "DedupFill-" + substring + "-%d";
        final ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat(str2).build());
        lifeCycleRegistry.manage((LifeCycleRegistry) new ExecutorServiceManager(newSingleThreadScheduledExecutor, io.dropwizard.util.Duration.seconds(5L), str2));
        this._ownerGroup = (OwnerGroup) lifeCycleRegistry.manage((LifeCycleRegistry) ostrichOwnerGroupFactory.create(substring + "-dedup", new OstrichOwnerFactory<DedupQueue>() { // from class: com.bazaarvoice.emodb.event.dedup.DefaultDedupEventStore.1
            @Override // com.bazaarvoice.emodb.event.owner.OstrichOwnerFactory
            public PartitionContext getContext(String str3) {
                return PartitionContextBuilder.of(str3);
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.bazaarvoice.emodb.event.owner.OstrichOwnerFactory
            public DedupQueue create(String str3) {
                return new DedupQueue(str3, DefaultDedupEventStore.this._channels.readChannel(str3), DefaultDedupEventStore.this._channels.writeChannel(str3), queueDAO, eventStore, supplier, newSingleThreadScheduledExecutor, sortedQueueFactory, metricRegistry);
            }
        }, Duration.standardHours(1L)));
    }

    @Nullable
    private DedupQueue getQueueReadWrite(String str, Duration duration) {
        return this._ownerGroup.startIfOwner(str, duration);
    }

    private SortedQueue getQueueReadOnly(String str, Duration duration) {
        DedupQueue queueReadWrite = getQueueReadWrite(str, duration);
        if (queueReadWrite != null) {
            try {
                return queueReadWrite.getQueue();
            } catch (ReadOnlyQueueException e) {
            }
        }
        return this._sortedQueueFactory.create(str, true, this._queueDAO);
    }

    @Override // com.bazaarvoice.emodb.event.dedup.DedupQueueAdmin
    public Map<String, DedupQueue> getActiveQueues() {
        return this._ownerGroup.getServices();
    }

    @Override // com.bazaarvoice.emodb.event.dedup.DedupQueueAdmin
    public boolean activateQueue(String str) {
        return this._ownerGroup.startIfOwner(str, SERVICE_SLOW_WAIT_DURATION) != null;
    }

    @Override // com.bazaarvoice.emodb.event.api.BaseEventStore
    public Iterator<String> listChannels() {
        HashSet newHashSet = Sets.newHashSet();
        Iterators.addAll(newHashSet, this._queueDAO.listQueues());
        Iterator<String> listChannels = this._delegate.listChannels();
        while (listChannels.hasNext()) {
            String next = listChannels.next();
            String queueFromReadChannel = this._channels.queueFromReadChannel(next);
            if (queueFromReadChannel != null) {
                newHashSet.add(queueFromReadChannel);
            } else {
                String queueFromWriteChannel = this._channels.queueFromWriteChannel(next);
                if (queueFromWriteChannel != null) {
                    newHashSet.add(queueFromWriteChannel);
                }
            }
        }
        return newHashSet.iterator();
    }

    @Override // com.bazaarvoice.emodb.event.api.BaseEventStore
    public void add(String str, ByteBuffer byteBuffer) {
        Preconditions.checkNotNull(str, Permissions.QUEUE);
        Preconditions.checkNotNull(byteBuffer, "event");
        this._delegate.add(this._channels.writeChannel(str), byteBuffer);
    }

    @Override // com.bazaarvoice.emodb.event.api.BaseEventStore
    public void addAll(String str, Collection<ByteBuffer> collection) {
        Preconditions.checkNotNull(str, Permissions.QUEUE);
        Preconditions.checkNotNull(collection, TraceKeyspace.EVENTS);
        this._delegate.addAll(this._channels.writeChannel(str), collection);
    }

    @Override // com.bazaarvoice.emodb.event.api.BaseEventStore
    public void addAll(Multimap<String, ByteBuffer> multimap) {
        Preconditions.checkNotNull(multimap, "eventsByQueue");
        ArrayListMultimap create = ArrayListMultimap.create();
        for (Map.Entry<String, Collection<ByteBuffer>> entry : multimap.asMap().entrySet()) {
            create.putAll(this._channels.writeChannel(entry.getKey()), entry.getValue());
        }
        this._delegate.addAll(create);
    }

    @Override // com.bazaarvoice.emodb.event.api.BaseEventStore
    public long getSizeEstimate(String str, long j) {
        Preconditions.checkNotNull(str, Permissions.QUEUE);
        checkLimit(j, Long.MAX_VALUE);
        return this._delegate.getSizeEstimate(this._channels.writeChannel(str), j) + getQueueReadOnly(str, SERVICE_FAST_WAIT_DURATION).sizeEstimate() + this._delegate.getSizeEstimate(this._channels.readChannel(str), j);
    }

    @Override // com.bazaarvoice.emodb.event.api.BaseEventStore
    public long getClaimCount(String str) {
        Preconditions.checkNotNull(str, Permissions.QUEUE);
        return this._delegate.getClaimCount(this._channels.readChannel(str));
    }

    @Override // com.bazaarvoice.emodb.event.api.BaseEventStore
    public Map<String, Long> snapshotClaimCounts() {
        Map<String, Long> snapshotClaimCounts = this._delegate.snapshotClaimCounts();
        LinkedHashMap newLinkedHashMap = Maps.newLinkedHashMap();
        for (Map.Entry<String, Long> entry : snapshotClaimCounts.entrySet()) {
            String queueFromReadChannel = this._channels.queueFromReadChannel(entry.getKey());
            if (queueFromReadChannel != null) {
                newLinkedHashMap.put(queueFromReadChannel, entry.getValue());
            }
        }
        return newLinkedHashMap;
    }

    @Override // com.bazaarvoice.emodb.event.api.BaseEventStore
    public List<EventData> peek(String str, int i) {
        SimpleEventSink simpleEventSink = new SimpleEventSink(i);
        peek(str, simpleEventSink);
        return simpleEventSink.getEvents();
    }

    @Override // com.bazaarvoice.emodb.event.api.BaseEventStore
    public boolean peek(String str, EventSink eventSink) {
        Preconditions.checkNotNull(str, Permissions.QUEUE);
        checkLimit(eventSink.remaining(), QueryLogger.DEFAULT_SLOW_QUERY_THRESHOLD_MS);
        DedupQueue queueReadWrite = getQueueReadWrite(str, SERVICE_FAST_WAIT_DURATION);
        if (queueReadWrite != null) {
            try {
                return queueReadWrite.peek(eventSink);
            } catch (ReadOnlyQueueException e) {
            }
        }
        return this._delegate.peek(this._channels.readChannel(str), eventSink);
    }

    @Override // com.bazaarvoice.emodb.event.api.BaseEventStore
    public List<EventData> poll(String str, Duration duration, int i) {
        SimpleEventSink simpleEventSink = new SimpleEventSink(i);
        poll(str, duration, simpleEventSink);
        return simpleEventSink.getEvents();
    }

    @Override // com.bazaarvoice.emodb.event.api.BaseEventStore
    public boolean poll(String str, Duration duration, EventSink eventSink) {
        Preconditions.checkNotNull(str, Permissions.QUEUE);
        checkLimit(eventSink.remaining(), 1000L);
        DedupQueue queueReadWrite = getQueueReadWrite(str, SERVICE_FAST_WAIT_DURATION);
        if (queueReadWrite != null) {
            try {
                return queueReadWrite.poll(duration, eventSink);
            } catch (ReadOnlyQueueException e) {
            }
        }
        return this._delegate.poll(this._channels.readChannel(str), duration, eventSink);
    }

    @Override // com.bazaarvoice.emodb.event.api.BaseEventStore
    public void renew(String str, Collection<String> collection, Duration duration, boolean z) {
        Preconditions.checkNotNull(str, Permissions.QUEUE);
        this._delegate.renew(this._channels.readChannel(str), collection, duration, z);
    }

    @Override // com.bazaarvoice.emodb.event.api.BaseEventStore
    public void delete(String str, Collection<String> collection, boolean z) {
        Preconditions.checkNotNull(str, Permissions.QUEUE);
        DedupQueue queueReadWrite = getQueueReadWrite(str, SERVICE_FAST_WAIT_DURATION);
        if (queueReadWrite != null) {
            queueReadWrite.delete(collection, z);
        } else {
            this._delegate.delete(this._channels.readChannel(str), collection, z);
        }
    }

    @Override // com.bazaarvoice.emodb.event.api.BaseEventStore
    public void unclaimAll(String str) {
        Preconditions.checkNotNull(str, Permissions.QUEUE);
        this._delegate.unclaimAll(this._channels.readChannel(str));
    }

    @Override // com.bazaarvoice.emodb.event.api.BaseEventStore
    public void copy(String str, String str2, Predicate<ByteBuffer> predicate, Date date) {
        Preconditions.checkNotNull(str, "from");
        Preconditions.checkNotNull(str2, "to");
        ScanSink newCopySink = newCopySink(str2);
        this._delegate.scan(this._channels.writeChannel(str), predicate, newCopySink, 2000, date);
        UnmodifiableIterator partition = Iterators.partition(getQueueReadOnly(str, SERVICE_SLOW_WAIT_DURATION).scan(null, Long.MAX_VALUE), 2000);
        while (partition.hasNext()) {
            newCopySink.accept(ImmutableList.copyOf(Iterables.filter((List) partition.next(), predicate)));
        }
        this._delegate.scan(this._channels.readChannel(str), predicate, newCopySink, 2000, date);
    }

    @Override // com.bazaarvoice.emodb.event.api.DedupEventStore
    public void copyFromRawChannel(String str, String str2, Predicate<ByteBuffer> predicate, Date date) {
        Preconditions.checkNotNull(str, "from");
        Preconditions.checkNotNull(str2, "to");
        this._delegate.scan(str, predicate, newCopySink(str2), 2000, date);
    }

    private ScanSink newCopySink(final String str) {
        final DedupQueue queueReadWrite = getQueueReadWrite(str, SERVICE_SLOW_WAIT_DURATION);
        if (queueReadWrite == null) {
            throw new ReadOnlyQueueException("Cannot copy to server that does not own the queue: " + str);
        }
        return new ScanSink() { // from class: com.bazaarvoice.emodb.event.dedup.DefaultDedupEventStore.2
            private DedupQueue _toQueue;
            private final String _toChannel;

            {
                this._toQueue = queueReadWrite;
                this._toChannel = DefaultDedupEventStore.this._channels.writeChannel(str);
            }

            @Override // com.bazaarvoice.emodb.event.api.ScanSink
            public void accept(List<ByteBuffer> list) {
                if (this._toQueue != null) {
                    try {
                        this._toQueue.addAll(list);
                    } catch (ReadOnlyQueueException e) {
                        this._toQueue = null;
                    }
                }
                if (this._toQueue == null) {
                    DefaultDedupEventStore.this._delegate.addAll(this._toChannel, list);
                }
            }
        };
    }

    @Override // com.bazaarvoice.emodb.event.api.BaseEventStore
    public void move(String str, String str2) {
        Preconditions.checkNotNull(str, "from");
        Preconditions.checkNotNull(str2, "to");
        moveToRawChannel(str, this._channels.writeChannel(str2));
    }

    @Override // com.bazaarvoice.emodb.event.api.DedupEventStore
    public void moveToRawChannel(String str, String str2) {
        Preconditions.checkNotNull(str, "from");
        Preconditions.checkNotNull(str2, "to");
        DedupQueue queueReadWrite = getQueueReadWrite(str, SERVICE_SLOW_WAIT_DURATION);
        if (queueReadWrite == null) {
            throw new ReadOnlyQueueException("Cannot move from server that does not own the source queue: " + str);
        }
        queueReadWrite.moveToRawChannel(str2);
    }

    @Override // com.bazaarvoice.emodb.event.api.BaseEventStore
    public void purge(String str) {
        Preconditions.checkNotNull(str, Permissions.QUEUE);
        DedupQueue queueReadWrite = getQueueReadWrite(str, SERVICE_SLOW_WAIT_DURATION);
        if (queueReadWrite == null) {
            throw new ReadOnlyQueueException("Cannot purge from server that does not own the queue: " + str);
        }
        queueReadWrite.purge();
    }

    private void checkLimit(long j, long j2) {
        Preconditions.checkArgument(j > 0, "Limit must be >0");
        Preconditions.checkArgument(j <= j2, "Limit must be <=%s", Long.valueOf(j2));
    }
}
