package org.apache.zeppelin.py4j;

import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:org/apache/zeppelin/py4j/CallbackClient.class */
public class CallbackClient {
    public static final String DEFAULT_ADDRESS = "127.0.0.1";
    private final int port;
    private final InetAddress address;
    private final Deque<CallbackConnection> connections;
    private final Lock lock;
    private final Logger logger;
    private boolean isShutdown;
    public static final long DEFAULT_MIN_CONNECTION_TIME = 30;
    private final ScheduledExecutorService executor;
    private final long minConnectionTime;
    private final TimeUnit minConnectionTimeUnit;

    public CallbackClient(int i) {
        this.connections = new ArrayDeque();
        this.lock = new ReentrantLock(true);
        this.logger = Logger.getLogger(CallbackClient.class.getName());
        this.isShutdown = false;
        this.executor = Executors.newScheduledThreadPool(1);
        this.port = i;
        try {
            this.address = InetAddress.getByName("127.0.0.1");
            this.minConnectionTime = 30L;
            this.minConnectionTimeUnit = TimeUnit.SECONDS;
            setupCleaner();
        } catch (Exception e) {
            throw new Py4JNetworkException("Default address could not be determined when creating communication channel.");
        }
    }

    public CallbackClient(int i, InetAddress inetAddress) {
        this.connections = new ArrayDeque();
        this.lock = new ReentrantLock(true);
        this.logger = Logger.getLogger(CallbackClient.class.getName());
        this.isShutdown = false;
        this.executor = Executors.newScheduledThreadPool(1);
        this.port = i;
        this.address = inetAddress;
        this.minConnectionTime = 30L;
        this.minConnectionTimeUnit = TimeUnit.SECONDS;
        setupCleaner();
    }

    public CallbackClient(int i, InetAddress inetAddress, long j, TimeUnit timeUnit) {
        this.connections = new ArrayDeque();
        this.lock = new ReentrantLock(true);
        this.logger = Logger.getLogger(CallbackClient.class.getName());
        this.isShutdown = false;
        this.executor = Executors.newScheduledThreadPool(1);
        this.port = i;
        this.address = inetAddress;
        this.minConnectionTime = j;
        this.minConnectionTimeUnit = timeUnit;
        setupCleaner();
    }

    public InetAddress getAddress() {
        return this.address;
    }

    private CallbackConnection getConnection() throws IOException {
        CallbackConnection pollLast = this.connections.pollLast();
        if (pollLast == null) {
            pollLast = new CallbackConnection(this.port, this.address);
            pollLast.start();
        }
        return pollLast;
    }

    private CallbackConnection getConnectionLock() {
        CallbackConnection callbackConnection = null;
        try {
            try {
                this.logger.log(Level.INFO, "Getting CB Connection");
                this.lock.lock();
                if (this.isShutdown) {
                    this.logger.log(Level.INFO, "Shuting down, no connection can be created.");
                } else {
                    callbackConnection = getConnection();
                    this.logger.log(Level.INFO, "Acquired CB Connection");
                }
                return callbackConnection;
            } catch (Exception e) {
                this.logger.log(Level.SEVERE, "Critical error while sending a command", (Throwable) e);
                throw new Py4JException("Error while obtaining a new communication channel", e);
            }
        } finally {
            this.lock.unlock();
        }
    }

    public int getPort() {
        return this.port;
    }

    private void giveBackConnection(CallbackConnection callbackConnection) {
        try {
            this.lock.lock();
            if (callbackConnection != null) {
                if (this.isShutdown) {
                    callbackConnection.shutdown();
                } else {
                    this.connections.addLast(callbackConnection);
                }
            }
        } finally {
            this.lock.unlock();
        }
    }

    public void periodicCleanup() {
        try {
            this.lock.lock();
            if (!this.isShutdown) {
                int size = this.connections.size();
                for (int i = 0; i < size; i++) {
                    CallbackConnection pollLast = this.connections.pollLast();
                    if (pollLast.wasUsed()) {
                        pollLast.setUsed(false);
                        this.connections.addFirst(pollLast);
                    } else {
                        pollLast.shutdown();
                    }
                }
            }
        } finally {
            this.lock.unlock();
        }
    }

    public String sendCommand(String str) {
        return sendCommand(str, true);
    }

    public String sendCommand(String str, boolean z) {
        String sendCommand;
        CallbackConnection connectionLock = getConnectionLock();
        if (connectionLock == null) {
            throw new Py4JException("Cannot obtain a new communication channel");
        }
        try {
            sendCommand = connectionLock.sendCommand(str, z);
        } catch (Py4JNetworkException e) {
            this.logger.log(Level.WARNING, "Error while sending a command", (Throwable) e);
            sendCommand = sendCommand(str, z);
        } catch (Exception e2) {
            this.logger.log(Level.SEVERE, "Critical error while sending a command", (Throwable) e2);
            throw new Py4JException("Error while sending a command.");
        }
        giveBackConnection(connectionLock);
        return sendCommand;
    }

    private void setupCleaner() {
        this.executor.scheduleAtFixedRate(new Runnable() { // from class: org.apache.zeppelin.py4j.CallbackClient.1
            @Override // java.lang.Runnable
            public void run() {
                CallbackClient.this.periodicCleanup();
            }
        }, this.minConnectionTime, this.minConnectionTime, this.minConnectionTimeUnit);
    }

    public void shutdown() {
        this.logger.info("Shutting down Callback Client");
        try {
            this.lock.lock();
            this.isShutdown = true;
            Iterator<CallbackConnection> it = this.connections.iterator();
            while (it.hasNext()) {
                it.next().shutdown();
            }
            this.executor.shutdownNow();
            this.connections.clear();
        } finally {
            this.lock.unlock();
        }
    }
}
