package org.apache.beam.sdk.testing;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DurationCoder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.flink.api.python.shaded.org.joda.time.Duration;
import org.apache.flink.api.python.shaded.org.joda.time.Instant;
import org.apache.flink.api.python.shaded.org.joda.time.ReadableDuration;

/* loaded from: input_file:org/apache/beam/sdk/testing/TestStream.class */
public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
    private final List<Event<T>> events;
    private final Coder<T> coder;

    /* loaded from: input_file:org/apache/beam/sdk/testing/TestStream$Builder.class */
    public static class Builder<T> {
        private final Coder<T> coder;
        private final ImmutableList<Event<T>> events;
        private final Instant currentWatermark;

        private Builder(Coder<T> coder) {
            this(coder, ImmutableList.of(), BoundedWindow.TIMESTAMP_MIN_VALUE);
        }

        private Builder(Coder<T> coder, ImmutableList<Event<T>> immutableList, Instant instant) {
            this.coder = coder;
            this.events = immutableList;
            this.currentWatermark = instant;
        }

        @SafeVarargs
        public final Builder<T> addElements(T t, T... tArr) {
            TimestampedValue<T> of = TimestampedValue.of(t, this.currentWatermark);
            TimestampedValue<T>[] timestampedValueArr = new TimestampedValue[tArr.length];
            for (int i = 0; i < tArr.length; i++) {
                timestampedValueArr[i] = TimestampedValue.of(tArr[i], this.currentWatermark);
            }
            return addElements((TimestampedValue) of, (TimestampedValue[]) timestampedValueArr);
        }

        @SafeVarargs
        public final Builder<T> addElements(TimestampedValue<T> timestampedValue, TimestampedValue<T>... timestampedValueArr) {
            Preconditions.checkArgument(timestampedValue.getTimestamp().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE), "Elements must have timestamps before %s. Got: %s", BoundedWindow.TIMESTAMP_MAX_VALUE, timestampedValue.getTimestamp());
            for (TimestampedValue<T> timestampedValue2 : timestampedValueArr) {
                Preconditions.checkArgument(timestampedValue2.getTimestamp().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE), "Elements must have timestamps before %s. Got: %s", BoundedWindow.TIMESTAMP_MAX_VALUE, timestampedValue2.getTimestamp());
            }
            return new Builder<>(this.coder, ImmutableList.builder().addAll((Iterable) this.events).add((ImmutableList.Builder) ElementEvent.add(timestampedValue, timestampedValueArr)).build(), this.currentWatermark);
        }

        public Builder<T> advanceWatermarkTo(Instant instant) {
            Preconditions.checkArgument(!instant.isBefore(this.currentWatermark), "The watermark must monotonically advance");
            Preconditions.checkArgument(instant.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE), "The Watermark cannot progress beyond the maximum. Got: %s. Maximum: %s", instant, BoundedWindow.TIMESTAMP_MAX_VALUE);
            return new Builder<>(this.coder, ImmutableList.builder().addAll((Iterable) this.events).add((ImmutableList.Builder) WatermarkEvent.advanceTo(instant)).build(), instant);
        }

        public Builder<T> advanceProcessingTime(Duration duration) {
            Preconditions.checkArgument(duration.getMillis() > 0, "Must advance the processing time by a positive amount. Got: ", duration);
            return new Builder<>(this.coder, ImmutableList.builder().addAll((Iterable) this.events).add((ImmutableList.Builder) ProcessingTimeEvent.advanceBy(duration)).build(), this.currentWatermark);
        }

        public TestStream<T> advanceWatermarkToInfinity() {
            return new TestStream<>(this.coder, ImmutableList.builder().addAll((Iterable) this.events).add((ImmutableList.Builder) WatermarkEvent.advanceTo(BoundedWindow.TIMESTAMP_MAX_VALUE)).build());
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/testing/TestStream$ElementEvent.class */
    public static abstract class ElementEvent<T> implements Event<T> {
        public abstract Iterable<TimestampedValue<T>> getElements();

        @SafeVarargs
        static <T> Event<T> add(TimestampedValue<T> timestampedValue, TimestampedValue<T>... timestampedValueArr) {
            return add(ImmutableList.builder().add((ImmutableList.Builder) timestampedValue).add((Object[]) timestampedValueArr).build());
        }

        @Internal
        public static <T> Event<T> add(Iterable<TimestampedValue<T>> iterable) {
            return new AutoValue_TestStream_ElementEvent(EventType.ELEMENT, iterable);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/testing/TestStream$Event.class */
    public interface Event<T> {
        EventType getType();
    }

    /* loaded from: input_file:org/apache/beam/sdk/testing/TestStream$EventType.class */
    public enum EventType {
        ELEMENT,
        WATERMARK,
        PROCESSING_TIME
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/testing/TestStream$ProcessingTimeEvent.class */
    public static abstract class ProcessingTimeEvent<T> implements Event<T> {
        public abstract Duration getProcessingTimeAdvance();

        @Internal
        public static <T> Event<T> advanceBy(Duration duration) {
            return new AutoValue_TestStream_ProcessingTimeEvent(EventType.PROCESSING_TIME, duration);
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/testing/TestStream$TestStreamCoder.class */
    public static class TestStreamCoder<T> extends StructuredCoder<TestStream<T>> {
        private final TimestampedValue.TimestampedValueCoder<T> elementCoder;

        public static <T> TestStreamCoder<T> of(Coder<T> coder) {
            return new TestStreamCoder<>(coder);
        }

        private TestStreamCoder(Coder<T> coder) {
            this.elementCoder = TimestampedValue.TimestampedValueCoder.of(coder);
        }

        @Override // org.apache.beam.sdk.coders.Coder
        public void encode(TestStream<T> testStream, OutputStream outputStream) throws IOException {
            List<Event<T>> events = testStream.getEvents();
            VarIntCoder.of().encode(Integer.valueOf(events.size()), outputStream);
            for (Event<T> event : events) {
                if (event instanceof ElementEvent) {
                    outputStream.write(event.getType().ordinal());
                    Iterable<TimestampedValue<T>> elements = ((ElementEvent) event).getElements();
                    VarIntCoder.of().encode(Integer.valueOf(Iterables.size(elements)), outputStream);
                    Iterator<TimestampedValue<T>> it = elements.iterator();
                    while (it.hasNext()) {
                        this.elementCoder.encode((TimestampedValue) it.next(), outputStream);
                    }
                } else if (event instanceof WatermarkEvent) {
                    outputStream.write(event.getType().ordinal());
                    InstantCoder.of().encode(((WatermarkEvent) event).getWatermark(), outputStream);
                } else if (event instanceof ProcessingTimeEvent) {
                    outputStream.write(event.getType().ordinal());
                    DurationCoder.of().encode((ReadableDuration) ((ProcessingTimeEvent) event).getProcessingTimeAdvance(), outputStream);
                }
            }
        }

        @Override // org.apache.beam.sdk.coders.Coder
        public TestStream<T> decode(InputStream inputStream) throws IOException {
            Integer decode = VarIntCoder.of().decode(inputStream);
            ArrayList arrayList = new ArrayList(decode.intValue());
            for (int i = 0; i < decode.intValue(); i++) {
                EventType eventType = EventType.values()[inputStream.read()];
                switch (eventType) {
                    case ELEMENT:
                        int intValue = VarIntCoder.of().decode(inputStream).intValue();
                        ArrayList arrayList2 = new ArrayList(intValue);
                        for (int i2 = 0; i2 < intValue; i2++) {
                            arrayList2.add(this.elementCoder.decode(inputStream));
                        }
                        arrayList.add(ElementEvent.add(arrayList2));
                        break;
                    case WATERMARK:
                        arrayList.add(WatermarkEvent.advanceTo(InstantCoder.of().decode(inputStream)));
                        break;
                    case PROCESSING_TIME:
                        arrayList.add(ProcessingTimeEvent.advanceBy(DurationCoder.of().decode(inputStream).toDuration()));
                        break;
                    default:
                        throw new IllegalStateException("Unknown event type + " + eventType);
                }
            }
            return TestStream.fromRawEvents(this.elementCoder.getValueCoder(), arrayList);
        }

        @Override // org.apache.beam.sdk.coders.Coder
        public List<? extends Coder<?>> getCoderArguments() {
            return Collections.singletonList(this.elementCoder);
        }

        @Override // org.apache.beam.sdk.coders.Coder
        public void verifyDeterministic() throws Coder.NonDeterministicException {
        }
    }

    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/testing/TestStream$WatermarkEvent.class */
    public static abstract class WatermarkEvent<T> implements Event<T> {
        public abstract Instant getWatermark();

        @Internal
        public static <T> Event<T> advanceTo(Instant instant) {
            return new AutoValue_TestStream_WatermarkEvent(EventType.WATERMARK, instant);
        }
    }

    public static <T> Builder<T> create(Coder<T> coder) {
        return new Builder<>(coder);
    }

    public static <T> Builder<T> create(Schema schema, SerializableFunction<T, Row> serializableFunction, SerializableFunction<Row, T> serializableFunction2) {
        return create(SchemaCoder.of(schema, serializableFunction, serializableFunction2));
    }

    private TestStream(Coder<T> coder, List<Event<T>> list) {
        this.coder = coder;
        this.events = (List) Preconditions.checkNotNull(list);
    }

    @Override // org.apache.beam.sdk.transforms.PTransform
    public PCollection<T> expand(PBegin pBegin) {
        return PCollection.createPrimitiveOutputInternal(pBegin.getPipeline(), WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED, this.coder);
    }

    public Coder<T> getValueCoder() {
        return this.coder;
    }

    public List<Event<T>> getEvents() {
        return this.events;
    }

    @Internal
    public static <T> TestStream<T> fromRawEvents(Coder<T> coder, List<Event<T>> list) {
        return new TestStream<>(coder, list);
    }

    public boolean equals(Object obj) {
        if (!(obj instanceof TestStream)) {
            return false;
        }
        TestStream testStream = (TestStream) obj;
        return getValueCoder().equals(testStream.getValueCoder()) && getEvents().equals(testStream.getEvents());
    }

    public int hashCode() {
        return Objects.hash(TestStream.class, getValueCoder(), getEvents());
    }
}
