/*
 * Decompiled with CFR 0.152.
 */
package io.nosqlbench.nbvectors.datasource.parquet.traversal.functional;

import io.nosqlbench.nbdatatools.api.iteration.ConvertingIterable;
import io.nosqlbench.nbvectors.datasource.parquet.layout.PageSupplier;
import io.nosqlbench.nbvectors.datasource.parquet.layout.PathAggregator;
import io.nosqlbench.nbvectors.datasource.parquet.traversal.functional.BoundedPageStore;
import io.nosqlbench.nbvectors.datasource.parquet.traversal.functional.ParquetVisitor;
import io.nosqlbench.nbvectors.datasource.parquet.traversal.functional.RecordSupplier;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.LocalInputFile;

public class ParquetTraversal {
    private static final Logger logger = LogManager.getLogger(ParquetTraversal.class);
    private final Iterable<PathAggregator> rootTraversers;
    private final int concurrency;

    public ParquetTraversal(List<Path> roots, int concurrency) {
        this.rootTraversers = new ConvertingIterable(roots, r -> new PathAggregator((Path)r, true));
        this.concurrency = concurrency;
    }

    public void traverse(ParquetVisitor visitor) {
        if (visitor.getTraversalDepth().isEnabledFor(ParquetVisitor.Depth.CALL)) {
            visitor.beforeAll();
            if (visitor.getTraversalDepth().isEnabledFor(ParquetVisitor.Depth.ROOTS)) {
                ExecutorService executorService = Executors.newFixedThreadPool(this.concurrency);
                ConcurrentLinkedQueue futures = new ConcurrentLinkedQueue();
                AtomicInteger submittedCount = new AtomicInteger(0);
                for (PathAggregator rootTraverser : this.rootTraversers) {
                    visitor.beforeRoot(rootTraverser);
                    if (visitor.getTraversalDepth().isEnabledFor(ParquetVisitor.Depth.FILES)) {
                        ConvertingIterable inputFilesIter = new ConvertingIterable(rootTraverser.getFileList(), LocalInputFile::new);
                        for (InputFile inputFile : inputFilesIter) {
                            int taskOrder = submittedCount.getAndIncrement();
                            logger.info("Submitting file traversal task for file: {} with task order: {}", (Object)inputFile, (Object)taskOrder);
                            Future<?> future = executorService.submit(() -> {
                                try {
                                    this.traverseFile(inputFile, visitor, taskOrder);
                                }
                                catch (Exception e) {
                                    logger.error("Error traversing file: " + String.valueOf(inputFile), (Throwable)e);
                                }
                            });
                            futures.add(future);
                        }
                    }
                    try {
                        for (int i = 0; i < submittedCount.get(); ++i) {
                            System.err.println("Waiting for file traversal task: " + i + "/" + submittedCount.get());
                            Future future = (Future)futures.poll();
                            if (future == null) continue;
                            future.get();
                            logger.info("Retired file traversal task: {} task order: {}", (Object)rootTraverser, (Object)i);
                        }
                    }
                    catch (InterruptedException | ExecutionException e) {
                        logger.error("Error waiting for file traversal tasks to complete.", (Throwable)e);
                        Thread.currentThread().interrupt();
                    }
                    submittedCount.set(0);
                    futures.clear();
                    visitor.afterRoot(rootTraverser);
                }
                executorService.shutdown();
                try {
                    if (!executorService.awaitTermination(60L, TimeUnit.SECONDS)) {
                        executorService.shutdownNow();
                        logger.warn("Executor did not terminate in time.");
                    }
                }
                catch (InterruptedException e) {
                    executorService.shutdownNow();
                    Thread.currentThread().interrupt();
                    logger.error("Executor termination interrupted.", (Throwable)e);
                }
            }
            visitor.afterAll();
        }
    }

    private void traverseFile(InputFile inputFile, ParquetVisitor visitor, int taskOrder) {
        visitor.beforeInputFile(inputFile);
        logger.debug("Starting traversal of file {} with task order {}", (Object)inputFile, (Object)taskOrder);
        if (visitor.getTraversalDepth().isEnabledFor(ParquetVisitor.Depth.PAGES)) {
            try (PageSupplier pageSupplier = new PageSupplier(inputFile);){
                BoundedPageStore pageStore;
                while ((pageStore = pageSupplier.get()) != null) {
                    visitor.beforePage(pageStore);
                    if (visitor.getTraversalDepth().isEnabledFor(ParquetVisitor.Depth.GROUPS)) {
                        Group group;
                        RecordSupplier groupRecordSupplier = pageStore.get();
                        while ((group = (Group)groupRecordSupplier.get()) != null) {
                            visitor.onGroup(group);
                        }
                    }
                    visitor.afterPage(pageStore);
                }
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        visitor.afterInputFile(inputFile);
        logger.debug("Finished traversal of file {} with task order {}", (Object)inputFile, (Object)taskOrder);
    }
}

