package org.apache.spark.network.client;

import com.facebook.presto.spark.$internal.io.netty.bootstrap.Bootstrap;
import com.facebook.presto.spark.$internal.io.netty.buffer.PooledByteBufAllocator;
import com.facebook.presto.spark.$internal.io.netty.channel.Channel;
import com.facebook.presto.spark.$internal.io.netty.channel.ChannelFuture;
import com.facebook.presto.spark.$internal.io.netty.channel.ChannelInitializer;
import com.facebook.presto.spark.$internal.io.netty.channel.ChannelOption;
import com.facebook.presto.spark.$internal.io.netty.channel.EventLoopGroup;
import com.facebook.presto.spark.$internal.io.netty.channel.socket.SocketChannel;
import com.facebook.presto.spark.$internal.org.slf4j.Logger;
import com.facebook.presto.spark.$internal.org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.server.TransportChannelHandler;
import org.apache.spark.network.util.IOMode;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.NettyUtils;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.spark_project.guava.base.Preconditions;
import org.spark_project.guava.base.Throwables;
import org.spark_project.guava.collect.Lists;

/* loaded from: input_file:org/apache/spark/network/client/TransportClientFactory.class */
public class TransportClientFactory implements Closeable {
    private final TransportContext context;
    private final TransportConf conf;
    private final List<TransportClientBootstrap> clientBootstraps;
    private final int numConnectionsPerPeer;
    private final Class<? extends Channel> socketChannelClass;
    private EventLoopGroup workerGroup;
    private PooledByteBufAllocator pooledAllocator;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Logger logger = LoggerFactory.getLogger((Class<?>) TransportClientFactory.class);
    private final ConcurrentHashMap<SocketAddress, ClientPool> connectionPool = new ConcurrentHashMap<>();
    private final Random rand = new Random();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/spark/network/client/TransportClientFactory$ClientPool.class */
    public static class ClientPool {
        TransportClient[] clients;
        Object[] locks;

        ClientPool(int i) {
            this.clients = new TransportClient[i];
            this.locks = new Object[i];
            for (int i2 = 0; i2 < i; i2++) {
                this.locks[i2] = new Object();
            }
        }
    }

    public TransportClientFactory(TransportContext transportContext, List<TransportClientBootstrap> list) {
        this.context = (TransportContext) Preconditions.checkNotNull(transportContext);
        this.conf = transportContext.getConf();
        this.clientBootstraps = Lists.newArrayList((Iterable) Preconditions.checkNotNull(list));
        this.numConnectionsPerPeer = this.conf.numConnectionsPerPeer();
        IOMode valueOf = IOMode.valueOf(this.conf.ioMode());
        this.socketChannelClass = NettyUtils.getClientChannelClass(valueOf);
        this.workerGroup = NettyUtils.createEventLoop(valueOf, this.conf.clientThreads(), "shuffle-client");
        this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(this.conf.preferDirectBufs(), false, this.conf.clientThreads());
    }

    public TransportClient createClient(String str, int i) throws IOException {
        InetSocketAddress createUnresolved = InetSocketAddress.createUnresolved(str, i);
        ClientPool clientPool = this.connectionPool.get(createUnresolved);
        if (clientPool == null) {
            this.connectionPool.putIfAbsent(createUnresolved, new ClientPool(this.numConnectionsPerPeer));
            clientPool = this.connectionPool.get(createUnresolved);
        }
        int nextInt = this.rand.nextInt(this.numConnectionsPerPeer);
        TransportClient transportClient = clientPool.clients[nextInt];
        if (transportClient != null && transportClient.isActive()) {
            TransportChannelHandler transportChannelHandler = (TransportChannelHandler) transportClient.getChannel().pipeline().get(TransportChannelHandler.class);
            synchronized (transportChannelHandler) {
                transportChannelHandler.getResponseHandler().updateTimeOfLastRequest();
            }
            if (transportClient.isActive()) {
                this.logger.trace("Returning cached connection to {}: {}", transportClient.getSocketAddress(), transportClient);
                return transportClient;
            }
        }
        long nanoTime = System.nanoTime();
        InetSocketAddress inetSocketAddress = new InetSocketAddress(str, i);
        long nanoTime2 = (System.nanoTime() - nanoTime) / CalendarInterval.MICROS_PER_SECOND;
        if (nanoTime2 > 2000) {
            this.logger.warn("DNS resolution for {} took {} ms", inetSocketAddress, Long.valueOf(nanoTime2));
        } else {
            this.logger.trace("DNS resolution for {} took {} ms", inetSocketAddress, Long.valueOf(nanoTime2));
        }
        synchronized (clientPool.locks[nextInt]) {
            TransportClient transportClient2 = clientPool.clients[nextInt];
            if (transportClient2 != null) {
                if (transportClient2.isActive()) {
                    this.logger.trace("Returning cached connection to {}: {}", inetSocketAddress, transportClient2);
                    return transportClient2;
                }
                this.logger.info("Found inactive connection to {}, creating a new one.", inetSocketAddress);
            }
            clientPool.clients[nextInt] = createClient(inetSocketAddress);
            return clientPool.clients[nextInt];
        }
    }

    public TransportClient createUnmanagedClient(String str, int i) throws IOException {
        return createClient(new InetSocketAddress(str, i));
    }

    private TransportClient createClient(InetSocketAddress inetSocketAddress) throws IOException {
        this.logger.debug("Creating new connection to {}", inetSocketAddress);
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(this.workerGroup).channel(this.socketChannelClass).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(this.conf.connectionTimeoutMs())).option(ChannelOption.ALLOCATOR, this.pooledAllocator);
        final AtomicReference atomicReference = new AtomicReference();
        final AtomicReference atomicReference2 = new AtomicReference();
        bootstrap.handler(new ChannelInitializer<SocketChannel>() { // from class: org.apache.spark.network.client.TransportClientFactory.1
            @Override // com.facebook.presto.spark.$internal.io.netty.channel.ChannelInitializer
            public void initChannel(SocketChannel socketChannel) {
                atomicReference.set(TransportClientFactory.this.context.initializePipeline(socketChannel).getClient());
                atomicReference2.set(socketChannel);
            }
        });
        long nanoTime = System.nanoTime();
        ChannelFuture connect = bootstrap.connect(inetSocketAddress);
        if (!connect.awaitUninterruptibly(this.conf.connectionTimeoutMs())) {
            throw new IOException(String.format("Connecting to %s timed out (%s ms)", inetSocketAddress, Integer.valueOf(this.conf.connectionTimeoutMs())));
        }
        if (connect.cause() != null) {
            throw new IOException(String.format("Failed to connect to %s", inetSocketAddress), connect.cause());
        }
        TransportClient transportClient = (TransportClient) atomicReference.get();
        Channel channel = (Channel) atomicReference2.get();
        if (!$assertionsDisabled && transportClient == null) {
            throw new AssertionError("Channel future completed successfully with null client");
        }
        long nanoTime2 = System.nanoTime();
        this.logger.debug("Connection to {} successful, running bootstraps...", inetSocketAddress);
        try {
            Iterator<TransportClientBootstrap> it = this.clientBootstraps.iterator();
            while (it.hasNext()) {
                it.next().doBootstrap(transportClient, channel);
            }
            long nanoTime3 = System.nanoTime();
            this.logger.info("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)", inetSocketAddress, Long.valueOf((nanoTime3 - nanoTime) / CalendarInterval.MICROS_PER_SECOND), Long.valueOf((nanoTime3 - nanoTime2) / CalendarInterval.MICROS_PER_SECOND));
            return transportClient;
        } catch (Exception e) {
            this.logger.error("Exception while bootstrapping client after " + ((System.nanoTime() - nanoTime2) / CalendarInterval.MICROS_PER_SECOND) + " ms", (Throwable) e);
            transportClient.close();
            throw Throwables.propagate(e);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        for (ClientPool clientPool : this.connectionPool.values()) {
            for (int i = 0; i < clientPool.clients.length; i++) {
                TransportClient transportClient = clientPool.clients[i];
                if (transportClient != null) {
                    clientPool.clients[i] = null;
                    JavaUtils.closeQuietly(transportClient);
                }
            }
        }
        this.connectionPool.clear();
        if (this.workerGroup != null) {
            this.workerGroup.shutdownGracefully();
            this.workerGroup = null;
        }
    }

    static {
        $assertionsDisabled = !TransportClientFactory.class.desiredAssertionStatus();
    }
}
