package com.bazaarvoice.emodb.event.dedup;

import com.bazaarvoice.emodb.common.dropwizard.lifecycle.ServiceFailureListener;
import com.bazaarvoice.emodb.event.api.EventData;
import com.bazaarvoice.emodb.event.api.EventSink;
import com.bazaarvoice.emodb.event.api.EventStore;
import com.bazaarvoice.emodb.event.api.SimpleEventSink;
import com.bazaarvoice.emodb.sortedq.api.Consumer;
import com.bazaarvoice.emodb.sortedq.api.SortedQueue;
import com.bazaarvoice.emodb.sortedq.api.SortedQueueFactory;
import com.bazaarvoice.emodb.sortedq.core.ByteBufferOrdering;
import com.bazaarvoice.emodb.sortedq.core.ReadOnlyQueueException;
import com.bazaarvoice.emodb.sortedq.db.QueueDAO;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Service;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/bazaarvoice/emodb/event/dedup/DedupQueue.class */
public class DedupQueue extends AbstractIdleService {
    private static final Logger _log = LoggerFactory.getLogger(DedupQueue.class);
    private static final Duration LAZY_FILL_DELAY = Duration.standardSeconds(1);
    private static final Duration SORTED_QUEUE_TIMEOUT = Duration.millis(100);
    private static final ByteBufferOrdering ORDERING = ByteBufferOrdering.INSTANCE;
    private final String _name;
    private final QueueDAO _queueDAO;
    private final ScheduledExecutorService _executor;
    private final EventStore _eventStore;
    private final Supplier<Boolean> _dedupEnabled;
    private final String _readChannel;
    private final String _writeChannel;
    private final AsyncFiller _asyncFiller = new AsyncFiller();
    private volatile SortedQueue _queue;
    private final SortedQueueFactory _sortedQueueFactory;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/bazaarvoice/emodb/event/dedup/DedupQueue$AsyncFiller.class */
    public class AsyncFiller implements Runnable {
        private volatile ScheduledFuture<?> _fillFuture;
        private volatile boolean _paused;
        private volatile int _consecutiveNoops;

        private AsyncFiller() {
        }

        boolean isFilling() {
            return this._fillFuture != null;
        }

        int getConsecutiveNoops() {
            return this._consecutiveNoops;
        }

        void start() {
            schedule(Duration.ZERO, false);
        }

        private synchronized void schedule(@Nullable Duration duration, boolean z) {
            if (this._paused) {
                return;
            }
            if (z) {
                this._fillFuture = null;
            }
            if (this._fillFuture != null || duration == null) {
                return;
            }
            this._fillFuture = DedupQueue.this._executor.schedule(this, duration.getMillis(), TimeUnit.MILLISECONDS);
        }

        @Override // java.lang.Runnable
        public void run() {
            Duration duration = null;
            try {
                try {
                    duration = fill();
                    schedule(duration, true);
                } catch (ReadOnlyQueueException e) {
                    schedule(duration, true);
                } catch (Throwable th) {
                    DedupQueue._log.error("Unexpected exception filling queue: {}", DedupQueue.this._name, th);
                    schedule(duration, true);
                }
            } catch (Throwable th2) {
                schedule(duration, true);
                throw th2;
            }
        }

        public void pause() {
            synchronized (this) {
                if (this._paused && this._fillFuture == null) {
                    return;
                }
                this._paused = true;
                ScheduledFuture<?> scheduledFuture = this._fillFuture;
                if (scheduledFuture != null) {
                    try {
                        scheduledFuture.get();
                    } catch (Throwable th) {
                        DedupQueue._log.error("Unexpected exception waiting for queue pause to complete: {}", DedupQueue.this._name, th);
                    }
                    synchronized (this) {
                        if (scheduledFuture == this._fillFuture) {
                            this._fillFuture = null;
                        }
                    }
                }
            }
        }

        public synchronized void resume() {
            this._paused = false;
        }

        private Duration fill() {
            Duration duration;
            Service.State state = DedupQueue.this.state();
            if ((state != Service.State.STARTING && state != Service.State.RUNNING) || !((Boolean) DedupQueue.this._dedupEnabled.get()).booleanValue()) {
                return null;
            }
            Drained drainWriteChannelTo = DedupQueue.this.drainWriteChannelTo(new Consumer() { // from class: com.bazaarvoice.emodb.event.dedup.DedupQueue.AsyncFiller.1
                @Override // com.bazaarvoice.emodb.sortedq.api.Consumer
                public void consume(List<ByteBuffer> list) {
                    DedupQueue.this.getQueue().addAll(list);
                }
            }, 1000);
            if (drainWriteChannelTo != Drained.NONE) {
                this._consecutiveNoops = 0;
                duration = drainWriteChannelTo == Drained.SOME ? Duration.ZERO : DedupQueue.LAZY_FILL_DELAY;
            } else {
                int i = this._consecutiveNoops + 1;
                this._consecutiveNoops = i;
                if (i < 3) {
                    duration = DedupQueue.LAZY_FILL_DELAY;
                } else {
                    this._consecutiveNoops = 0;
                    duration = null;
                }
            }
            return duration;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/bazaarvoice/emodb/event/dedup/DedupQueue$Drained.class */
    public enum Drained {
        NONE,
        SOME,
        ALL
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/bazaarvoice/emodb/event/dedup/DedupQueue$TrackingEventSink.class */
    public static class TrackingEventSink implements EventSink {
        private final EventSink _delegate;
        private int _hardLimit = 5000;
        private int _count;
        private boolean _done;

        TrackingEventSink(EventSink eventSink) {
            this._delegate = eventSink;
        }

        @Override // com.bazaarvoice.emodb.event.api.EventSink
        public int remaining() {
            int min = Math.min(this._delegate.remaining(), this._hardLimit - this._count);
            Preconditions.checkState(!this._done && min > 0);
            return min;
        }

        @Override // com.bazaarvoice.emodb.event.api.EventSink
        public EventSink.Status accept(EventData eventData) {
            EventSink.Status accept = this._delegate.accept(eventData);
            if (accept == EventSink.Status.ACCEPTED_CONTINUE) {
                int i = this._count + 1;
                this._count = i;
                if (i >= this._hardLimit) {
                    accept = EventSink.Status.ACCEPTED_STOP;
                }
            }
            if (accept == EventSink.Status.ACCEPTED_STOP || accept == EventSink.Status.REJECTED_STOP) {
                this._done = true;
            }
            return accept;
        }

        void setHardLimit(int i) {
            this._hardLimit = i;
            if (this._count >= this._hardLimit) {
                this._done = true;
            }
        }

        boolean isDone() {
            return this._done;
        }
    }

    public DedupQueue(String str, String str2, String str3, QueueDAO queueDAO, EventStore eventStore, Supplier<Boolean> supplier, ScheduledExecutorService scheduledExecutorService, SortedQueueFactory sortedQueueFactory, MetricRegistry metricRegistry) {
        this._name = (String) Preconditions.checkNotNull(str, "name");
        this._readChannel = (String) Preconditions.checkNotNull(str2, "readChannel");
        this._writeChannel = (String) Preconditions.checkNotNull(str3, "writeChannel");
        this._queueDAO = (QueueDAO) Preconditions.checkNotNull(queueDAO, "queueDAO");
        this._eventStore = (EventStore) Preconditions.checkNotNull(eventStore, "eventStore");
        this._dedupEnabled = (Supplier) Preconditions.checkNotNull(supplier, "dedupEnabled");
        this._executor = (ScheduledExecutorService) Preconditions.checkNotNull(scheduledExecutorService, "executor");
        this._sortedQueueFactory = sortedQueueFactory;
        ServiceFailureListener.listenTo(this, metricRegistry);
    }

    @Override // com.google.common.util.concurrent.AbstractIdleService
    protected void startUp() throws Exception {
        this._queue = this._sortedQueueFactory.create(this._name, this._queueDAO);
        this._asyncFiller.start();
    }

    @Override // com.google.common.util.concurrent.AbstractIdleService
    protected void shutDown() throws Exception {
        if (this._queue != null) {
            this._queue.setReadOnly();
            this._queue = null;
        }
    }

    public SortedQueue getQueue() throws ReadOnlyQueueException {
        SortedQueue sortedQueue = this._queue;
        if (sortedQueue == null) {
            throw new ReadOnlyQueueException();
        }
        return sortedQueue;
    }

    public void addAll(Collection<ByteBuffer> collection) {
        getQueue().addAll(collection);
    }

    public boolean peek(EventSink eventSink) {
        return peekOrPoll(null, eventSink);
    }

    public boolean poll(Duration duration, EventSink eventSink) {
        return peekOrPoll(duration, eventSink);
    }

    public void delete(Collection<String> collection, boolean z) {
        this._eventStore.delete(this._readChannel, collection, z);
    }

    public void moveToRawChannel(final String str) {
        this._asyncFiller.pause();
        try {
            SortedQueue queue = getQueue();
            this._eventStore.move(this._writeChannel, str);
            queue.drainTo(new Consumer() { // from class: com.bazaarvoice.emodb.event.dedup.DedupQueue.1
                @Override // com.bazaarvoice.emodb.sortedq.api.Consumer
                public void consume(List<ByteBuffer> list) {
                    DedupQueue.this._eventStore.addAll(str, list);
                }
            }, Long.MAX_VALUE);
            this._eventStore.move(this._readChannel, str);
        } finally {
            this._asyncFiller.resume();
        }
    }

    public void purge() {
        SortedQueue queue = getQueue();
        this._eventStore.purge(this._writeChannel);
        queue.clear();
        this._eventStore.purge(this._readChannel);
    }

    private boolean peekOrPoll(@Nullable Duration duration, EventSink eventSink) {
        if (!this._dedupEnabled.get().booleanValue()) {
            return false;
        }
        if (duration != null && this._eventStore.getClaimCount(this._readChannel) >= 20000) {
            return false;
        }
        TrackingEventSink trackingEventSink = new TrackingEventSink(eventSink);
        boolean peekOrPollReadChannel = peekOrPollReadChannel(duration, trackingEventSink);
        if (trackingEventSink.isDone()) {
            return peekOrPollReadChannel;
        }
        HashSet newHashSet = Sets.newHashSet();
        boolean peekOrPollWriteChannel = peekOrPollWriteChannel(duration, trackingEventSink, newHashSet);
        if (peekOrPollWriteChannel) {
            this._asyncFiller.start();
        }
        return peekOrPollWriteChannel || peekOrPollSortedQueue(duration, trackingEventSink, newHashSet);
    }

    private boolean peekOrPollReadChannel(@Nullable Duration duration, TrackingEventSink trackingEventSink) {
        return duration == null ? this._eventStore.peek(this._readChannel, trackingEventSink) : this._eventStore.poll(this._readChannel, duration, trackingEventSink);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean addAndPeekOrPollReadChannel(Collection<ByteBuffer> collection, @Nullable Duration duration, TrackingEventSink trackingEventSink) {
        if (collection.isEmpty()) {
            return false;
        }
        return duration == null ? this._eventStore.addAllAndPeek(this._readChannel, collection, trackingEventSink) : this._eventStore.addAllAndPoll(this._readChannel, collection, duration, trackingEventSink);
    }

    private boolean peekOrPollWriteChannel(@Nullable final Duration duration, final TrackingEventSink trackingEventSink, final Set<ByteBuffer> set) {
        return drainWriteChannelTo(new Consumer() { // from class: com.bazaarvoice.emodb.event.dedup.DedupQueue.2
            @Override // com.bazaarvoice.emodb.sortedq.api.Consumer
            public void consume(List<ByteBuffer> list) {
                if (list.size() == 1000) {
                    trackingEventSink.setHardLimit(500);
                }
                List filterDuplicates = DedupQueue.this.filterDuplicates(list, set);
                if (!filterDuplicates.isEmpty() && !trackingEventSink.isDone() && DedupQueue.this.getQueue().isEmpty()) {
                    filterDuplicates = DedupQueue.this.sorted(filterDuplicates);
                    int i = 0;
                    do {
                        int min = Math.min(trackingEventSink.remaining() + i, filterDuplicates.size());
                        DedupQueue.this.addAndPeekOrPollReadChannel(filterDuplicates.subList(0, min), duration, trackingEventSink);
                        filterDuplicates = filterDuplicates.subList(min, filterDuplicates.size());
                        i += 10;
                        if (filterDuplicates.isEmpty()) {
                            break;
                        }
                    } while (!trackingEventSink.isDone());
                }
                if (filterDuplicates.isEmpty()) {
                    return;
                }
                DedupQueue.this.getQueue().addAll(filterDuplicates);
                set.removeAll(Sets.newHashSet(filterDuplicates));
            }
        }, 1000) == Drained.SOME;
    }

    private boolean peekOrPollSortedQueue(@Nullable final Duration duration, final TrackingEventSink trackingEventSink, final Set<ByteBuffer> set) {
        SortedQueue queue = getQueue();
        int i = 0;
        long currentTimeMillis = System.currentTimeMillis() + SORTED_QUEUE_TIMEOUT.getMillis();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        while (!trackingEventSink.isDone() && !queue.isEmpty()) {
            long currentTimeMillis2 = currentTimeMillis - System.currentTimeMillis();
            if (currentTimeMillis2 <= 0) {
                break;
            }
            queue.drainTo(new Consumer() { // from class: com.bazaarvoice.emodb.event.dedup.DedupQueue.3
                @Override // com.bazaarvoice.emodb.sortedq.api.Consumer
                public void consume(List<ByteBuffer> list) {
                    atomicBoolean.set(DedupQueue.this.addAndPeekOrPollReadChannel(DedupQueue.this.filterDuplicates(list, set), duration, trackingEventSink));
                }
            }, trackingEventSink.remaining() + i, Duration.millis(currentTimeMillis2));
            i = Math.min(i + 10, 1000);
        }
        return !queue.isEmpty() || atomicBoolean.get();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Drained drainWriteChannelTo(Consumer consumer, int i) {
        SimpleEventSink simpleEventSink = new SimpleEventSink(i);
        boolean poll = this._eventStore.poll(this._writeChannel, Duration.standardSeconds(30L), simpleEventSink);
        List<EventData> events = simpleEventSink.getEvents();
        if (events.isEmpty()) {
            return Drained.NONE;
        }
        consumer.consume(getEventData(events));
        this._eventStore.delete(this._writeChannel, getEventIds(events), true);
        return poll ? Drained.SOME : Drained.ALL;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<ByteBuffer> filterDuplicates(Collection<ByteBuffer> collection, Set<ByteBuffer> set) {
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(collection.size());
        for (ByteBuffer byteBuffer : collection) {
            if (set.add(byteBuffer)) {
                newArrayListWithCapacity.add(byteBuffer);
            }
        }
        return newArrayListWithCapacity;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<ByteBuffer> sorted(Iterable<ByteBuffer> iterable) {
        return ORDERING.immutableSortedCopy(iterable);
    }

    private List<String> getEventIds(List<EventData> list) {
        return Lists.transform(list, new Function<EventData, String>() { // from class: com.bazaarvoice.emodb.event.dedup.DedupQueue.4
            @Override // com.google.common.base.Function
            public String apply(EventData eventData) {
                return eventData.getId();
            }
        });
    }

    private List<ByteBuffer> getEventData(List<EventData> list) {
        return Lists.transform(list, new Function<EventData, ByteBuffer>() { // from class: com.bazaarvoice.emodb.event.dedup.DedupQueue.5
            @Override // com.google.common.base.Function
            public ByteBuffer apply(EventData eventData) {
                return eventData.getData();
            }
        });
    }

    @Override // com.google.common.util.concurrent.AbstractIdleService
    public String toString() {
        SortedQueue sortedQueue = this._queue;
        Objects.ToStringHelper stringHelper = Objects.toStringHelper(this);
        stringHelper.add("name", this._name);
        stringHelper.add("#write", this._eventStore.getSizeEstimate(this._writeChannel, 1000L));
        stringHelper.add("#write-claims", this._eventStore.getClaimCount(this._writeChannel));
        stringHelper.add("#dedup", sortedQueue != null ? Long.valueOf(sortedQueue.sizeEstimate()) : "n/a");
        stringHelper.add("#read", this._eventStore.getSizeEstimate(this._readChannel, 1000L));
        stringHelper.add("#read-claims", this._eventStore.getClaimCount(this._readChannel));
        if (this._asyncFiller.isFilling()) {
            stringHelper.addValue("FILLING");
            stringHelper.add("idle", this._asyncFiller.getConsecutiveNoops());
        }
        return stringHelper.toString();
    }
}
