package org.apache.beam.sdk.testing;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.testing.Annotations;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Predicate;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Predicates;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.flink.api.python.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;

/* loaded from: input_file:org/apache/beam/sdk/testing/TestPipeline.class */
public class TestPipeline extends Pipeline implements TestRule {
    private final PipelineOptions options;
    public static final String PROPERTY_BEAM_TEST_PIPELINE_OPTIONS = "beamTestPipelineOptions";
    static final String PROPERTY_USE_DEFAULT_DUMMY_RUNNER = "beamUseDummyRunner";
    private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
    private Optional<? extends PipelineRunEnforcement> enforcement;
    private final Map<String, Object> providerRuntimeValues;

    /* loaded from: input_file:org/apache/beam/sdk/testing/TestPipeline$AbandonedNodeException.class */
    public static class AbandonedNodeException extends RuntimeException {
        AbandonedNodeException(String str) {
            super(str);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/testing/TestPipeline$GetFromRuntimeValues.class */
    private static class GetFromRuntimeValues<T> implements SerializableFunction<Map<String, Object>, T> {
        private final String key;

        private GetFromRuntimeValues(String str) {
            this.key = str;
        }

        @Override // org.apache.beam.sdk.transforms.SerializableFunction, org.apache.beam.sdk.transforms.ProcessFunction
        public T apply(Map<String, Object> map) {
            return (T) map.get(this.key);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/testing/TestPipeline$IsEmptyVisitor.class */
    public static class IsEmptyVisitor extends Pipeline.PipelineVisitor.Defaults {
        private boolean empty;

        private IsEmptyVisitor() {
            this.empty = true;
        }

        public boolean isEmpty() {
            return this.empty;
        }

        @Override // org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults, org.apache.beam.sdk.Pipeline.PipelineVisitor
        public void visitPrimitiveTransform(TransformHierarchy.Node node) {
            this.empty = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/testing/TestPipeline$PipelineAbandonedNodeEnforcement.class */
    public static class PipelineAbandonedNodeEnforcement extends PipelineRunEnforcement {

        @Nullable
        private List<TransformHierarchy.Node> runVisitedNodes;
        private final Predicate<TransformHierarchy.Node> isPAssertNode;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/testing/TestPipeline$PipelineAbandonedNodeEnforcement$NodeRecorder.class */
        public static class NodeRecorder extends Pipeline.PipelineVisitor.Defaults {
            private final List<TransformHierarchy.Node> visited;

            private NodeRecorder() {
                this.visited = new ArrayList();
            }

            @Override // org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults, org.apache.beam.sdk.Pipeline.PipelineVisitor
            public void leaveCompositeTransform(TransformHierarchy.Node node) {
                this.visited.add(node);
            }

            @Override // org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults, org.apache.beam.sdk.Pipeline.PipelineVisitor
            public void visitPrimitiveTransform(TransformHierarchy.Node node) {
                this.visited.add(node);
            }
        }

        private PipelineAbandonedNodeEnforcement(TestPipeline testPipeline) {
            super(testPipeline);
            this.isPAssertNode = node -> {
                return (node.getTransform() instanceof PAssert.GroupThenAssert) || (node.getTransform() instanceof PAssert.GroupThenAssertForSingleton) || (node.getTransform() instanceof PAssert.OneSideInputAssert);
            };
            this.runVisitedNodes = null;
        }

        private List<TransformHierarchy.Node> recordPipelineNodes(Pipeline pipeline) {
            NodeRecorder nodeRecorder = new NodeRecorder();
            pipeline.traverseTopologically(nodeRecorder);
            return nodeRecorder.visited;
        }

        private boolean isEmptyPipeline(Pipeline pipeline) {
            IsEmptyVisitor isEmptyVisitor = new IsEmptyVisitor();
            pipeline.traverseTopologically(isEmptyVisitor);
            return isEmptyVisitor.isEmpty();
        }

        private void verifyPipelineExecution() {
            if (isEmptyPipeline(this.pipeline)) {
                return;
            }
            if (!this.runAttempted && !this.enableAutoRunIfMissing) {
                throw new PipelineRunMissingException("The pipeline has not been run.");
            }
            List<TransformHierarchy.Node> recordPipelineNodes = recordPipelineNodes(this.pipeline);
            if (!pipelineRunSucceeded() || visitedAll(recordPipelineNodes)) {
                return;
            }
            if (!FluentIterable.from(recordPipelineNodes).filter(Predicates.not(Predicates.in(this.runVisitedNodes))).anyMatch(this.isPAssertNode)) {
                throw new AbandonedNodeException("The pipeline contains abandoned PTransform(s).");
            }
            throw new AbandonedNodeException("The pipeline contains abandoned PAssert(s).");
        }

        private boolean visitedAll(List<TransformHierarchy.Node> list) {
            return this.runVisitedNodes.equals(list);
        }

        private boolean pipelineRunSucceeded() {
            return this.runVisitedNodes != null;
        }

        @Override // org.apache.beam.sdk.testing.TestPipeline.PipelineRunEnforcement
        protected void afterPipelineExecution() {
            this.runVisitedNodes = recordPipelineNodes(this.pipeline);
            super.afterPipelineExecution();
        }

        @Override // org.apache.beam.sdk.testing.TestPipeline.PipelineRunEnforcement
        protected void afterUserCodeFinished() {
            super.afterUserCodeFinished();
            verifyPipelineExecution();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/testing/TestPipeline$PipelineRunEnforcement.class */
    public static class PipelineRunEnforcement {
        protected boolean enableAutoRunIfMissing;
        protected final Pipeline pipeline;
        protected boolean runAttempted;

        private PipelineRunEnforcement(Pipeline pipeline) {
            this.pipeline = pipeline;
        }

        protected void enableAutoRunIfMissing(boolean z) {
            this.enableAutoRunIfMissing = z;
        }

        protected void beforePipelineExecution() {
            this.runAttempted = true;
        }

        protected void afterPipelineExecution() {
        }

        protected void afterUserCodeFinished() {
            if (this.runAttempted || !this.enableAutoRunIfMissing) {
                return;
            }
            this.pipeline.run().waitUntilFinish();
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/testing/TestPipeline$PipelineRunMissingException.class */
    public static class PipelineRunMissingException extends RuntimeException {
        PipelineRunMissingException(String str) {
            super(str);
        }
    }

    @Internal
    /* loaded from: input_file:org/apache/beam/sdk/testing/TestPipeline$TestValueProviderOptions.class */
    public interface TestValueProviderOptions extends PipelineOptions {
        ValueProvider<Map<String, Object>> getProviderRuntimeValues();

        void setProviderRuntimeValues(ValueProvider<Map<String, Object>> valueProvider);
    }

    public static TestPipeline create() {
        return fromOptions(testingPipelineOptions());
    }

    public static TestPipeline fromOptions(PipelineOptions pipelineOptions) {
        return new TestPipeline(pipelineOptions);
    }

    private TestPipeline(PipelineOptions pipelineOptions) {
        super(pipelineOptions);
        this.enforcement = Optional.absent();
        this.providerRuntimeValues = Maps.newHashMap();
        this.options = pipelineOptions;
    }

    @Override // org.apache.beam.sdk.Pipeline
    public PipelineOptions getOptions() {
        return this.options;
    }

    public Statement apply(final Statement statement, final Description description) {
        return new Statement() { // from class: org.apache.beam.sdk.testing.TestPipeline.1
            private void setDeducedEnforcementLevel() {
                if (TestPipeline.this.enforcement.isPresent()) {
                    return;
                }
                boolean anyMatch = FluentIterable.from(description.getAnnotations()).filter(Annotations.Predicates.isAnnotationOfType(Category.class)).anyMatch(Annotations.Predicates.isCategoryOf(NeedsRunner.class, true));
                boolean isAssignableFrom = CrashingRunner.class.isAssignableFrom(TestPipeline.this.options.getRunner());
                Preconditions.checkState((anyMatch && isAssignableFrom) ? false : true, "The test was annotated with a [@%s] / [@%s] while the runner was set to [%s]. Please re-check your configuration.", NeedsRunner.class.getSimpleName(), ValidatesRunner.class.getSimpleName(), CrashingRunner.class.getSimpleName());
                TestPipeline.this.enableAbandonedNodeEnforcement(anyMatch || !isAssignableFrom);
            }

            public void evaluate() throws Throwable {
                ((ApplicationNameOptions) TestPipeline.this.options.as(ApplicationNameOptions.class)).setAppName(TestPipeline.this.getAppName(description));
                setDeducedEnforcementLevel();
                statement.evaluate();
                ((PipelineRunEnforcement) TestPipeline.this.enforcement.get()).afterUserCodeFinished();
            }
        };
    }

    @Override // org.apache.beam.sdk.Pipeline
    public PipelineResult run() {
        return run(getOptions());
    }

    @Override // org.apache.beam.sdk.Pipeline
    public PipelineResult run(PipelineOptions pipelineOptions) {
        Preconditions.checkState(this.enforcement.isPresent(), "Is your TestPipeline declaration missing a @Rule annotation? Usage: @Rule public final transient TestPipeline pipeline = TestPipeline.create();");
        try {
            this.enforcement.get().beforePipelineExecution();
            PipelineOptions pipelineOptions2 = (PipelineOptions) MAPPER.convertValue(MAPPER.valueToTree(pipelineOptions), PipelineOptions.class);
            ((TestValueProviderOptions) pipelineOptions2.as(TestValueProviderOptions.class)).setProviderRuntimeValues(ValueProvider.StaticValueProvider.of(this.providerRuntimeValues));
            PipelineResult run = super.run(pipelineOptions2);
            verifyPAssertsSucceeded(this, run);
            this.enforcement.get().afterPipelineExecution();
            return run;
        } catch (RuntimeException e) {
            Throwable cause = e.getCause();
            if (cause instanceof AssertionError) {
                throw ((AssertionError) cause);
            }
            throw e;
        }
    }

    public <T> ValueProvider<T> newProvider(T t) {
        String uuid = UUID.randomUUID().toString();
        this.providerRuntimeValues.put(uuid, t);
        return ValueProvider.NestedValueProvider.of(((TestValueProviderOptions) this.options.as(TestValueProviderOptions.class)).getProviderRuntimeValues(), new GetFromRuntimeValues(uuid));
    }

    public TestPipeline enableAbandonedNodeEnforcement(boolean z) {
        this.enforcement = z ? Optional.of(new PipelineAbandonedNodeEnforcement()) : Optional.of(new PipelineRunEnforcement(this));
        return this;
    }

    public TestPipeline enableAutoRunIfMissing(boolean z) {
        this.enforcement.get().enableAutoRunIfMissing(z);
        return this;
    }

    @Override // org.apache.beam.sdk.Pipeline
    public String toString() {
        return "TestPipeline#" + ((ApplicationNameOptions) this.options.as(ApplicationNameOptions.class)).getAppName();
    }

    public static PipelineOptions testingPipelineOptions() {
        try {
            String property = System.getProperty(PROPERTY_BEAM_TEST_PIPELINE_OPTIONS);
            PipelineOptions create = Strings.isNullOrEmpty(property) ? PipelineOptionsFactory.create() : PipelineOptionsFactory.fromArgs((String[]) MAPPER.readValue(property, String[].class)).as(TestPipelineOptions.class);
            if (Strings.isNullOrEmpty(property)) {
                String property2 = System.getProperty(PROPERTY_USE_DEFAULT_DUMMY_RUNNER);
                if (!Strings.isNullOrEmpty(property2) && Boolean.valueOf(property2).booleanValue()) {
                    create.setRunner(CrashingRunner.class);
                }
            }
            create.setStableUniqueNames(PipelineOptions.CheckEnabled.ERROR);
            FileSystems.setDefaultPipelineOptions(create);
            return create;
        } catch (IOException e) {
            throw new RuntimeException("Unable to instantiate test options from system property beamTestPipelineOptions:" + System.getProperty(PROPERTY_BEAM_TEST_PIPELINE_OPTIONS), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getAppName(Description description) {
        String methodName = description.getMethodName();
        Class testClass = description.getTestClass();
        return testClass.isMemberClass() ? String.format("%s$%s-%s", testClass.getEnclosingClass().getSimpleName(), testClass.getSimpleName(), methodName) : String.format("%s-%s", testClass.getSimpleName(), methodName);
    }

    public static void verifyPAssertsSucceeded(Pipeline pipeline, PipelineResult pipelineResult) {
        if (MetricsEnvironment.isMetricsSupported()) {
            long countAsserts = PAssert.countAsserts(pipeline);
            long j = 0;
            Iterator<MetricResult<Long>> it = pipelineResult.metrics().queryMetrics(MetricsFilter.builder().addNameFilter(MetricNameFilter.named((Class<?>) PAssert.class, PAssert.SUCCESS_COUNTER)).build()).getCounters().iterator();
            while (it.hasNext()) {
                if (it.next().getAttempted().longValue() > 0) {
                    j++;
                }
            }
            MatcherAssert.assertThat(String.format("Expected %d successful assertions, but found %d.", Long.valueOf(countAsserts), Long.valueOf(j)), Long.valueOf(j), Matchers.is(Long.valueOf(countAsserts)));
        }
    }
}
