package org.apache.pulsar.broker.service;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.Future;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import org.apache.pulsar.common.api.proto.CommandTopicMigrated;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.netty.NettyChannelUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/PulsarCommandSenderImpl.class */
public class PulsarCommandSenderImpl implements PulsarCommandSender {
    private static final Logger log = LoggerFactory.getLogger(PulsarCommandSenderImpl.class);
    private final BrokerInterceptor interceptor;
    private final ServerCnx cnx;

    public PulsarCommandSenderImpl(BrokerInterceptor brokerInterceptor, ServerCnx serverCnx) {
        this.interceptor = brokerInterceptor;
        this.cnx = serverCnx;
    }

    @Override // org.apache.pulsar.broker.service.PulsarCommandSender
    public void sendPartitionMetadataResponse(ServerError serverError, String str, long j) {
        BaseCommand newPartitionMetadataResponseCommand = Commands.newPartitionMetadataResponseCommand(serverError, str, j);
        safeIntercept(newPartitionMetadataResponseCommand, this.cnx);
        writeAndFlush(Commands.serializeWithSize(newPartitionMetadataResponseCommand));
    }

    @Override // org.apache.pulsar.broker.service.PulsarCommandSender
    public void sendPartitionMetadataResponse(int i, long j) {
        BaseCommand newPartitionMetadataResponseCommand = Commands.newPartitionMetadataResponseCommand(i, j);
        safeIntercept(newPartitionMetadataResponseCommand, this.cnx);
        writeAndFlush(Commands.serializeWithSize(newPartitionMetadataResponseCommand));
    }

    @Override // org.apache.pulsar.broker.service.PulsarCommandSender
    public void sendSuccessResponse(long j) {
        BaseCommand newSuccessCommand = Commands.newSuccessCommand(j);
        safeIntercept(newSuccessCommand, this.cnx);
        writeAndFlush(Commands.serializeWithSize(newSuccessCommand));
    }

    @Override // org.apache.pulsar.broker.service.PulsarCommandSender
    public void sendErrorResponse(long j, ServerError serverError, String str) {
        BaseCommand newErrorCommand = Commands.newErrorCommand(j, serverError, str);
        safeIntercept(newErrorCommand, this.cnx);
        writeAndFlush(Commands.serializeWithSize(newErrorCommand));
    }

    @Override // org.apache.pulsar.broker.service.PulsarCommandSender
    public void sendProducerSuccessResponse(long j, String str, SchemaVersion schemaVersion) {
        BaseCommand newProducerSuccessCommand = Commands.newProducerSuccessCommand(j, str, schemaVersion);
        safeIntercept(newProducerSuccessCommand, this.cnx);
        writeAndFlush(Commands.serializeWithSize(newProducerSuccessCommand));
    }

    @Override // org.apache.pulsar.broker.service.PulsarCommandSender
    public void sendProducerSuccessResponse(long j, String str, long j2, SchemaVersion schemaVersion, Optional<Long> optional, boolean z) {
        BaseCommand newProducerSuccessCommand = Commands.newProducerSuccessCommand(j, str, j2, schemaVersion, optional, z);
        safeIntercept(newProducerSuccessCommand, this.cnx);
        writeAndFlush(Commands.serializeWithSize(newProducerSuccessCommand));
    }

    @Override // org.apache.pulsar.broker.service.PulsarCommandSender
    public void sendSendReceiptResponse(long j, long j2, long j3, long j4, long j5) {
        BaseCommand newSendReceiptCommand = Commands.newSendReceiptCommand(j, j2, j3, j4, j5);
        safeIntercept(newSendReceiptCommand, this.cnx);
        writeAndFlush(Commands.serializeWithSize(newSendReceiptCommand));
    }

    @Override // org.apache.pulsar.broker.service.PulsarCommandSender
    public void sendSendError(long j, long j2, ServerError serverError, String str) {
        BaseCommand newSendErrorCommand = Commands.newSendErrorCommand(j, j2, serverError, str);
        safeIntercept(newSendErrorCommand, this.cnx);
        writeAndFlush(Commands.serializeWithSize(newSendErrorCommand));
    }

    @Override // org.apache.pulsar.broker.service.PulsarCommandSender
    public void sendGetTopicsOfNamespaceResponse(List<String> list, String str, boolean z, boolean z2, long j) {
        BaseCommand newGetTopicsOfNamespaceResponseCommand = Commands.newGetTopicsOfNamespaceResponseCommand(list, str, z, z2, j);
        safeIntercept(newGetTopicsOfNamespaceResponseCommand, this.cnx);
        writeAndFlush(Commands.serializeWithSize(newGetTopicsOfNamespaceResponseCommand));
    }

    @Override // org.apache.pulsar.broker.service.PulsarCommandSender
    public void sendGetSchemaResponse(long j, SchemaInfo schemaInfo, SchemaVersion schemaVersion) {
        BaseCommand newGetSchemaResponseCommand = Commands.newGetSchemaResponseCommand(j, schemaInfo, schemaVersion);
        safeIntercept(newGetSchemaResponseCommand, this.cnx);
        writeAndFlush(Commands.serializeWithSize(newGetSchemaResponseCommand));
    }

    @Override // org.apache.pulsar.broker.service.PulsarCommandSender
    public void sendGetSchemaErrorResponse(long j, ServerError serverError, String str) {
        BaseCommand newGetSchemaResponseErrorCommand = Commands.newGetSchemaResponseErrorCommand(j, serverError, str);
        safeIntercept(newGetSchemaResponseErrorCommand, this.cnx);
        writeAndFlush(Commands.serializeWithSize(newGetSchemaResponseErrorCommand));
    }

    @Override // org.apache.pulsar.broker.service.PulsarCommandSender
    public void sendGetOrCreateSchemaResponse(long j, SchemaVersion schemaVersion) {
        BaseCommand newGetOrCreateSchemaResponseCommand = Commands.newGetOrCreateSchemaResponseCommand(j, schemaVersion);
        safeIntercept(newGetOrCreateSchemaResponseCommand, this.cnx);
        writeAndFlush(Commands.serializeWithSize(newGetOrCreateSchemaResponseCommand));
    }

    @Override // org.apache.pulsar.broker.service.PulsarCommandSender
    public void sendGetOrCreateSchemaErrorResponse(long j, ServerError serverError, String str) {
        BaseCommand newGetOrCreateSchemaResponseErrorCommand = Commands.newGetOrCreateSchemaResponseErrorCommand(j, serverError, str);
        safeIntercept(newGetOrCreateSchemaResponseErrorCommand, this.cnx);
        writeAndFlush(Commands.serializeWithSize(newGetOrCreateSchemaResponseErrorCommand));
    }

    @Override // org.apache.pulsar.broker.service.PulsarCommandSender
    public void sendConnectedResponse(int i, int i2, boolean z) {
        BaseCommand newConnectedCommand = Commands.newConnectedCommand(i, i2, z);
        safeIntercept(newConnectedCommand, this.cnx);
        writeAndFlush(Commands.serializeWithSize(newConnectedCommand));
    }

    @Override // org.apache.pulsar.broker.service.PulsarCommandSender
    public void sendLookupResponse(String str, String str2, boolean z, CommandLookupTopicResponse.LookupType lookupType, long j, boolean z2) {
        BaseCommand newLookupResponseCommand = Commands.newLookupResponseCommand(str, str2, z, lookupType, j, z2);
        safeIntercept(newLookupResponseCommand, this.cnx);
        writeAndFlush(Commands.serializeWithSize(newLookupResponseCommand));
    }

    @Override // org.apache.pulsar.broker.service.PulsarCommandSender
    public void sendLookupResponse(ServerError serverError, String str, long j) {
        BaseCommand newLookupErrorResponseCommand = Commands.newLookupErrorResponseCommand(serverError, str, j);
        safeIntercept(newLookupErrorResponseCommand, this.cnx);
        writeAndFlush(Commands.serializeWithSize(newLookupErrorResponseCommand));
    }

    @Override // org.apache.pulsar.broker.service.PulsarCommandSender
    public void sendActiveConsumerChange(long j, boolean z) {
        if (Commands.peerSupportsActiveConsumerListener(this.cnx.getRemoteEndpointProtocolVersion())) {
            writeAndFlush(Commands.newActiveConsumerChange(j, z));
        }
    }

    @Override // org.apache.pulsar.broker.service.PulsarCommandSender
    public void sendReachedEndOfTopic(long j) {
        if (this.cnx.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v9.getValue()) {
            log.info("[{}] Notifying consumer that end of topic has been reached", this);
            writeAndFlush(Commands.newReachedEndOfTopic(j));
        }
    }

    @Override // org.apache.pulsar.broker.service.PulsarCommandSender
    public boolean sendTopicMigrated(CommandTopicMigrated.ResourceType resourceType, long j, String str, String str2) {
        if (this.cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v20.getValue()) {
            return false;
        }
        log.info("[{}] Notifying {} that topic is migrated", resourceType.name(), Long.valueOf(j));
        writeAndFlush(Commands.newTopicMigrated(resourceType, j, str, str2));
        return true;
    }

    public ChannelPromise sendMessagesToConsumer(long j, String str, Subscription subscription, int i, List<? extends Entry> list, EntryBatchSizes entryBatchSizes, EntryBatchIndexesAcks entryBatchIndexesAcks, RedeliveryTracker redeliveryTracker, long j2) {
        ChannelHandlerContext ctx = this.cnx.ctx();
        ChannelPromise newPromise = ctx.newPromise();
        ctx.channel().eventLoop().execute(() -> {
            ArrayList arrayList = new ArrayList(list.size());
            for (int i2 = 0; i2 < list.size(); i2++) {
                Entry entry = (Entry) list.get(i2);
                if (entry != null) {
                    int batchSize = entryBatchSizes.getBatchSize(i2);
                    if (batchSize <= 1 || this.cnx.isBatchMessageCompatibleVersion()) {
                        ByteBuf dataBuffer = entry.getDataBuffer();
                        dataBuffer.retain();
                        if (this.cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v18.getValue() || !this.cnx.supportBrokerMetadata() || !this.cnx.getBrokerService().getPulsar().getConfig().isExposingBrokerEntryMetadataToClientEnabled()) {
                            Commands.skipBrokerEntryMetadataIfExist(dataBuffer);
                        }
                        if (this.cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getValue()) {
                            Commands.skipChecksumIfPresent(dataBuffer);
                        }
                        if (log.isDebugEnabled()) {
                            log.debug("[{}-{}] Sending message to consumerId {}, msg id {}-{} with batchSize {}", new Object[]{str, subscription, Long.valueOf(j), Long.valueOf(entry.getLedgerId()), Long.valueOf(entry.getEntryId()), Integer.valueOf(batchSize)});
                        }
                        ctx.write(this.cnx.newMessageAndIntercept(j, entry.getLedgerId(), entry.getEntryId(), i, redeliveryTracker.getRedeliveryCount(entry.getLedgerId(), entry.getEntryId()), dataBuffer, entryBatchIndexesAcks == null ? null : entryBatchIndexesAcks.getAckSet(i2), str, j2), ctx.voidPromise());
                        arrayList.add(entry);
                    } else {
                        log.warn("[{}-{}] Consumer doesn't support batch messages -  consumerId {}, msg id {}-{}", new Object[]{str, subscription, Long.valueOf(j), Long.valueOf(entry.getLedgerId()), Long.valueOf(entry.getEntryId())});
                        ctx.close();
                        entry.release();
                    }
                }
            }
            ctx.writeAndFlush(Unpooled.EMPTY_BUFFER, newPromise);
            newPromise.addListener(future -> {
                arrayList.forEach((v0) -> {
                    v0.release();
                });
            });
            entryBatchSizes.recyle();
            if (entryBatchIndexesAcks != null) {
                entryBatchIndexesAcks.recycle();
            }
        });
        return newPromise;
    }

    @Override // org.apache.pulsar.broker.service.PulsarCommandSender
    public void sendTcClientConnectResponse(long j, ServerError serverError, String str) {
        BaseCommand newTcClientConnectResponse = Commands.newTcClientConnectResponse(j, serverError, str);
        safeIntercept(newTcClientConnectResponse, this.cnx);
        writeAndFlush(Commands.serializeWithSize(newTcClientConnectResponse));
    }

    @Override // org.apache.pulsar.broker.service.PulsarCommandSender
    public void sendTcClientConnectResponse(long j) {
        sendTcClientConnectResponse(j, null, null);
    }

    @Override // org.apache.pulsar.broker.service.PulsarCommandSender
    public void sendNewTxnResponse(long j, TxnID txnID, long j2) {
        BaseCommand newTxnResponse = Commands.newTxnResponse(j, txnID.getLeastSigBits(), txnID.getMostSigBits());
        safeIntercept(newTxnResponse, this.cnx);
        writeAndFlush(Commands.serializeWithSize(newTxnResponse));
        if (this.interceptor != null) {
            this.interceptor.txnOpened(j2, txnID.toString());
        }
    }

    @Override // org.apache.pulsar.broker.service.PulsarCommandSender
    public void sendNewTxnErrorResponse(long j, long j2, ServerError serverError, String str) {
        BaseCommand newTxnResponse = Commands.newTxnResponse(j, j2, serverError, str);
        safeIntercept(newTxnResponse, this.cnx);
        writeAndFlush(Commands.serializeWithSize(newTxnResponse));
    }

    @Override // org.apache.pulsar.broker.service.PulsarCommandSender
    public void sendEndTxnResponse(long j, TxnID txnID, int i) {
        BaseCommand newEndTxnResponse = Commands.newEndTxnResponse(j, txnID.getLeastSigBits(), txnID.getMostSigBits());
        safeIntercept(newEndTxnResponse, this.cnx);
        writeAndFlush(Commands.serializeWithSize(newEndTxnResponse));
        if (this.interceptor != null) {
            this.interceptor.txnEnded(txnID.toString(), i);
        }
    }

    @Override // org.apache.pulsar.broker.service.PulsarCommandSender
    public void sendEndTxnErrorResponse(long j, TxnID txnID, ServerError serverError, String str) {
        BaseCommand newEndTxnResponse = Commands.newEndTxnResponse(j, txnID.getLeastSigBits(), txnID.getMostSigBits(), serverError, str);
        safeIntercept(newEndTxnResponse, this.cnx);
        writeAndFlush(Commands.serializeWithSize(newEndTxnResponse));
    }

    @Override // org.apache.pulsar.broker.service.PulsarCommandSender
    public void sendWatchTopicListSuccess(long j, long j2, String str, List<String> list) {
        interceptAndWriteCommand(Commands.newWatchTopicListSuccess(j, j2, str, list));
    }

    @Override // org.apache.pulsar.broker.service.PulsarCommandSender
    public void sendWatchTopicListUpdate(long j, List<String> list, List<String> list2, String str) {
        interceptAndWriteCommand(Commands.newWatchTopicUpdate(j, list, list2, str));
    }

    private void interceptAndWriteCommand(BaseCommand baseCommand) {
        safeIntercept(baseCommand, this.cnx);
        writeAndFlush(Commands.serializeWithSize(baseCommand));
    }

    private void writeAndFlush(ByteBuf byteBuf) {
        NettyChannelUtil.writeAndFlushWithVoidPromise(this.cnx.ctx(), byteBuf);
    }

    private void safeIntercept(BaseCommand baseCommand, ServerCnx serverCnx) {
        if (this.interceptor != null) {
            try {
                this.interceptor.onPulsarCommand(baseCommand, serverCnx);
            } catch (Exception e) {
                log.error("Failed to execute command {} on broker interceptor.", baseCommand.getType(), e);
            }
        }
    }

    @Override // org.apache.pulsar.broker.service.PulsarCommandSender
    /* renamed from: sendMessagesToConsumer, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ Future mo276sendMessagesToConsumer(long j, String str, Subscription subscription, int i, List list, EntryBatchSizes entryBatchSizes, EntryBatchIndexesAcks entryBatchIndexesAcks, RedeliveryTracker redeliveryTracker, long j2) {
        return sendMessagesToConsumer(j, str, subscription, i, (List<? extends Entry>) list, entryBatchSizes, entryBatchIndexesAcks, redeliveryTracker, j2);
    }
}
