package com.bazaarvoice.emodb.databus.core;

import com.bazaarvoice.curator.recipes.leader.LeaderService;
import com.bazaarvoice.emodb.common.dropwizard.guice.SelfHostAndPort;
import com.bazaarvoice.emodb.common.dropwizard.leader.LeaderServiceTask;
import com.bazaarvoice.emodb.common.dropwizard.lifecycle.ServiceFailureListener;
import com.bazaarvoice.emodb.databus.ChannelNames;
import com.bazaarvoice.emodb.databus.DatabusZooKeeper;
import com.bazaarvoice.emodb.databus.db.SubscriptionDAO;
import com.bazaarvoice.emodb.databus.repl.ReplicationEventSource;
import com.bazaarvoice.emodb.databus.repl.ReplicationSource;
import com.bazaarvoice.emodb.datacenter.api.DataCenter;
import com.bazaarvoice.emodb.datacenter.api.DataCenters;
import com.bazaarvoice.emodb.event.api.EventStore;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.Multimap;
import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.Service;
import com.google.inject.Inject;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.cassandra.repair.messages.RepairOption;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ZKPaths;
import org.joda.time.Duration;

/* loaded from: input_file:com/bazaarvoice/emodb/databus/core/DefaultFanoutManager.class */
public class DefaultFanoutManager implements FanoutManager {
    private static final Duration SAME_DC_SLEEP_WHEN_IDLE = Duration.millis(100);
    private static final Duration REMOTE_DC_SLEEP_WHEN_IDLE = Duration.standardSeconds(1);
    private final EventStore _eventStore;
    private final SubscriptionDAO _subscriptionDao;
    private final DataCenters _dataCenters;
    private final CuratorFramework _curator;
    private final String _selfId;
    private final LeaderServiceTask _dropwizardTask;
    private final RateLimitedLogFactory _logFactory;
    private final SubscriptionEvaluator _subscriptionEvaluator;
    private final MetricRegistry _metricRegistry;

    @Inject
    public DefaultFanoutManager(EventStore eventStore, SubscriptionDAO subscriptionDAO, SubscriptionEvaluator subscriptionEvaluator, DataCenters dataCenters, @DatabusZooKeeper CuratorFramework curatorFramework, @SelfHostAndPort HostAndPort hostAndPort, LeaderServiceTask leaderServiceTask, RateLimitedLogFactory rateLimitedLogFactory, MetricRegistry metricRegistry) {
        this._eventStore = (EventStore) Preconditions.checkNotNull(eventStore, "eventStore");
        this._subscriptionDao = (SubscriptionDAO) Preconditions.checkNotNull(subscriptionDAO, "subscriptionDao");
        this._subscriptionEvaluator = (SubscriptionEvaluator) Preconditions.checkNotNull(subscriptionEvaluator, "subscriptionEvaluator");
        this._dataCenters = (DataCenters) Preconditions.checkNotNull(dataCenters, RepairOption.DATACENTERS_KEY);
        this._curator = (CuratorFramework) Preconditions.checkNotNull(curatorFramework, "curator");
        this._selfId = ((HostAndPort) Preconditions.checkNotNull(hostAndPort, "self")).toString();
        this._dropwizardTask = (LeaderServiceTask) Preconditions.checkNotNull(leaderServiceTask, "dropwizardTask");
        this._logFactory = (RateLimitedLogFactory) Preconditions.checkNotNull(rateLimitedLogFactory, "logFactory");
        this._metricRegistry = metricRegistry;
    }

    @Override // com.bazaarvoice.emodb.databus.core.FanoutManager
    public Service newMasterFanout() {
        return create("master", new EventStoreEventSource(this._eventStore, ChannelNames.getMasterFanoutChannel()), true, SAME_DC_SLEEP_WHEN_IDLE);
    }

    @Override // com.bazaarvoice.emodb.databus.core.FanoutManager
    public Service newInboundReplicationFanout(DataCenter dataCenter, ReplicationSource replicationSource) {
        return create("in-" + dataCenter.getName(), new ReplicationEventSource(replicationSource, ChannelNames.getReplicationFanoutChannel(this._dataCenters.getSelf())), false, REMOTE_DC_SLEEP_WHEN_IDLE);
    }

    private Service create(final String str, final EventSource eventSource, final boolean z, final Duration duration) {
        final Function<Multimap<String, ByteBuffer>, Void> function = new Function<Multimap<String, ByteBuffer>, Void>() { // from class: com.bazaarvoice.emodb.databus.core.DefaultFanoutManager.1
            @Override // com.google.common.base.Function
            public Void apply(@Nullable Multimap<String, ByteBuffer> multimap) {
                DefaultFanoutManager.this._eventStore.addAll(multimap);
                return null;
            }
        };
        final Supplier supplier = () -> {
            return this._subscriptionDao.getAllSubscriptions();
        };
        LeaderService leaderService = new LeaderService(this._curator, ZKPaths.makePath("/leader/fanout", str), this._selfId, "LeaderSelector-" + str, 1L, TimeUnit.MINUTES, new Supplier<Service>() { // from class: com.bazaarvoice.emodb.databus.core.DefaultFanoutManager.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.google.common.base.Supplier
            public Service get() {
                return new DefaultFanout(str, eventSource, function, z, duration, supplier, DefaultFanoutManager.this._dataCenters.getSelf(), DefaultFanoutManager.this._logFactory, DefaultFanoutManager.this._subscriptionEvaluator, DefaultFanoutManager.this._metricRegistry);
            }
        });
        ServiceFailureListener.listenTo(leaderService, this._metricRegistry);
        this._dropwizardTask.register("databus-fanout", leaderService);
        return leaderService;
    }
}
