/*
 * Decompiled with CFR 0.152.
 */
package io.nosqlbench.nbvectors.jjq.bulkio;

import java.util.Iterator;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;

public class ConcurrentSupplier<T>
implements Supplier<T>,
AutoCloseable {
    private final Iterator<T> source;
    private final LinkedBlockingDeque<T> buffer;
    private final Consumer<RuntimeException> errorChannel;
    private volatile boolean priming = true;

    public ConcurrentSupplier(Iterable<T> source, int bufferSize, Consumer<RuntimeException> errorChannel) {
        this(source.iterator(), bufferSize, errorChannel);
    }

    public ConcurrentSupplier(Iterator<T> iterator, int bufferSize, Consumer<RuntimeException> errorChannel) {
        this.source = iterator;
        this.errorChannel = errorChannel;
        this.buffer = new LinkedBlockingDeque(bufferSize);
        this.startFeeder();
    }

    private void startFeeder() {
        Thread feeder = Thread.ofVirtual().name("feeder").factory().newThread(() -> {
            while (this.source.hasNext()) {
                T next = this.source.next();
                try {
                    this.buffer.putFirst(next);
                }
                catch (InterruptedException interruptedException) {}
            }
            this.priming = false;
        });
        feeder.start();
    }

    @Override
    public void close() throws Exception {
        this.priming = false;
        if (!this.buffer.isEmpty()) {
            this.throwOrReport(new RuntimeException("queue was closed with " + this.buffer.size() + " pending elements"));
        }
    }

    private void throwOrReport(RuntimeException error) {
        if (this.errorChannel == null) {
            throw error;
        }
        this.errorChannel.accept(error);
    }

    @Override
    public T get() {
        T value = null;
        while (this.priming || !this.buffer.isEmpty()) {
            try {
                value = this.buffer.pollFirst(100L, TimeUnit.MILLISECONDS);
                if (value == null) continue;
                break;
            }
            catch (InterruptedException interruptedException) {
            }
        }
        return value;
    }
}

