package org.apache.beam.sdk.io;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.repackaged.com.google.common.collect.Lists;
import org.apache.beam.sdk.repackaged.com.google.common.collect.Maps;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental(Experimental.Kind.SOURCE_SINK)
/* loaded from: input_file:org/apache/beam/sdk/io/WriteFiles.class */
public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
    private static final Logger LOG = LoggerFactory.getLogger(WriteFiles.class);
    static final int UNKNOWN_SHARDNUM = -1;
    private FileBasedSink<T> sink;
    private FileBasedSink.WriteOperation<T> writeOperation;

    @Nullable
    private final PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards;

    @Nullable
    private final ValueProvider<Integer> numShardsProvider;
    private boolean windowedWrites;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/WriteFiles$ApplyShardingKey.class */
    public static class ApplyShardingKey<T> extends DoFn<T, KV<Integer, T>> {
        private final PCollectionView<Integer> numShardsView;
        private final ValueProvider<Integer> numShardsProvider;
        private int shardNumber = -1;

        ApplyShardingKey(PCollectionView<Integer> pCollectionView, ValueProvider<Integer> valueProvider) {
            this.numShardsView = pCollectionView;
            this.numShardsProvider = valueProvider;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<T, KV<Integer, T>>.ProcessContext processContext) {
            int intValue;
            if (this.numShardsView != null) {
                intValue = ((Integer) processContext.sideInput(this.numShardsView)).intValue();
            } else {
                Preconditions.checkNotNull(this.numShardsProvider);
                intValue = this.numShardsProvider.get().intValue();
            }
            Preconditions.checkArgument(intValue > 0, "Must have a positive number of shards specified for non-runner-determined sharding. Got %s", intValue);
            if (this.shardNumber == -1) {
                this.shardNumber = ThreadLocalRandom.current().nextInt(intValue);
            } else {
                this.shardNumber = (this.shardNumber + 1) % intValue;
            }
            processContext.output(KV.of(Integer.valueOf(this.shardNumber), processContext.element()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/WriteFiles$WriteShardedBundles.class */
    public class WriteShardedBundles extends DoFn<KV<Integer, Iterable<T>>, FileBasedSink.FileResult> {
        private WriteShardedBundles() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<Integer, Iterable<T>>, FileBasedSink.FileResult>.ProcessContext processContext, BoundedWindow boundedWindow) throws Exception {
            WriteFiles.LOG.info("Opening writer for write operation {}", WriteFiles.this.writeOperation);
            FileBasedSink.Writer<T> createWriter = WriteFiles.this.writeOperation.createWriter();
            if (WriteFiles.this.windowedWrites) {
                createWriter.openWindowed(UUID.randomUUID().toString(), boundedWindow, processContext.pane(), processContext.element().getKey().intValue());
            } else {
                createWriter.openUnwindowed(UUID.randomUUID().toString(), -1);
            }
            WriteFiles.LOG.debug("Done opening writer");
            try {
                Iterator<T> it = processContext.element().getValue().iterator();
                while (it.hasNext()) {
                    WriteFiles.writeOrClose(createWriter, it.next());
                }
                processContext.output(createWriter.close());
            } catch (Exception e) {
                createWriter.cleanup();
                throw e;
            }
        }

        @Override // org.apache.beam.sdk.transforms.DoFn, org.apache.beam.sdk.transforms.display.HasDisplayData
        public void populateDisplayData(DisplayData.Builder builder) {
            builder.delegate(WriteFiles.this);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/WriteFiles$WriteUnwindowedBundles.class */
    public class WriteUnwindowedBundles extends DoFn<T, FileBasedSink.FileResult> {
        private FileBasedSink.Writer<T> writer;
        private BoundedWindow window;

        private WriteUnwindowedBundles() {
            this.writer = null;
            this.window = null;
        }

        @DoFn.StartBundle
        public void startBundle(DoFn<T, FileBasedSink.FileResult>.StartBundleContext startBundleContext) {
            this.writer = null;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<T, FileBasedSink.FileResult>.ProcessContext processContext, BoundedWindow boundedWindow) throws Exception {
            if (this.writer == null) {
                WriteFiles.LOG.info("Opening writer for write operation {}", WriteFiles.this.writeOperation);
                this.writer = WriteFiles.this.writeOperation.createWriter();
                this.writer.openUnwindowed(UUID.randomUUID().toString(), -1);
                WriteFiles.LOG.debug("Done opening writer");
            }
            this.window = boundedWindow;
            WriteFiles.writeOrClose(this.writer, processContext.element());
        }

        @DoFn.FinishBundle
        public void finishBundle(DoFn<T, FileBasedSink.FileResult>.FinishBundleContext finishBundleContext) throws Exception {
            if (this.writer == null) {
                return;
            }
            finishBundleContext.output(this.writer.close(), this.window.maxTimestamp(), this.window);
        }

        @Override // org.apache.beam.sdk.transforms.DoFn, org.apache.beam.sdk.transforms.display.HasDisplayData
        public void populateDisplayData(DisplayData.Builder builder) {
            builder.delegate(WriteFiles.this);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/WriteFiles$WriteWindowedBundles.class */
    public class WriteWindowedBundles extends DoFn<T, FileBasedSink.FileResult> {
        private Map<KV<BoundedWindow, PaneInfo>, FileBasedSink.Writer<T>> windowedWriters;

        private WriteWindowedBundles() {
        }

        @DoFn.StartBundle
        public void startBundle(DoFn<T, FileBasedSink.FileResult>.StartBundleContext startBundleContext) {
            this.windowedWriters = Maps.newHashMap();
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<T, FileBasedSink.FileResult>.ProcessContext processContext, BoundedWindow boundedWindow) throws Exception {
            PaneInfo pane = processContext.pane();
            KV<BoundedWindow, PaneInfo> of = KV.of(boundedWindow, pane);
            FileBasedSink.Writer<T> writer = this.windowedWriters.get(of);
            if (writer == null) {
                String uuid = UUID.randomUUID().toString();
                WriteFiles.LOG.info("Opening writer {} for write operation {}, window {} pane {}", new Object[]{uuid, WriteFiles.this.writeOperation, boundedWindow, pane});
                writer = WriteFiles.this.writeOperation.createWriter();
                writer.openWindowed(uuid, boundedWindow, pane, -1);
                this.windowedWriters.put(of, writer);
                WriteFiles.LOG.debug("Done opening writer");
            }
            WriteFiles.writeOrClose(writer, processContext.element());
        }

        @DoFn.FinishBundle
        public void finishBundle(DoFn<T, FileBasedSink.FileResult>.FinishBundleContext finishBundleContext) throws Exception {
            for (Map.Entry<KV<BoundedWindow, PaneInfo>, FileBasedSink.Writer<T>> entry : this.windowedWriters.entrySet()) {
                FileBasedSink.FileResult close = entry.getValue().close();
                BoundedWindow key = entry.getKey().getKey();
                finishBundleContext.output(close, key.maxTimestamp(), key);
            }
        }

        @Override // org.apache.beam.sdk.transforms.DoFn, org.apache.beam.sdk.transforms.display.HasDisplayData
        public void populateDisplayData(DisplayData.Builder builder) {
            builder.delegate(WriteFiles.this);
        }
    }

    public static <T> WriteFiles<T> to(FileBasedSink<T> fileBasedSink) {
        Preconditions.checkNotNull(fileBasedSink, "sink");
        return new WriteFiles<>(fileBasedSink, null, null, false);
    }

    private WriteFiles(FileBasedSink<T> fileBasedSink, @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> pTransform, @Nullable ValueProvider<Integer> valueProvider, boolean z) {
        this.sink = fileBasedSink;
        this.computeNumShards = pTransform;
        this.numShardsProvider = valueProvider;
        this.windowedWrites = z;
    }

    @Override // org.apache.beam.sdk.transforms.PTransform
    public PDone expand(PCollection<T> pCollection) {
        if (pCollection.isBounded() == PCollection.IsBounded.UNBOUNDED) {
            Preconditions.checkArgument(this.windowedWrites, "Must use windowed writes when applying %s to an unbounded PCollection", WriteFiles.class.getSimpleName());
            Preconditions.checkArgument((this.computeNumShards == null && this.numShardsProvider == null) ? false : true, "When applying %s to an unbounded PCollection, must specify number of output shards explicitly", WriteFiles.class.getSimpleName());
        }
        this.writeOperation = this.sink.createWriteOperation();
        this.writeOperation.setWindowedWrites(this.windowedWrites);
        return createWrite(pCollection);
    }

    @Override // org.apache.beam.sdk.transforms.PTransform
    public void validate(PipelineOptions pipelineOptions) {
        this.sink.validate(pipelineOptions);
    }

    @Override // org.apache.beam.sdk.transforms.PTransform, org.apache.beam.sdk.transforms.display.HasDisplayData
    public void populateDisplayData(DisplayData.Builder builder) {
        super.populateDisplayData(builder);
        builder.add(DisplayData.item("sink", this.sink.getClass()).withLabel("WriteFiles Sink")).include("sink", this.sink);
        if (getSharding() != null) {
            builder.include("sharding", getSharding());
        } else if (getNumShards() != null) {
            builder.add(DisplayData.item("numShards", getNumShards().isAccessible() ? getNumShards().get().toString() : getNumShards().toString()).withLabel("Fixed Number of Shards"));
        }
    }

    public FileBasedSink<T> getSink() {
        return this.sink;
    }

    @Nullable
    public PTransform<PCollection<T>, PCollectionView<Integer>> getSharding() {
        return this.computeNumShards;
    }

    public ValueProvider<Integer> getNumShards() {
        return this.numShardsProvider;
    }

    public WriteFiles<T> withNumShards(int i) {
        return i > 0 ? withNumShards(ValueProvider.StaticValueProvider.of(Integer.valueOf(i))) : withRunnerDeterminedSharding();
    }

    public WriteFiles<T> withNumShards(ValueProvider<Integer> valueProvider) {
        return new WriteFiles<>(this.sink, null, valueProvider, this.windowedWrites);
    }

    public WriteFiles<T> withSharding(PTransform<PCollection<T>, PCollectionView<Integer>> pTransform) {
        Preconditions.checkNotNull(pTransform, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead");
        return new WriteFiles<>(this.sink, pTransform, null, this.windowedWrites);
    }

    public WriteFiles<T> withRunnerDeterminedSharding() {
        return new WriteFiles<>(this.sink, null, null, this.windowedWrites);
    }

    public WriteFiles<T> withWindowedWrites() {
        return new WriteFiles<>(this.sink, this.computeNumShards, this.numShardsProvider, true);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> void writeOrClose(FileBasedSink.Writer<T> writer, T t) throws Exception {
        try {
            writer.write(t);
        } catch (Exception e) {
            try {
                writer.close();
            } catch (Exception e2) {
                if (e2 instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                e.addSuppressed(e2);
            }
            throw e;
        }
    }

    private PDone createWrite(PCollection<T> pCollection) {
        PCollectionView pCollectionView;
        PCollection pCollection2;
        Pipeline pipeline = pCollection.getPipeline();
        if (!this.windowedWrites) {
            pCollection = (PCollection) pCollection.apply(Window.into(new GlobalWindows()).triggering(DefaultTrigger.of()).discardingFiredPanes());
        }
        Coder<?> windowCoder = pCollection.getWindowingStrategy().getWindowFn().windowCoder();
        if (this.computeNumShards == null && this.numShardsProvider == null) {
            pCollectionView = null;
            pCollection2 = (PCollection) pCollection.apply("WriteBundles", ParDo.of(this.windowedWrites ? new WriteWindowedBundles() : new WriteUnwindowedBundles()));
        } else {
            ArrayList newArrayList = Lists.newArrayList();
            if (this.computeNumShards != null) {
                pCollectionView = (PCollectionView) pCollection.apply(this.computeNumShards);
                newArrayList.add(pCollectionView);
            } else {
                pCollectionView = null;
            }
            PCollection pCollection3 = (PCollection) ((PCollection) pCollection.apply("ApplyShardLabel", ParDo.of(new ApplyShardingKey(pCollectionView, pCollectionView != null ? null : this.numShardsProvider)).withSideInputs(newArrayList))).apply("GroupIntoShards", GroupByKey.create());
            windowCoder = pCollection3.getWindowingStrategy().getWindowFn().windowCoder();
            pCollection2 = (PCollection) pCollection3.apply("WriteShardedBundles", ParDo.of(new WriteShardedBundles()));
        }
        pCollection2.setCoder(FileBasedSink.FileResultCoder.of(windowCoder));
        if (this.windowedWrites) {
            PCollection pCollection4 = (PCollection) pCollection2.apply("AttachSingletonKey", WithKeys.of((Void) null));
            pCollection4.setCoder(KvCoder.of(VoidCoder.of(), FileBasedSink.FileResultCoder.of(windowCoder)));
            ((PCollection) pCollection4.apply("FinalizeGroupByKey", GroupByKey.create())).apply("Finalize", ParDo.of(new DoFn<KV<Void, Iterable<FileBasedSink.FileResult>>, Integer>() { // from class: org.apache.beam.sdk.io.WriteFiles.1
                @DoFn.ProcessElement
                public void processElement(DoFn<KV<Void, Iterable<FileBasedSink.FileResult>>, Integer>.ProcessContext processContext) throws Exception {
                    WriteFiles.LOG.info("Finalizing write operation {}.", WriteFiles.this.writeOperation);
                    WriteFiles.this.writeOperation.finalize(Lists.newArrayList(processContext.element().getValue()));
                    WriteFiles.LOG.debug("Done finalizing write operation");
                }
            }));
        } else {
            final PCollectionView pCollectionView2 = (PCollectionView) pCollection2.apply(View.asIterable());
            ImmutableList.Builder add = ImmutableList.builder().add((ImmutableList.Builder) pCollectionView2);
            if (pCollectionView != null) {
                add.add((ImmutableList.Builder) pCollectionView);
            }
            final PCollectionView pCollectionView3 = pCollectionView;
            ((PCollection) pipeline.apply(Create.of((Void) null, new Void[0]))).apply("Finalize", ParDo.of(new DoFn<Void, Integer>() { // from class: org.apache.beam.sdk.io.WriteFiles.2
                @DoFn.ProcessElement
                public void processElement(DoFn<Void, Integer>.ProcessContext processContext) throws Exception {
                    WriteFiles.LOG.info("Finalizing write operation {}.", WriteFiles.this.writeOperation);
                    ArrayList newArrayList2 = Lists.newArrayList((Iterable) processContext.sideInput(pCollectionView2));
                    WriteFiles.LOG.debug("Side input initialized to finalize write operation {}.", WriteFiles.this.writeOperation);
                    int intValue = pCollectionView3 != null ? ((Integer) processContext.sideInput(pCollectionView3)).intValue() : WriteFiles.this.numShardsProvider != null ? ((Integer) WriteFiles.this.numShardsProvider.get()).intValue() : 1;
                    int size = intValue - newArrayList2.size();
                    if (size > 0) {
                        WriteFiles.LOG.info("Creating {} empty output shards in addition to {} written for a total of {}.", new Object[]{Integer.valueOf(size), Integer.valueOf(newArrayList2.size()), Integer.valueOf(intValue)});
                        for (int i = 0; i < size; i++) {
                            FileBasedSink.Writer<T> createWriter = WriteFiles.this.writeOperation.createWriter();
                            createWriter.openUnwindowed(UUID.randomUUID().toString(), -1);
                            newArrayList2.add(createWriter.close());
                        }
                        WriteFiles.LOG.debug("Done creating extra shards.");
                    }
                    WriteFiles.this.writeOperation.finalize(newArrayList2);
                    WriteFiles.LOG.debug("Done finalizing write operation {}", WriteFiles.this.writeOperation);
                }
            }).withSideInputs(add.build()));
        }
        return PDone.in(pCollection.getPipeline());
    }
}
