package com.bazaarvoice.emodb.databus.core;

import com.bazaarvoice.emodb.common.dropwizard.lifecycle.ServiceFailureListener;
import com.bazaarvoice.emodb.common.dropwizard.metrics.MetricsGroup;
import com.bazaarvoice.emodb.databus.ChannelNames;
import com.bazaarvoice.emodb.databus.api.Databus;
import com.bazaarvoice.emodb.databus.api.Event;
import com.bazaarvoice.emodb.databus.api.PollResult;
import com.bazaarvoice.emodb.sor.condition.Condition;
import com.bazaarvoice.emodb.table.db.ClusterInfo;
import com.bazaarvoice.emodb.web.auth.Permissions;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractScheduledService;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/bazaarvoice/emodb/databus/core/Canary.class */
public class Canary extends AbstractScheduledService {
    private static final Logger _log = LoggerFactory.getLogger(Canary.class);
    private static final Duration POLL_INTERVAL = Duration.standardSeconds(1);
    private static final Duration CLAIM_TTL = Duration.standardSeconds(30);
    private static final int EVENTS_LIMIT = 50;
    private final RateLimitedLog _rateLimitedLog;
    private final Databus _databus;
    private final MetricsGroup _timers;
    private final String _timerName;
    private final String _subscriptionName;
    private final Condition _subscriptionCondition;
    private final ScheduledExecutorService _executor;

    public Canary(ClusterInfo clusterInfo, Condition condition, Databus databus, RateLimitedLogFactory rateLimitedLogFactory, MetricRegistry metricRegistry) {
        this(clusterInfo, condition, databus, rateLimitedLogFactory, metricRegistry, null);
    }

    @VisibleForTesting
    Canary(ClusterInfo clusterInfo, Condition condition, Databus databus, RateLimitedLogFactory rateLimitedLogFactory, MetricRegistry metricRegistry, @Nullable ScheduledExecutorService scheduledExecutorService) {
        this._databus = (Databus) Preconditions.checkNotNull(databus, Permissions.DATABUS);
        this._timers = new MetricsGroup(metricRegistry);
        Preconditions.checkNotNull(clusterInfo, "cluster");
        this._subscriptionName = ChannelNames.getMasterCanarySubscription(clusterInfo.getCluster());
        this._subscriptionCondition = (Condition) Preconditions.checkNotNull(condition, "subscriptionCondition");
        this._timerName = newTimerName("readEventsByCanaryPoll-" + clusterInfo.getClusterMetric());
        this._rateLimitedLog = rateLimitedLogFactory.from(_log);
        this._executor = scheduledExecutorService;
        createCanarySubscription();
        ServiceFailureListener.listenTo(this, metricRegistry);
    }

    private String newTimerName(String str) {
        return MetricRegistry.name("bv.emodb.databus", "DatabusCanary", str, "readEvents");
    }

    private void createCanarySubscription() {
        this._databus.subscribe(this._subscriptionName, this._subscriptionCondition, Duration.standardDays(3650L), DatabusChannelConfiguration.CANARY_TTL, false);
    }

    @Override // com.google.common.util.concurrent.AbstractScheduledService
    protected AbstractScheduledService.Scheduler scheduler() {
        return AbstractScheduledService.Scheduler.newFixedDelaySchedule(0L, POLL_INTERVAL.getMillis(), TimeUnit.MILLISECONDS);
    }

    @Override // com.google.common.util.concurrent.AbstractScheduledService
    protected void shutDown() throws Exception {
        this._timers.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.google.common.util.concurrent.AbstractScheduledService
    public ScheduledExecutorService executor() {
        return this._executor != null ? this._executor : super.executor();
    }

    @Override // com.google.common.util.concurrent.AbstractScheduledService
    protected void runOneIteration() {
        while (isRunning() && pollAndAckEvents()) {
            try {
            } catch (Throwable th) {
                this._rateLimitedLog.error(th, "Unexpected canary exception: {}", th);
                stop();
                return;
            }
        }
    }

    private boolean pollAndAckEvents() {
        long nanoTime = System.nanoTime();
        ArrayList newArrayList = Lists.newArrayList();
        PollResult poll = this._databus.poll(this._subscriptionName, CLAIM_TTL, 50);
        Iterator<Event> eventIterator = poll.getEventIterator();
        while (eventIterator.hasNext()) {
            newArrayList.add(eventIterator.next().getEventKey());
        }
        trackAverageEventDuration(System.nanoTime() - nanoTime, newArrayList.size());
        if (!isRunning()) {
            return false;
        }
        if (!newArrayList.isEmpty()) {
            this._databus.acknowledge(this._subscriptionName, newArrayList);
        }
        return poll.hasMoreEvents();
    }

    private void trackAverageEventDuration(long j, int i) {
        if (i == 0) {
            return;
        }
        long j2 = ((j + i) - 1) / i;
        this._timers.beginUpdates();
        Timer timer = this._timers.timer(this._timerName, TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS);
        for (int i2 = 0; i2 < i; i2++) {
            timer.update(j2, TimeUnit.NANOSECONDS);
        }
        this._timers.endUpdates();
    }
}
