package com.bazaarvoice.emodb.databus;

import com.bazaarvoice.emodb.cachemgr.api.CacheRegistry;
import com.bazaarvoice.emodb.common.cassandra.CassandraFactory;
import com.bazaarvoice.emodb.common.cassandra.CassandraKeyspace;
import com.bazaarvoice.emodb.common.dropwizard.lifecycle.LifeCycleRegistry;
import com.bazaarvoice.emodb.common.dropwizard.service.EmoServiceMode;
import com.bazaarvoice.emodb.common.zookeeper.store.ValueStore;
import com.bazaarvoice.emodb.common.zookeeper.store.ZkBooleanSerializer;
import com.bazaarvoice.emodb.common.zookeeper.store.ZkValueStore;
import com.bazaarvoice.emodb.databus.core.CanaryManager;
import com.bazaarvoice.emodb.databus.core.DatabusChannelConfiguration;
import com.bazaarvoice.emodb.databus.core.DatabusEventStore;
import com.bazaarvoice.emodb.databus.core.DatabusFactory;
import com.bazaarvoice.emodb.databus.core.DedupMigrationTask;
import com.bazaarvoice.emodb.databus.core.DefaultDatabus;
import com.bazaarvoice.emodb.databus.core.DefaultFanoutManager;
import com.bazaarvoice.emodb.databus.core.DefaultRateLimitedLogFactory;
import com.bazaarvoice.emodb.databus.core.FanoutManager;
import com.bazaarvoice.emodb.databus.core.MasterFanout;
import com.bazaarvoice.emodb.databus.core.OwnerAwareDatabus;
import com.bazaarvoice.emodb.databus.core.RateLimitedLogFactory;
import com.bazaarvoice.emodb.databus.core.SubscriptionEvaluator;
import com.bazaarvoice.emodb.databus.core.SystemQueueMonitorManager;
import com.bazaarvoice.emodb.databus.db.SubscriptionDAO;
import com.bazaarvoice.emodb.databus.db.cql.CqlSubscriptionDAO;
import com.bazaarvoice.emodb.databus.db.generic.CachingSubscriptionDAO;
import com.bazaarvoice.emodb.databus.db.generic.CachingSubscriptionDAODelegate;
import com.bazaarvoice.emodb.databus.db.generic.CachingSubscriptionDAOExecutorService;
import com.bazaarvoice.emodb.databus.db.generic.CachingSubscriptionDAORegistry;
import com.bazaarvoice.emodb.databus.repl.DefaultReplicationManager;
import com.bazaarvoice.emodb.databus.repl.DefaultReplicationSource;
import com.bazaarvoice.emodb.databus.repl.ReplicationEnabledTask;
import com.bazaarvoice.emodb.databus.repl.ReplicationSource;
import com.bazaarvoice.emodb.event.DedupEnabled;
import com.bazaarvoice.emodb.event.EventStoreHostDiscovery;
import com.bazaarvoice.emodb.event.EventStoreModule;
import com.bazaarvoice.emodb.event.EventStoreZooKeeper;
import com.bazaarvoice.emodb.event.api.ChannelConfiguration;
import com.bazaarvoice.emodb.event.api.DedupEventStoreChannels;
import com.bazaarvoice.ostrich.HostDiscovery;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Key;
import com.google.inject.PrivateModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.TypeLiteral;
import io.dropwizard.lifecycle.ExecutorServiceManager;
import io.dropwizard.util.Duration;
import java.lang.annotation.Annotation;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.framework.CuratorFramework;

/* loaded from: input_file:com/bazaarvoice/emodb/databus/DatabusModule.class */
public class DatabusModule extends PrivateModule {
    private static final int MAX_THREADS_FOR_QUEUE_DRAINING = 10;
    private final EmoServiceMode _serviceMode;
    private MetricRegistry _metricRegistry;

    public DatabusModule(EmoServiceMode emoServiceMode, MetricRegistry metricRegistry) {
        this._serviceMode = emoServiceMode;
        this._metricRegistry = metricRegistry;
    }

    @Override // com.google.inject.PrivateModule
    protected void configure() {
        bind(SubscriptionDAO.class).to(CachingSubscriptionDAO.class).asEagerSingleton();
        bind(SubscriptionDAO.class).annotatedWith(CachingSubscriptionDAODelegate.class).to(CqlSubscriptionDAO.class).asEagerSingleton();
        bind(CqlSubscriptionDAO.class).asEagerSingleton();
        bind(CassandraFactory.class).asEagerSingleton();
        bind(ChannelConfiguration.class).to(DatabusChannelConfiguration.class).asEagerSingleton();
        bind(CuratorFramework.class).annotatedWith(EventStoreZooKeeper.class).to(Key.get(CuratorFramework.class, (Class<? extends Annotation>) DatabusZooKeeper.class));
        bind(HostDiscovery.class).annotatedWith(EventStoreHostDiscovery.class).to(Key.get(HostDiscovery.class, (Class<? extends Annotation>) DatabusHostDiscovery.class));
        bind(DedupEventStoreChannels.class).toInstance(ChannelNames.dedupChannels());
        bind(new TypeLiteral<Supplier<Boolean>>() { // from class: com.bazaarvoice.emodb.databus.DatabusModule.1
        }).annotatedWith(DedupEnabled.class).to(Key.get(new TypeLiteral<ValueStore<Boolean>>() { // from class: com.bazaarvoice.emodb.databus.DatabusModule.2
        }, (Class<? extends Annotation>) DedupEnabled.class)).asEagerSingleton();
        install(new EventStoreModule("bv.emodb.databus", this._metricRegistry));
        if (this._serviceMode.specifies(EmoServiceMode.Aspect.dataBus_fan_out_and_replication)) {
            bind(FanoutManager.class).to(DefaultFanoutManager.class).asEagerSingleton();
            bind(CanaryManager.class).asEagerSingleton();
            bind(MasterFanout.class).asEagerSingleton();
            bind(DefaultReplicationManager.class).asEagerSingleton();
            bind(ReplicationEnabledTask.class).asEagerSingleton();
            bind(SystemQueueMonitorManager.class).asEagerSingleton();
        }
        bind(RateLimitedLogFactory.class).to(DefaultRateLimitedLogFactory.class).asEagerSingleton();
        bind(SubscriptionEvaluator.class).asEagerSingleton();
        bind(DedupMigrationTask.class).asEagerSingleton();
        bind(DatabusEventStore.class).asEagerSingleton();
        expose(DatabusEventStore.class);
        bind(OwnerAwareDatabus.class).to(DefaultDatabus.class).asEagerSingleton();
        bind(DatabusFactory.class).asEagerSingleton();
        expose(DatabusFactory.class);
        bind(ReplicationSource.class).to(DefaultReplicationSource.class).asEagerSingleton();
        expose(ReplicationSource.class);
    }

    @Singleton
    @Provides
    CassandraKeyspace provideKeyspace(DatabusConfiguration databusConfiguration, CassandraFactory cassandraFactory) {
        Map<String, CassandraKeyspace> build = cassandraFactory.build(databusConfiguration.getCassandraConfiguration());
        Preconditions.checkArgument(build.size() == 1, "Only one keyspace expected for databus, found %s", build.keySet());
        return build.values().iterator().next();
    }

    @Singleton
    @Provides
    @CachingSubscriptionDAORegistry
    CacheRegistry provideCacheRegistry(CacheRegistry cacheRegistry) {
        return cacheRegistry.withNamespace("bus");
    }

    @Singleton
    @Provides
    @DedupEnabled
    ValueStore<Boolean> provideDedupEnabled(@DatabusZooKeeper CuratorFramework curatorFramework, LifeCycleRegistry lifeCycleRegistry) {
        return (ValueStore) lifeCycleRegistry.manage((LifeCycleRegistry) new ZkValueStore(curatorFramework, "/settings/dedup-enabled", new ZkBooleanSerializer(), true));
    }

    @Singleton
    @ReplicationEnabled
    @Provides
    ValueStore<Boolean> provideReplicationEnabled(@DatabusZooKeeper CuratorFramework curatorFramework, LifeCycleRegistry lifeCycleRegistry) {
        return (ValueStore) lifeCycleRegistry.manage((LifeCycleRegistry) new ZkValueStore(curatorFramework, "/settings/replication-enabled", new ZkBooleanSerializer(), true));
    }

    @CachingSubscriptionDAOExecutorService
    @Singleton
    @Provides
    ListeningExecutorService provideCachingSubscriptionDAOExecutorService(LifeCycleRegistry lifeCycleRegistry) {
        ListeningExecutorService listeningDecorator = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat("subscription-cache-%d").build()));
        lifeCycleRegistry.manage((LifeCycleRegistry) new ExecutorServiceManager(listeningDecorator, Duration.seconds(1L), "subscription-cache"));
        return listeningDecorator;
    }

    @Singleton
    @Provides
    CachingSubscriptionDAO.CachingMode provideCachingSubscriptionDAOCachingMode(DatabusConfiguration databusConfiguration) {
        return databusConfiguration.getSubscriptionCacheInvalidation();
    }

    @Singleton
    @Provides
    @QueueDrainExecutorService
    ExecutorService provideQueueDrainService(LifeCycleRegistry lifeCycleRegistry) {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10, new ThreadFactoryBuilder().setNameFormat("drainQueue-%d").build());
        lifeCycleRegistry.manage((LifeCycleRegistry) new ExecutorServiceManager(newFixedThreadPool, Duration.seconds(1L), "drainQueue-cache"));
        return newFixedThreadPool;
    }
}
