package com.bazaarvoice.emodb.databus.core;

import com.bazaarvoice.emodb.common.dropwizard.lifecycle.ServiceFailureListener;
import com.bazaarvoice.emodb.common.dropwizard.metrics.MetricsGroup;
import com.bazaarvoice.emodb.databus.ChannelNames;
import com.bazaarvoice.emodb.datacenter.api.DataCenter;
import com.bazaarvoice.emodb.datacenter.api.DataCenters;
import com.bazaarvoice.emodb.table.db.ClusterInfo;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractScheduledService;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.repair.messages.RepairOption;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/bazaarvoice/emodb/databus/core/SystemQueueMonitor.class */
public class SystemQueueMonitor extends AbstractScheduledService {
    private static final Logger _log = LoggerFactory.getLogger(SystemQueueMonitor.class);
    private static final Duration POLL_INTERVAL = Duration.standardMinutes(1);
    private final DatabusEventStore _eventStore;
    private final DataCenters _dataCenters;
    private final Collection<ClusterInfo> _clusterInfo;
    private final MetricsGroup _gauges;

    public SystemQueueMonitor(DatabusEventStore databusEventStore, DataCenters dataCenters, Collection<ClusterInfo> collection, MetricRegistry metricRegistry) {
        this._eventStore = (DatabusEventStore) Preconditions.checkNotNull(databusEventStore, "eventStore");
        this._dataCenters = (DataCenters) Preconditions.checkNotNull(dataCenters, RepairOption.DATACENTERS_KEY);
        this._clusterInfo = (Collection) Preconditions.checkNotNull(collection, "clusterInfo");
        this._gauges = new MetricsGroup(metricRegistry);
        ServiceFailureListener.listenTo(this, metricRegistry);
    }

    @Override // com.google.common.util.concurrent.AbstractScheduledService
    protected AbstractScheduledService.Scheduler scheduler() {
        return AbstractScheduledService.Scheduler.newFixedDelaySchedule(0L, POLL_INTERVAL.getMillis(), TimeUnit.MILLISECONDS);
    }

    @Override // com.google.common.util.concurrent.AbstractScheduledService
    protected void shutDown() {
        this._gauges.close();
    }

    @Override // com.google.common.util.concurrent.AbstractScheduledService
    protected void runOneIteration() {
        try {
            pollQueueSizes();
        } catch (Throwable th) {
            _log.error("Unexpected exception.", th);
        }
    }

    private void pollQueueSizes() {
        this._gauges.beginUpdates();
        pollQueueSize("master", ChannelNames.getMasterFanoutChannel());
        for (ClusterInfo clusterInfo : this._clusterInfo) {
            pollQueueSize("canary-" + clusterInfo.getClusterMetric(), ChannelNames.getMasterCanarySubscription(clusterInfo.getCluster()));
        }
        DataCenter self = this._dataCenters.getSelf();
        for (DataCenter dataCenter : this._dataCenters.getAll()) {
            if (!dataCenter.equals(self)) {
                pollQueueSize("out-" + dataCenter.getName(), ChannelNames.getReplicationFanoutChannel(dataCenter));
            }
        }
        this._gauges.endUpdates();
    }

    private void pollQueueSize(String str, String str2) {
        try {
            long sizeEstimate = this._eventStore.getSizeEstimate(str2, 500L);
            _log.debug("System queue size {}: {} (channel={})", str, Long.valueOf(sizeEstimate), str2);
            this._gauges.gauge(newMetric(str)).set(Long.valueOf(sizeEstimate));
        } catch (Exception e) {
            _log.error("Unexpected exception polling channel size: {}", str2, e);
        }
    }

    private String newMetric(String str) {
        return MetricRegistry.name("bv.emodb.databus", "SystemQueue", str);
    }
}
