package com.bazaarvoice.emodb.databus.core;

import com.bazaarvoice.emodb.common.dropwizard.lifecycle.LifeCycleRegistry;
import com.bazaarvoice.emodb.common.dropwizard.time.ClockTicker;
import com.bazaarvoice.emodb.common.uuid.TimeUUIDs;
import com.bazaarvoice.emodb.databus.ChannelNames;
import com.bazaarvoice.emodb.databus.DefaultJoinFilter;
import com.bazaarvoice.emodb.databus.QueueDrainExecutorService;
import com.bazaarvoice.emodb.databus.SystemInternalId;
import com.bazaarvoice.emodb.databus.api.Event;
import com.bazaarvoice.emodb.databus.api.MoveSubscriptionStatus;
import com.bazaarvoice.emodb.databus.api.Names;
import com.bazaarvoice.emodb.databus.api.PollResult;
import com.bazaarvoice.emodb.databus.api.ReplaySubscriptionStatus;
import com.bazaarvoice.emodb.databus.api.Subscription;
import com.bazaarvoice.emodb.databus.api.UnauthorizedSubscriptionException;
import com.bazaarvoice.emodb.databus.api.UnknownMoveException;
import com.bazaarvoice.emodb.databus.api.UnknownReplayException;
import com.bazaarvoice.emodb.databus.api.UnknownSubscriptionException;
import com.bazaarvoice.emodb.databus.auth.DatabusAuthorizer;
import com.bazaarvoice.emodb.databus.db.SubscriptionDAO;
import com.bazaarvoice.emodb.databus.model.OwnedSubscription;
import com.bazaarvoice.emodb.event.api.EventData;
import com.bazaarvoice.emodb.event.api.EventSink;
import com.bazaarvoice.emodb.event.core.SizeCacheKey;
import com.bazaarvoice.emodb.job.api.JobHandler;
import com.bazaarvoice.emodb.job.api.JobHandlerRegistry;
import com.bazaarvoice.emodb.job.api.JobIdentifier;
import com.bazaarvoice.emodb.job.api.JobRequest;
import com.bazaarvoice.emodb.job.api.JobService;
import com.bazaarvoice.emodb.job.api.JobStatus;
import com.bazaarvoice.emodb.sor.api.Coordinate;
import com.bazaarvoice.emodb.sor.api.Intrinsic;
import com.bazaarvoice.emodb.sor.api.ReadConsistency;
import com.bazaarvoice.emodb.sor.api.UnknownTableException;
import com.bazaarvoice.emodb.sor.condition.Condition;
import com.bazaarvoice.emodb.sor.condition.Conditions;
import com.bazaarvoice.emodb.sor.core.DataProvider;
import com.bazaarvoice.emodb.sor.core.UpdateIntentEvent;
import com.bazaarvoice.emodb.sor.core.UpdateRef;
import com.bazaarvoice.emodb.sortedq.core.ReadOnlyQueueException;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Ticker;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.primitives.Ints;
import com.google.inject.Inject;
import io.dropwizard.lifecycle.Managed;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/bazaarvoice/emodb/databus/core/DefaultDatabus.class */
public class DefaultDatabus implements OwnerAwareDatabus, Managed {
    private static final int MAX_EVENTS_TO_CONSOLIDATE = 1000;
    private static final int MAX_ITEMS_TO_FETCH_FOR_QUEUE_DRAINING = 100;
    private final EventBus _eventBus;
    private final SubscriptionDAO _subscriptionDao;
    private final DatabusEventStore _eventStore;
    private final DataProvider _dataProvider;
    private final SubscriptionEvaluator _subscriptionEvaluator;
    private final JobService _jobService;
    private final DatabusAuthorizer _databusAuthorizer;
    private final String _systemOwnerId;
    private final Meter _peekedMeter;
    private final Meter _polledMeter;
    private final Meter _renewedMeter;
    private final Meter _ackedMeter;
    private final Meter _recentUnknownMeter;
    private final Meter _staleUnknownMeter;
    private final Meter _redundantMeter;
    private final Meter _discardedMeter;
    private final Meter _consolidatedMeter;
    private final Meter _unownedSubscriptionMeter;
    private final Meter _drainQueueAsyncMeter;
    private final Meter _drainQueueTaskMeter;
    private final Meter _drainQueueRedundantMeter;
    private final LoadingCache<SizeCacheKey, Map.Entry<Long, Long>> _eventSizeCache;
    private final Supplier<Condition> _defaultJoinFilterCondition;
    private final Ticker _ticker;
    private final Clock _clock;
    private ExecutorService _drainService;
    private ConcurrentMap<String, Long> _drainedSubscriptionsMap = Maps.newConcurrentMap();
    private static final Logger _log = LoggerFactory.getLogger(DefaultDatabus.class);
    private static final Duration MAX_POLL_TIME = Duration.millis(100);
    private static final Duration RECENT_UNKNOWN_RETRY = Duration.millis(400);
    private static final Duration STALE_UNKNOWN_AGE = Duration.standardSeconds(2);
    private static final Duration MAX_QUEUE_DRAIN_TIME_FOR_A_SUBSCRIPTION = Duration.standardMinutes(1);
    private static final List<List<String>> EMPTY_TAGS = ImmutableList.of(ImmutableList.of());
    private static final Comparator<List<String>> TAG_LIST_COMPARATOR = new Comparator<List<String>>() { // from class: com.bazaarvoice.emodb.databus.core.DefaultDatabus.6
        @Override // java.util.Comparator
        public int compare(List<String> list, List<String> list2) {
            int size = list.size();
            int size2 = list2.size();
            int min = Math.min(size, size2);
            for (int i = 0; i < min; i++) {
                int compareTo = list.get(i).compareTo(list2.get(i));
                if (compareTo != 0) {
                    return compareTo;
                }
            }
            return Integer.compare(size, size2);
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/bazaarvoice/emodb/databus/core/DefaultDatabus$ConsolidatingEventSink.class */
    public class ConsolidatingEventSink implements EventSink {
        private final Map<Coordinate, EventList> _eventMap = Maps.newLinkedHashMap();
        private final int _limit;

        ConsolidatingEventSink(int i) {
            this._limit = i;
        }

        @Override // com.bazaarvoice.emodb.event.api.EventSink
        public int remaining() {
            return (this._limit - this._eventMap.size()) + 1;
        }

        @Override // com.bazaarvoice.emodb.event.api.EventSink
        public EventSink.Status accept(EventData eventData) {
            UpdateRef fromByteBuffer = UpdateRefSerializer.fromByteBuffer(eventData.getData());
            Coordinate of = Coordinate.of(fromByteBuffer.getTable(), fromByteBuffer.getKey());
            EventList eventList = this._eventMap.get(of);
            if (eventList == null) {
                if (this._eventMap.size() == this._limit) {
                    return EventSink.Status.REJECTED_STOP;
                }
                Map<Coordinate, EventList> map = this._eventMap;
                EventList eventList2 = new EventList();
                eventList = eventList2;
                map.put(of, eventList2);
            }
            eventList.add(eventData.getId(), fromByteBuffer.getChangeId(), fromByteBuffer.getTags());
            return eventList.size() == 1000 ? EventSink.Status.ACCEPTED_STOP : EventSink.Status.ACCEPTED_CONTINUE;
        }

        Map<Coordinate, EventList> getEvents() {
            return this._eventMap;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/bazaarvoice/emodb/databus/core/DefaultDatabus$EventList.class */
    public static class EventList {
        private final List<Pair<String, UUID>> _eventAndChangeIds;
        private List<List<String>> _tags;

        private EventList() {
            this._eventAndChangeIds = Lists.newArrayList();
        }

        void add(String str, UUID uuid, Set<String> set) {
            this._eventAndChangeIds.add(Pair.of(str, uuid));
            this._tags = DefaultDatabus.sortedTagUnion(this._tags, set);
        }

        List<Pair<String, UUID>> getEventAndChangeIds() {
            return this._eventAndChangeIds;
        }

        List<List<String>> getTags() {
            return this._tags;
        }

        int size() {
            return this._eventAndChangeIds.size();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/bazaarvoice/emodb/databus/core/DefaultDatabus$Item.class */
    public static class Item implements Comparable<Item> {
        private final List<String> _consolidatedEventIds;
        private final int _sortIndex;
        private Map<String, Object> _content;
        private List<List<String>> _tags;

        Item(String str, int i, Map<String, Object> map, List<List<String>> list) {
            this._consolidatedEventIds = Lists.newArrayList(str);
            this._sortIndex = i;
            this._content = map;
            this._tags = list;
        }

        boolean consolidateWith(Item item) {
            if (this._consolidatedEventIds.size() >= 1000) {
                return false;
            }
            this._consolidatedEventIds.addAll(item._consolidatedEventIds);
            if (Intrinsic.getVersion(this._content).longValue() < Intrinsic.getVersion(item._content).longValue()) {
                this._content = item._content;
            }
            Iterator<List<String>> it2 = item._tags.iterator();
            while (it2.hasNext()) {
                this._tags = DefaultDatabus.sortedTagUnion(this._tags, it2.next());
            }
            return true;
        }

        Event toEvent() {
            Collections.sort(this._consolidatedEventIds);
            return new Event(EventKeyFormat.encode(this._consolidatedEventIds), this._content, this._tags);
        }

        @Override // java.lang.Comparable
        public int compareTo(Item item) {
            return Ints.compare(this._sortIndex, item._sortIndex);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/bazaarvoice/emodb/databus/core/DefaultDatabus$ResolvedItemSink.class */
    public interface ResolvedItemSink {
        void accept(Coordinate coordinate, Item item);
    }

    @Inject
    public DefaultDatabus(LifeCycleRegistry lifeCycleRegistry, EventBus eventBus, DataProvider dataProvider, SubscriptionDAO subscriptionDAO, DatabusEventStore databusEventStore, SubscriptionEvaluator subscriptionEvaluator, JobService jobService, JobHandlerRegistry jobHandlerRegistry, DatabusAuthorizer databusAuthorizer, @SystemInternalId String str, @DefaultJoinFilter Supplier<Condition> supplier, @QueueDrainExecutorService ExecutorService executorService, MetricRegistry metricRegistry, Clock clock) {
        this._eventBus = eventBus;
        this._subscriptionDao = subscriptionDAO;
        this._eventStore = databusEventStore;
        this._dataProvider = dataProvider;
        this._subscriptionEvaluator = subscriptionEvaluator;
        this._jobService = jobService;
        this._databusAuthorizer = databusAuthorizer;
        this._systemOwnerId = str;
        this._defaultJoinFilterCondition = supplier;
        this._drainService = (ExecutorService) Preconditions.checkNotNull(executorService, "drainService");
        this._ticker = ClockTicker.getTicker(clock);
        this._clock = clock;
        this._peekedMeter = newEventMeter("peeked", metricRegistry);
        this._polledMeter = newEventMeter("polled", metricRegistry);
        this._renewedMeter = newEventMeter("renewed", metricRegistry);
        this._ackedMeter = newEventMeter("acked", metricRegistry);
        this._recentUnknownMeter = newEventMeter("recent-unknown", metricRegistry);
        this._staleUnknownMeter = newEventMeter("stale-unknown", metricRegistry);
        this._redundantMeter = newEventMeter("redundant", metricRegistry);
        this._discardedMeter = newEventMeter("discarded", metricRegistry);
        this._consolidatedMeter = newEventMeter("consolidated", metricRegistry);
        this._unownedSubscriptionMeter = newEventMeter("unowned", metricRegistry);
        this._drainQueueAsyncMeter = newEventMeter("drainQueueAsync", metricRegistry);
        this._drainQueueTaskMeter = newEventMeter("drainQueueTask", metricRegistry);
        this._drainQueueRedundantMeter = newEventMeter("drainQueueRedundant", metricRegistry);
        this._eventSizeCache = CacheBuilder.newBuilder().expireAfterWrite(15L, TimeUnit.SECONDS).maximumSize(2000L).ticker(this._ticker).build(new CacheLoader<SizeCacheKey, Map.Entry<Long, Long>>() { // from class: com.bazaarvoice.emodb.databus.core.DefaultDatabus.1
            @Override // com.google.common.cache.CacheLoader
            public Map.Entry<Long, Long> load(SizeCacheKey sizeCacheKey) throws Exception {
                return Maps.immutableEntry(Long.valueOf(DefaultDatabus.this.internalEventCountUpTo(sizeCacheKey.channelName, sizeCacheKey.limitAsked)), Long.valueOf(sizeCacheKey.limitAsked));
            }
        });
        lifeCycleRegistry.manage((LifeCycleRegistry) this);
        Preconditions.checkNotNull(jobHandlerRegistry, "jobHandlerRegistry");
        registerMoveSubscriptionJobHandler(jobHandlerRegistry);
        registerReplaySubscriptionJobHandler(jobHandlerRegistry);
    }

    private void registerMoveSubscriptionJobHandler(JobHandlerRegistry jobHandlerRegistry) {
        jobHandlerRegistry.addHandler(MoveSubscriptionJob.INSTANCE, new Supplier<JobHandler<MoveSubscriptionRequest, MoveSubscriptionResult>>() { // from class: com.bazaarvoice.emodb.databus.core.DefaultDatabus.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.google.common.base.Supplier
            public JobHandler<MoveSubscriptionRequest, MoveSubscriptionResult> get() {
                return new JobHandler<MoveSubscriptionRequest, MoveSubscriptionResult>() { // from class: com.bazaarvoice.emodb.databus.core.DefaultDatabus.2.1
                    @Override // com.bazaarvoice.emodb.job.api.JobHandler
                    public MoveSubscriptionResult run(MoveSubscriptionRequest moveSubscriptionRequest) throws Exception {
                        try {
                            DefaultDatabus.this.checkSubscriptionOwner(moveSubscriptionRequest.getOwnerId(), moveSubscriptionRequest.getFrom());
                            DefaultDatabus.this.checkSubscriptionOwner(moveSubscriptionRequest.getOwnerId(), moveSubscriptionRequest.getTo());
                            DefaultDatabus.this._eventStore.move(moveSubscriptionRequest.getFrom(), moveSubscriptionRequest.getTo());
                            return new MoveSubscriptionResult(new Date());
                        } catch (ReadOnlyQueueException e) {
                            return notOwner();
                        }
                    }
                };
            }
        });
    }

    private void registerReplaySubscriptionJobHandler(JobHandlerRegistry jobHandlerRegistry) {
        jobHandlerRegistry.addHandler(ReplaySubscriptionJob.INSTANCE, new Supplier<JobHandler<ReplaySubscriptionRequest, ReplaySubscriptionResult>>() { // from class: com.bazaarvoice.emodb.databus.core.DefaultDatabus.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.google.common.base.Supplier
            public JobHandler<ReplaySubscriptionRequest, ReplaySubscriptionResult> get() {
                return new JobHandler<ReplaySubscriptionRequest, ReplaySubscriptionResult>() { // from class: com.bazaarvoice.emodb.databus.core.DefaultDatabus.3.1
                    @Override // com.bazaarvoice.emodb.job.api.JobHandler
                    public ReplaySubscriptionResult run(ReplaySubscriptionRequest replaySubscriptionRequest) throws Exception {
                        try {
                            DefaultDatabus.this.checkSubscriptionOwner(replaySubscriptionRequest.getOwnerId(), replaySubscriptionRequest.getSubscription());
                            DefaultDatabus.this.replay(replaySubscriptionRequest.getSubscription(), replaySubscriptionRequest.getSince());
                            if (replaySubscriptionRequest.getSince() == null || !new DateTime(replaySubscriptionRequest.getSince()).plus(DatabusChannelConfiguration.REPLAY_TTL).isBeforeNow()) {
                                return new ReplaySubscriptionResult(new Date());
                            }
                            throw new ReplayTooLateException();
                        } catch (ReadOnlyQueueException e) {
                            return notOwner();
                        }
                    }
                };
            }
        });
    }

    private void createDatabusReplaySubscription() {
        subscribe(this._systemOwnerId, ChannelNames.getMasterReplayChannel(), Conditions.alwaysTrue(), Duration.standardDays(3650L), DatabusChannelConfiguration.REPLAY_TTL, false);
    }

    private Meter newEventMeter(String str, MetricRegistry metricRegistry) {
        return metricRegistry.meter(getMetricName(str));
    }

    private String getMetricName(String str) {
        return MetricRegistry.name("bv.emodb.databus", "DefaultDatabus", str);
    }

    @VisibleForTesting
    protected ConcurrentMap<String, Long> getDrainedSubscriptionsMap() {
        return this._drainedSubscriptionsMap;
    }

    @Override // io.dropwizard.lifecycle.Managed
    public void start() throws Exception {
        createDatabusReplaySubscription();
        this._eventBus.register(this);
    }

    @Override // io.dropwizard.lifecycle.Managed
    public void stop() throws Exception {
        this._eventBus.unregister(this);
    }

    @Override // com.bazaarvoice.emodb.databus.core.OwnerAwareDatabus
    public Iterator<Subscription> listSubscriptions(String str, @Nullable String str2, long j) {
        Preconditions.checkArgument(j > 0, "Limit must be >0");
        return StreamSupport.stream(this._subscriptionDao.getAllSubscriptions().spliterator(), false).filter(ownedSubscription -> {
            return this._databusAuthorizer.owner(str).canAccessSubscription(ownedSubscription);
        }).sorted((ownedSubscription2, ownedSubscription3) -> {
            return ownedSubscription2.getName().compareTo(ownedSubscription3.getName());
        }).filter(ownedSubscription4 -> {
            return str2 == null || ownedSubscription4.getName().compareTo(str2) > 0;
        }).limit(j).map(ownedSubscription5 -> {
            return ownedSubscription5;
        }).iterator();
    }

    @Override // com.bazaarvoice.emodb.databus.core.OwnerAwareDatabus
    public void subscribe(String str, String str2, Condition condition, Duration duration, Duration duration2) {
        subscribe(str, str2, condition, duration, duration2, true);
    }

    @Override // com.bazaarvoice.emodb.databus.core.OwnerAwareDatabus
    public void subscribe(String str, String str2, Condition condition, Duration duration, Duration duration2, boolean z) {
        checkLegalSubscriptionName(str2);
        checkSubscriptionOwner(str, str2);
        Preconditions.checkNotNull(condition, "tableFilter");
        Preconditions.checkArgument(duration.isLongerThan(Duration.ZERO), "SubscriptionTtl must be >0");
        Preconditions.checkArgument(duration2.isLongerThan(Duration.ZERO), "EventTtl must be >0");
        TableFilterValidator.checkAllowed(condition);
        if (z) {
            Condition condition2 = this._defaultJoinFilterCondition.get();
            if (!Conditions.alwaysTrue().equals(condition2)) {
                condition = condition.equals(Conditions.alwaysTrue()) ? condition2 : Conditions.and(condition, condition2);
            }
        }
        this._subscriptionDao.insertSubscription(str, str2, condition, duration, duration2);
    }

    @Override // com.bazaarvoice.emodb.databus.core.OwnerAwareDatabus
    public void unsubscribe(String str, String str2) {
        checkLegalSubscriptionName(str2);
        checkSubscriptionOwner(str, str2);
        this._subscriptionDao.deleteSubscription(str2);
        this._eventStore.purge(str2);
    }

    @Override // com.bazaarvoice.emodb.databus.core.OwnerAwareDatabus
    public Subscription getSubscription(String str, String str2) throws UnknownSubscriptionException {
        checkLegalSubscriptionName(str2);
        OwnedSubscription subscriptionByName = getSubscriptionByName(str2);
        checkSubscriptionOwner(str, subscriptionByName);
        return subscriptionByName;
    }

    private OwnedSubscription getSubscriptionByName(String str) {
        OwnedSubscription subscription = this._subscriptionDao.getSubscription(str);
        if (subscription == null) {
            throw new UnknownSubscriptionException(str);
        }
        return subscription;
    }

    @Subscribe
    public void onUpdateIntent(UpdateIntentEvent updateIntentEvent) {
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(updateIntentEvent.getUpdateRefs().size());
        Iterator<UpdateRef> it2 = updateIntentEvent.getUpdateRefs().iterator();
        while (it2.hasNext()) {
            newArrayListWithCapacity.add(UpdateRefSerializer.toByteBuffer(it2.next()));
        }
        this._eventStore.addAll(ChannelNames.getMasterFanoutChannel(), newArrayListWithCapacity);
    }

    @Override // com.bazaarvoice.emodb.databus.core.OwnerAwareDatabus
    public long getEventCount(String str, String str2) {
        return getEventCountUpTo(str, str2, Long.MAX_VALUE);
    }

    @Override // com.bazaarvoice.emodb.databus.core.OwnerAwareDatabus
    public long getEventCountUpTo(String str, String str2, long j) {
        checkSubscriptionOwner(str, str2);
        SizeCacheKey sizeCacheKey = new SizeCacheKey(str2, j);
        Map.Entry<Long, Long> unchecked = this._eventSizeCache.getUnchecked(sizeCacheKey);
        if (unchecked.getValue().longValue() >= j) {
            return unchecked.getKey().longValue();
        }
        this._eventSizeCache.invalidate(sizeCacheKey);
        return this._eventSizeCache.getUnchecked(sizeCacheKey).getKey().longValue();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long internalEventCountUpTo(String str, long j) {
        checkLegalSubscriptionName(str);
        Preconditions.checkArgument(j > 0, "Limit must be >0");
        return this._eventStore.getSizeEstimate(str, j);
    }

    @Override // com.bazaarvoice.emodb.databus.core.OwnerAwareDatabus
    public long getClaimCount(String str, String str2) {
        checkLegalSubscriptionName(str2);
        checkSubscriptionOwner(str, str2);
        return this._eventStore.getClaimCount(str2);
    }

    @Override // com.bazaarvoice.emodb.databus.core.OwnerAwareDatabus
    public Iterator<Event> peek(String str, String str2, int i) {
        checkLegalSubscriptionName(str2);
        Preconditions.checkArgument(i > 0, "Limit must be >0");
        checkSubscriptionOwner(str, str2);
        return peekOrPoll(str2, null, i).getEventIterator();
    }

    @Override // com.bazaarvoice.emodb.databus.core.OwnerAwareDatabus
    public PollResult poll(String str, String str2, Duration duration, int i) {
        checkLegalSubscriptionName(str2);
        Preconditions.checkArgument(duration.getMillis() >= 0, "ClaimTtl must be >=0");
        Preconditions.checkArgument(i > 0, "Limit must be >0");
        checkSubscriptionOwner(str, str2);
        return peekOrPoll(str2, duration, i);
    }

    private PollResult peekOrPoll(final String str, @Nullable Duration duration, final int i) {
        Iterator<Event> concat;
        int size;
        int i2 = i;
        Map<Coordinate, EventList> of = ImmutableMap.of();
        HashMap newHashMap = Maps.newHashMap();
        boolean z = duration == null;
        boolean z2 = !z && duration.getMillis() > 0;
        boolean z3 = false;
        boolean z4 = true;
        boolean z5 = false;
        final Meter meter = z ? this._peekedMeter : this._polledMeter;
        Stopwatch createStarted = Stopwatch.createStarted(this._ticker);
        int i3 = 0;
        while (true) {
            if (i2 == 0) {
                break;
            }
            ConsolidatingEventSink consolidatingEventSink = new ConsolidatingEventSink(i2 + i3);
            boolean peek = z ? this._eventStore.peek(str, consolidatingEventSink) : this._eventStore.poll(str, duration, consolidatingEventSink);
            of = consolidatingEventSink.getEvents();
            if (of.isEmpty()) {
                z3 = peek;
                break;
            }
            do {
                boolean resolvePeekOrPollEvents = resolvePeekOrPollEvents(str, of, Math.min(10, i2), (coordinate, item) -> {
                    Item item = (Item) newHashMap.get(coordinate);
                    if (item == null || !item.consolidateWith(item)) {
                        newHashMap.put(coordinate, item);
                    } else {
                        this._consolidatedMeter.mark();
                    }
                });
                i2 = i - newHashMap.size();
                z5 = z5 || resolvePeekOrPollEvents;
                if (of.isEmpty() || i2 <= 0) {
                    break;
                }
            } while (createStarted.elapsed(TimeUnit.MILLISECONDS) < MAX_POLL_TIME.getMillis());
            z3 = peek || of.size() + newHashMap.size() > i;
            if (!peek) {
                break;
            }
            i3 = 10;
            if (!z2) {
                break;
            }
            boolean z6 = createStarted.elapsed(TimeUnit.MILLISECONDS) < MAX_POLL_TIME.getMillis();
            z4 = z6;
            if (!z6) {
                break;
            }
        }
        if (newHashMap.isEmpty()) {
            concat = Iterators.emptyIterator();
            size = 0;
            if (z2 && !of.isEmpty()) {
                unclaim(str, of.values());
            }
            if (z2 && z5 && !z4) {
                drainQueueAsync(str);
            }
        } else if (of.isEmpty()) {
            concat = toEvents(newHashMap.values()).iterator();
            size = newHashMap.size();
            meter.mark(size);
        } else {
            final LinkedHashMap newLinkedHashMap = Maps.newLinkedHashMap(of);
            final int i4 = i2;
            concat = Iterators.concat(toEvents(newHashMap.values()).iterator(), new AbstractIterator<Event>() { // from class: com.bazaarvoice.emodb.databus.core.DefaultDatabus.4
                private Iterator<Event> currentBatch = Iterators.emptyIterator();
                private int remaining;

                {
                    this.remaining = i4;
                }

                /* JADX INFO: Access modifiers changed from: protected */
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // com.google.common.collect.AbstractIterator
                public Event computeNext() {
                    Event event = null;
                    if (this.currentBatch.hasNext()) {
                        event = this.currentBatch.next();
                    } else if (!newLinkedHashMap.isEmpty() && this.remaining > 0) {
                        try {
                            ArrayList newArrayList = Lists.newArrayList();
                            do {
                                DefaultDatabus.this.resolvePeekOrPollEvents(str, newLinkedHashMap, Math.min(this.remaining, 25), (coordinate2, item2) -> {
                                    newArrayList.add(item2);
                                });
                                if (!newArrayList.isEmpty()) {
                                    this.remaining -= newArrayList.size();
                                    this.currentBatch = DefaultDatabus.this.toEvents(newArrayList).iterator();
                                    event = this.currentBatch.next();
                                }
                                if (event != null || newLinkedHashMap.isEmpty()) {
                                    break;
                                }
                            } while (this.remaining > 0);
                        } catch (Exception e) {
                            DefaultDatabus._log.warn("Failed to load additional events during peek/poll for subscription {}", str, e);
                        }
                    }
                    if (event != null) {
                        return event;
                    }
                    if (!newLinkedHashMap.isEmpty()) {
                        try {
                            DefaultDatabus.this.unclaim(str, newLinkedHashMap.values());
                        } catch (Exception e2) {
                            DefaultDatabus._log.warn("Failed to unclaim {} events from subscription {}", Integer.valueOf(newLinkedHashMap.size()), str, e2);
                        }
                    }
                    meter.mark(i - this.remaining);
                    return endOfData();
                }
            });
            size = newHashMap.size() + newLinkedHashMap.size();
        }
        return new PollResult(concat, size, z3);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean resolvePeekOrPollEvents(String str, Map<Coordinate, EventList> map, int i, ResolvedItemSink resolvedItemSink) {
        HashMap newHashMap = Maps.newHashMap();
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList newArrayList2 = Lists.newArrayList();
        int i2 = i;
        boolean z = false;
        DataProvider.AnnotatedGet prepareGetAnnotated = this._dataProvider.prepareGetAnnotated(ReadConsistency.STRONG);
        Iterator<Map.Entry<Coordinate, EventList>> it2 = map.entrySet().iterator();
        while (it2.hasNext() && i2 != 0) {
            Map.Entry<Coordinate, EventList> next = it2.next();
            Coordinate key = next.getKey();
            try {
                prepareGetAnnotated.add(key.getTable(), key.getId());
                i2--;
            } catch (UnknownTableException e) {
                Iterator<Pair<String, UUID>> it3 = next.getValue().getEventAndChangeIds().iterator();
                while (it3.hasNext()) {
                    newArrayList.add(it3.next().first());
                }
                this._discardedMeter.mark(r0.size());
            }
            newHashMap.put(key, Integer.valueOf(newHashMap.size()));
        }
        Iterator<DataProvider.AnnotatedContent> execute = prepareGetAnnotated.execute();
        while (execute.hasNext()) {
            DataProvider.AnnotatedContent next2 = execute.next();
            Map<String, Object> content = next2.getContent();
            Coordinate fromJson = Coordinate.fromJson(content);
            EventList eventList = map.get(fromJson);
            List<List<String>> tags = eventList.getTags();
            Item item = null;
            for (Pair<String, UUID> pair : eventList.getEventAndChangeIds()) {
                String first = pair.first();
                UUID second = pair.second();
                if (next2.isChangeDeltaPending(second)) {
                    if (isRecent(second)) {
                        newArrayList2.add(first);
                        this._recentUnknownMeter.mark();
                    } else {
                        this._staleUnknownMeter.mark();
                    }
                } else if (next2.isChangeDeltaRedundant(second)) {
                    newArrayList.add(first);
                    this._redundantMeter.mark();
                } else {
                    Item item2 = new Item(first, ((Integer) newHashMap.get(fromJson)).intValue(), content, tags);
                    if (item == null) {
                        item = item2;
                    } else if (item.consolidateWith(item2)) {
                        this._consolidatedMeter.mark();
                    } else {
                        resolvedItemSink.accept(fromJson, item);
                        item = item2;
                    }
                }
            }
            if (item != null) {
                resolvedItemSink.accept(fromJson, item);
            }
        }
        if (!newArrayList2.isEmpty()) {
            this._eventStore.renew(str, newArrayList2, RECENT_UNKNOWN_RETRY, false);
        }
        if (!newArrayList.isEmpty()) {
            this._eventStore.delete(str, newArrayList, true);
            z = true;
        }
        Iterator it4 = newHashMap.keySet().iterator();
        while (it4.hasNext()) {
            map.remove((Coordinate) it4.next());
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<Event> toEvents(Collection<Item> collection) {
        return collection.isEmpty() ? ImmutableList.of() : (List) collection.stream().sorted().map((v0) -> {
            return v0.toEvent();
        }).collect(Collectors.toList());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void unclaim(String str, Collection<EventList> collection) {
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<EventList> it2 = collection.iterator();
        while (it2.hasNext()) {
            Iterator<Pair<String, UUID>> it3 = it2.next().getEventAndChangeIds().iterator();
            while (it3.hasNext()) {
                newArrayList.add(it3.next().first());
            }
        }
        this._eventStore.renew(str, newArrayList, Duration.ZERO, false);
    }

    private boolean isRecent(UUID uuid) {
        return this._clock.millis() - TimeUUIDs.getTimeMillis(uuid) < STALE_UNKNOWN_AGE.getMillis();
    }

    @Override // com.bazaarvoice.emodb.databus.core.OwnerAwareDatabus
    public void renew(String str, String str2, Collection<String> collection, Duration duration) {
        checkLegalSubscriptionName(str2);
        Preconditions.checkNotNull(collection, "eventKeys");
        Preconditions.checkArgument(duration.getMillis() >= 0, "ClaimTtl must be >=0");
        checkSubscriptionOwner(str, str2);
        this._eventStore.renew(str2, EventKeyFormat.decodeAll(collection), duration, true);
        this._renewedMeter.mark(collection.size());
    }

    @Override // com.bazaarvoice.emodb.databus.core.OwnerAwareDatabus
    public void acknowledge(String str, String str2, Collection<String> collection) {
        checkLegalSubscriptionName(str2);
        Preconditions.checkNotNull(collection, "eventKeys");
        checkSubscriptionOwner(str, str2);
        this._eventStore.delete(str2, EventKeyFormat.decodeAll(collection), true);
        this._ackedMeter.mark(collection.size());
    }

    @Override // com.bazaarvoice.emodb.databus.core.OwnerAwareDatabus
    public String replayAsync(String str, String str2) {
        return replayAsyncSince(str, str2, null);
    }

    @Override // com.bazaarvoice.emodb.databus.core.OwnerAwareDatabus
    public String replayAsyncSince(String str, String str2, Date date) {
        checkLegalSubscriptionName(str2);
        checkSubscriptionOwner(str, str2);
        return this._jobService.submitJob(new JobRequest(ReplaySubscriptionJob.INSTANCE, new ReplaySubscriptionRequest(str, str2, date))).toString();
    }

    public void replay(String str, Date date) {
        Preconditions.checkState(date == null || new DateTime(date).plus(DatabusChannelConfiguration.REPLAY_TTL).isAfterNow(), "Since timestamp is outside the replay TTL.");
        String masterReplayChannel = ChannelNames.getMasterReplayChannel();
        OwnedSubscription subscriptionByName = getSubscriptionByName(str);
        this._eventStore.copy(masterReplayChannel, str, byteBuffer -> {
            return this._subscriptionEvaluator.matches(subscriptionByName, byteBuffer);
        }, date);
    }

    @Override // com.bazaarvoice.emodb.databus.core.OwnerAwareDatabus
    public ReplaySubscriptionStatus getReplayStatus(String str, String str2) {
        Preconditions.checkNotNull(str2, "reference");
        try {
            JobIdentifier fromString = JobIdentifier.fromString(str2, ReplaySubscriptionJob.INSTANCE);
            JobStatus jobStatus = this._jobService.getJobStatus(fromString);
            if (jobStatus == null) {
                throw new UnknownReplayException(str2);
            }
            ReplaySubscriptionRequest replaySubscriptionRequest = (ReplaySubscriptionRequest) jobStatus.getRequest();
            if (replaySubscriptionRequest == null) {
                throw new IllegalStateException("Replay request details not found: " + fromString);
            }
            checkSubscriptionOwner(str, replaySubscriptionRequest.getSubscription());
            switch (jobStatus.getStatus()) {
                case FINISHED:
                    return new ReplaySubscriptionStatus(replaySubscriptionRequest.getSubscription(), ReplaySubscriptionStatus.Status.COMPLETE);
                case FAILED:
                    return new ReplaySubscriptionStatus(replaySubscriptionRequest.getSubscription(), ReplaySubscriptionStatus.Status.ERROR);
                default:
                    return new ReplaySubscriptionStatus(replaySubscriptionRequest.getSubscription(), ReplaySubscriptionStatus.Status.IN_PROGRESS);
            }
        } catch (IllegalArgumentException e) {
            throw new UnknownReplayException(str2);
        }
    }

    @Override // com.bazaarvoice.emodb.databus.core.OwnerAwareDatabus
    public String moveAsync(String str, String str2, String str3) {
        checkLegalSubscriptionName(str2);
        checkLegalSubscriptionName(str3);
        checkSubscriptionOwner(str, str2);
        checkSubscriptionOwner(str, str3);
        return this._jobService.submitJob(new JobRequest(MoveSubscriptionJob.INSTANCE, new MoveSubscriptionRequest(str, str2, str3))).toString();
    }

    @Override // com.bazaarvoice.emodb.databus.core.OwnerAwareDatabus
    public MoveSubscriptionStatus getMoveStatus(String str, String str2) {
        Preconditions.checkNotNull(str2, "reference");
        try {
            JobIdentifier fromString = JobIdentifier.fromString(str2, MoveSubscriptionJob.INSTANCE);
            JobStatus jobStatus = this._jobService.getJobStatus(fromString);
            if (jobStatus == null) {
                throw new UnknownMoveException(str2);
            }
            MoveSubscriptionRequest moveSubscriptionRequest = (MoveSubscriptionRequest) jobStatus.getRequest();
            if (moveSubscriptionRequest == null) {
                throw new IllegalStateException("Move request details not found: " + fromString);
            }
            checkSubscriptionOwner(str, moveSubscriptionRequest.getFrom());
            switch (jobStatus.getStatus()) {
                case FINISHED:
                    return new MoveSubscriptionStatus(moveSubscriptionRequest.getFrom(), moveSubscriptionRequest.getTo(), MoveSubscriptionStatus.Status.COMPLETE);
                case FAILED:
                    return new MoveSubscriptionStatus(moveSubscriptionRequest.getFrom(), moveSubscriptionRequest.getTo(), MoveSubscriptionStatus.Status.ERROR);
                default:
                    return new MoveSubscriptionStatus(moveSubscriptionRequest.getFrom(), moveSubscriptionRequest.getTo(), MoveSubscriptionStatus.Status.IN_PROGRESS);
            }
        } catch (IllegalArgumentException e) {
            throw new UnknownMoveException(str2);
        }
    }

    @Override // com.bazaarvoice.emodb.databus.core.OwnerAwareDatabus
    public void injectEvent(String str, String str2, String str3, String str4) {
        checkSubscriptionOwner(str, str2);
        this._eventStore.add(str2, UpdateRefSerializer.toByteBuffer(new UpdateRef(str3, str4, TimeUUIDs.minimumUuid(), ImmutableSet.of())));
    }

    @Override // com.bazaarvoice.emodb.databus.core.OwnerAwareDatabus
    public void unclaimAll(String str, String str2) {
        checkLegalSubscriptionName(str2);
        checkSubscriptionOwner(str, str2);
        this._eventStore.unclaimAll(str2);
    }

    @Override // com.bazaarvoice.emodb.databus.core.OwnerAwareDatabus
    public void purge(String str, String str2) {
        checkLegalSubscriptionName(str2);
        checkSubscriptionOwner(str, str2);
        this._eventStore.purge(str2);
    }

    private void checkLegalSubscriptionName(String str) {
        Preconditions.checkArgument(Names.isLegalSubscriptionName(str), "Subscription name must be a lowercase ASCII string between 1 and 255 characters in length. Allowed punctuation characters are -.:@_ and the subscription name may not start with a single underscore character. An example of a valid subscription name would be 'polloi:review'.");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkSubscriptionOwner(String str, String str2) {
        checkSubscriptionOwner(str, this._subscriptionDao.getSubscription(str2));
    }

    private void checkSubscriptionOwner(String str, OwnedSubscription ownedSubscription) {
        Preconditions.checkNotNull(str, "ownerId");
        if (ownedSubscription != null) {
            if (ownedSubscription.getOwnerId() == null) {
                this._unownedSubscriptionMeter.mark();
            } else if (!this._databusAuthorizer.owner(str).canAccessSubscription(ownedSubscription)) {
                throw new UnauthorizedSubscriptionException("Not subscriber", ownedSubscription.getName());
            }
        }
    }

    @VisibleForTesting
    protected void drainQueueAsync(String str) {
        try {
            if (this._drainedSubscriptionsMap.putIfAbsent(str, 0L) == null) {
                _log.info("Starting the draining process for subscription: {}.", str);
                this._drainQueueAsyncMeter.mark();
                submitDrainServiceTask(str, 100);
            } else {
                _log.info("Draining for subscription: {} was already started from a previous poll.", str);
            }
        } catch (Exception e) {
            _log.error("Encountered exception while draining the queue for subscription: {}.", str, e);
        }
    }

    private void submitDrainServiceTask(final String str, final int i) {
        this._drainQueueTaskMeter.mark();
        this._drainService.submit(new Runnable() { // from class: com.bazaarvoice.emodb.databus.core.DefaultDatabus.5
            @Override // java.lang.Runnable
            public void run() {
                DefaultDatabus.this.doDrainQueue(str, i);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doDrainQueue(String str, int i) {
        boolean z = false;
        Stopwatch createStarted = Stopwatch.createStarted(this._ticker);
        ConsolidatingEventSink consolidatingEventSink = new ConsolidatingEventSink(i);
        boolean peek = this._eventStore.peek(str, consolidatingEventSink);
        Map<Coordinate, EventList> events = consolidatingEventSink.getEvents();
        if (events.isEmpty()) {
            this._drainedSubscriptionsMap.remove(str);
            return;
        }
        ArrayList newArrayList = Lists.newArrayList();
        DataProvider.AnnotatedGet prepareGetAnnotated = this._dataProvider.prepareGetAnnotated(ReadConsistency.STRONG);
        for (Map.Entry<Coordinate, EventList> entry : events.entrySet()) {
            Coordinate key = entry.getKey();
            try {
                prepareGetAnnotated.add(key.getTable(), key.getId());
            } catch (UnknownTableException e) {
                Iterator<Pair<String, UUID>> it2 = entry.getValue().getEventAndChangeIds().iterator();
                while (it2.hasNext()) {
                    newArrayList.add(it2.next().first());
                }
            }
        }
        Iterator<DataProvider.AnnotatedContent> execute = prepareGetAnnotated.execute();
        while (execute.hasNext()) {
            DataProvider.AnnotatedContent next = execute.next();
            for (Pair<String, UUID> pair : events.get(Coordinate.fromJson(next.getContent())).getEventAndChangeIds()) {
                String first = pair.first();
                if (next.isChangeDeltaRedundant(pair.second())) {
                    newArrayList.add(first);
                } else {
                    z = true;
                }
            }
        }
        if (!newArrayList.isEmpty()) {
            this._drainQueueRedundantMeter.mark(newArrayList.size());
            this._eventStore.delete(str, newArrayList, true);
        }
        long longValue = this._drainedSubscriptionsMap.get(str).longValue() + createStarted.elapsed(TimeUnit.MILLISECONDS);
        if (z || !peek || longValue >= MAX_QUEUE_DRAIN_TIME_FOR_A_SUBSCRIPTION.getMillis()) {
            this._drainedSubscriptionsMap.remove(str);
        } else {
            this._drainedSubscriptionsMap.replace(str, Long.valueOf(longValue));
            submitDrainServiceTask(str, i);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static List<List<String>> sortedTagUnion(@Nullable List<List<String>> list, Set<String> set) {
        return sortedTagUnion(list, asSortedList(set));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static List<List<String>> sortedTagUnion(@Nullable List<List<String>> list, List<String> list2) {
        if (list == null) {
            if (list2.isEmpty()) {
                return EMPTY_TAGS;
            }
            ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(3);
            newArrayListWithCapacity.add(list2);
            return newArrayListWithCapacity;
        }
        int binarySearch = Collections.binarySearch(list, list2, TAG_LIST_COMPARATOR);
        if (binarySearch >= 0) {
            return list;
        }
        if (list == EMPTY_TAGS) {
            list = Lists.newArrayListWithCapacity(3);
            list.addAll(EMPTY_TAGS);
        }
        list.add((-binarySearch) - 1, list2);
        return list;
    }

    private static List<String> asSortedList(Set<String> set) {
        switch (set.size()) {
            case 0:
                return ImmutableList.of();
            case 1:
                return ImmutableList.of(set.iterator().next());
            default:
                return Ordering.natural().immutableSortedCopy(set);
        }
    }
}
