package com.linkedin.alpini.base.concurrency;

import com.linkedin.alpini.base.concurrency.TimeoutProcessor;
import com.linkedin.alpini.base.registry.ResourceRegistry;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/alpini/base/concurrency/TestTimeoutProcessor.class */
public class TestTimeoutProcessor {
    private static final Logger LOG = LogManager.getLogger(TestTimeoutProcessor.class);
    TimeoutProcessor _timeout;

    @DataProvider(name = "TimeoutProcessor")
    public static Object[] processor() {
        return new Object[]{new TimeoutProcessor((ResourceRegistry) null, 300L, TimeoutProcessor.EventStore.SkipList, 1), new TimeoutProcessor((ResourceRegistry) null, 300L, TimeoutProcessor.EventStore.TreeMap, 1)};
    }

    @AfterMethod(groups = {"unit"})
    protected void finiProcessor() throws InterruptedException {
        this._timeout.shutdownNow();
        this._timeout.awaitTermination(30, TimeUnit.SECONDS);
    }

    @Test(groups = {"unit"}, dataProvider = "TimeoutProcessor")
    public void testOneFire(TimeoutProcessor timeoutProcessor) throws InterruptedException {
        this._timeout = timeoutProcessor;
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        TimeoutProcessor.TimeoutFuture schedule = this._timeout.schedule(new Runnable() { // from class: com.linkedin.alpini.base.concurrency.TestTimeoutProcessor.1
            @Override // java.lang.Runnable
            public void run() {
                TestTimeoutProcessor.LOG.debug("future is running.");
                atomicInteger.incrementAndGet();
            }
        }, 1L, TimeUnit.SECONDS);
        Thread.sleep(900L);
        Assert.assertEquals(atomicInteger.get(), 0);
        Thread.sleep(700L);
        Assert.assertEquals(atomicInteger.get(), 1);
        Assert.assertTrue(schedule.isDone());
        Thread.sleep(100L);
        Assert.assertEquals(atomicInteger.get(), 1);
    }

    @Test(groups = {"unit"}, dataProvider = "TimeoutProcessor")
    public void testOneCancel(TimeoutProcessor timeoutProcessor) throws InterruptedException {
        this._timeout = timeoutProcessor;
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        TimeoutProcessor.TimeoutFuture schedule = this._timeout.schedule(new Runnable() { // from class: com.linkedin.alpini.base.concurrency.TestTimeoutProcessor.2
            @Override // java.lang.Runnable
            public void run() {
                atomicInteger.incrementAndGet();
            }
        }, 1L, TimeUnit.SECONDS);
        Thread.sleep(900L);
        Assert.assertEquals(atomicInteger.get(), 0);
        schedule.cancel();
        Thread.sleep(200L);
        Assert.assertTrue(schedule.isDone());
        Assert.assertEquals(atomicInteger.get(), 0);
        Thread.sleep(100L);
        Assert.assertEquals(atomicInteger.get(), 0);
    }

    @Test(groups = {"unit"}, dataProvider = "TimeoutProcessor")
    public void testStormCancel(TimeoutProcessor timeoutProcessor) throws InterruptedException {
        this._timeout = timeoutProcessor;
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        TimeoutProcessor.TimeoutFuture[] timeoutFutureArr = new TimeoutProcessor.TimeoutFuture[1000];
        int i = 0;
        int i2 = 0;
        while (i < 100000) {
            if (i2 >= timeoutFutureArr.length) {
                i2 = 0;
            }
            if (timeoutFutureArr[i2] != null) {
                timeoutFutureArr[i2].cancel();
            }
            timeoutFutureArr[i2] = this._timeout.schedule(new Runnable() { // from class: com.linkedin.alpini.base.concurrency.TestTimeoutProcessor.3
                @Override // java.lang.Runnable
                public void run() {
                    atomicInteger.incrementAndGet();
                }
            }, 1L, TimeUnit.SECONDS);
            i++;
            i2++;
        }
        Assert.assertEquals(atomicInteger.get(), 0);
        for (TimeoutProcessor.TimeoutFuture timeoutFuture : timeoutFutureArr) {
            timeoutFuture.cancel();
        }
        Thread.sleep(900L);
        Assert.assertEquals(atomicInteger.get(), 0);
        Thread.sleep(900L);
        Assert.assertEquals(atomicInteger.get(), 0);
        for (TimeoutProcessor.TimeoutFuture timeoutFuture2 : timeoutFutureArr) {
            Assert.assertTrue(timeoutFuture2.isDone());
        }
    }

    @Test(groups = {"unit"}, dataProvider = "TimeoutProcessor")
    public void testStormFib(TimeoutProcessor timeoutProcessor) throws InterruptedException {
        this._timeout = timeoutProcessor;
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        TimeoutProcessor.TimeoutFuture[] timeoutFutureArr = new TimeoutProcessor.TimeoutFuture[1000];
        int i = 0;
        int i2 = 1;
        int i3 = 0;
        int i4 = 0;
        int i5 = 0;
        int i6 = 0;
        int i7 = 0;
        while (i6 < 100000 + timeoutFutureArr.length) {
            if (i7 >= timeoutFutureArr.length) {
                i7 = 0;
            }
            if (timeoutFutureArr[i7] != null) {
                i++;
                if (i == i2 + i3) {
                    if (timeoutFutureArr[i7].cancel()) {
                        i5++;
                    }
                    i3 = i2;
                    i2 = i;
                }
            }
            if (i6 < 100000) {
                timeoutFutureArr[i7] = this._timeout.schedule(new Runnable() { // from class: com.linkedin.alpini.base.concurrency.TestTimeoutProcessor.4
                    @Override // java.lang.Runnable
                    public void run() {
                        atomicInteger.incrementAndGet();
                    }
                }, 1L, TimeUnit.SECONDS);
                i4++;
            }
            i6++;
            i7++;
        }
        Assert.assertEquals(atomicInteger.get(), 0);
        LOG.debug("Created {} timeouts, cancelled {} of them", Integer.valueOf(i4), Integer.valueOf(i5));
        waitForStorm(atomicInteger, i4 - i5);
        Assert.assertEquals(atomicInteger.get(), i4 - i5);
    }

    @Test(groups = {"unit"}, dataProvider = "TimeoutProcessor")
    public void testStormFib2(TimeoutProcessor timeoutProcessor) throws InterruptedException {
        this._timeout = timeoutProcessor;
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        TimeoutProcessor.TimeoutFuture[] timeoutFutureArr = new TimeoutProcessor.TimeoutFuture[1000];
        int i = 0;
        int i2 = 1;
        int i3 = 0;
        int i4 = 0;
        int i5 = 0;
        int i6 = 0;
        int i7 = 0;
        while (i6 < 100000 + timeoutFutureArr.length) {
            if (i7 >= timeoutFutureArr.length) {
                i7 = 0;
            }
            if (timeoutFutureArr[i7] != null) {
                i++;
                if (i == i2 + i3) {
                    i3 = i2;
                    i2 = i;
                } else if (timeoutFutureArr[i7].cancel()) {
                    i5++;
                }
            }
            if (i6 < 100000) {
                timeoutFutureArr[i7] = this._timeout.schedule(new Runnable() { // from class: com.linkedin.alpini.base.concurrency.TestTimeoutProcessor.5
                    @Override // java.lang.Runnable
                    public void run() {
                        atomicInteger.incrementAndGet();
                    }
                }, 1L, TimeUnit.SECONDS);
                i4++;
            }
            i6++;
            i7++;
        }
        Assert.assertEquals(atomicInteger.get(), 0);
        LOG.info(String.format("Created %d timeouts, cancelled %d of them", Integer.valueOf(i4), Integer.valueOf(i5)));
        waitForStorm(atomicInteger, i4 - i5);
        Assert.assertEquals(atomicInteger.get(), i4 - i5);
    }

    @Test(groups = {"unit"}, dataProvider = "TimeoutProcessor")
    public void testStormFire(TimeoutProcessor timeoutProcessor) throws InterruptedException {
        this._timeout = timeoutProcessor;
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        TimeoutProcessor.TimeoutFuture[] timeoutFutureArr = new TimeoutProcessor.TimeoutFuture[1000];
        int i = 0;
        int i2 = 0;
        int i3 = 0;
        while (i2 < 100000) {
            if (i3 >= timeoutFutureArr.length) {
                i3 = 0;
            }
            timeoutFutureArr[i3] = this._timeout.schedule(new Runnable() { // from class: com.linkedin.alpini.base.concurrency.TestTimeoutProcessor.6
                @Override // java.lang.Runnable
                public void run() {
                    atomicInteger.incrementAndGet();
                }
            }, 1L, TimeUnit.SECONDS);
            i++;
            i2++;
            i3++;
        }
        Assert.assertEquals(atomicInteger.get(), 0);
        waitForStorm(atomicInteger, i);
        Assert.assertEquals(atomicInteger.get(), i);
    }

    private void waitForStorm(AtomicInteger atomicInteger, long j) throws InterruptedException {
        int i;
        int i2 = 0;
        do {
            Thread.sleep(500L);
            if (atomicInteger.get() == j) {
                return;
            }
            i = i2;
            i2++;
        } while (i < 24);
    }

    @Test(groups = {"unit"}, dataProvider = "TimeoutProcessor", successPercentage = 20, invocationCount = 5)
    void testFiringAccuracy(TimeoutProcessor timeoutProcessor) throws Exception {
        this._timeout = timeoutProcessor;
        HashMap hashMap = new HashMap();
        Iterator it = Arrays.asList(30, 90, 60).iterator();
        while (it.hasNext()) {
            long intValue = ((Integer) it.next()).intValue();
            CompletableFuture completableFuture = new CompletableFuture();
            long nanoTime = System.nanoTime();
            this._timeout.schedule(() -> {
                completableFuture.complete(Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime)));
            }, intValue, TimeUnit.MILLISECONDS);
            hashMap.put(Long.valueOf(intValue), completableFuture);
        }
        hashMap.forEach((l, completableFuture2) -> {
            Long l = null;
            try {
                l = (Long) completableFuture2.get(3L, TimeUnit.SECONDS);
            } catch (Exception e) {
                Assert.fail("Event has not fired as expected within 3 seconds", e);
            }
            Assert.assertTrue(Math.abs(1.0d - (((double) l.longValue()) / ((double) l.longValue()))) < 0.5d, "Event fired after " + l + "ms (" + l + "ms expected).");
        });
    }
}
