package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.util.AtomicDisposableReferenceCounter;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestClient.class */
public class PartitionRequestClient {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionRequestClient.class);
    private final Channel tcpChannel;
    private final PartitionRequestClientHandler partitionRequestHandler;
    private final ConnectionID connectionId;
    private final PartitionRequestClientFactory clientFactory;
    private final AtomicDisposableReferenceCounter closeReferenceCounter = new AtomicDisposableReferenceCounter();

    /* JADX INFO: Access modifiers changed from: package-private */
    public PartitionRequestClient(Channel channel, PartitionRequestClientHandler partitionRequestClientHandler, ConnectionID connectionID, PartitionRequestClientFactory partitionRequestClientFactory) {
        this.tcpChannel = (Channel) Preconditions.checkNotNull(channel);
        this.partitionRequestHandler = (PartitionRequestClientHandler) Preconditions.checkNotNull(partitionRequestClientHandler);
        this.connectionId = (ConnectionID) Preconditions.checkNotNull(connectionID);
        this.clientFactory = (PartitionRequestClientFactory) Preconditions.checkNotNull(partitionRequestClientFactory);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean disposeIfNotUsed() {
        return this.closeReferenceCounter.disposeIfNotUsed();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean incrementReferenceCounter() {
        return this.closeReferenceCounter.increment();
    }

    public ChannelFuture requestSubpartition(ResultPartitionID resultPartitionID, int i, final RemoteInputChannel remoteInputChannel, int i2) throws IOException {
        checkNotClosed();
        LOG.debug("Requesting subpartition {} of partition {} with {} ms delay.", new Object[]{Integer.valueOf(i), resultPartitionID, Integer.valueOf(i2)});
        this.partitionRequestHandler.addInputChannel(remoteInputChannel);
        final NettyMessage.PartitionRequest partitionRequest = new NettyMessage.PartitionRequest(resultPartitionID, i, remoteInputChannel.getInputChannelId(), remoteInputChannel.getInitialCredit());
        final ChannelFutureListener channelFutureListener = new ChannelFutureListener() { // from class: org.apache.flink.runtime.io.network.netty.PartitionRequestClient.1
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    return;
                }
                PartitionRequestClient.this.partitionRequestHandler.removeInputChannel(remoteInputChannel);
                remoteInputChannel.onError(new LocalTransportException("Sending the partition request failed.", channelFuture.channel().localAddress(), channelFuture.cause()));
            }
        };
        if (i2 == 0) {
            ChannelFuture writeAndFlush = this.tcpChannel.writeAndFlush(partitionRequest);
            writeAndFlush.addListener(channelFutureListener);
            return writeAndFlush;
        }
        final ChannelFuture[] channelFutureArr = new ChannelFuture[1];
        this.tcpChannel.eventLoop().schedule(new Runnable() { // from class: org.apache.flink.runtime.io.network.netty.PartitionRequestClient.2
            @Override // java.lang.Runnable
            public void run() {
                channelFutureArr[0] = PartitionRequestClient.this.tcpChannel.writeAndFlush(partitionRequest);
                channelFutureArr[0].addListener(channelFutureListener);
            }
        }, i2, TimeUnit.MILLISECONDS);
        return channelFutureArr[0];
    }

    public void sendTaskEvent(ResultPartitionID resultPartitionID, TaskEvent taskEvent, final RemoteInputChannel remoteInputChannel) throws IOException {
        checkNotClosed();
        this.tcpChannel.writeAndFlush(new NettyMessage.TaskEventRequest(taskEvent, resultPartitionID, remoteInputChannel.getInputChannelId())).addListener(new ChannelFutureListener() { // from class: org.apache.flink.runtime.io.network.netty.PartitionRequestClient.3
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    return;
                }
                remoteInputChannel.onError(new LocalTransportException("Sending the task event failed.", channelFuture.channel().localAddress(), channelFuture.cause()));
            }
        });
    }

    public void close(RemoteInputChannel remoteInputChannel) throws IOException {
        this.partitionRequestHandler.removeInputChannel(remoteInputChannel);
        if (!this.closeReferenceCounter.decrement()) {
            this.partitionRequestHandler.cancelRequestFor(remoteInputChannel.getInputChannelId());
        } else {
            this.tcpChannel.writeAndFlush(new NettyMessage.CloseRequest()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            this.clientFactory.destroyPartitionRequestClient(this.connectionId, this);
        }
    }

    private void checkNotClosed() throws IOException {
        if (this.closeReferenceCounter.isDisposed()) {
            throw new LocalTransportException("Channel closed.", this.tcpChannel.localAddress());
        }
    }
}
