package com.bazaarvoice.emodb.databus.repl;

import com.bazaarvoice.emodb.common.dropwizard.discovery.PayloadBuilder;
import com.bazaarvoice.emodb.common.dropwizard.lifecycle.LifeCycleRegistry;
import com.bazaarvoice.emodb.common.dropwizard.lifecycle.ManagedGuavaService;
import com.bazaarvoice.emodb.common.zookeeper.store.GuavaServiceController;
import com.bazaarvoice.emodb.common.zookeeper.store.ValueStore;
import com.bazaarvoice.emodb.databus.ReplicationEnabled;
import com.bazaarvoice.emodb.databus.ReplicationKey;
import com.bazaarvoice.emodb.databus.core.FanoutManager;
import com.bazaarvoice.emodb.datacenter.api.DataCenter;
import com.bazaarvoice.emodb.datacenter.api.DataCenters;
import com.bazaarvoice.ostrich.ServiceEndPointBuilder;
import com.bazaarvoice.ostrich.discovery.FixedHostDiscovery;
import com.bazaarvoice.ostrich.pool.ServiceCachingPolicyBuilder;
import com.bazaarvoice.ostrich.pool.ServicePoolBuilder;
import com.bazaarvoice.ostrich.pool.ServicePoolProxies;
import com.bazaarvoice.ostrich.retry.ExponentialBackoffRetry;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AbstractScheduledService;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.sun.jersey.api.client.Client;
import io.dropwizard.lifecycle.ExecutorServiceManager;
import io.dropwizard.lifecycle.Managed;
import io.dropwizard.util.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.repair.messages.RepairOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/bazaarvoice/emodb/databus/repl/DefaultReplicationManager.class */
public class DefaultReplicationManager extends AbstractScheduledService {
    private static final Logger _log = LoggerFactory.getLogger(DefaultReplicationManager.class);
    private final ScheduledExecutorService _healthCheckExecutor;
    private final FanoutManager _fanoutManager;
    private final DataCenters _dataCenters;
    private final Client _jerseyClient;
    private final ValueStore<Boolean> _replicationEnabled;
    private final String _replicationApiKey;
    private final MetricRegistry _metrics;
    private final Map<String, Managed> _dataCenterFanout = Maps.newHashMap();

    @Inject
    public DefaultReplicationManager(LifeCycleRegistry lifeCycleRegistry, FanoutManager fanoutManager, DataCenters dataCenters, Client client, @ReplicationEnabled ValueStore<Boolean> valueStore, @ReplicationKey String str, MetricRegistry metricRegistry) {
        this._fanoutManager = (FanoutManager) Preconditions.checkNotNull(fanoutManager, "fanoutManager");
        this._dataCenters = (DataCenters) Preconditions.checkNotNull(dataCenters, RepairOption.DATACENTERS_KEY);
        this._jerseyClient = (Client) Preconditions.checkNotNull(client, "jerseyClient");
        this._replicationEnabled = (ValueStore) Preconditions.checkNotNull(valueStore, "replicationEnabled");
        this._replicationApiKey = (String) Preconditions.checkNotNull(str, "replicationApiKey");
        this._metrics = (MetricRegistry) Preconditions.checkNotNull(metricRegistry, "metrics");
        this._healthCheckExecutor = defaultScheduledExecutor(lifeCycleRegistry, "Databus Replication HealthCheck");
        lifeCycleRegistry.manage((LifeCycleRegistry) new ManagedGuavaService(this));
    }

    private static ScheduledExecutorService defaultScheduledExecutor(LifeCycleRegistry lifeCycleRegistry, String str) {
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat(str).setDaemon(true).build());
        lifeCycleRegistry.manage((LifeCycleRegistry) new ExecutorServiceManager(newScheduledThreadPool, Duration.seconds(5L), str));
        return newScheduledThreadPool;
    }

    @Override // com.google.common.util.concurrent.AbstractScheduledService
    protected AbstractScheduledService.Scheduler scheduler() {
        return AbstractScheduledService.Scheduler.newFixedDelaySchedule(5L, 60L, TimeUnit.SECONDS);
    }

    @Override // com.google.common.util.concurrent.AbstractScheduledService
    protected void shutDown() throws Exception {
        stopAll(this._dataCenterFanout);
    }

    @Override // com.google.common.util.concurrent.AbstractScheduledService
    protected void runOneIteration() throws Exception {
        try {
            HashMap newHashMap = Maps.newHashMap(this._dataCenterFanout);
            DataCenter self = this._dataCenters.getSelf();
            for (DataCenter dataCenter : this._dataCenters.getAll()) {
                if (!dataCenter.equals(self)) {
                    if (newHashMap.remove(dataCenter.getName()) == null) {
                        Managed newInboundReplication = newInboundReplication(dataCenter);
                        try {
                            newInboundReplication.start();
                            this._dataCenterFanout.put(dataCenter.getName(), newInboundReplication);
                        } catch (Exception e) {
                            _log.error("Unexpected exception starting replication service: {}", dataCenter.getName());
                        }
                    }
                }
            }
            stopAll(newHashMap);
        } catch (Throwable th) {
            _log.error("Unexpected exception polling data center changes.", th);
        }
    }

    private void stopAll(Map<String, Managed> map) {
        Iterator it2 = Lists.newArrayList(map.entrySet()).iterator();
        while (it2.hasNext()) {
            Map.Entry entry = (Map.Entry) it2.next();
            try {
                ((Managed) entry.getValue()).stop();
            } catch (Exception e) {
                _log.error("Unexpected exception stopping replication service: {}", entry.getKey());
            }
            this._dataCenterFanout.remove(entry.getKey());
        }
    }

    private Managed newInboundReplication(final DataCenter dataCenter) {
        final ReplicationSource newRemoteReplicationSource = newRemoteReplicationSource(dataCenter);
        final GuavaServiceController guavaServiceController = new GuavaServiceController(this._replicationEnabled, new Supplier<Service>() { // from class: com.bazaarvoice.emodb.databus.repl.DefaultReplicationManager.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.google.common.base.Supplier
            public Service get() {
                return DefaultReplicationManager.this._fanoutManager.newInboundReplicationFanout(dataCenter, newRemoteReplicationSource);
            }
        });
        return new Managed() { // from class: com.bazaarvoice.emodb.databus.repl.DefaultReplicationManager.2
            @Override // io.dropwizard.lifecycle.Managed
            public void start() throws Exception {
                guavaServiceController.start();
            }

            @Override // io.dropwizard.lifecycle.Managed
            public void stop() throws Exception {
                guavaServiceController.stop();
                ServicePoolProxies.close(newRemoteReplicationSource);
            }
        };
    }

    private ReplicationSource newRemoteReplicationSource(DataCenter dataCenter) {
        ReplicationClientFactory usingApiKey = new ReplicationClientFactory(this._jerseyClient).usingApiKey(this._replicationApiKey);
        return (ReplicationSource) ServicePoolBuilder.create(ReplicationSource.class).withHostDiscovery(new FixedHostDiscovery(new ServiceEndPointBuilder().withServiceName(usingApiKey.getServiceName()).withId(dataCenter.getName()).withPayload(new PayloadBuilder().withUrl(dataCenter.getServiceUri().resolve(ReplicationClient.SERVICE_PATH)).withAdminUrl(dataCenter.getAdminUri()).toString()).build())).withServiceFactory(usingApiKey).withCachingPolicy(ServiceCachingPolicyBuilder.getMultiThreadedClientPolicy()).withHealthCheckExecutor(this._healthCheckExecutor).withMetricRegistry(this._metrics).buildProxy(new ExponentialBackoffRetry(30, 1L, 10L, TimeUnit.SECONDS));
    }
}
