package com.linkedin.alpini.base.concurrency;

import com.linkedin.alpini.base.concurrency.ConcurrentAccumulator;
import com.linkedin.alpini.base.misc.Time;
import com.linkedin.alpini.base.statistics.Welfords;
import java.util.Objects;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"unit"})
/* loaded from: input_file:com/linkedin/alpini/base/concurrency/TestConcurrentAccumulator.class */
public class TestConcurrentAccumulator {
    private static final Object[][] ACCUMULATOR_MODES = {new Object[]{ConcurrentAccumulator.Mode.COMPLEX}, new Object[]{ConcurrentAccumulator.Mode.THREADED}};

    @DataProvider
    public Object[][] accumulatorModes() {
        return ACCUMULATOR_MODES;
    }

    private void assertThreadMap(ConcurrentAccumulator<?, ?, ?> concurrentAccumulator, int i) {
        ThreadedAccumulator unwrap = concurrentAccumulator.unwrap(ThreadedAccumulator.class);
        if (unwrap != null) {
            Assert.assertEquals(unwrap.threadMapSize(), i);
        }
    }

    @Test(dataProvider = "accumulatorModes")
    public void basicTest(ConcurrentAccumulator.Mode mode) throws InterruptedException {
        ConcurrentAccumulator<?, ?, ?> concurrentAccumulator = new ConcurrentAccumulator<>(mode, Welfords.DoubleWelford.COLLECTOR);
        for (int i = 0; i < 2; i++) {
            ForkJoinPool forkJoinPool = new ForkJoinPool(8);
            try {
                assertThreadMap(concurrentAccumulator, 1);
                concurrentAccumulator.reset();
                forkJoinPool.submit(() -> {
                    IntStream.range(1, 5000000).parallel().forEach(i2 -> {
                        concurrentAccumulator.accept(Double.valueOf(ThreadLocalRandom.current().nextDouble(100.0d)));
                    });
                }).join();
                assertThreadMap(concurrentAccumulator, 9);
                System.out.println(concurrentAccumulator.getThenReset());
                Objects.requireNonNull(concurrentAccumulator);
                forkJoinPool.submit(concurrentAccumulator::pack).join();
                forkJoinPool.submit(() -> {
                    IntStream.range(1, 5000000).parallel().forEach(i2 -> {
                        concurrentAccumulator.accept(Double.valueOf(ThreadLocalRandom.current().nextDouble(100.0d)));
                    });
                }).join();
                assertThreadMap(concurrentAccumulator, 9);
                System.out.println(concurrentAccumulator);
                forkJoinPool.shutdownNow();
                Assert.assertTrue(forkJoinPool.awaitTermination(5L, TimeUnit.SECONDS));
                System.gc();
                Time.sleepUninterruptably(1000L);
                System.gc();
                System.out.println(concurrentAccumulator);
                assertThreadMap(concurrentAccumulator, 1);
            } catch (Throwable th) {
                forkJoinPool.shutdownNow();
                Assert.assertTrue(forkJoinPool.awaitTermination(5L, TimeUnit.SECONDS));
                throw th;
            }
        }
    }
}
