package org.apache.beam.runners.direct;

import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.direct.WatermarkManager;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.MoreObjects;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.Optional;
import org.apache.beam.runners.direct.repackaged.com.google.common.cache.CacheBuilder;
import org.apache.beam.runners.direct.repackaged.com.google.common.cache.CacheLoader;
import org.apache.beam.runners.direct.repackaged.com.google.common.cache.LoadingCache;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.KeyedWorkItems;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PValue;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.class */
public final class ExecutorServiceParallelExecutor implements PipelineExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutorServiceParallelExecutor.class);
    private final ExecutorService executorService;
    private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers;
    private final Set<PValue> keyedPValues;
    private final TransformEvaluatorRegistry registry;
    private final Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> transformEnforcements;
    private final EvaluationContext evaluationContext;
    private final TransformExecutorService parallelExecutorService;
    private Collection<AppliedPTransform<?, ?, ?>> rootNodes;
    private final LoadingCache<StepAndKey, TransformExecutorService> executorServices = CacheBuilder.newBuilder().weakValues().build(serialTransformExecutorServiceCacheLoader());
    private final Queue<ExecutorUpdate> allUpdates = new ConcurrentLinkedQueue();
    private final BlockingQueue<VisibleExecutorUpdate> visibleUpdates = new ArrayBlockingQueue(20);
    private final CompletionCallback defaultCompletionCallback = new DefaultCompletionCallback();

    /* loaded from: input_file:org/apache/beam/runners/direct/ExecutorServiceParallelExecutor$CompletionCallbackBase.class */
    private abstract class CompletionCallbackBase implements CompletionCallback {
        private CompletionCallbackBase() {
        }

        protected abstract CommittedResult getCommittedResult(DirectRunner.CommittedBundle<?> committedBundle, TransformResult transformResult);

        @Override // org.apache.beam.runners.direct.CompletionCallback
        public final CommittedResult handleResult(DirectRunner.CommittedBundle<?> committedBundle, TransformResult transformResult) {
            CommittedResult committedResult = getCommittedResult(committedBundle, transformResult);
            for (DirectRunner.CommittedBundle<?> committedBundle2 : committedResult.getOutputs()) {
                ExecutorServiceParallelExecutor.this.allUpdates.offer(ExecutorUpdate.fromBundle(committedBundle2, (Collection) ExecutorServiceParallelExecutor.this.valueToConsumers.get(committedBundle2.getPCollection())));
            }
            DirectRunner.CommittedBundle<?> unprocessedInputs = committedResult.getUnprocessedInputs();
            if (unprocessedInputs != null && !Iterables.isEmpty(unprocessedInputs.getElements())) {
                ExecutorServiceParallelExecutor.this.allUpdates.offer(ExecutorUpdate.fromBundle(unprocessedInputs, Collections.singleton(committedResult.getTransform())));
            }
            return committedResult;
        }

        @Override // org.apache.beam.runners.direct.CompletionCallback
        public final void handleThrowable(DirectRunner.CommittedBundle<?> committedBundle, Throwable th) {
            ExecutorServiceParallelExecutor.this.allUpdates.offer(ExecutorUpdate.fromThrowable(th));
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/ExecutorServiceParallelExecutor$DefaultCompletionCallback.class */
    private class DefaultCompletionCallback extends CompletionCallbackBase {
        private DefaultCompletionCallback() {
            super();
        }

        @Override // org.apache.beam.runners.direct.ExecutorServiceParallelExecutor.CompletionCallbackBase
        public CommittedResult getCommittedResult(DirectRunner.CommittedBundle<?> committedBundle, TransformResult transformResult) {
            return ExecutorServiceParallelExecutor.this.evaluationContext.handleResult(committedBundle, Collections.emptyList(), transformResult);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/direct/ExecutorServiceParallelExecutor$ExecutorUpdate.class */
    public static abstract class ExecutorUpdate {
        public static ExecutorUpdate fromBundle(DirectRunner.CommittedBundle<?> committedBundle, Collection<AppliedPTransform<?, ?, ?>> collection) {
            return new AutoValue_ExecutorServiceParallelExecutor_ExecutorUpdate(Optional.of(committedBundle), collection, Optional.absent());
        }

        public static ExecutorUpdate fromThrowable(Throwable th) {
            return new AutoValue_ExecutorServiceParallelExecutor_ExecutorUpdate(Optional.absent(), Collections.emptyList(), Optional.of(th));
        }

        public abstract Optional<? extends DirectRunner.CommittedBundle<?>> getBundle();

        public abstract Collection<AppliedPTransform<?, ?, ?>> getConsumers();

        public abstract Optional<? extends Throwable> getException();
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/ExecutorServiceParallelExecutor$MonitorRunnable.class */
    private class MonitorRunnable implements Runnable {
        private final long maxTimeProcessingUpdatesNanos;
        private final String runnableName;
        private boolean exceptionThrown;

        private MonitorRunnable() {
            this.maxTimeProcessingUpdatesNanos = TimeUnit.MILLISECONDS.toNanos(5L);
            this.runnableName = String.format("%s$%s-monitor", ExecutorServiceParallelExecutor.this.evaluationContext.getPipelineOptions().getAppName(), ExecutorServiceParallelExecutor.class.getSimpleName());
            this.exceptionThrown = false;
        }

        @Override // java.lang.Runnable
        public void run() {
            String name = Thread.currentThread().getName();
            Thread.currentThread().setName(this.runnableName);
            try {
                try {
                    ExecutorUpdate executorUpdate = (ExecutorUpdate) ExecutorServiceParallelExecutor.this.allUpdates.poll();
                    long nanoTime = System.nanoTime();
                    while (executorUpdate != null) {
                        ExecutorServiceParallelExecutor.LOG.debug("Executor Update: {}", executorUpdate);
                        if (executorUpdate.getBundle().isPresent()) {
                            ExecutorServiceParallelExecutor.this.scheduleConsumers(executorUpdate);
                        } else if (executorUpdate.getException().isPresent()) {
                            ExecutorServiceParallelExecutor.this.visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(executorUpdate.getException().get()));
                            this.exceptionThrown = true;
                        }
                        if (System.nanoTime() - nanoTime > this.maxTimeProcessingUpdatesNanos) {
                            break;
                        } else {
                            executorUpdate = (ExecutorUpdate) ExecutorServiceParallelExecutor.this.allUpdates.poll();
                        }
                    }
                    addWorkIfNecessary(fireTimers());
                    if (!shouldShutdown()) {
                        ExecutorServiceParallelExecutor.this.executorService.submit(this);
                    }
                    Thread.currentThread().setName(name);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    ExecutorServiceParallelExecutor.LOG.error("Monitor died due to being interrupted");
                    while (!ExecutorServiceParallelExecutor.this.visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(e))) {
                        ExecutorServiceParallelExecutor.this.visibleUpdates.poll();
                    }
                    if (!shouldShutdown()) {
                        ExecutorServiceParallelExecutor.this.executorService.submit(this);
                    }
                    Thread.currentThread().setName(name);
                } catch (Throwable th) {
                    ExecutorServiceParallelExecutor.LOG.error("Monitor thread died due to throwable", th);
                    while (!ExecutorServiceParallelExecutor.this.visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(th))) {
                        ExecutorServiceParallelExecutor.this.visibleUpdates.poll();
                    }
                    if (!shouldShutdown()) {
                        ExecutorServiceParallelExecutor.this.executorService.submit(this);
                    }
                    Thread.currentThread().setName(name);
                }
            } catch (Throwable th2) {
                if (!shouldShutdown()) {
                    ExecutorServiceParallelExecutor.this.executorService.submit(this);
                }
                Thread.currentThread().setName(name);
                throw th2;
            }
        }

        private boolean fireTimers() throws Exception {
            try {
                boolean z = false;
                for (Map.Entry<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, WatermarkManager.FiredTimers>> entry : ExecutorServiceParallelExecutor.this.evaluationContext.extractFiredTimers().entrySet()) {
                    AppliedPTransform<?, ?, ?> key = entry.getKey();
                    for (Map.Entry<StructuralKey<?>, WatermarkManager.FiredTimers> entry2 : entry.getValue().entrySet()) {
                        for (TimeDomain timeDomain : TimeDomain.values()) {
                            Collection<TimerInternals.TimerData> timers = entry2.getValue().getTimers(timeDomain);
                            if (!timers.isEmpty()) {
                                ExecutorServiceParallelExecutor.this.scheduleConsumption(key, ExecutorServiceParallelExecutor.this.evaluationContext.createKeyedBundle(null, entry2.getKey(), key.getInput()).add(WindowedValue.valueInEmptyWindows(KeyedWorkItems.timersWorkItem(entry2.getKey().getKey(), timers))).commit(Instant.now()), new TimerCompletionCallback(timers));
                                z = true;
                            }
                        }
                    }
                }
                return z;
            } catch (Exception e) {
                ExecutorServiceParallelExecutor.LOG.error("Internal Error while delivering timers", e);
                throw e;
            }
        }

        private boolean shouldShutdown() {
            boolean z = this.exceptionThrown || ExecutorServiceParallelExecutor.this.evaluationContext.isDone();
            if (z) {
                if (ExecutorServiceParallelExecutor.this.evaluationContext.isDone()) {
                    ExecutorServiceParallelExecutor.LOG.debug("Pipeline is finished. Shutting down. {}");
                    while (!ExecutorServiceParallelExecutor.this.visibleUpdates.offer(VisibleExecutorUpdate.finished())) {
                        ExecutorServiceParallelExecutor.this.visibleUpdates.poll();
                    }
                }
                ExecutorServiceParallelExecutor.this.executorService.shutdown();
            }
            return z;
        }

        private void addWorkIfNecessary(boolean z) {
            if (z) {
                return;
            }
            for (AppliedPTransform<?, ?, ?> appliedPTransform : ExecutorServiceParallelExecutor.this.rootNodes) {
                if (!ExecutorServiceParallelExecutor.this.evaluationContext.isDone(appliedPTransform)) {
                    ExecutorServiceParallelExecutor.this.scheduleConsumption(appliedPTransform, null, ExecutorServiceParallelExecutor.this.defaultCompletionCallback);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/ExecutorServiceParallelExecutor$TimerCompletionCallback.class */
    public class TimerCompletionCallback extends CompletionCallbackBase {
        private final Iterable<TimerInternals.TimerData> timers;

        private TimerCompletionCallback(Iterable<TimerInternals.TimerData> iterable) {
            super();
            this.timers = iterable;
        }

        @Override // org.apache.beam.runners.direct.ExecutorServiceParallelExecutor.CompletionCallbackBase
        public CommittedResult getCommittedResult(DirectRunner.CommittedBundle<?> committedBundle, TransformResult transformResult) {
            return ExecutorServiceParallelExecutor.this.evaluationContext.handleResult(committedBundle, this.timers, transformResult);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/ExecutorServiceParallelExecutor$VisibleExecutorUpdate.class */
    public static class VisibleExecutorUpdate {
        private final Optional<? extends Throwable> throwable;
        private final boolean done;

        public static VisibleExecutorUpdate fromThrowable(Throwable th) {
            return new VisibleExecutorUpdate(false, th);
        }

        public static VisibleExecutorUpdate finished() {
            return new VisibleExecutorUpdate(true, null);
        }

        private VisibleExecutorUpdate(boolean z, @Nullable Throwable th) {
            this.throwable = Optional.fromNullable(th);
            this.done = z;
        }

        public boolean isDone() {
            return this.done;
        }
    }

    public static ExecutorServiceParallelExecutor create(ExecutorService executorService, Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> map, Set<PValue> set, TransformEvaluatorRegistry transformEvaluatorRegistry, Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> map2, EvaluationContext evaluationContext) {
        return new ExecutorServiceParallelExecutor(executorService, map, set, transformEvaluatorRegistry, map2, evaluationContext);
    }

    private ExecutorServiceParallelExecutor(ExecutorService executorService, Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> map, Set<PValue> set, TransformEvaluatorRegistry transformEvaluatorRegistry, Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> map2, EvaluationContext evaluationContext) {
        this.executorService = executorService;
        this.valueToConsumers = map;
        this.keyedPValues = set;
        this.registry = transformEvaluatorRegistry;
        this.transformEnforcements = map2;
        this.evaluationContext = evaluationContext;
        this.parallelExecutorService = TransformExecutorServices.parallel(executorService);
    }

    private CacheLoader<StepAndKey, TransformExecutorService> serialTransformExecutorServiceCacheLoader() {
        return new CacheLoader<StepAndKey, TransformExecutorService>() { // from class: org.apache.beam.runners.direct.ExecutorServiceParallelExecutor.1
            @Override // org.apache.beam.runners.direct.repackaged.com.google.common.cache.CacheLoader
            public TransformExecutorService load(StepAndKey stepAndKey) throws Exception {
                return TransformExecutorServices.serial(ExecutorServiceParallelExecutor.this.executorService);
            }
        };
    }

    @Override // org.apache.beam.runners.direct.PipelineExecutor
    public void start(Collection<AppliedPTransform<?, ?, ?>> collection) {
        this.rootNodes = ImmutableList.copyOf((Collection) collection);
        this.executorService.submit(new MonitorRunnable());
    }

    public void scheduleConsumption(AppliedPTransform<?, ?, ?> appliedPTransform, @Nullable DirectRunner.CommittedBundle<?> committedBundle, CompletionCallback completionCallback) {
        evaluateBundle(appliedPTransform, committedBundle, completionCallback);
    }

    private <T> void evaluateBundle(AppliedPTransform<?, ?, ?> appliedPTransform, @Nullable DirectRunner.CommittedBundle<T> committedBundle, CompletionCallback completionCallback) {
        TransformExecutorService transformExecutorService;
        if (committedBundle == null || !isKeyed(committedBundle.getPCollection())) {
            transformExecutorService = this.parallelExecutorService;
        } else {
            transformExecutorService = this.executorServices.getUnchecked(StepAndKey.of(appliedPTransform, committedBundle == null ? null : committedBundle.getKey()));
        }
        transformExecutorService.schedule(TransformExecutor.create(this.registry, (Collection) MoreObjects.firstNonNull(this.transformEnforcements.get(appliedPTransform.getTransform().getClass()), Collections.emptyList()), this.evaluationContext, committedBundle, appliedPTransform, completionCallback, transformExecutorService));
    }

    private boolean isKeyed(PValue pValue) {
        return this.keyedPValues.contains(pValue);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleConsumers(ExecutorUpdate executorUpdate) {
        DirectRunner.CommittedBundle<?> committedBundle = executorUpdate.getBundle().get();
        Iterator<AppliedPTransform<?, ?, ?>> it = executorUpdate.getConsumers().iterator();
        while (it.hasNext()) {
            scheduleConsumption(it.next(), committedBundle, this.defaultCompletionCallback);
        }
    }

    @Override // org.apache.beam.runners.direct.PipelineExecutor
    public void awaitCompletion() throws Throwable {
        while (true) {
            VisibleExecutorUpdate poll = this.visibleUpdates.poll(2L, TimeUnit.SECONDS);
            if (poll == null && this.executorService.isShutdown()) {
                return;
            }
            if (poll != null && poll.throwable.isPresent()) {
                throw ((Throwable) poll.throwable.get());
            }
            if (poll != null && poll.isDone()) {
                this.executorService.shutdown();
                return;
            }
        }
    }
}
