package com.linkedin.alpini.base.concurrency;

import com.linkedin.alpini.base.misc.Time;
import java.util.AbstractQueue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"unit"})
/* loaded from: input_file:com/linkedin/alpini/base/concurrency/TestBlockingQueues.class */
public class TestBlockingQueues {
    private ExecutorService threadPool;

    /* renamed from: com.linkedin.alpini.base.concurrency.TestBlockingQueues$1Pred, reason: invalid class name */
    /* loaded from: input_file:com/linkedin/alpini/base/concurrency/TestBlockingQueues$1Pred.class */
    class C1Pred implements Predicate<Object> {
        int count = 0;
        final /* synthetic */ Object[] val$arr;

        C1Pred(Object[] objArr) {
            this.val$arr = objArr;
        }

        @Override // java.util.function.Predicate
        public boolean test(Object obj) {
            Assert.assertNotNull(obj);
            this.count++;
            return obj == this.val$arr[50];
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/alpini/base/concurrency/TestBlockingQueues$Interrupter.class */
    public static class Interrupter implements Runnable {
        final Thread threadToInterrupt;

        Interrupter(Thread thread) {
            this.threadToInterrupt = thread;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    Time.sleep(100L);
                    this.threadToInterrupt.interrupt();
                } catch (InterruptedException e) {
                    throw new AssertionError();
                }
            } catch (Throwable th) {
                this.threadToInterrupt.interrupt();
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/alpini/base/concurrency/TestBlockingQueues$Producer.class */
    public static class Producer implements Callable<Void> {
        final BlockingQueue<Object> q;
        final int elements;
        final CountDownLatch doneProducing = new CountDownLatch(1);

        Producer(BlockingQueue<Object> blockingQueue, int i) {
            this.q = blockingQueue;
            this.elements = i;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws InterruptedException {
            for (int i = 0; i < this.elements; i++) {
                try {
                    this.q.put(new Object());
                } finally {
                    this.doneProducing.countDown();
                }
            }
            return null;
        }
    }

    @DataProvider
    public Object[][] blockingQueues() {
        return (Object[][]) Stream.of((Object[]) new AbstractQueue[]{new LinkedBlockingQueue(), new ConcurrentLinkedBlockingQueue()}).map(abstractQueue -> {
            return new Object[]{abstractQueue, abstractQueue.getClass().getSimpleName()};
        }).toArray(i -> {
            return new Object[i];
        });
    }

    @BeforeMethod
    public void setUp() {
        this.threadPool = Executors.newCachedThreadPool();
    }

    @AfterMethod(alwaysRun = true)
    public void tearDown() throws InterruptedException {
        if (this.threadPool == null) {
            return;
        }
        this.threadPool.shutdown();
        AssertJUnit.assertTrue("Some worker didn't finish in time", this.threadPool.awaitTermination(1L, TimeUnit.SECONDS));
        this.threadPool = null;
    }

    private static <T> int drain(BlockingQueue blockingQueue, Collection collection, int i, long j, TimeUnit timeUnit, boolean z) throws InterruptedException {
        return z ? drain(blockingQueue, collection, i, j, timeUnit) : drainUninterruptibly(blockingQueue, collection, i, j, timeUnit);
    }

    public static <E> int drain(BlockingQueue<E> blockingQueue, Collection<? super E> collection, int i, long j, TimeUnit timeUnit) throws InterruptedException {
        Assert.assertNotNull(collection);
        long nanoTime = Time.nanoTime() + timeUnit.toNanos(j);
        int i2 = 0;
        while (i2 < i) {
            i2 += blockingQueue.drainTo(collection, i - i2);
            if (i2 < i) {
                E poll = blockingQueue.poll(nanoTime - Time.nanoTime(), TimeUnit.NANOSECONDS);
                if (poll == null) {
                    break;
                }
                collection.add(poll);
                i2++;
            }
        }
        return i2;
    }

    public static <E> int drainUninterruptibly(BlockingQueue<E> blockingQueue, Collection<? super E> collection, int i, long j, TimeUnit timeUnit) {
        E poll;
        Assert.assertNotNull(collection);
        long nanoTime = Time.nanoTime() + timeUnit.toNanos(j);
        int i2 = 0;
        boolean z = false;
        while (i2 < i) {
            try {
                i2 += blockingQueue.drainTo(collection, i - i2);
                if (i2 < i) {
                    while (true) {
                        try {
                            poll = blockingQueue.poll(nanoTime - Time.nanoTime(), TimeUnit.NANOSECONDS);
                            break;
                        } catch (InterruptedException e) {
                            z = true;
                        }
                    }
                    if (poll == null) {
                        break;
                    }
                    collection.add(poll);
                    i2++;
                }
            } finally {
                if (z) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        return i2;
    }

    private static <E> List<E> newArrayList() {
        return new ArrayList();
    }

    @Test(dataProvider = "blockingQueues")
    public void testMultipleProducers(BlockingQueue<Object> blockingQueue, String str) throws InterruptedException {
        for (boolean z : new boolean[]{true, false}) {
            this.threadPool.submit(new Producer(blockingQueue, 20));
            this.threadPool.submit(new Producer(blockingQueue, 20));
            this.threadPool.submit(new Producer(blockingQueue, 20));
            this.threadPool.submit(new Producer(blockingQueue, 20));
            this.threadPool.submit(new Producer(blockingQueue, 20));
            List newArrayList = newArrayList();
            AssertJUnit.assertEquals(100, drain(blockingQueue, newArrayList, 100, Long.MAX_VALUE, TimeUnit.NANOSECONDS, z));
            AssertJUnit.assertEquals(100, newArrayList.size());
            assertDrained(blockingQueue);
        }
    }

    @Test(dataProvider = "blockingQueues")
    public void testMultipleProducersIterator(BlockingQueue<Object> blockingQueue, String str) throws InterruptedException, TimeoutException, ExecutionException {
        for (boolean z : new boolean[]{true, false}) {
            Iterator it = Arrays.asList(this.threadPool.submit(new Producer(blockingQueue, 20)), this.threadPool.submit(new Producer(blockingQueue, 20)), this.threadPool.submit(new Producer(blockingQueue, 20)), this.threadPool.submit(new Producer(blockingQueue, 20)), this.threadPool.submit(new Producer(blockingQueue, 20))).iterator();
            while (it.hasNext()) {
                ((Future) it.next()).get(1L, TimeUnit.SECONDS);
            }
            Assert.assertFalse(blockingQueue.isEmpty());
            Assert.assertEquals(100, blockingQueue.size());
            Object[] objArr = new Object[100];
            int i = 0;
            Iterator it2 = blockingQueue.iterator();
            while (it2.hasNext()) {
                int i2 = i;
                i++;
                Object next = it2.next();
                objArr[i2] = next;
                Assert.assertNotNull(next);
            }
            Assert.assertEquals(100, i);
            Objects.requireNonNull(it2);
            Assert.assertThrows(NoSuchElementException.class, it2::next);
            C1Pred c1Pred = new C1Pred(objArr);
            Assert.assertTrue(blockingQueue.removeIf(c1Pred));
            Assert.assertEquals(c1Pred.count, 100);
            Assert.assertEquals(blockingQueue.size(), 99);
            Assert.assertTrue(blockingQueue.remove(objArr[75]));
            Assert.assertEquals(blockingQueue.size(), 98);
            Assert.assertTrue(blockingQueue.contains(objArr[25]));
            Assert.assertFalse(blockingQueue.contains(objArr[75]));
            List newArrayList = newArrayList();
            AssertJUnit.assertEquals(98, drain(blockingQueue, newArrayList, 98, Long.MAX_VALUE, TimeUnit.NANOSECONDS, z));
            AssertJUnit.assertEquals(98, newArrayList.size());
            assertDrained(blockingQueue);
            Assert.assertFalse(newArrayList.contains(objArr[50]));
            Assert.assertFalse(newArrayList.contains(objArr[75]));
        }
        this.threadPool.shutdown();
        Assert.assertTrue(this.threadPool.awaitTermination(1L, TimeUnit.SECONDS));
        System.gc();
        Time.sleep(1000L);
        System.gc();
        Assert.assertNull(blockingQueue.poll());
        blockingQueue.clear();
        Assert.assertThrows(NullPointerException.class, () -> {
            blockingQueue.offer(null, 1L, TimeUnit.SECONDS);
        });
    }

    @Test(dataProvider = "blockingQueues")
    public void testDrainTimesOut(BlockingQueue<Object> blockingQueue, String str) throws Exception {
        for (boolean z : new boolean[]{true, false}) {
            AssertJUnit.assertEquals(0, drain(blockingQueue, Collections.emptyList(), 1, 10L, TimeUnit.MILLISECONDS));
            Producer producer = new Producer(blockingQueue, 1);
            AsyncFuture submit = this.threadPool.submit(producer);
            long nanoTime = System.nanoTime();
            int drain = drain(blockingQueue, newArrayList(), 2, 10L, TimeUnit.MILLISECONDS, z);
            AssertJUnit.assertTrue(drain <= 1);
            AssertJUnit.assertTrue(System.nanoTime() - nanoTime >= TimeUnit.MILLISECONDS.toNanos(10L));
            submit.cancel(true);
            producer.doneProducing.await();
            if (drain == 0) {
                blockingQueue.poll();
            }
        }
    }

    @Test(dataProvider = "blockingQueues")
    public void testZeroElements(BlockingQueue<Object> blockingQueue, String str) throws InterruptedException {
        Assert.assertTrue(blockingQueue.remainingCapacity() > 0);
        for (boolean z : new boolean[]{true, false}) {
            Assert.assertTrue(blockingQueue.isEmpty());
            AssertJUnit.assertEquals(0, drain(blockingQueue, Collections.emptyList(), 0, 10L, TimeUnit.MILLISECONDS, z));
            Assert.assertEquals(blockingQueue.size(), 0);
        }
        blockingQueue.clear();
        blockingQueue.drainTo(Collections.emptyList());
        Assert.assertFalse(blockingQueue.remove(new Object()));
    }

    @Test(dataProvider = "blockingQueues")
    public void testEmpty(BlockingQueue<Object> blockingQueue, String str) {
        assertDrained(blockingQueue);
    }

    @Test(dataProvider = "blockingQueues")
    public void testNegativeMaxElements(BlockingQueue<Object> blockingQueue, String str) throws InterruptedException {
        this.threadPool.submit(new Producer(blockingQueue, 1));
        List newArrayList = newArrayList();
        AssertJUnit.assertEquals(drain(blockingQueue, newArrayList, -1, Long.MAX_VALUE, TimeUnit.NANOSECONDS), 0);
        AssertJUnit.assertTrue(newArrayList.isEmpty());
        blockingQueue.take();
    }

    @Test(dataProvider = "blockingQueues")
    public void testDrain_throws(BlockingQueue<Object> blockingQueue, String str) {
        this.threadPool.submit(new Interrupter(Thread.currentThread()));
        try {
            drain(blockingQueue, Collections.emptyList(), 100, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
            AssertJUnit.fail();
        } catch (InterruptedException e) {
        }
    }

    @Test(dataProvider = "blockingQueues")
    public void testDrainUninterruptibly_doesNotThrow(final BlockingQueue<Object> blockingQueue, String str) {
        final Thread currentThread = Thread.currentThread();
        this.threadPool.submit(new Callable<Void>() { // from class: com.linkedin.alpini.base.concurrency.TestBlockingQueues.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws InterruptedException {
                new Producer(blockingQueue, 50).call();
                new Interrupter(currentThread).run();
                new Producer(blockingQueue, 50).call();
                return null;
            }
        });
        List newArrayList = newArrayList();
        int drainUninterruptibly = drainUninterruptibly(blockingQueue, newArrayList, 100, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
        AssertJUnit.assertTrue(Thread.interrupted());
        AssertJUnit.assertEquals(100, drainUninterruptibly);
        AssertJUnit.assertEquals(100, newArrayList.size());
    }

    private void assertDrained(BlockingQueue<Object> blockingQueue) {
        AssertJUnit.assertNull(blockingQueue.peek());
        assertInterruptibleDrained(blockingQueue);
        assertUninterruptibleDrained(blockingQueue);
    }

    private void assertInterruptibleDrained(BlockingQueue<Object> blockingQueue) {
        try {
            AssertJUnit.assertEquals(0, drain(blockingQueue, Collections.emptyList(), 0, 10L, TimeUnit.MILLISECONDS));
            this.threadPool.submit(new Interrupter(Thread.currentThread()));
            try {
                drain(blockingQueue, newArrayList(), 1, Long.MAX_VALUE, TimeUnit.NANOSECONDS);
                AssertJUnit.fail();
            } catch (InterruptedException e) {
            }
        } catch (InterruptedException e2) {
            throw new AssertionError();
        }
    }

    private void assertUninterruptibleDrained(BlockingQueue<Object> blockingQueue) {
        AssertJUnit.assertEquals(0, drainUninterruptibly(blockingQueue, Collections.emptyList(), 0, 10L, TimeUnit.MILLISECONDS));
        this.threadPool.submit(new Interrupter(Thread.currentThread()));
        long nanoTime = System.nanoTime();
        drainUninterruptibly(blockingQueue, newArrayList(), 1, 10L, TimeUnit.MILLISECONDS);
        AssertJUnit.assertTrue(System.nanoTime() - nanoTime >= TimeUnit.MILLISECONDS.toNanos(10L));
        while (!Thread.interrupted()) {
            Thread.yield();
        }
    }
}
