package com.sproutsocial.nsq;

import com.fasterxml.jackson.databind.deser.DeserializerCache;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.Inflater;
import java.util.zip.InflaterInputStream;
import javax.net.ssl.SSLSocketFactory;
import net.jcip.annotations.GuardedBy;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:META-INF/bundled-dependencies/nsq-j-1.0.jar:com/sproutsocial/nsq/Connection.class */
public abstract class Connection extends BasePubSub implements Closeable {
    protected final HostAndPort host;
    protected DataOutputStream out;
    protected DataInputStream in;
    private volatile boolean isReading;
    protected int msgTimeout;
    protected int heartbeatInterval;
    protected int maxRdyCount;
    protected long lastActionFlush;
    protected int unflushedCount;
    protected long lastHeartbeat;
    protected final BlockingQueue<String> respQueue;
    protected final ExecutorService handlerExecutor;
    private static final ThreadFactory readThreadFactory = Util.threadFactory("nsq-read");
    private static final Set<String> nonFatalErrors = Collections.unmodifiableSet(new HashSet(Arrays.asList("E_FIN_FAILED", "E_REQ_FAILED", "E_TOUCH_FAILED")));
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) Connection.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/nsq-j-1.0.jar:com/sproutsocial/nsq/Connection$StreamPair.class */
    public static class StreamPair {
        private InputStream baseIn;
        private OutputStream baseOut;
        boolean isBuffered;

        private StreamPair() {
            this.isBuffered = false;
        }
    }

    public Connection(Client client, HostAndPort hostAndPort) {
        super(client);
        this.isReading = true;
        this.msgTimeout = 60000;
        this.heartbeatInterval = 30000;
        this.maxRdyCount = 2500;
        this.respQueue = new ArrayBlockingQueue(1);
        this.host = hostAndPort;
        this.handlerExecutor = client.getExecutor();
    }

    public synchronized void connect(Config config) throws IOException {
        addClientConfig(config);
        Socket socket = new Socket();
        socket.setSoTimeout(30000);
        socket.connect(new InetSocketAddress(this.host.getHost(), this.host.getPort()), 30000);
        StreamPair streams = setStreams(socket.getInputStream(), socket.getOutputStream(), new StreamPair());
        this.out.write("  V2".getBytes(Util.US_ASCII));
        String connectCommand = connectCommand("IDENTIFY", this.client.getGson().toJson(config).getBytes(Util.UTF_8));
        ServerConfig serverConfig = (ServerConfig) this.client.getGson().fromJson(connectCommand, ServerConfig.class);
        logger.debug("serverConfig:{}", connectCommand);
        setConfig(serverConfig);
        this.msgTimeout = ((Integer) Util.firstNonNull(serverConfig.getMsgTimeout(), 60000)).intValue();
        this.heartbeatInterval = ((Integer) Util.firstNonNull(serverConfig.getHeartbeatInterval(), 30000)).intValue();
        this.maxRdyCount = ((Integer) Util.firstNonNull(serverConfig.getMaxRdyCount(), 2500)).intValue();
        logger.info("connected {} msgTimeout:{} heartbeatInterval:{} maxRdyCount:{}", this.host, Integer.valueOf(this.msgTimeout), Integer.valueOf(this.heartbeatInterval), Integer.valueOf(this.maxRdyCount));
        socket.setSoTimeout(this.heartbeatInterval + 5000);
        wrapEncryption(serverConfig, socket, streams);
        wrapCompression(serverConfig, streams);
        if (!streams.isBuffered) {
            this.in = new DataInputStream(new BufferedInputStream(streams.baseIn));
            this.out = new DataOutputStream(new BufferedOutputStream(streams.baseOut));
        }
        sendAuthorization(serverConfig);
        scheduleAtFixedRate(new Runnable() { // from class: com.sproutsocial.nsq.Connection.1
            @Override // java.lang.Runnable
            public void run() {
                Connection.this.checkHeartbeat();
            }
        }, this.heartbeatInterval + DeserializerCache.DEFAULT_MAX_CACHE_SIZE, this.heartbeatInterval, false);
        this.lastHeartbeat = Util.clock();
        readThreadFactory.newThread(new Runnable() { // from class: com.sproutsocial.nsq.Connection.2
            @Override // java.lang.Runnable
            public void run() {
                Connection.this.read();
            }
        }).start();
    }

    private String connectCommand(String str, byte[] bArr) throws IOException {
        this.out.write((str + StringUtils.LF).getBytes(Util.US_ASCII));
        write(bArr);
        this.out.flush();
        return readResponse();
    }

    private void addClientConfig(Config config) {
        String name;
        int indexOf;
        if (config.getHostname() == null && (indexOf = (name = ManagementFactory.getRuntimeMXBean().getName()).indexOf(64)) > 0) {
            config.setHostname(name.substring(indexOf + 1));
        }
        config.setFeatureNegotiation(true);
    }

    private void wrapEncryption(ServerConfig serverConfig, Socket socket, StreamPair streamPair) throws IOException {
        if (serverConfig.getTlsV1().booleanValue()) {
            logger.debug("adding tls");
            SSLSocketFactory sSLSocketFactory = this.client.getSSLSocketFactory();
            if (sSLSocketFactory == null) {
                sSLSocketFactory = (SSLSocketFactory) SSLSocketFactory.getDefault();
            }
            Socket createSocket = sSLSocketFactory.createSocket(socket, socket.getInetAddress().getHostAddress(), socket.getPort(), true);
            setStreams(createSocket.getInputStream(), createSocket.getOutputStream(), streamPair);
            readResponse();
        }
    }

    private void wrapCompression(ServerConfig serverConfig, StreamPair streamPair) throws IOException {
        if (serverConfig.getDeflate().booleanValue()) {
            logger.debug("adding deflate compression");
            try {
                setStreams(new InflaterInputStream(streamPair.baseIn, new Inflater(true), 32768), (DeflaterOutputStream) DeflaterOutputStream.class.getConstructor(OutputStream.class, Deflater.class, Integer.TYPE, Boolean.TYPE).newInstance(streamPair.baseOut, new Deflater(-1, true), 32768, true), streamPair);
                streamPair.isBuffered = true;
                readResponse();
                return;
            } catch (Exception e) {
                throw new NSQException("deflate compression only supported on java7 and up");
            }
        }
        if (serverConfig.getSnappy().booleanValue()) {
            logger.debug("adding snappy compression");
            if (serverConfig.getVersion().startsWith("0.")) {
                throw new NSQException("snappy compression only supported on nsqd 1.0 and up");
            }
            try {
                setStreams((InputStream) Class.forName("org.xerial.snappy.SnappyFramedInputStream").getConstructor(InputStream.class).newInstance(streamPair.baseIn), (OutputStream) Class.forName("org.xerial.snappy.SnappyFramedOutputStream").getConstructor(OutputStream.class).newInstance(streamPair.baseOut), streamPair);
                readResponse();
            } catch (Exception e2) {
                throw new NSQException("snappy compression failed, is org.xerial.snappy:snappy-java available?", e2);
            }
        }
    }

    private void sendAuthorization(ServerConfig serverConfig) throws IOException {
        if (serverConfig.getAuthRequired() == null || !serverConfig.getAuthRequired().booleanValue()) {
            return;
        }
        if (this.client.getAuthSecret() == null) {
            throw new NSQException("nsqd requires authorization, call client.setAuthSecret before connecting");
        }
        if (!serverConfig.getTlsV1().booleanValue()) {
            logger.warn("authorization used without encryption. authSecret sent in plain text");
        }
        logger.info("authorization response:{}", connectCommand("AUTH", this.client.getAuthSecret()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkHeartbeat() {
        boolean z;
        try {
            long clock = Util.clock();
            synchronized (this) {
                z = clock - this.lastHeartbeat > ((long) (2 * this.heartbeatInterval));
            }
            if (z) {
                logger.info("heartbeat failed, closing connection:{}", toString());
                close();
            }
        } catch (Exception e) {
            logger.error("problem checking heartbeat, will probably time out soon. {}", toString(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @GuardedBy("this")
    public void writeCommand(String str, Object obj, Object obj2) throws IOException {
        this.out.write((str + " " + obj + " " + obj2 + StringUtils.LF).getBytes(Util.US_ASCII));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @GuardedBy("this")
    public void writeCommand(String str, Object obj) throws IOException {
        this.out.write((str + " " + obj + StringUtils.LF).getBytes(Util.US_ASCII));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @GuardedBy("this")
    public void write(byte[] bArr) throws IOException {
        this.out.writeInt(bArr.length);
        this.out.write(bArr);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @GuardedBy("this")
    public void flush() throws IOException {
        this.out.flush();
        this.lastActionFlush = Util.clock();
        this.unflushedCount = 0;
    }

    private String readResponse() throws IOException {
        int readInt = this.in.readInt();
        int readInt2 = this.in.readInt();
        String str = null;
        if (readInt2 == 0) {
            str = readAscii(readInt - 4);
        } else if (readInt2 == 1) {
            String readAscii = readAscii(readInt - 4);
            int indexOf = readAscii.indexOf(" ");
            if (!nonFatalErrors.contains(indexOf == -1 ? readAscii : readAscii.substring(0, indexOf))) {
                throw new NSQException("error from nsqd:" + readAscii);
            }
            logger.warn("non fatal nsqd error:{} probably due to message timeout", readAscii);
        } else {
            if (readInt2 != 2) {
                throw new NSQException("bad frame type:" + readInt2);
            }
            onMessage(this.in.readLong(), this.in.readUnsignedShort(), readAscii(16), readBytes(readInt - 30));
        }
        return str;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void read() {
        while (this.isReading) {
            try {
                String readResponse = readResponse();
                if ("_heartbeat_".equals(readResponse)) {
                    this.client.getSchedExecutor().execute(new Runnable() { // from class: com.sproutsocial.nsq.Connection.3
                        @Override // java.lang.Runnable
                        public void run() {
                            Connection.this.receivedHeartbeat();
                        }
                    });
                } else if (readResponse != null) {
                    this.respQueue.offer(readResponse);
                }
            } catch (Exception e) {
                if (this.isReading) {
                    this.respQueue.offer(e.toString());
                    close();
                    logger.error("read thread exception. con:{}", toString(), e);
                }
            }
        }
        logger.debug("read loop done {}", toString());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void receivedHeartbeat() {
        try {
            this.out.write("NOP\n".getBytes(Util.US_ASCII));
            this.out.flush();
            this.lastHeartbeat = Util.clock();
        } catch (Throwable th) {
            logger.error("receivedHeartbeat error", th);
        }
    }

    protected void onMessage(long j, int i, String str, byte[] bArr) {
        throw new NSQException("unexpected frame type 2 - message");
    }

    private byte[] readBytes(int i) throws IOException {
        byte[] bArr = new byte[i];
        this.in.readFully(bArr);
        return bArr;
    }

    private String readAscii(int i) throws IOException {
        return new String(readBytes(i), Util.US_ASCII);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void flushAndReadOK() throws IOException {
        flush();
        try {
            String poll = this.respQueue.poll(this.heartbeatInterval, TimeUnit.MILLISECONDS);
            if ("OK".equals(poll)) {
            } else {
                throw new NSQException("bad response:" + (poll != null ? poll : "timeout"));
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new NSQException("read interrupted");
        }
    }

    public synchronized void flushAndClose() {
        try {
            flush();
        } catch (IOException e) {
            logger.error("flushAndClose error", (Throwable) e);
        }
        close();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.isReading = false;
        Util.closeQuietly(this.out);
        Util.closeQuietly(this.in);
        cancelTasks();
        logger.debug("connection closed:{}", toString());
    }

    public HostAndPort getHost() {
        return this.host;
    }

    public synchronized String stateDesc() {
        long clock = Util.clock();
        return String.format("%s lastFlush:%.1f lastHeartbeat:%.1f unflushedCount:%d", toString(), Float.valueOf(((float) (clock - this.lastActionFlush)) / 1000.0f), Float.valueOf(((float) (clock - this.lastHeartbeat)) / 1000.0f), Integer.valueOf(this.unflushedCount));
    }

    public synchronized int getMsgTimeout() {
        return this.msgTimeout;
    }

    public synchronized long getLastActionFlush() {
        return this.lastActionFlush;
    }

    public synchronized int getMaxRdyCount() {
        return this.maxRdyCount;
    }

    private StreamPair setStreams(InputStream inputStream, OutputStream outputStream, StreamPair streamPair) {
        streamPair.baseIn = inputStream;
        streamPair.baseOut = outputStream;
        this.in = new DataInputStream(inputStream);
        this.out = new DataOutputStream(outputStream);
        return streamPair;
    }
}
