package org.apache.beam.sdk.io;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Path;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.GZIPOutputStream;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdks.java.core.repackaged.com.google.common.base.MoreObjects;
import org.apache.beam.sdks.java.core.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdks.java.core.repackaged.com.google.common.base.Strings;
import org.apache.beam.sdks.java.core.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.sdks.java.core.repackaged.com.google.common.collect.Lists;
import org.apache.beam.sdks.java.core.repackaged.com.google.common.collect.Ordering;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/FileBasedSink.class */
public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
    private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class);
    private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+)");
    protected final WritableByteChannelFactory writableByteChannelFactory;
    protected FilenamePolicy fileNamePolicy;

    /* loaded from: input_file:org/apache/beam/sdk/io/FileBasedSink$CompressionType.class */
    public enum CompressionType implements WritableByteChannelFactory {
        UNCOMPRESSED("", null) { // from class: org.apache.beam.sdk.io.FileBasedSink.CompressionType.1
            @Override // org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory
            public WritableByteChannel create(WritableByteChannel writableByteChannel) throws IOException {
                return writableByteChannel;
            }
        },
        GZIP(".gz", MimeTypes.BINARY) { // from class: org.apache.beam.sdk.io.FileBasedSink.CompressionType.2
            @Override // org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory
            public WritableByteChannel create(WritableByteChannel writableByteChannel) throws IOException {
                return Channels.newChannel(new GZIPOutputStream(Channels.newOutputStream(writableByteChannel), true));
            }
        },
        BZIP2(".bz2", MimeTypes.BINARY) { // from class: org.apache.beam.sdk.io.FileBasedSink.CompressionType.3
            @Override // org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory
            public WritableByteChannel create(WritableByteChannel writableByteChannel) throws IOException {
                return Channels.newChannel((OutputStream) new BZip2CompressorOutputStream(Channels.newOutputStream(writableByteChannel)));
            }
        },
        DEFLATE(".deflate", MimeTypes.BINARY) { // from class: org.apache.beam.sdk.io.FileBasedSink.CompressionType.4
            @Override // org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory
            public WritableByteChannel create(WritableByteChannel writableByteChannel) throws IOException {
                return Channels.newChannel((OutputStream) new DeflateCompressorOutputStream(Channels.newOutputStream(writableByteChannel)));
            }
        };

        private String filenameSuffix;

        @Nullable
        private String mimeType;

        CompressionType(String str, @Nullable String str2) {
            this.filenameSuffix = str;
            this.mimeType = str2;
        }

        @Override // org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory
        public String getFilenameSuffix() {
            return this.filenameSuffix;
        }

        @Override // org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory
        @Nullable
        public String getMimeType() {
            return this.mimeType;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/FileBasedSink$DefaultFilenamePolicy.class */
    protected class DefaultFilenamePolicy extends FilenamePolicy {
        ValueProvider<String> baseOutputFilename;
        String extension;
        String fileNamingTemplate;

        public DefaultFilenamePolicy(ValueProvider<String> valueProvider, String str, String str2) {
            this.baseOutputFilename = valueProvider;
            if (Strings.isNullOrEmpty(FileBasedSink.this.writableByteChannelFactory.getFilenameSuffix())) {
                this.extension = str;
            } else {
                this.extension = str + FileBasedSink.getFileExtension(FileBasedSink.this.writableByteChannelFactory.getFilenameSuffix());
            }
            this.fileNamingTemplate = str2;
        }

        @Override // org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy
        public String unwindowedFilename(FilenamePolicy.Context context) {
            if (context.numShards <= 0) {
                return null;
            }
            return FileBasedSink.constructName(this.baseOutputFilename.get(), this.fileNamingTemplate, FileBasedSink.getFileExtension(this.extension), context.getShardNumber(), context.getNumShards());
        }

        @Override // org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy
        public String windowedFilename(FilenamePolicy.WindowedContext windowedContext) {
            throw new UnsupportedOperationException("There is no default policy for windowed file output. Please provide an explicit FilenamePolicy to generate filenames.");
        }

        @Override // org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy
        public ValueProvider<String> getBaseOutputFilenameProvider() {
            return this.baseOutputFilename;
        }

        @Override // org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy
        public void populateDisplayData(DisplayData.Builder builder) {
            Object[] objArr = new Object[3];
            objArr[0] = this.baseOutputFilename.isAccessible() ? this.baseOutputFilename.get() : this.baseOutputFilename.toString();
            objArr[1] = this.fileNamingTemplate;
            objArr[2] = FileBasedSink.getFileExtension(this.extension);
            builder.add(DisplayData.item("fileNamePattern", String.format("%s%s%s", objArr)).withLabel("File Name Pattern"));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/FileBasedSink$FileBasedWriteOperation.class */
    public static abstract class FileBasedWriteOperation<T> implements Serializable {
        protected final FileBasedSink<T> sink;
        protected final ValueProvider<String> tempDirectory;
        protected boolean windowedWrites;

        /* loaded from: input_file:org/apache/beam/sdk/io/FileBasedSink$FileBasedWriteOperation$TemporaryDirectoryBuilder.class */
        private static class TemporaryDirectoryBuilder implements SerializableFunction<String, String> {
            Instant now;

            private TemporaryDirectoryBuilder() {
                this.now = Instant.now();
            }

            @Override // org.apache.beam.sdk.transforms.SerializableFunction
            public String apply(String str) {
                try {
                    Path path = IOChannelUtils.getFactory(str).toPath(str);
                    return path.resolveSibling("temp-beam-" + path.getFileName() + "-" + this.now.toString(DateTimeFormat.forPattern("yyyy-MM-DD_HH-mm-ss"))).toString();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }

        protected static String buildTemporaryFilename(String str, String str2) throws IOException {
            return IOChannelUtils.getFactory(str).resolve(str, str2);
        }

        public FileBasedWriteOperation(FileBasedSink<T> fileBasedSink) {
            this(fileBasedSink, ValueProvider.NestedValueProvider.of(fileBasedSink.getBaseOutputFilenameProvider(), new TemporaryDirectoryBuilder()));
        }

        public FileBasedWriteOperation(FileBasedSink<T> fileBasedSink, String str) {
            this(fileBasedSink, ValueProvider.StaticValueProvider.of(str));
        }

        private FileBasedWriteOperation(FileBasedSink<T> fileBasedSink, ValueProvider<String> valueProvider) {
            this.sink = fileBasedSink;
            this.tempDirectory = valueProvider;
            this.windowedWrites = false;
        }

        public abstract FileBasedWriter<T> createWriter(PipelineOptions pipelineOptions) throws Exception;

        public void setWindowedWrites(boolean z) {
            this.windowedWrites = z;
        }

        public void finalize(Iterable<FileResult> iterable, PipelineOptions pipelineOptions) throws Exception {
            Map<String, String> buildOutputFilenames = buildOutputFilenames(iterable);
            copyToOutputFiles(buildOutputFilenames, pipelineOptions);
            removeTemporaryFiles(buildOutputFilenames.keySet(), !this.windowedWrites, pipelineOptions);
        }

        protected final Map<String, String> buildOutputFilenames(Iterable<FileResult> iterable) {
            HashMap hashMap = new HashMap();
            ArrayList arrayList = new ArrayList();
            for (FileResult fileResult : iterable) {
                if (fileResult.getDestinationFilename() != null) {
                    hashMap.put(fileResult.getFilename(), fileResult.getDestinationFilename());
                } else {
                    arrayList.add(fileResult.getFilename());
                }
            }
            if (arrayList.size() > 0) {
                Preconditions.checkArgument(hashMap.isEmpty());
                List sortedCopy = Ordering.natural().sortedCopy(arrayList);
                FilenamePolicy filenamePolicy = getSink().fileNamePolicy;
                for (int i = 0; i < sortedCopy.size(); i++) {
                    hashMap.put(sortedCopy.get(i), filenamePolicy.unwindowedFilename(new FilenamePolicy.Context(i, sortedCopy.size())));
                }
            }
            int size = new HashSet(hashMap.values()).size();
            Preconditions.checkState(size == hashMap.size(), "Only generated %s distinct file names for %s files.", size, hashMap.size());
            return hashMap;
        }

        protected final void copyToOutputFiles(Map<String, String> map, PipelineOptions pipelineOptions) throws IOException {
            int size = map.size();
            if (size <= 0) {
                FileBasedSink.LOG.info("No output files to write.");
            } else {
                FileBasedSink.LOG.debug("Copying {} files.", Integer.valueOf(size));
                IOChannelUtils.getFactory(map.values().iterator().next()).copy(map.keySet(), map.values());
            }
        }

        protected final void removeTemporaryFiles(Set<String> set, boolean z, PipelineOptions pipelineOptions) throws IOException {
            String str = this.tempDirectory.get();
            FileBasedSink.LOG.debug("Removing temporary bundle output files in {}.", str);
            IOChannelFactory factory = IOChannelUtils.getFactory(str);
            HashSet hashSet = new HashSet();
            if (z) {
                try {
                    hashSet.addAll(factory.match(factory.resolve(str, "*")));
                } catch (Exception e) {
                    FileBasedSink.LOG.warn("Failed to match temporary files under: [{}].", str);
                }
            }
            HashSet hashSet2 = new HashSet(hashSet);
            hashSet2.addAll(set);
            FileBasedSink.LOG.debug("Removing {} temporary files found under {} ({} matched glob, {} known files)", new Object[]{Integer.valueOf(hashSet2.size()), str, Integer.valueOf(hashSet.size()), Integer.valueOf(hashSet2.size() - hashSet.size())});
            try {
                factory.remove(hashSet2);
                factory.remove(ImmutableList.of(str));
            } catch (Exception e2) {
                FileBasedSink.LOG.warn("Failed to remove temporary directory: [{}].", str);
            }
        }

        public final Coder<FileResult> getFileResultCoder() {
            return FileResultCoder.of();
        }

        public FileBasedSink<T> getSink() {
            return this.sink;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/FileBasedSink$FileBasedWriter.class */
    public static abstract class FileBasedWriter<T> {
        private static final Logger LOG = LoggerFactory.getLogger(FileBasedWriter.class);
        final FileBasedWriteOperation<T> writeOperation;
        private String id;
        private BoundedWindow window;
        private PaneInfo paneInfo;
        private int shard = -1;
        private int numShards = -1;
        private String filename;
        private WritableByteChannel channel;
        private final String mimeType;

        public FileBasedWriter(FileBasedWriteOperation<T> fileBasedWriteOperation, String str) {
            Preconditions.checkNotNull(fileBasedWriteOperation);
            this.writeOperation = fileBasedWriteOperation;
            this.mimeType = str;
        }

        protected abstract void prepareWrite(WritableByteChannel writableByteChannel) throws Exception;

        protected void writeHeader() throws Exception {
        }

        protected void writeFooter() throws Exception {
        }

        protected void finishWrite() throws Exception {
        }

        public final void openWindowed(String str, BoundedWindow boundedWindow, PaneInfo paneInfo, int i, int i2) throws Exception {
            if (!getWriteOperation().windowedWrites) {
                throw new IllegalStateException("openWindowed called a non-windowed sink.");
            }
            open(str, boundedWindow, paneInfo, i, i2);
        }

        public abstract void write(T t) throws Exception;

        public final void openUnwindowed(String str, int i, int i2) throws Exception {
            if (getWriteOperation().windowedWrites) {
                throw new IllegalStateException("openUnwindowed called a windowed sink.");
            }
            open(str, null, null, i, i2);
        }

        private void open(String str, @Nullable BoundedWindow boundedWindow, @Nullable PaneInfo paneInfo, int i, int i2) throws Exception {
            this.id = str;
            this.window = boundedWindow;
            this.paneInfo = paneInfo;
            this.shard = i;
            this.numShards = i2;
            this.filename = FileBasedWriteOperation.buildTemporaryFilename(getWriteOperation().tempDirectory.get(), str);
            LOG.debug("Opening {}.", this.filename);
            WritableByteChannelFactory writableByteChannelFactory = getWriteOperation().getSink().writableByteChannelFactory;
            this.channel = writableByteChannelFactory.create(IOChannelUtils.create(this.filename, (String) MoreObjects.firstNonNull(writableByteChannelFactory.getMimeType(), this.mimeType)));
            try {
                prepareWrite(this.channel);
                LOG.debug("Writing header to {}.", this.filename);
                writeHeader();
                LOG.debug("Starting write of bundle {} to {}.", this.id, this.filename);
            } catch (Exception e) {
                try {
                    LOG.error("Writing header to {} failed, closing channel.", this.filename);
                    this.channel.close();
                } catch (IOException e2) {
                    LOG.error("Closing channel for {} failed: {}", this.filename, e2.getMessage());
                }
                throw e;
            }
        }

        public final void cleanup() throws Exception {
            if (this.filename != null) {
                IOChannelUtils.getFactory(this.filename).remove(Lists.newArrayList(this.filename));
            }
        }

        public final FileResult close() throws Exception {
            WritableByteChannel writableByteChannel = this.channel;
            Throwable th = null;
            try {
                LOG.debug("Writing footer to {}.", this.filename);
                writeFooter();
                LOG.debug("Finishing write to {}.", this.filename);
                finishWrite();
                if (!this.channel.isOpen()) {
                    throw new IllegalStateException("Channel should only be closed by its owner: " + this.channel);
                }
                FilenamePolicy filenamePolicy = getWriteOperation().getSink().fileNamePolicy;
                String windowedFilename = this.window != null ? filenamePolicy.windowedFilename(new FilenamePolicy.WindowedContext(this.window, this.paneInfo, this.shard, this.numShards)) : filenamePolicy.unwindowedFilename(new FilenamePolicy.Context(this.shard, this.numShards));
                FileResult fileResult = new FileResult(this.filename, windowedFilename);
                LOG.debug("Result for bundle {}: {} {}", new Object[]{this.id, this.filename, windowedFilename});
                return fileResult;
            } finally {
                if (writableByteChannel != null) {
                    if (0 != 0) {
                        try {
                            writableByteChannel.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        writableByteChannel.close();
                    }
                }
            }
        }

        public FileBasedWriteOperation<T> getWriteOperation() {
            return this.writeOperation;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/FileBasedSink$FileResult.class */
    public static final class FileResult {
        private final String filename;
        private final String destinationFilename;

        public FileResult(String str, String str2) {
            this.filename = str;
            this.destinationFilename = str2;
        }

        public String getFilename() {
            return this.filename;
        }

        public String getDestinationFilename() {
            return this.destinationFilename;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/FileBasedSink$FileResultCoder.class */
    public static final class FileResultCoder extends CustomCoder<FileResult> {
        private static final FileResultCoder INSTANCE = new FileResultCoder();
        private final Coder<String> stringCoder = NullableCoder.of(StringUtf8Coder.of());

        public static FileResultCoder of() {
            return INSTANCE;
        }

        @Override // org.apache.beam.sdk.coders.Coder
        public void encode(FileResult fileResult, OutputStream outputStream, Coder.Context context) throws IOException {
            if (fileResult == null) {
                throw new CoderException("cannot encode a null value");
            }
            this.stringCoder.encode(fileResult.getFilename(), outputStream, context.nested());
            this.stringCoder.encode(fileResult.getDestinationFilename(), outputStream, context.nested());
        }

        @Override // org.apache.beam.sdk.coders.Coder
        public FileResult decode(InputStream inputStream, Coder.Context context) throws IOException {
            return new FileResult(this.stringCoder.decode(inputStream, context.nested()), this.stringCoder.decode(inputStream, context.nested()));
        }

        @Override // org.apache.beam.sdk.coders.CustomCoder, org.apache.beam.sdk.coders.Coder
        public void verifyDeterministic() throws Coder.NonDeterministicException {
            throw new Coder.NonDeterministicException(this, "TableRows are not deterministic.");
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/FileBasedSink$FilenamePolicy.class */
    public static abstract class FilenamePolicy implements Serializable {

        /* loaded from: input_file:org/apache/beam/sdk/io/FileBasedSink$FilenamePolicy$Context.class */
        public static class Context {
            private int shardNumber;
            private int numShards;

            public Context(int i, int i2) {
                this.shardNumber = i;
                this.numShards = i2;
            }

            public int getShardNumber() {
                return this.shardNumber;
            }

            public int getNumShards() {
                return this.numShards;
            }
        }

        /* loaded from: input_file:org/apache/beam/sdk/io/FileBasedSink$FilenamePolicy$WindowedContext.class */
        public static class WindowedContext {
            private int shardNumber;
            private int numShards;
            private BoundedWindow window;
            private PaneInfo paneInfo;

            public WindowedContext(BoundedWindow boundedWindow, PaneInfo paneInfo, int i, int i2) {
                this.window = boundedWindow;
                this.paneInfo = paneInfo;
                this.shardNumber = i;
                this.numShards = i2;
            }

            public BoundedWindow getWindow() {
                return this.window;
            }

            public PaneInfo getPaneInfo() {
                return this.paneInfo;
            }

            public int getShardNumber() {
                return this.shardNumber;
            }

            public int getNumShards() {
                return this.numShards;
            }
        }

        public abstract String windowedFilename(WindowedContext windowedContext);

        public abstract String unwindowedFilename(Context context);

        public abstract ValueProvider<String> getBaseOutputFilenameProvider();

        public void populateDisplayData(DisplayData.Builder builder) {
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/FileBasedSink$WritableByteChannelFactory.class */
    public interface WritableByteChannelFactory extends Serializable {
        WritableByteChannel create(WritableByteChannel writableByteChannel) throws IOException;

        @Nullable
        String getMimeType();

        @Nullable
        String getFilenameSuffix();
    }

    public static String constructName(String str, String str2, String str3, int i, int i2) {
        StringBuffer stringBuffer = new StringBuffer();
        stringBuffer.append(str);
        Matcher matcher = SHARD_FORMAT_RE.matcher(str2);
        while (matcher.find()) {
            boolean z = matcher.group(1).charAt(0) == 'S';
            char[] cArr = new char[matcher.end() - matcher.start()];
            Arrays.fill(cArr, '0');
            matcher.appendReplacement(stringBuffer, new DecimalFormat(String.valueOf(cArr)).format(z ? i : i2));
        }
        matcher.appendTail(stringBuffer);
        stringBuffer.append(str3);
        return stringBuffer.toString();
    }

    public FileBasedSink(String str, String str2) {
        this(str, str2, ShardNameTemplate.INDEX_OF_MAX);
    }

    public FileBasedSink(String str, String str2, WritableByteChannelFactory writableByteChannelFactory) {
        this(ValueProvider.StaticValueProvider.of(str), str2, ShardNameTemplate.INDEX_OF_MAX, writableByteChannelFactory);
    }

    public FileBasedSink(String str, String str2, String str3) {
        this(ValueProvider.StaticValueProvider.of(str), str2, str3, CompressionType.UNCOMPRESSED);
    }

    public FileBasedSink(ValueProvider<String> valueProvider, String str, String str2, WritableByteChannelFactory writableByteChannelFactory) {
        this.writableByteChannelFactory = writableByteChannelFactory;
        this.fileNamePolicy = new DefaultFilenamePolicy(valueProvider, str, str2);
    }

    public FileBasedSink(FilenamePolicy filenamePolicy) {
        this(filenamePolicy, CompressionType.UNCOMPRESSED);
    }

    public FileBasedSink(FilenamePolicy filenamePolicy, WritableByteChannelFactory writableByteChannelFactory) {
        this.fileNamePolicy = filenamePolicy;
        this.writableByteChannelFactory = writableByteChannelFactory;
    }

    public ValueProvider<String> getBaseOutputFilenameProvider() {
        return this.fileNamePolicy.getBaseOutputFilenameProvider();
    }

    public FilenamePolicy getFileNamePolicy() {
        return this.fileNamePolicy;
    }

    public void validate(PipelineOptions pipelineOptions) {
    }

    public abstract FileBasedWriteOperation<T> createWriteOperation();

    @Override // org.apache.beam.sdk.transforms.display.HasDisplayData
    public void populateDisplayData(DisplayData.Builder builder) {
        getFileNamePolicy().populateDisplayData(builder);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String getFileExtension(String str) {
        return (str == null || str.isEmpty()) ? "" : str.startsWith(".") ? str : "." + str;
    }
}
