package org.apache.bookkeeper.proto;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.bookkeeper.util.SafeRunnable;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.CorruptedFrameException;
import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.TooLongFrameException;
import org.jboss.netty.handler.timeout.ReadTimeoutException;
import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelPipelineCoverage("one")
/* loaded from: input_file:org/apache/bookkeeper/proto/PerChannelBookieClient.class */
public class PerChannelBookieClient extends SimpleChannelHandler implements ChannelPipelineFactory {
    static final Logger LOG = LoggerFactory.getLogger(PerChannelBookieClient.class);
    static final long maxMemory = Runtime.getRuntime().maxMemory() / 5;
    public static final int MAX_FRAME_LENGTH = 2097152;
    InetSocketAddress addr;
    Semaphore opCounterSem;
    AtomicLong totalBytesOutstanding;
    ClientSocketChannelFactory channelFactory;
    OrderedSafeExecutor executor;
    private Timer readTimeoutTimer;
    ConcurrentHashMap<CompletionKey, AddCompletion> addCompletions;
    ConcurrentHashMap<CompletionKey, ReadCompletion> readCompletions;
    Queue<BookkeeperInternalCallbacks.GenericCallback<Void>> pendingOps;
    volatile Channel channel;
    private volatile ConnectionState state;
    private final ClientConfiguration conf;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/bookkeeper/proto/PerChannelBookieClient$AddCompletion.class */
    public static class AddCompletion {
        final BookkeeperInternalCallbacks.WriteCallback cb;
        final Object ctx;

        public AddCompletion(BookkeeperInternalCallbacks.WriteCallback writeCallback, long j, Object obj) {
            this.cb = writeCallback;
            this.ctx = obj;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/bookkeeper/proto/PerChannelBookieClient$CompletionKey.class */
    public class CompletionKey {
        long ledgerId;
        long entryId;
        final long timeoutAt;

        CompletionKey(long j, long j2) {
            this.ledgerId = j;
            this.entryId = j2;
            this.timeoutAt = System.currentTimeMillis() + (PerChannelBookieClient.this.conf.getReadTimeout() * 1000);
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof CompletionKey) || obj == null) {
                return false;
            }
            CompletionKey completionKey = (CompletionKey) obj;
            return this.ledgerId == completionKey.ledgerId && this.entryId == completionKey.entryId;
        }

        public int hashCode() {
            return (((int) this.ledgerId) << 16) ^ ((int) this.entryId);
        }

        public String toString() {
            return String.format("LedgerEntry(%d, %d)", Long.valueOf(this.ledgerId), Long.valueOf(this.entryId));
        }

        public boolean shouldTimeout() {
            return this.timeoutAt <= System.currentTimeMillis();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/bookkeeper/proto/PerChannelBookieClient$ConnectionState.class */
    public enum ConnectionState {
        DISCONNECTED,
        CONNECTING,
        CONNECTED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/bookkeeper/proto/PerChannelBookieClient$ReadCompletion.class */
    public static class ReadCompletion {
        final BookkeeperInternalCallbacks.ReadEntryCallback cb;
        final Object ctx;

        public ReadCompletion(BookkeeperInternalCallbacks.ReadEntryCallback readEntryCallback, Object obj) {
            this.cb = readEntryCallback;
            this.ctx = obj;
        }
    }

    public PerChannelBookieClient(OrderedSafeExecutor orderedSafeExecutor, ClientSocketChannelFactory clientSocketChannelFactory, InetSocketAddress inetSocketAddress, AtomicLong atomicLong) {
        this(new ClientConfiguration(), orderedSafeExecutor, clientSocketChannelFactory, inetSocketAddress, atomicLong);
    }

    public PerChannelBookieClient(ClientConfiguration clientConfiguration, OrderedSafeExecutor orderedSafeExecutor, ClientSocketChannelFactory clientSocketChannelFactory, InetSocketAddress inetSocketAddress, AtomicLong atomicLong) {
        this.opCounterSem = new Semaphore(2000);
        this.addCompletions = new ConcurrentHashMap<>();
        this.readCompletions = new ConcurrentHashMap<>();
        this.pendingOps = new ArrayDeque();
        this.channel = null;
        this.conf = clientConfiguration;
        this.addr = inetSocketAddress;
        this.executor = orderedSafeExecutor;
        this.totalBytesOutstanding = atomicLong;
        this.channelFactory = clientSocketChannelFactory;
        this.state = ConnectionState.DISCONNECTED;
        this.readTimeoutTimer = null;
    }

    private synchronized void connect() {
        if (this.state == ConnectionState.CONNECTING) {
            return;
        }
        this.state = ConnectionState.CONNECTING;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Connecting to bookie: " + this.addr);
        }
        ClientBootstrap clientBootstrap = new ClientBootstrap(this.channelFactory);
        clientBootstrap.setPipelineFactory(this);
        clientBootstrap.setOption("tcpNoDelay", Boolean.valueOf(this.conf.getClientTcpNoDelay()));
        clientBootstrap.setOption("keepAlive", true);
        clientBootstrap.connect(this.addr).addListener(new ChannelFutureListener() { // from class: org.apache.bookkeeper.proto.PerChannelBookieClient.1
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                int i;
                Queue<BookkeeperInternalCallbacks.GenericCallback<Void>> queue;
                synchronized (PerChannelBookieClient.this) {
                    if (channelFuture.isSuccess()) {
                        PerChannelBookieClient.LOG.info("Successfully connected to bookie: " + PerChannelBookieClient.this.addr);
                        i = 0;
                        PerChannelBookieClient.this.channel = channelFuture.getChannel();
                        PerChannelBookieClient.this.state = ConnectionState.CONNECTED;
                    } else {
                        PerChannelBookieClient.LOG.error("Could not connect to bookie: " + PerChannelBookieClient.this.addr);
                        i = -8;
                        PerChannelBookieClient.this.channel = null;
                        PerChannelBookieClient.this.state = ConnectionState.DISCONNECTED;
                    }
                    PerChannelBookieClient.this.channel = PerChannelBookieClient.this.channel;
                    queue = PerChannelBookieClient.this.pendingOps;
                    PerChannelBookieClient.this.pendingOps = new ArrayDeque();
                }
                Iterator<BookkeeperInternalCallbacks.GenericCallback<Void>> it = queue.iterator();
                while (it.hasNext()) {
                    it.next().operationComplete(i, null);
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void connectIfNeededAndDoOp(BookkeeperInternalCallbacks.GenericCallback<Void> genericCallback) {
        boolean z;
        if (this.channel == null || this.state != ConnectionState.CONNECTED) {
            synchronized (this) {
                if (this.channel == null || this.state != ConnectionState.CONNECTED) {
                    z = false;
                    this.pendingOps.add(genericCallback);
                    connect();
                } else {
                    z = true;
                }
            }
        } else {
            z = true;
        }
        if (z) {
            genericCallback.operationComplete(0, null);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addEntry(final long j, byte[] bArr, final long j2, ChannelBuffer channelBuffer, BookkeeperInternalCallbacks.WriteCallback writeCallback, Object obj, int i) {
        final int readableBytes = channelBuffer.readableBytes();
        final CompletionKey completionKey = new CompletionKey(j, j2);
        this.addCompletions.put(completionKey, new AddCompletion(writeCallback, readableBytes, obj));
        try {
            ChannelBuffer buffer = this.channel.getConfig().getBufferFactory().getBuffer(28);
            buffer.writeInt((28 - 4) + readableBytes);
            buffer.writeInt(new BookieProtocol.PacketHeader((byte) 2, (byte) 1, (short) i).toInt());
            buffer.writeBytes(bArr, 0, 20);
            this.channel.write(ChannelBuffers.wrappedBuffer(new ChannelBuffer[]{buffer, channelBuffer})).addListener(new ChannelFutureListener() { // from class: org.apache.bookkeeper.proto.PerChannelBookieClient.2
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if (!channelFuture.isSuccess()) {
                        PerChannelBookieClient.this.errorOutAddKey(completionKey);
                    } else if (PerChannelBookieClient.LOG.isDebugEnabled()) {
                        PerChannelBookieClient.LOG.debug("Successfully wrote request for adding entry: " + j2 + " ledger-id: " + j + " bookie: " + PerChannelBookieClient.this.channel.getRemoteAddress() + " entry length: " + readableBytes);
                    }
                }
            });
        } catch (Throwable th) {
            LOG.warn("Add entry operation failed", th);
            errorOutAddKey(completionKey);
        }
    }

    public void readEntryAndFenceLedger(final long j, byte[] bArr, final long j2, BookkeeperInternalCallbacks.ReadEntryCallback readEntryCallback, Object obj) {
        final CompletionKey completionKey = new CompletionKey(j, j2);
        this.readCompletions.put(completionKey, new ReadCompletion(readEntryCallback, obj));
        ChannelBuffer buffer = this.channel.getConfig().getBufferFactory().getBuffer(44);
        buffer.writeInt(44 - 4);
        buffer.writeInt(new BookieProtocol.PacketHeader((byte) 2, (byte) 2, (short) 1).toInt());
        buffer.writeLong(j);
        buffer.writeLong(j2);
        buffer.writeBytes(bArr, 0, 20);
        this.channel.write(buffer).addListener(new ChannelFutureListener() { // from class: org.apache.bookkeeper.proto.PerChannelBookieClient.3
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (!channelFuture.isSuccess()) {
                    PerChannelBookieClient.this.errorOutReadKey(completionKey);
                } else if (PerChannelBookieClient.LOG.isDebugEnabled()) {
                    PerChannelBookieClient.LOG.debug("Successfully wrote request for reading entry: " + j2 + " ledger-id: " + j + " bookie: " + PerChannelBookieClient.this.channel.getRemoteAddress());
                }
            }
        });
    }

    public void readEntry(final long j, final long j2, BookkeeperInternalCallbacks.ReadEntryCallback readEntryCallback, Object obj) {
        final CompletionKey completionKey = new CompletionKey(j, j2);
        this.readCompletions.put(completionKey, new ReadCompletion(readEntryCallback, obj));
        try {
            ChannelBuffer buffer = this.channel.getConfig().getBufferFactory().getBuffer(24);
            buffer.writeInt(24 - 4);
            buffer.writeInt(new BookieProtocol.PacketHeader((byte) 2, (byte) 2, (short) 0).toInt());
            buffer.writeLong(j);
            buffer.writeLong(j2);
            this.channel.write(buffer).addListener(new ChannelFutureListener() { // from class: org.apache.bookkeeper.proto.PerChannelBookieClient.4
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if (!channelFuture.isSuccess()) {
                        PerChannelBookieClient.this.errorOutReadKey(completionKey);
                    } else if (PerChannelBookieClient.LOG.isDebugEnabled()) {
                        PerChannelBookieClient.LOG.debug("Successfully wrote request for reading entry: " + j2 + " ledger-id: " + j + " bookie: " + PerChannelBookieClient.this.channel.getRemoteAddress());
                    }
                }
            });
        } catch (Throwable th) {
            LOG.warn("Read entry operation failed", th);
            errorOutReadKey(completionKey);
        }
    }

    public void close() {
        if (this.channel != null) {
            this.channel.close().awaitUninterruptibly();
        }
        if (this.readTimeoutTimer != null) {
            this.readTimeoutTimer.stop();
            this.readTimeoutTimer = null;
        }
    }

    void errorOutReadKey(final CompletionKey completionKey) {
        this.executor.submitOrdered(Long.valueOf(completionKey.ledgerId), new SafeRunnable() { // from class: org.apache.bookkeeper.proto.PerChannelBookieClient.5
            @Override // org.apache.bookkeeper.util.SafeRunnable
            public void safeRun() {
                ReadCompletion remove = PerChannelBookieClient.this.readCompletions.remove(completionKey);
                if (remove != null) {
                    PerChannelBookieClient.LOG.error("Could not write  request for reading entry: " + completionKey.entryId + " ledger-id: " + completionKey.ledgerId + " bookie: " + PerChannelBookieClient.this.channel.getRemoteAddress());
                    remove.cb.readEntryComplete(-8, completionKey.ledgerId, completionKey.entryId, null, remove.ctx);
                }
            }
        });
    }

    void errorOutAddKey(final CompletionKey completionKey) {
        this.executor.submitOrdered(Long.valueOf(completionKey.ledgerId), new SafeRunnable() { // from class: org.apache.bookkeeper.proto.PerChannelBookieClient.6
            @Override // org.apache.bookkeeper.util.SafeRunnable
            public void safeRun() {
                AddCompletion remove = PerChannelBookieClient.this.addCompletions.remove(completionKey);
                if (remove != null) {
                    PerChannelBookieClient.LOG.error("Could not write request for adding entry: " + completionKey.entryId + " ledger-id: " + completionKey.ledgerId + " bookie: " + (PerChannelBookieClient.this.channel != null ? PerChannelBookieClient.this.channel.getRemoteAddress().toString() : "null"));
                    remove.cb.writeComplete(-8, completionKey.ledgerId, completionKey.entryId, PerChannelBookieClient.this.addr, remove.ctx);
                    PerChannelBookieClient.LOG.error("Invoked callback method: " + completionKey.entryId);
                }
            }
        });
    }

    void errorOutOutstandingEntries() {
        Iterator<CompletionKey> it = this.addCompletions.keySet().iterator();
        while (it.hasNext()) {
            errorOutAddKey(it.next());
        }
        Iterator<CompletionKey> it2 = this.readCompletions.keySet().iterator();
        while (it2.hasNext()) {
            errorOutReadKey(it2.next());
        }
    }

    public ChannelPipeline getPipeline() throws Exception {
        ChannelPipeline pipeline = Channels.pipeline();
        if (this.readTimeoutTimer == null) {
            this.readTimeoutTimer = new HashedWheelTimer();
        }
        pipeline.addLast("readTimeout", new ReadTimeoutHandler(this.readTimeoutTimer, this.conf.getReadTimeout()));
        pipeline.addLast("lengthbasedframedecoder", new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
        pipeline.addLast("mainhandler", this);
        return pipeline;
    }

    public void channelDisconnected(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
        LOG.info("Disconnected from bookie: " + this.addr);
        errorOutOutstandingEntries();
        this.channel.close();
        synchronized (this) {
            this.state = ConnectionState.DISCONNECTED;
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) throws Exception {
        Throwable cause = exceptionEvent.getCause();
        if ((cause instanceof CorruptedFrameException) || (cause instanceof TooLongFrameException)) {
            LOG.error("Corrupted fram received from bookie: " + exceptionEvent.getChannel().getRemoteAddress());
            return;
        }
        if (!(cause instanceof ReadTimeoutException)) {
            if (cause instanceof IOException) {
                return;
            }
            LOG.error("Unexpected exception caught by bookie client channel handler", cause);
            return;
        }
        for (CompletionKey completionKey : this.addCompletions.keySet()) {
            if (completionKey.shouldTimeout()) {
                errorOutAddKey(completionKey);
            }
        }
        for (CompletionKey completionKey2 : this.readCompletions.keySet()) {
            if (completionKey2.shouldTimeout()) {
                errorOutReadKey(completionKey2);
            }
        }
    }

    public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) throws Exception {
        if (!(messageEvent.getMessage() instanceof ChannelBuffer)) {
            channelHandlerContext.sendUpstream(messageEvent);
            return;
        }
        final ChannelBuffer channelBuffer = (ChannelBuffer) messageEvent.getMessage();
        try {
            final BookieProtocol.PacketHeader fromInt = BookieProtocol.PacketHeader.fromInt(channelBuffer.readInt());
            final int readInt = channelBuffer.readInt();
            final long readLong = channelBuffer.readLong();
            final long readLong2 = channelBuffer.readLong();
            this.executor.submitOrdered(Long.valueOf(readLong), new SafeRunnable() { // from class: org.apache.bookkeeper.proto.PerChannelBookieClient.7
                @Override // org.apache.bookkeeper.util.SafeRunnable
                public void safeRun() {
                    switch (fromInt.getOpCode()) {
                        case 1:
                            PerChannelBookieClient.this.handleAddResponse(readLong, readLong2, readInt);
                            return;
                        case 2:
                            PerChannelBookieClient.this.handleReadResponse(readLong, readLong2, readInt, channelBuffer);
                            return;
                        default:
                            PerChannelBookieClient.LOG.error("Unexpected response, type: " + ((int) fromInt.getOpCode()) + " received from bookie: " + PerChannelBookieClient.this.addr + " , ignoring");
                            return;
                    }
                }
            });
        } catch (IndexOutOfBoundsException e) {
            LOG.error("Unparseable response from bookie: " + this.addr, e);
        }
    }

    void handleAddResponse(long j, long j2, int i) {
        int i2;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Got response for add request from bookie: " + this.addr + " for ledger: " + j + " entry: " + j2 + " rc: " + i);
        }
        switch (i) {
            case 0:
                i2 = 0;
                break;
            case BookieProtocol.EUA /* 102 */:
                i2 = -102;
                break;
            case BookieProtocol.EBADVERSION /* 103 */:
                i2 = -16;
                break;
            case BookieProtocol.EFENCED /* 104 */:
                i2 = -101;
                break;
            default:
                LOG.error("Add for ledger: " + j + ", entry: " + j2 + " failed on bookie: " + this.addr + " with code: " + i);
                i2 = -12;
                break;
        }
        AddCompletion remove = this.addCompletions.remove(new CompletionKey(j, j2));
        if (remove == null) {
            LOG.error("Unexpected add response received from bookie: " + this.addr + " for ledger: " + j + ", entry: " + j2 + " , ignoring");
        } else {
            remove.cb.writeComplete(i2, j, j2, this.addr, remove.ctx);
        }
    }

    void handleReadResponse(long j, long j2, int i, ChannelBuffer channelBuffer) {
        int i2;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Got response for read request from bookie: " + this.addr + " for ledger: " + j + " entry: " + j2 + " rc: " + i + "entry length: " + channelBuffer.readableBytes());
        }
        if (i == 0) {
            i2 = 0;
        } else if (i == 2 || i == 1) {
            i2 = -13;
        } else if (i == 103) {
            i2 = -16;
        } else if (i == 102) {
            i2 = -102;
        } else {
            LOG.error("Read for ledger: " + j + ", entry: " + j2 + " failed on bookie: " + this.addr + " with code: " + i);
            i2 = -1;
        }
        ReadCompletion remove = this.readCompletions.remove(new CompletionKey(j, j2));
        if (remove == null) {
            remove = this.readCompletions.remove(new CompletionKey(j, -1L));
        }
        if (remove == null) {
            LOG.error("Unexpected read response received from bookie: " + this.addr + " for ledger: " + j + ", entry: " + j2 + " , ignoring");
        } else {
            remove.cb.readEntryComplete(i2, j, j2, channelBuffer.slice(), remove.ctx);
        }
    }

    CompletionKey newCompletionKey(long j, long j2) {
        return new CompletionKey(j, j2);
    }
}
