package org.apache.flink.runtime.blob;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.net.ssl.SSLContext;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/blob/BlobServer.class */
public class BlobServer extends Thread implements BlobService {
    private static final Logger LOG = LoggerFactory.getLogger(BlobServer.class);
    private final ServerSocket serverSocket;
    private SSLContext serverSSLContext;
    private final Configuration blobServiceConfiguration;
    private final File storageDir;
    private final BlobStore blobStore;
    private final int maxConnections;
    private final Thread shutdownHook;
    private final AtomicInteger tempFileCounter = new AtomicInteger(0);
    private final AtomicBoolean shutdownRequested = new AtomicBoolean();
    private final Set<BlobServerConnection> activeConnections = new HashSet();
    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    public BlobServer(Configuration configuration, BlobStore blobStore) throws IOException {
        this.serverSSLContext = null;
        this.blobServiceConfiguration = (Configuration) Preconditions.checkNotNull(configuration);
        this.blobStore = (BlobStore) Preconditions.checkNotNull(blobStore);
        this.storageDir = BlobUtils.initStorageDirectory(configuration.getString("blob.storage.directory", (String) null));
        LOG.info("Created BLOB server storage directory {}", this.storageDir);
        int integer = configuration.getInteger("blob.fetch.num-concurrent", 50);
        if (integer >= 1) {
            this.maxConnections = integer;
        } else {
            LOG.warn("Invalid value for maximum connections in BLOB server: {}. Using default value of {}", Integer.valueOf(integer), 50);
            this.maxConnections = 50;
        }
        int integer2 = configuration.getInteger("blob.fetch.backlog", 1000);
        if (integer2 < 1) {
            LOG.warn("Invalid value for BLOB connection backlog: {}. Using default value of {}", Integer.valueOf(integer2), 1000);
            integer2 = 1000;
        }
        this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
        if (configuration.getBoolean("blob.service.ssl.enabled", true)) {
            try {
                this.serverSSLContext = SSLUtils.createSSLServerContext(configuration);
            } catch (Exception e) {
                throw new IOException("Failed to initialize SSLContext for the blob server", e);
            }
        }
        String string = configuration.getString("blob.server.port", "0");
        Iterator portRangeFromString = NetUtils.getPortRangeFromString(string);
        final int i = integer2;
        ServerSocket createSocketFromPorts = NetUtils.createSocketFromPorts(portRangeFromString, new NetUtils.SocketFactory() { // from class: org.apache.flink.runtime.blob.BlobServer.1
            public ServerSocket createSocket(int i2) throws IOException {
                if (BlobServer.this.serverSSLContext == null) {
                    return new ServerSocket(i2, i);
                }
                BlobServer.LOG.info("Enabling ssl for the blob server");
                return BlobServer.this.serverSSLContext.getServerSocketFactory().createServerSocket(i2, i);
            }
        });
        if (createSocketFromPorts == null) {
            throw new IOException("Unable to allocate socket for blob server in specified port range: " + string);
        }
        SSLUtils.setSSLVerAndCipherSuites(createSocketFromPorts, configuration);
        this.serverSocket = createSocketFromPorts;
        setName("BLOB Server listener at " + getPort());
        setDaemon(true);
        start();
        if (LOG.isInfoEnabled()) {
            LOG.info("Started BLOB server at {}:{} - max concurrent requests: {} - max backlog: {}", new Object[]{this.serverSocket.getInetAddress().getHostAddress(), Integer.valueOf(getPort()), Integer.valueOf(integer), Integer.valueOf(integer2)});
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public File getStorageLocation(BlobKey blobKey) {
        return BlobUtils.getStorageLocation(this.storageDir, blobKey);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public File getStorageLocation(JobID jobID, String str) {
        return BlobUtils.getStorageLocation(this.storageDir, jobID, str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void deleteJobDirectory(JobID jobID) throws IOException {
        BlobUtils.deleteJobDirectory(this.storageDir, jobID);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public File createTemporaryFilename() {
        return new File(BlobUtils.getIncomingDirectory(this.storageDir), String.format("temp-%08d", Integer.valueOf(this.tempFileCounter.getAndIncrement())));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BlobStore getBlobStore() {
        return this.blobStore;
    }

    public ReadWriteLock getReadWriteLock() {
        return this.readWriteLock;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (!this.shutdownRequested.get()) {
            try {
                BlobServerConnection blobServerConnection = new BlobServerConnection(this.serverSocket.accept(), this);
                try {
                    synchronized (this.activeConnections) {
                        while (this.activeConnections.size() >= this.maxConnections) {
                            this.activeConnections.wait(2000L);
                        }
                        this.activeConnections.add(blobServerConnection);
                    }
                    blobServerConnection.start();
                    BlobServerConnection blobServerConnection2 = null;
                    if (0 != 0) {
                        blobServerConnection2.close();
                        synchronized (this.activeConnections) {
                            this.activeConnections.remove(null);
                        }
                    }
                } catch (Throwable th) {
                    if (blobServerConnection != null) {
                        blobServerConnection.close();
                        synchronized (this.activeConnections) {
                            this.activeConnections.remove(blobServerConnection);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th2) {
                if (this.shutdownRequested.get()) {
                    return;
                }
                LOG.error("BLOB server stopped working. Shutting down", th2);
                try {
                    close();
                    return;
                } catch (Throwable th3) {
                    LOG.error("Could not properly close the BlobServer.", th3);
                    return;
                }
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.shutdownRequested.compareAndSet(false, true)) {
            Throwable th = null;
            try {
                this.serverSocket.close();
            } catch (IOException e) {
                th = e;
            }
            interrupt();
            try {
                join();
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
                LOG.debug("Error while waiting for this thread to die.", e2);
            }
            synchronized (this.activeConnections) {
                if (!this.activeConnections.isEmpty()) {
                    for (BlobServerConnection blobServerConnection : this.activeConnections) {
                        LOG.debug("Shutting down connection {}.", blobServerConnection.getName());
                        blobServerConnection.close();
                    }
                    this.activeConnections.clear();
                }
            }
            try {
                FileUtils.deleteDirectory(this.storageDir);
            } catch (IOException e3) {
                th = (Exception) ExceptionUtils.firstOrSuppressed(e3, th);
            }
            if (this.shutdownHook != null && this.shutdownHook != Thread.currentThread()) {
                try {
                    Runtime.getRuntime().removeShutdownHook(this.shutdownHook);
                } catch (IllegalStateException e4) {
                } catch (Throwable th2) {
                    LOG.warn("Exception while unregistering BLOB server's cleanup shutdown hook.", th2);
                }
            }
            if (LOG.isInfoEnabled()) {
                LOG.info("Stopped BLOB server at {}:{}", this.serverSocket.getInetAddress().getHostAddress(), Integer.valueOf(getPort()));
            }
            ExceptionUtils.tryRethrowIOException(th);
        }
    }

    @Override // org.apache.flink.runtime.blob.BlobService
    public BlobClient createClient() throws IOException {
        return new BlobClient(new InetSocketAddress(this.serverSocket.getInetAddress(), getPort()), this.blobServiceConfiguration);
    }

    @Override // org.apache.flink.runtime.blob.BlobService
    public URL getURL(BlobKey blobKey) throws IOException {
        Preconditions.checkArgument(blobKey != null, "BLOB key cannot be null.");
        File storageLocation = BlobUtils.getStorageLocation(this.storageDir, blobKey);
        if (storageLocation.exists()) {
            return storageLocation.toURI().toURL();
        }
        try {
            this.blobStore.get(blobKey, storageLocation);
            if (storageLocation.exists()) {
                return storageLocation.toURI().toURL();
            }
            throw new FileNotFoundException("Local file " + storageLocation + " does not exist and failed to copy from blob store.");
        } catch (Exception e) {
            throw new IOException("Failed to copy from blob store.", e);
        }
    }

    @Override // org.apache.flink.runtime.blob.BlobService
    public void delete(BlobKey blobKey) throws IOException {
        File storageLocation = BlobUtils.getStorageLocation(this.storageDir, blobKey);
        this.readWriteLock.writeLock().lock();
        try {
            if (storageLocation.exists() && !storageLocation.delete()) {
                LOG.warn("Failed to delete locally BLOB " + blobKey + " at " + storageLocation.getAbsolutePath());
            }
            this.blobStore.delete(blobKey);
            this.readWriteLock.writeLock().unlock();
        } catch (Throwable th) {
            this.readWriteLock.writeLock().unlock();
            throw th;
        }
    }

    @Override // org.apache.flink.runtime.blob.BlobService
    public int getPort() {
        return this.serverSocket.getLocalPort();
    }

    public boolean isShutdown() {
        return this.shutdownRequested.get();
    }

    ServerSocket getServerSocket() {
        return this.serverSocket;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void unregisterConnection(BlobServerConnection blobServerConnection) {
        synchronized (this.activeConnections) {
            this.activeConnections.remove(blobServerConnection);
            this.activeConnections.notifyAll();
        }
    }

    List<BlobServerConnection> getCurrentActiveConnections() {
        ArrayList arrayList;
        synchronized (this.activeConnections) {
            arrayList = new ArrayList(this.activeConnections);
        }
        return arrayList;
    }
}
