package org.apache.pulsar.broker.transaction.buffer.impl;

import io.netty.buffer.ByteBuf;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl.class */
public class TransactionBufferHandlerImpl implements TransactionBufferHandler {
    private final long operationTimeoutInMills;
    private final HashedWheelTimer timer;
    private final PulsarService pulsarService;
    private final PulsarClientImpl pulsarClient;
    private volatile int requestCredits;
    private static final Logger log = LoggerFactory.getLogger(TransactionBufferHandlerImpl.class);
    private static final AtomicIntegerFieldUpdater<TransactionBufferHandlerImpl> REQUEST_CREDITS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(TransactionBufferHandlerImpl.class, "requestCredits");
    private final AtomicLong requestIdGenerator = new AtomicLong();
    private final ConcurrentSkipListMap<Long, OpRequestSend> outstandingRequests = new ConcurrentSkipListMap<>();
    private final GrowableArrayBlockingQueue<OpRequestSend> pendingRequests = new GrowableArrayBlockingQueue<>();

    /* loaded from: input_file:org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferHandlerImpl$OpRequestSend.class */
    public static final class OpRequestSend {
        long requestId;
        String topic;
        ByteBuf cmd;
        CompletableFuture<TxnID> cb;
        long createdAt;
        CompletableFuture<ClientCnx> cnx;
        private final Recycler.Handle<OpRequestSend> recyclerHandle;
        private static final Recycler<OpRequestSend> RECYCLER = new Recycler<OpRequestSend>() { // from class: org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl.OpRequestSend.1
            protected OpRequestSend newObject(Recycler.Handle<OpRequestSend> handle) {
                return new OpRequestSend(handle);
            }

            /* renamed from: newObject, reason: collision with other method in class */
            protected /* bridge */ /* synthetic */ Object m532newObject(Recycler.Handle handle) {
                return newObject((Recycler.Handle<OpRequestSend>) handle);
            }
        };

        static OpRequestSend create(long j, String str, ByteBuf byteBuf, CompletableFuture<TxnID> completableFuture, CompletableFuture<ClientCnx> completableFuture2) {
            OpRequestSend opRequestSend = (OpRequestSend) RECYCLER.get();
            opRequestSend.requestId = j;
            opRequestSend.topic = str;
            opRequestSend.cmd = byteBuf;
            opRequestSend.cb = completableFuture;
            opRequestSend.createdAt = System.currentTimeMillis();
            opRequestSend.cnx = completableFuture2;
            return opRequestSend;
        }

        void recycle() {
            this.recyclerHandle.recycle(this);
        }

        private OpRequestSend(Recycler.Handle<OpRequestSend> handle) {
            this.recyclerHandle = handle;
        }
    }

    public TransactionBufferHandlerImpl(PulsarService pulsarService, HashedWheelTimer hashedWheelTimer, int i, long j) throws PulsarServerException {
        this.pulsarService = pulsarService;
        this.pulsarClient = pulsarService.getClient();
        this.operationTimeoutInMills = j;
        this.timer = hashedWheelTimer;
        this.requestCredits = Math.max(100, i);
    }

    public CompletableFuture<TxnID> endTxnOnTopic(String str, long j, long j2, TxnAction txnAction, long j3) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] endTxnOnTopic txnId: [{}], txnAction: [{}]", new Object[]{str, new TxnID(j, j2), Integer.valueOf(txnAction.getValue())});
        }
        CompletableFuture<TxnID> completableFuture = new CompletableFuture<>();
        long andIncrement = this.requestIdGenerator.getAndIncrement();
        OpRequestSend create = OpRequestSend.create(andIncrement, str, Commands.newEndTxnOnPartition(andIncrement, j2, j, str, txnAction, j3), completableFuture, getClientCnx(str));
        if (checkRequestCredits(create)) {
            endTxn(create);
        }
        return completableFuture;
    }

    public CompletableFuture<TxnID> endTxnOnSubscription(String str, String str2, long j, long j2, TxnAction txnAction, long j3) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] endTxnOnSubscription txnId: [{}], txnAction: [{}]", new Object[]{str, new TxnID(j, j2), Integer.valueOf(txnAction.getValue())});
        }
        CompletableFuture<TxnID> completableFuture = new CompletableFuture<>();
        long andIncrement = this.requestIdGenerator.getAndIncrement();
        OpRequestSend create = OpRequestSend.create(andIncrement, str, Commands.newEndTxnOnSubscription(andIncrement, j2, j, str, str2, txnAction, j3), completableFuture, getClientCnx(str));
        if (checkRequestCredits(create)) {
            endTxn(create);
        }
        return completableFuture;
    }

    private boolean checkRequestCredits(OpRequestSend opRequestSend) {
        int i = REQUEST_CREDITS_UPDATER.get(this);
        if (i <= 0 || this.pendingRequests.peek() != null) {
            this.pendingRequests.add(opRequestSend);
            return false;
        }
        if (REQUEST_CREDITS_UPDATER.compareAndSet(this, i, i - 1)) {
            return true;
        }
        return checkRequestCredits(opRequestSend);
    }

    public void endTxn(OpRequestSend opRequestSend) {
        opRequestSend.cnx.whenComplete((clientCnx, th) -> {
            if (th != null) {
                Throwable unwrapCompletionException = FutureUtil.unwrapCompletionException(th);
                log.error("endTxn error topic: [{}]", opRequestSend.topic, unwrapCompletionException);
                if (unwrapCompletionException instanceof PulsarClientException.BrokerMetadataException) {
                    opRequestSend.cb.complete(null);
                } else {
                    opRequestSend.cb.completeExceptionally(new PulsarClientException.LookupException(unwrapCompletionException.getMessage()));
                }
                onResponse(opRequestSend);
                return;
            }
            if (!clientCnx.ctx().channel().isActive()) {
                opRequestSend.cb.completeExceptionally(new PulsarClientException.LookupException(opRequestSend.topic + " endTxn channel is not active"));
                onResponse(opRequestSend);
                return;
            }
            clientCnx.registerTransactionBufferHandler(this);
            this.outstandingRequests.put(Long.valueOf(opRequestSend.requestId), opRequestSend);
            this.timer.newTimeout(timeout -> {
                OpRequestSend remove = this.outstandingRequests.remove(Long.valueOf(opRequestSend.requestId));
                if (remove == null || remove.cb.isDone() || remove.cb.isCompletedExceptionally()) {
                    return;
                }
                remove.cb.completeExceptionally(new TransactionBufferClientException.RequestTimeoutException());
                onResponse(remove);
            }, this.operationTimeoutInMills, TimeUnit.MILLISECONDS);
            opRequestSend.cmd.retain();
            clientCnx.ctx().writeAndFlush(opRequestSend.cmd, clientCnx.ctx().voidPromise());
        });
    }

    public void handleEndTxnOnTopicResponse(long j, CommandEndTxnOnPartitionResponse commandEndTxnOnPartitionResponse) {
        OpRequestSend remove = this.outstandingRequests.remove(Long.valueOf(j));
        if (remove == null) {
            if (log.isDebugEnabled()) {
                log.debug("Got end txn on topic response for timeout {} - {}", Long.valueOf(commandEndTxnOnPartitionResponse.getTxnidMostBits()), Long.valueOf(commandEndTxnOnPartitionResponse.getTxnidLeastBits()));
                return;
            }
            return;
        }
        try {
            if (commandEndTxnOnPartitionResponse.hasError()) {
                log.error("[{}] Got end txn on topic response for request {} error {}", new Object[]{remove.topic, Long.valueOf(commandEndTxnOnPartitionResponse.getRequestId()), commandEndTxnOnPartitionResponse.getError()});
                remove.cb.completeExceptionally(ClientCnx.getPulsarClientException(commandEndTxnOnPartitionResponse.getError(), commandEndTxnOnPartitionResponse.getMessage()));
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Got end txn on topic response for for request {}", remove.topic, Long.valueOf(commandEndTxnOnPartitionResponse.getRequestId()));
                }
                remove.cb.complete(new TxnID(commandEndTxnOnPartitionResponse.getTxnidMostBits(), commandEndTxnOnPartitionResponse.getTxnidLeastBits()));
            }
        } finally {
            onResponse(remove);
        }
    }

    public void handleEndTxnOnSubscriptionResponse(long j, CommandEndTxnOnSubscriptionResponse commandEndTxnOnSubscriptionResponse) {
        OpRequestSend remove = this.outstandingRequests.remove(Long.valueOf(j));
        if (remove == null) {
            if (log.isDebugEnabled()) {
                log.debug("Got end txn on subscription response for timeout {} - {}", Long.valueOf(commandEndTxnOnSubscriptionResponse.getTxnidMostBits()), Long.valueOf(commandEndTxnOnSubscriptionResponse.getTxnidLeastBits()));
                return;
            }
            return;
        }
        try {
            if (commandEndTxnOnSubscriptionResponse.hasError()) {
                log.error("[{}] Got end txn on subscription response for request {} error {}", new Object[]{remove.topic, Long.valueOf(commandEndTxnOnSubscriptionResponse.getRequestId()), commandEndTxnOnSubscriptionResponse.getError()});
                remove.cb.completeExceptionally(ClientCnx.getPulsarClientException(commandEndTxnOnSubscriptionResponse.getError(), commandEndTxnOnSubscriptionResponse.getMessage()));
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Got end txn on subscription response for for request {}", remove.topic, Long.valueOf(commandEndTxnOnSubscriptionResponse.getRequestId()));
                }
                remove.cb.complete(new TxnID(commandEndTxnOnSubscriptionResponse.getTxnidMostBits(), commandEndTxnOnSubscriptionResponse.getTxnidLeastBits()));
            }
        } finally {
            onResponse(remove);
        }
    }

    public void onResponse(OpRequestSend opRequestSend) {
        REQUEST_CREDITS_UPDATER.incrementAndGet(this);
        if (opRequestSend != null) {
            ReferenceCountUtil.safeRelease(opRequestSend.cmd);
            opRequestSend.recycle();
        }
        checkPendingRequests();
    }

    private void checkPendingRequests() {
        while (true) {
            int i = REQUEST_CREDITS_UPDATER.get(this);
            if (i <= 0 || this.pendingRequests.peek() == null) {
                return;
            }
            if (REQUEST_CREDITS_UPDATER.compareAndSet(this, i, i - 1)) {
                OpRequestSend opRequestSend = (OpRequestSend) this.pendingRequests.poll();
                if (opRequestSend != null) {
                    CompletableFuture<ClientCnx> clientCnx = getClientCnx(opRequestSend.topic);
                    if (opRequestSend.cnx != clientCnx) {
                        opRequestSend = OpRequestSend.create(opRequestSend.requestId, opRequestSend.topic, opRequestSend.cmd, opRequestSend.cb, clientCnx);
                        opRequestSend.recycle();
                    }
                    endTxn(opRequestSend);
                } else {
                    REQUEST_CREDITS_UPDATER.incrementAndGet(this);
                }
            }
        }
    }

    public CompletableFuture<ClientCnx> getClientCnxWithLookup(String str) {
        return this.pulsarClient.getConnection(str);
    }

    public CompletableFuture<ClientCnx> getClientCnx(String str) {
        NamespaceService namespaceService = this.pulsarService.getNamespaceService();
        return namespaceService.getBundleAsync(TopicName.get(str)).thenCompose(namespaceBundle -> {
            return namespaceService.getOwnerAsync(namespaceBundle);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) optional -> {
            if (!optional.isPresent()) {
                return getClientCnxWithLookup(str);
            }
            NamespaceEphemeralData namespaceEphemeralData = (NamespaceEphemeralData) optional.get();
            try {
                if (namespaceEphemeralData.isDisabled()) {
                    return getClientCnxWithLookup(str);
                }
                URI uri = this.pulsarClient.getConfiguration().isUseTls() ? new URI(namespaceEphemeralData.getNativeUrlTls()) : new URI(namespaceEphemeralData.getNativeUrl());
                InetSocketAddress createUnresolved = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
                return this.pulsarClient.getConnection(createUnresolved, createUnresolved);
            } catch (URISyntaxException e) {
                return getClientCnxWithLookup(str);
            }
        });
    }

    public void close() {
        this.timer.stop();
    }

    public int getAvailableRequestCredits() {
        return REQUEST_CREDITS_UPDATER.get(this);
    }

    public int getPendingRequestsCount() {
        return this.pendingRequests.size();
    }
}
