package org.springframework.integration.store;

import java.util.AbstractQueue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

/* loaded from: input_file:WEB-INF/lib/spring-integration-core-4.3.14.RELEASE.jar:org/springframework/integration/store/MessageGroupQueue.class */
public class MessageGroupQueue extends AbstractQueue<Message<?>> implements BlockingQueue<Message<?>> {
    private final Log logger;
    private static final int DEFAULT_CAPACITY = Integer.MAX_VALUE;
    private final BasicMessageGroupStore messageGroupStore;
    private final Object groupId;
    private final int capacity;
    private final Lock storeLock;
    private final Condition messageStoreNotFull;
    private final Condition messageStoreNotEmpty;

    public MessageGroupQueue(BasicMessageGroupStore basicMessageGroupStore, Object obj) {
        this(basicMessageGroupStore, obj, Integer.MAX_VALUE, new ReentrantLock(true));
    }

    public MessageGroupQueue(BasicMessageGroupStore basicMessageGroupStore, Object obj, int i) {
        this(basicMessageGroupStore, obj, i, new ReentrantLock(true));
    }

    public MessageGroupQueue(BasicMessageGroupStore basicMessageGroupStore, Object obj, Lock lock) {
        this(basicMessageGroupStore, obj, Integer.MAX_VALUE, lock);
    }

    public MessageGroupQueue(BasicMessageGroupStore basicMessageGroupStore, Object obj, int i, Lock lock) {
        this.logger = LogFactory.getLog(getClass());
        Assert.isTrue(i > 0, "'capacity' must be greater than 0");
        Assert.notNull(lock, "'storeLock' must not be null");
        Assert.notNull(basicMessageGroupStore, "'messageGroupStore' must not be null");
        Assert.notNull(obj, "'groupId' must not be null");
        this.storeLock = lock;
        this.messageStoreNotFull = this.storeLock.newCondition();
        this.messageStoreNotEmpty = this.storeLock.newCondition();
        this.messageGroupStore = basicMessageGroupStore;
        this.groupId = obj;
        this.capacity = i;
        if (!this.logger.isWarnEnabled() || (basicMessageGroupStore instanceof ChannelMessageStore)) {
            return;
        }
        this.logger.warn(basicMessageGroupStore.getClass().getSimpleName() + " is not optimized for use in a 'MessageGroupQueue'; consider using a `ChannelMessageStore'");
    }

    public void setPriority(boolean z) {
        if (z) {
            Assert.isInstanceOf(PriorityCapableChannelMessageStore.class, this.messageGroupStore);
            Assert.isTrue(((PriorityCapableChannelMessageStore) this.messageGroupStore).isPriorityEnabled(), "When using priority, the 'PriorityCapableChannelMessageStore' must have priority enabled.");
        } else if (this.logger.isWarnEnabled() && (this.messageGroupStore instanceof PriorityCapableChannelMessageStore) && ((PriorityCapableChannelMessageStore) this.messageGroupStore).isPriorityEnabled()) {
            this.logger.warn("It's not recommended to use a priority-based message store when declaring a non-priority 'MessageGroupQueue'; message retrieval may not be FIFO; set 'priority' to 'true' if that is your intent. If you are using the namespace to define a channel, use '<priority-queue message-store.../> instead.");
        }
    }

    @Override // java.util.AbstractCollection, java.util.Collection, java.lang.Iterable
    public Iterator<Message<?>> iterator() {
        return getMessages().iterator();
    }

    @Override // java.util.AbstractCollection, java.util.Collection
    public int size() {
        return this.messageGroupStore.messageGroupSize(this.groupId);
    }

    /* JADX WARN: Finally extract failed */
    @Override // java.util.Queue
    public Message<?> peek() {
        Message<?> message = null;
        Lock lock = this.storeLock;
        try {
            lock.lockInterruptibly();
            try {
                Collection<Message<?>> messages = getMessages();
                if (!messages.isEmpty()) {
                    message = messages.iterator().next();
                }
                lock.unlock();
            } catch (Throwable th) {
                lock.unlock();
                throw th;
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return message;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.BlockingQueue
    public Message<?> poll(long j, TimeUnit timeUnit) throws InterruptedException {
        long nanos = timeUnit.toNanos(j);
        Lock lock = this.storeLock;
        lock.lockInterruptibly();
        while (size() == 0 && nanos > 0) {
            try {
                nanos = this.messageStoreNotEmpty.awaitNanos(nanos);
            } catch (Throwable th) {
                lock.unlock();
                throw th;
            }
        }
        Message<?> doPoll = doPoll();
        lock.unlock();
        return doPoll;
    }

    @Override // java.util.Queue
    public Message<?> poll() {
        Message<?> message = null;
        Lock lock = this.storeLock;
        try {
            lock.lockInterruptibly();
            try {
                message = doPoll();
                lock.unlock();
            } catch (Throwable th) {
                lock.unlock();
                throw th;
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return message;
    }

    @Override // java.util.concurrent.BlockingQueue
    public int drainTo(Collection<? super Message<?>> collection) {
        return drainTo(collection, Integer.MAX_VALUE);
    }

    @Override // java.util.concurrent.BlockingQueue
    public int drainTo(Collection<? super Message<?>> collection, int i) {
        Assert.notNull(collection, "'collection' must not be null");
        int size = collection.size();
        ArrayList arrayList = new ArrayList();
        Lock lock = this.storeLock;
        try {
            lock.lockInterruptibly();
            try {
                Message<?> pollMessageFromGroup = this.messageGroupStore.pollMessageFromGroup(this.groupId);
                for (int i2 = 0; i2 < i && pollMessageFromGroup != null; i2++) {
                    arrayList.add(pollMessageFromGroup);
                    pollMessageFromGroup = this.messageGroupStore.pollMessageFromGroup(this.groupId);
                }
                this.messageStoreNotFull.signal();
                lock.unlock();
            } catch (Throwable th) {
                lock.unlock();
                throw th;
            }
        } catch (InterruptedException e) {
            this.logger.warn("Queue may not have drained completely since this operation was interrupted", e);
            Thread.currentThread().interrupt();
        }
        collection.addAll(arrayList);
        return collection.size() - size;
    }

    @Override // java.util.Queue, java.util.concurrent.BlockingQueue
    public boolean offer(Message<?> message) {
        boolean z = true;
        Lock lock = this.storeLock;
        try {
            lock.lockInterruptibly();
            try {
                z = doOffer(message);
                lock.unlock();
            } catch (Throwable th) {
                lock.unlock();
                throw th;
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return z;
    }

    @Override // java.util.concurrent.BlockingQueue
    public boolean offer(Message<?> message, long j, TimeUnit timeUnit) throws InterruptedException {
        long nanos = timeUnit.toNanos(j);
        boolean z = false;
        Lock lock = this.storeLock;
        lock.lockInterruptibly();
        try {
            if (this.capacity != Integer.MAX_VALUE) {
                while (size() == this.capacity && nanos > 0) {
                    nanos = this.messageStoreNotFull.awaitNanos(nanos);
                }
            }
            if (nanos > 0) {
                z = doOffer(message);
            }
            return z;
        } finally {
            lock.unlock();
        }
    }

    @Override // java.util.concurrent.BlockingQueue
    public void put(Message<?> message) throws InterruptedException {
        Lock lock = this.storeLock;
        lock.lockInterruptibly();
        try {
            if (this.capacity != Integer.MAX_VALUE) {
                while (size() == this.capacity) {
                    this.messageStoreNotFull.await();
                }
            }
            doOffer(message);
        } finally {
            lock.unlock();
        }
    }

    @Override // java.util.concurrent.BlockingQueue
    public int remainingCapacity() {
        if (this.capacity == Integer.MAX_VALUE) {
            return Integer.MAX_VALUE;
        }
        return this.capacity - size();
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.BlockingQueue
    public Message<?> take() throws InterruptedException {
        Lock lock = this.storeLock;
        lock.lockInterruptibly();
        while (size() == 0) {
            try {
                this.messageStoreNotEmpty.await();
            } finally {
                lock.unlock();
            }
        }
        return doPoll();
    }

    private Collection<Message<?>> getMessages() {
        return this.messageGroupStore.getMessageGroup(this.groupId).getMessages();
    }

    private Message<?> doPoll() {
        Message<?> pollMessageFromGroup = this.messageGroupStore.pollMessageFromGroup(this.groupId);
        this.messageStoreNotFull.signal();
        return pollMessageFromGroup;
    }

    private boolean doOffer(Message<?> message) {
        boolean z = false;
        if (this.capacity == Integer.MAX_VALUE || size() < this.capacity) {
            this.messageGroupStore.addMessageToGroup(this.groupId, message);
            z = true;
            this.messageStoreNotEmpty.signal();
        }
        return z;
    }
}
