package com.bazaarvoice.ostrich.pool;

import com.bazaarvoice.ostrich.PartitionContext;
import com.bazaarvoice.ostrich.RetryPolicy;
import com.bazaarvoice.ostrich.ServiceCallback;
import com.bazaarvoice.ostrich.ServiceEndPoint;
import com.bazaarvoice.ostrich.ServiceEndPointPredicate;
import com.bazaarvoice.ostrich.exceptions.MaxRetriesException;
import com.bazaarvoice.ostrich.exceptions.NoAvailableHostsException;
import com.bazaarvoice.ostrich.metrics.Metrics;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Ticker;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/bazaarvoice/ostrich/pool/AsyncServicePool.class */
class AsyncServicePool<S> implements com.bazaarvoice.ostrich.AsyncServicePool<S> {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncServicePool.class);
    private static final ServiceEndPointPredicate ALL_END_POINTS = new ServiceEndPointPredicate() { // from class: com.bazaarvoice.ostrich.pool.AsyncServicePool.1
        @Override // com.bazaarvoice.ostrich.ServiceEndPointPredicate
        public boolean apply(ServiceEndPoint serviceEndPoint) {
            return true;
        }
    };
    private final Ticker _ticker;
    private final ServicePool<S> _pool;
    private final boolean _shutdownPoolOnClose;
    private final ExecutorService _executor;
    private final boolean _shutdownExecutorOnClose;
    private final Metrics.InstanceMetrics _metrics;
    private final Timer _executionTime;
    private final Meter _numExecuteSuccesses;
    private final Meter _numExecuteFailures;
    private final Histogram _executeBatchSize;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AsyncServicePool(Ticker ticker, ServicePool<S> servicePool, boolean z, ExecutorService executorService, boolean z2, MetricRegistry metricRegistry) {
        this._ticker = (Ticker) Preconditions.checkNotNull(ticker);
        this._pool = (ServicePool) Preconditions.checkNotNull(servicePool);
        this._shutdownPoolOnClose = z;
        this._executor = (ExecutorService) Preconditions.checkNotNull(executorService);
        this._shutdownExecutorOnClose = z2;
        this._metrics = Metrics.forInstance(metricRegistry, this, this._pool.getServiceName());
        this._executionTime = this._metrics.timer("execution-time");
        this._numExecuteSuccesses = this._metrics.meter("num-execute-successes");
        this._numExecuteFailures = this._metrics.meter("num-execute-failures");
        this._executeBatchSize = this._metrics.histogram("execute-batch-size");
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this._shutdownExecutorOnClose) {
            this._executor.shutdown();
        }
        if (this._shutdownPoolOnClose) {
            this._pool.close();
        }
        this._metrics.close();
    }

    @Override // com.bazaarvoice.ostrich.AsyncServicePool
    public <R> Future<R> execute(final RetryPolicy retryPolicy, final ServiceCallback<S, R> serviceCallback) {
        return this._executor.submit(new Callable<R>() { // from class: com.bazaarvoice.ostrich.pool.AsyncServicePool.2
            @Override // java.util.concurrent.Callable
            public R call() throws Exception {
                return (R) AsyncServicePool.this._pool.execute(retryPolicy, serviceCallback);
            }
        });
    }

    @Override // com.bazaarvoice.ostrich.AsyncServicePool
    public <R> Future<R> execute(final PartitionContext partitionContext, final RetryPolicy retryPolicy, final ServiceCallback<S, R> serviceCallback) {
        return this._executor.submit(new Callable<R>() { // from class: com.bazaarvoice.ostrich.pool.AsyncServicePool.3
            @Override // java.util.concurrent.Callable
            public R call() throws Exception {
                return (R) AsyncServicePool.this._pool.execute(partitionContext, retryPolicy, serviceCallback);
            }
        });
    }

    @Override // com.bazaarvoice.ostrich.AsyncServicePool
    public <R> Collection<Future<R>> executeOnAll(RetryPolicy retryPolicy, ServiceCallback<S, R> serviceCallback) {
        return executeOn(ALL_END_POINTS, retryPolicy, serviceCallback);
    }

    @Override // com.bazaarvoice.ostrich.AsyncServicePool
    public <R> Collection<Future<R>> executeOn(ServiceEndPointPredicate serviceEndPointPredicate, final RetryPolicy retryPolicy, final ServiceCallback<S, R> serviceCallback) {
        ArrayList newArrayList = Lists.newArrayList();
        Iterable<ServiceEndPoint> allEndPoints = this._pool.getAllEndPoints();
        if (Iterables.isEmpty(allEndPoints)) {
            throw new NoAvailableHostsException();
        }
        for (final ServiceEndPoint serviceEndPoint : allEndPoints) {
            if (serviceEndPointPredicate.apply(serviceEndPoint)) {
                newArrayList.add(this._executor.submit(new Callable<R>() { // from class: com.bazaarvoice.ostrich.pool.AsyncServicePool.4
                    @Override // java.util.concurrent.Callable
                    public R call() throws Exception {
                        Timer.Context time = AsyncServicePool.this._executionTime.time();
                        Stopwatch createStarted = Stopwatch.createStarted(AsyncServicePool.this._ticker);
                        int i = 0;
                        do {
                            try {
                                try {
                                    R r = (R) AsyncServicePool.this._pool.executeOnEndPoint(serviceEndPoint, serviceCallback);
                                    AsyncServicePool.this._numExecuteSuccesses.mark();
                                    return r;
                                } catch (Exception e) {
                                    AsyncServicePool.this._numExecuteFailures.mark();
                                    if (!AsyncServicePool.this._pool.isRetriableException(e)) {
                                        throw e;
                                    }
                                    AsyncServicePool.LOG.info("Retriable exception from end point id: " + serviceEndPoint.getId(), (Throwable) e);
                                    i++;
                                }
                            } finally {
                                time.stop();
                            }
                        } while (retryPolicy.allowRetry(i, createStarted.elapsed(TimeUnit.MILLISECONDS)));
                        throw new MaxRetriesException(e);
                    }
                }));
            }
        }
        this._executeBatchSize.update(newArrayList.size());
        return newArrayList;
    }

    @Override // com.bazaarvoice.ostrich.AsyncServicePool
    public int getNumValidEndPoints() {
        return this._pool.getNumValidEndPoints();
    }

    @Override // com.bazaarvoice.ostrich.AsyncServicePool
    public int getNumBadEndPoints() {
        return this._pool.getNumBadEndPoints();
    }
}
