package org.apache.activemq.artemis.protocol.amqp.connect.mirror;

import java.util.List;
import java.util.function.BooleanSupplier;
import java.util.function.ToIntFunction;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.RunnableCallback;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.pools.MpscPool;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.jboss.logging.Logger;

/* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.class */
public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implements MirrorController {
    private static final Logger logger = Logger.getLogger(AMQPMirrorControllerTarget.class);
    private static final ThreadLocal<MirrorController> CONTROLLER_THREAD_LOCAL = new ThreadLocal<>();
    private final MpscPool<ACKMessageOperation> ackMessageMpscPool;
    final RoutingContextImpl routingContext;
    final BasicMirrorController<Receiver> basicController;
    final ActiveMQServer server;
    DuplicateIDCache lruduplicateIDCache;
    String lruDuplicateIDKey;
    private final ReferenceNodeStore referenceNodeStore;
    OperationContext mirrorContext;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$activemq$artemis$core$server$impl$AckReason = new int[AckReason.values().length];

        static {
            try {
                $SwitchMap$org$apache$activemq$artemis$core$server$impl$AckReason[AckReason.EXPIRED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget$ACKMessageOperation.class */
    public class ACKMessageOperation implements IOCallback, Runnable {
        Delivery delivery;
        public TransactionOperationAbstract tx = new TransactionOperationAbstract() { // from class: org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget.ACKMessageOperation.1
            public void afterCommit(Transaction transaction) {
                ACKMessageOperation.this.connectionRun();
            }
        };

        ACKMessageOperation() {
        }

        void reset() {
            this.delivery = null;
        }

        ACKMessageOperation setDelivery(Delivery delivery) {
            this.delivery = delivery;
            return this;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (AMQPMirrorControllerTarget.logger.isTraceEnabled()) {
                AMQPMirrorControllerTarget.logger.trace("Delivery settling for " + this.delivery + ", context=" + this.delivery.getContext());
            }
            this.delivery.disposition(Accepted.getInstance());
            AMQPMirrorControllerTarget.this.settle(this.delivery);
            AMQPMirrorControllerTarget.this.connection.flush();
            AMQPMirrorControllerTarget.this.ackMessageMpscPool.release(this);
        }

        public void done() {
            connectionRun();
        }

        public void connectionRun() {
            AMQPMirrorControllerTarget.this.connection.runNow(this);
        }

        public void onError(int i, String str) {
            AMQPMirrorControllerTarget.logger.warn(i + "-" + str);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget$PageAck.class */
    public class PageAck implements ToIntFunction<PagedReference>, BooleanSupplier, Runnable {
        final Queue targetQueue;
        final String nodeID;
        final long messageID;
        final IOCallback operation;

        PageAck(Queue queue, String str, long j, IOCallback iOCallback) {
            this.targetQueue = queue;
            this.nodeID = str;
            this.messageID = j;
            this.operation = iOCallback;
        }

        @Override // java.util.function.BooleanSupplier
        public boolean getAsBoolean() {
            try {
                AMQPMirrorControllerTarget.this.recoverContext();
                MessageReference removeWithSuppliedID = this.targetQueue.removeWithSuppliedID(this.nodeID, this.messageID, AMQPMirrorControllerTarget.this.referenceNodeStore);
                if (removeWithSuppliedID == null) {
                    return false;
                }
                this.targetQueue.acknowledge((Transaction) null, removeWithSuppliedID, AckReason.NORMAL, (ServerConsumer) null, false);
                OperationContextImpl.getContext().executeOnCompletion(this.operation);
                return true;
            } catch (Throwable th) {
                AMQPMirrorControllerTarget.logger.warn(th.getMessage(), th);
                return false;
            }
        }

        @Override // java.util.function.ToIntFunction
        public int applyAsInt(PagedReference pagedReference) {
            String serverID = AMQPMirrorControllerTarget.this.referenceNodeStore.getServerID(pagedReference);
            long id = AMQPMirrorControllerTarget.this.referenceNodeStore.getID(pagedReference);
            if (serverID == null) {
                serverID = AMQPMirrorControllerTarget.this.referenceNodeStore.getDefaultNodeID();
            }
            if (!serverID.equals(this.nodeID)) {
                return -1;
            }
            long j = id - this.messageID;
            if (j == 0) {
                return 0;
            }
            return j > 0 ? 1 : -1;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.operation.done();
        }
    }

    public static void setControllerInUse(MirrorController mirrorController) {
        CONTROLLER_THREAD_LOCAL.set(mirrorController);
    }

    public static MirrorController getControllerInUse() {
        return CONTROLLER_THREAD_LOCAL.get();
    }

    public AMQPMirrorControllerTarget(AMQPSessionCallback aMQPSessionCallback, AMQPConnectionContext aMQPConnectionContext, AMQPSessionContext aMQPSessionContext, Receiver receiver, ActiveMQServer activeMQServer) {
        super(aMQPSessionCallback, aMQPConnectionContext, aMQPSessionContext, receiver);
        this.ackMessageMpscPool = new MpscPool<>(this.amqpCredits, (v0) -> {
            v0.reset();
        }, () -> {
            return new ACKMessageOperation();
        });
        this.routingContext = new RoutingContextImpl((Transaction) null);
        this.basicController = new BasicMirrorController<>(activeMQServer);
        this.basicController.setLink(receiver);
        this.server = activeMQServer;
        this.referenceNodeStore = aMQPSessionCallback.getProtocolManager().getReferenceIDSupplier();
        this.mirrorContext = aMQPSessionContext.getSessionSPI().getSessionContext();
    }

    public String getRemoteMirrorId() {
        return this.basicController.getRemoteMirrorId();
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver
    public void flow() {
        this.creditRunnable.run();
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver
    protected void actualDelivery(AMQPMessage aMQPMessage, Delivery delivery, Receiver receiver, Transaction transaction) {
        recoverContext();
        incrementSettle();
        if (logger.isTraceEnabled()) {
            logger.trace(this.server + "::actualdelivery call for " + aMQPMessage);
        }
        setControllerInUse(this);
        delivery.setContext(aMQPMessage);
        ACKMessageOperation delivery2 = ((ACKMessageOperation) this.ackMessageMpscPool.borrow()).setDelivery(delivery);
        try {
            try {
                Object messageAnnotationProperty = AMQPMessageBrokerAccessor.getMessageAnnotationProperty(aMQPMessage, AMQPMirrorControllerSource.EVENT_TYPE);
                if (messageAnnotationProperty != null) {
                    if (messageAnnotationProperty.equals(AMQPMirrorControllerSource.ADD_ADDRESS)) {
                        addAddress(parseAddress(aMQPMessage));
                    } else if (messageAnnotationProperty.equals(AMQPMirrorControllerSource.DELETE_ADDRESS)) {
                        deleteAddress(parseAddress(aMQPMessage));
                    } else if (messageAnnotationProperty.equals(AMQPMirrorControllerSource.CREATE_QUEUE)) {
                        createQueue(parseQueue(aMQPMessage));
                    } else if (messageAnnotationProperty.equals(AMQPMirrorControllerSource.DELETE_QUEUE)) {
                        deleteQueue(SimpleString.toSimpleString((String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(aMQPMessage, AMQPMirrorControllerSource.ADDRESS)), SimpleString.toSimpleString((String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(aMQPMessage, AMQPMirrorControllerSource.QUEUE)));
                    } else if (messageAnnotationProperty.equals(AMQPMirrorControllerSource.POST_ACK)) {
                        String str = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(aMQPMessage, AMQPMirrorControllerSource.BROKER_ID);
                        AckReason messageAnnotationAckReason = AMQPMessageBrokerAccessor.getMessageAnnotationAckReason(aMQPMessage);
                        if (str == null) {
                            str = getRemoteMirrorId();
                        }
                        if (postAcknowledge((String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(aMQPMessage, AMQPMirrorControllerSource.QUEUE), str, ((Long) aMQPMessage.getBody().getValue()).longValue(), delivery2, messageAnnotationAckReason)) {
                            delivery2 = null;
                        }
                    }
                } else if (sendMessage(aMQPMessage, delivery2)) {
                    delivery2 = null;
                }
                setControllerInUse(null);
                if (delivery2 != null) {
                    this.server.getStorageManager().afterCompleteOperations(delivery2);
                }
            } catch (Throwable th) {
                logger.warn(th.getMessage(), th);
                setControllerInUse(null);
                if (delivery2 != null) {
                    this.server.getStorageManager().afterCompleteOperations(delivery2);
                }
            }
        } catch (Throwable th2) {
            setControllerInUse(null);
            if (delivery2 != null) {
                this.server.getStorageManager().afterCompleteOperations(delivery2);
            }
            throw th2;
        }
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable
    public void initialize() throws Exception {
        super.initialize();
        this.receiver.setSenderSettleMode(this.receiver.getRemoteSenderSettleMode());
        this.receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
        flow();
    }

    private QueueConfiguration parseQueue(AMQPMessage aMQPMessage) {
        return QueueConfiguration.fromJSON((String) aMQPMessage.getBody().getValue());
    }

    private AddressInfo parseAddress(AMQPMessage aMQPMessage) {
        return AddressInfo.fromJSON((String) aMQPMessage.getBody().getValue());
    }

    public void addAddress(AddressInfo addressInfo) throws Exception {
        if (logger.isDebugEnabled()) {
            logger.debug(this.server + " adding address " + addressInfo);
        }
        this.server.addAddressInfo(addressInfo);
    }

    public void deleteAddress(AddressInfo addressInfo) throws Exception {
        if (logger.isDebugEnabled()) {
            logger.debug(this.server + " delete address " + addressInfo);
        }
        try {
            this.server.removeAddressInfo(addressInfo.getName(), (SecurityAuth) null, true);
        } catch (ActiveMQAddressDoesNotExistException e) {
            logger.debug(e.getMessage(), e);
        } catch (Exception e2) {
            logger.warn(e2.getMessage(), e2);
        }
    }

    public void createQueue(QueueConfiguration queueConfiguration) throws Exception {
        if (logger.isDebugEnabled()) {
            logger.debug(this.server + " adding queue " + queueConfiguration);
        }
        try {
            this.server.createQueue(queueConfiguration, true);
        } catch (Exception e) {
            logger.debug("Queue could not be created, already existed " + queueConfiguration, e);
        }
    }

    public void deleteQueue(SimpleString simpleString, SimpleString simpleString2) throws Exception {
        if (logger.isDebugEnabled()) {
            logger.debug(this.server + " destroy queue " + simpleString2 + " on address = " + simpleString + " server " + this.server.getIdentity());
        }
        try {
            this.server.destroyQueue(simpleString2, (SecurityAuth) null, false, true, false, false);
        } catch (ActiveMQNonExistentQueueException e) {
            logger.debug(this.server + " queue " + simpleString2 + " was previously removed", e);
        }
    }

    public boolean postAcknowledge(String str, String str2, long j, ACKMessageOperation aCKMessageOperation, AckReason ackReason) throws Exception {
        Queue locateQueue = this.server.locateQueue(str);
        if (locateQueue == null) {
            Logger logger2 = logger;
            logger2.warn("Queue " + str + " not found on mirror target, ignoring ack for queue=" + str + ", messageID=" + j + ", nodeID=" + logger2);
            return false;
        }
        if (logger.isDebugEnabled() && locateQueue.getConsumerCount() > 0) {
            logger.debug("server " + this.server.getIdentity() + ", queue " + locateQueue.getName() + " has consumers while delivering ack for " + j);
        }
        if (logger.isTraceEnabled()) {
            Logger logger3 = logger;
            logger3.trace("Server " + this.server.getIdentity() + " with queue = " + str + " being acked for " + j + " coming from " + logger3 + " targetQueue = " + j);
        }
        performAck(str2, j, locateQueue, aCKMessageOperation, ackReason, (short) 0);
        return true;
    }

    public void performAckOnPage(String str, long j, Queue queue, IOCallback iOCallback) {
        PageAck pageAck = new PageAck(queue, str, j, iOCallback);
        queue.getPageSubscription().scanAck(pageAck, pageAck, pageAck, pageAck);
    }

    private void performAck(String str, long j, Queue queue, ACKMessageOperation aCKMessageOperation, AckReason ackReason, short s) {
        if (logger.isTraceEnabled()) {
            Logger logger2 = logger;
            queue.getName();
            logger2.trace("performAck (nodeID=" + str + ", messageID=" + j + "), targetQueue=" + logger2);
        }
        MessageReference removeWithSuppliedID = queue.removeWithSuppliedID(str, j, this.referenceNodeStore);
        if (removeWithSuppliedID == null) {
            if (logger.isDebugEnabled()) {
                Logger logger3 = logger;
                logger3.debug("Retrying Reference not found on messageID=" + j + " nodeID=" + logger3 + ", currentRetry=" + str);
            }
            switch (s) {
                case 0:
                    this.sessionSPI.getSessionContext().executeOnCompletion(new RunnableCallback(() -> {
                        performAck(str, j, queue, aCKMessageOperation, ackReason, (short) 1);
                    }));
                    return;
                case 1:
                    queue.flushOnIntermediate(() -> {
                        recoverContext();
                        performAck(str, j, queue, aCKMessageOperation, ackReason, (short) 2);
                    });
                    return;
                case 2:
                    if (ackReason == AckReason.EXPIRED) {
                        aCKMessageOperation.run();
                        break;
                    } else {
                        performAckOnPage(str, j, queue, aCKMessageOperation);
                        return;
                    }
            }
        }
        if (removeWithSuppliedID != null) {
            if (logger.isTraceEnabled()) {
                Logger logger4 = logger;
                logger4.trace("Post ack Server " + this.server + " worked well for messageID=" + j + " nodeID=" + logger4);
            }
            try {
                switch (AnonymousClass1.$SwitchMap$org$apache$activemq$artemis$core$server$impl$AckReason[ackReason.ordinal()]) {
                    case 1:
                        queue.expire(removeWithSuppliedID, (ServerConsumer) null, false);
                        break;
                    default:
                        queue.acknowledge((Transaction) null, removeWithSuppliedID, ackReason, (ServerConsumer) null, false);
                        break;
                }
                OperationContextImpl.getContext().executeOnCompletion(aCKMessageOperation);
            } catch (Exception e) {
                logger.warn(e.getMessage(), e);
            }
        }
    }

    private boolean sendMessage(AMQPMessage aMQPMessage, ACKMessageOperation aCKMessageOperation) throws Exception {
        DuplicateIDCache duplicateIDCache;
        if (aMQPMessage.getMessageID() <= 0) {
            aMQPMessage.setMessageID(this.server.getStorageManager().generateID());
        }
        String str = (String) AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(aMQPMessage, AMQPMirrorControllerSource.BROKER_ID);
        if (str == null) {
            str = getRemoteMirrorId();
        }
        Long l = (Long) AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(aMQPMessage, AMQPMirrorControllerSource.INTERNAL_ID);
        String str2 = (String) AMQPMessageBrokerAccessor.getDeliveryAnnotationProperty(aMQPMessage, AMQPMirrorControllerSource.INTERNAL_DESTINATION);
        long j = 0;
        if (l != null) {
            j = l.longValue();
        }
        if (logger.isTraceEnabled()) {
            logger.trace("sendMessage on server " + this.server + " for message " + aMQPMessage + " with internalID = " + l + " mirror id " + str);
        }
        this.routingContext.setDuplicateDetection(false);
        if (this.lruDuplicateIDKey == null || !this.lruDuplicateIDKey.equals(str)) {
            if (logger.isDebugEnabled()) {
                logger.trace("Setting up duplicate detection cache on $ACTIVEMQ_ARTEMIS_MIRROR, ServerID=" + str + " with " + this.connection.getAmqpCredits() + " elements, being the number of credits");
            }
            this.lruDuplicateIDKey = str;
            this.lruduplicateIDCache = this.server.getPostOffice().getDuplicateIDCache(SimpleString.toSimpleString("$ACTIVEMQ_ARTEMIS_MIRROR_" + str), this.connection.getAmqpCredits());
            duplicateIDCache = this.lruduplicateIDCache;
        } else {
            duplicateIDCache = this.lruduplicateIDCache;
        }
        byte[] longToBytes = ByteUtil.longToBytes(l.longValue());
        if (duplicateIDCache.contains(longToBytes)) {
            flow();
            return false;
        }
        aMQPMessage.setBrokerProperty(AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY, Long.valueOf(j));
        aMQPMessage.setBrokerProperty(AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY, str);
        if (str2 != null) {
            aMQPMessage.m5setAddress(str2);
        }
        MirrorTransaction mirrorTransaction = new MirrorTransaction(this.server.getStorageManager());
        mirrorTransaction.addOperation(aCKMessageOperation.tx);
        this.routingContext.setTransaction(mirrorTransaction);
        duplicateIDCache.addToCache(longToBytes, mirrorTransaction);
        this.routingContext.clear().setMirrorSource(this).setLoadBalancingType(MessageLoadBalancingType.OFF);
        this.server.getPostOffice().route(aMQPMessage, this.routingContext, false);
        mirrorTransaction.commit();
        flow();
        return true;
    }

    public void postAcknowledge(MessageReference messageReference, AckReason ackReason) {
    }

    public void sendMessage(Message message, RoutingContext routingContext, List<MessageReference> list) {
    }
}
