package org.apache.qpid.server.protocol.v1_0;

import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
import org.apache.qpid.server.protocol.v1_0.type.transaction.Declare;
import org.apache.qpid.server.protocol.v1_0.type.transaction.Declared;
import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge;
import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.class */
public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint<Coordinator> {
    private static final Logger LOGGER = LoggerFactory.getLogger(TxnCoordinatorReceivingLinkEndpoint.class);
    private final Map<Integer, ServerTransaction> _createdTransactions;

    public TxnCoordinatorReceivingLinkEndpoint(Session_1_0 session_1_0, Link_1_0<Source, Coordinator> link_1_0) {
        super(session_1_0, link_1_0);
        this._createdTransactions = new ConcurrentHashMap();
    }

    @Override // org.apache.qpid.server.protocol.v1_0.LinkEndpoint
    public void start() {
        setLinkCredit(UnsignedInteger.ONE);
        setCreditWindow();
    }

    @Override // org.apache.qpid.server.protocol.v1_0.AbstractReceivingLinkEndpoint
    protected Error receiveDelivery(Delivery delivery) {
        DeliveryState deliveryState;
        try {
            QpidByteBuffer payload = delivery.getPayload();
            try {
                boolean z = false;
                Iterator<EncodingRetainingSection<?>> it = getSectionDecoder().parseAll(payload).iterator();
                while (it.hasNext()) {
                    EncodingRetainingSection<?> next = it.next();
                    try {
                        if (next instanceof AmqpValueSection) {
                            if (z) {
                                throw new ConnectionScopedRuntimeException("Received more than one AmqpValue sections");
                            }
                            z = true;
                            Object value = next.getValue();
                            AMQPConnection_1_0<?> connection = getSession().getConnection();
                            connection.receivedComplete();
                            if (!(value instanceof Declare)) {
                                if (!(value instanceof Discharge)) {
                                    throw new ConnectionScopedRuntimeException(String.format("Received unknown command '%s'", value.getClass().getSimpleName()));
                                }
                                Discharge discharge = (Discharge) value;
                                Error discharge2 = discharge(discharge.getTxnId(), Boolean.TRUE.equals(discharge.getFail()));
                                if (discharge2 == null) {
                                    deliveryState = new Accepted();
                                } else if (Arrays.asList(((Source) getSource()).getOutcomes()).contains(Rejected.REJECTED_SYMBOL)) {
                                    Rejected rejected = new Rejected();
                                    rejected.setError(discharge2);
                                    deliveryState = rejected;
                                    discharge2 = null;
                                } else {
                                    deliveryState = null;
                                }
                                if (discharge2 == null) {
                                    updateDisposition(delivery.getDeliveryTag(), deliveryState, true);
                                }
                                Error error = discharge2;
                                if (payload != null) {
                                    payload.close();
                                }
                                return error;
                            }
                            IdentifiedTransaction createIdentifiedTransaction = connection.createIdentifiedTransaction();
                            this._createdTransactions.put(Integer.valueOf(createIdentifiedTransaction.getId()), createIdentifiedTransaction.getServerTransaction());
                            connection.registerTransactionTickers(createIdentifiedTransaction.getServerTransaction(), this::doTimeoutAction, ((Long) getSession().getContextValue(Long.class, "qpid.session.transactionTimeoutNotificationRepeatPeriod")).longValue());
                            Declared declared = new Declared();
                            declared.setTxnId(Session_1_0.integerToTransactionId(createIdentifiedTransaction.getId()));
                            updateDisposition(delivery.getDeliveryTag(), declared, true);
                        }
                        next.dispose();
                    } finally {
                        next.dispose();
                    }
                }
                if (!z) {
                    throw new ConnectionScopedRuntimeException("Received no AmqpValue section");
                }
                if (payload != null) {
                    payload.close();
                }
                return null;
            } finally {
            }
        } catch (AmqpErrorException e) {
            return e.getError();
        }
    }

    private Error discharge(Binary binary, boolean z) {
        Error error = null;
        Integer num = null;
        ServerTransaction serverTransaction = null;
        try {
            num = Session_1_0.transactionIdToInteger(binary);
            serverTransaction = this._createdTransactions.get(num);
        } catch (IllegalArgumentException | UnknownTransactionException e) {
        }
        if (serverTransaction != null) {
            AMQPConnection_1_0<?> connection = getSession().getConnection();
            if (z) {
                serverTransaction.rollback();
                connection.incrementTransactionRollbackCounter();
            } else if ((serverTransaction instanceof LocalTransaction) && ((LocalTransaction) serverTransaction).isRollbackOnly()) {
                error = forceRollback(serverTransaction, connection);
            } else {
                try {
                    serverTransaction.commit();
                } catch (Exception e2) {
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Transaction {} commit failed", num, e2);
                    } else {
                        LOGGER.info("Transaction {} commit failed: {}", num, e2.getMessage());
                    }
                    error = forceRollback(serverTransaction, connection);
                } catch (ServerScopedRuntimeException e3) {
                    throw e3;
                }
            }
            this._createdTransactions.remove(num);
            connection.unregisterTransactionTickers(serverTransaction);
            connection.removeTransaction(num.intValue());
            connection.decrementTransactionOpenCounter();
        } else {
            error = new Error();
            error.setCondition(TransactionError.UNKNOWN_ID);
            error.setDescription("Unknown transactionId " + binary.toString());
        }
        return error;
    }

    private Error forceRollback(ServerTransaction serverTransaction, AMQPConnection_1_0<?> aMQPConnection_1_0) {
        serverTransaction.rollback();
        aMQPConnection_1_0.incrementTransactionRollbackCounter();
        Error error = new Error();
        error.setCondition(TransactionError.TRANSACTION_ROLLBACK);
        error.setDescription("The transaction was rolled back due to an earlier issue (e.g. a published message was sent settled but could not be enqueued)");
        return error;
    }

    @Override // org.apache.qpid.server.protocol.v1_0.AbstractLinkEndpoint
    protected void remoteDetachedPerformDetach(Detach detach) {
        rollbackOpenTransactions();
        close();
    }

    @Override // org.apache.qpid.server.protocol.v1_0.AbstractLinkEndpoint
    protected Map<Binary, DeliveryState> getLocalUnsettled() {
        return Collections.emptyMap();
    }

    @Override // org.apache.qpid.server.protocol.v1_0.AbstractLinkEndpoint
    protected void reattachLink(Attach attach) throws AmqpErrorException {
        throw new AmqpErrorException(new Error(AmqpError.NOT_IMPLEMENTED, "Cannot reattach a Coordinator Link."));
    }

    @Override // org.apache.qpid.server.protocol.v1_0.AbstractLinkEndpoint
    protected void resumeLink(Attach attach) throws AmqpErrorException {
        throw new AmqpErrorException(new Error(AmqpError.NOT_IMPLEMENTED, "Cannot resume a Coordinator Link."));
    }

    @Override // org.apache.qpid.server.protocol.v1_0.AbstractLinkEndpoint
    protected void establishLink(Attach attach) throws AmqpErrorException {
        if (getSource() != null || getTarget() != 0) {
            throw new IllegalStateException("LinkEndpoint and Termini should be null when establishing a Link.");
        }
        Coordinator coordinator = new Coordinator();
        getLink().setTermini((Source) attach.getSource(), coordinator);
        attachReceived(attach);
    }

    @Override // org.apache.qpid.server.protocol.v1_0.AbstractLinkEndpoint
    protected void recoverLink(Attach attach) throws AmqpErrorException {
        throw new AmqpErrorException(new Error(AmqpError.NOT_IMPLEMENTED, "Cannot recover a Coordinator Link."));
    }

    @Override // org.apache.qpid.server.protocol.v1_0.AbstractLinkEndpoint
    public void attachReceived(Attach attach) throws AmqpErrorException {
        super.attachReceived(attach);
        setDeliveryCount(new SequenceNumber(attach.getInitialDeliveryCount().intValue()));
    }

    @Override // org.apache.qpid.server.protocol.v1_0.LinkEndpoint
    public void receiveComplete() {
    }

    private void doTimeoutAction(String str) {
        rollbackOpenTransactions();
        getSession().getConnection().close(new Error(TransactionError.TRANSACTION_TIMEOUT, str));
    }

    private void rollbackOpenTransactions() {
        for (Map.Entry<Integer, ServerTransaction> entry : this._createdTransactions.entrySet()) {
            entry.getValue().rollback();
            AMQPConnection_1_0<?> connection = getSession().getConnection();
            connection.decrementTransactionOpenCounter();
            connection.incrementTransactionRollbackCounter();
            connection.removeTransaction(entry.getKey().intValue());
        }
        this._createdTransactions.clear();
    }
}
