package org.apache.flume.sink.hdfs;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import java.io.IOException;
import java.lang.reflect.Method;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flume.Clock;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.SystemClock;
import org.apache.flume.auth.PrivilegedExecutor;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.hdfs.HDFSEventSink;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:META-INF/bundled-dependencies/flume-hdfs-sink-1.9.0.jar:org/apache/flume/sink/hdfs/BucketWriter.class */
public class BucketWriter {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BucketWriter.class);
    private static final Integer staticLock = new Integer(1);
    private Method isClosedMethod;
    private final HDFSWriter writer;
    private final long rollInterval;
    private final long rollSize;
    private final long rollCount;
    private final long batchSize;
    private final CompressionCodec codeC;
    private final SequenceFile.CompressionType compType;
    private final ScheduledExecutorService timedRollerPool;
    private final PrivilegedExecutor proxyUser;
    private final AtomicLong fileExtensionCounter;
    private long eventCounter;
    private long processSize;
    private FileSystem fileSystem;
    private volatile String filePath;
    private volatile String fileName;
    private volatile String inUsePrefix;
    private volatile String inUseSuffix;
    private volatile String fileSuffix;
    private volatile String bucketPath;
    private volatile String targetPath;
    private volatile long batchCounter;
    private volatile boolean isOpen;
    private volatile boolean isUnderReplicated;
    private volatile int consecutiveUnderReplRotateCount;
    private volatile ScheduledFuture<Void> timedRollFuture;
    private SinkCounter sinkCounter;
    private final int idleTimeout;
    private volatile ScheduledFuture<Void> idleFuture;
    private final HDFSEventSink.WriterCallback onCloseCallback;
    private final String onCloseCallbackPath;
    private final long callTimeout;
    private final ExecutorService callTimeoutPool;
    private final int maxConsecUnderReplRotations = 30;
    private boolean mockFsInjected;
    private final long retryInterval;
    private final int maxRetries;
    protected AtomicBoolean closed;
    AtomicInteger renameTries;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/flume-hdfs-sink-1.9.0.jar:org/apache/flume/sink/hdfs/BucketWriter$CallRunner.class */
    public interface CallRunner<T> {
        T call() throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/flume-hdfs-sink-1.9.0.jar:org/apache/flume/sink/hdfs/BucketWriter$CloseHandler.class */
    public class CloseHandler implements Callable<Void> {
        private final String path;
        private int closeTries;

        private CloseHandler() {
            this.path = BucketWriter.this.bucketPath;
            this.closeTries = 0;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws Exception {
            close(false);
            return null;
        }

        public void close(boolean z) {
            this.closeTries++;
            boolean z2 = this.closeTries < BucketWriter.this.maxRetries && !z;
            try {
                BucketWriter.this.callWithTimeout(BucketWriter.this.createCloseCallRunner());
                BucketWriter.this.sinkCounter.incrementConnectionClosedCount();
            } catch (IOException | InterruptedException e) {
                BucketWriter.LOG.warn("Closing file: " + this.path + " failed. Will retry again in " + BucketWriter.this.retryInterval + " seconds.", e);
                if (BucketWriter.this.timedRollerPool == null || BucketWriter.this.timedRollerPool.isTerminated()) {
                    BucketWriter.LOG.warn("Cannot retry close any more timedRollerPool is null or terminated");
                } else if (z2) {
                    BucketWriter.this.timedRollerPool.schedule(this, BucketWriter.this.retryInterval, TimeUnit.SECONDS);
                }
                if (z2) {
                    return;
                }
                BucketWriter.LOG.warn("Unsuccessfully attempted to close " + this.path + " " + BucketWriter.this.maxRetries + " times. Initializing lease recovery.");
                BucketWriter.this.sinkCounter.incrementConnectionFailedCount();
                BucketWriter.this.recoverLease();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/flume-hdfs-sink-1.9.0.jar:org/apache/flume/sink/hdfs/BucketWriter$ScheduledRenameCallable.class */
    public class ScheduledRenameCallable implements Callable<Void> {
        private final String path;
        private final String finalPath;
        private FileSystem fs;
        private int renameTries;

        private ScheduledRenameCallable() {
            this.path = BucketWriter.this.bucketPath;
            this.finalPath = BucketWriter.this.targetPath;
            this.fs = BucketWriter.this.fileSystem;
            this.renameTries = 1;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws Exception {
            if (this.renameTries >= BucketWriter.this.maxRetries) {
                BucketWriter.LOG.warn("Unsuccessfully attempted to rename " + this.path + " " + BucketWriter.this.maxRetries + " times. File may still be open.");
                return null;
            }
            this.renameTries++;
            try {
                BucketWriter.this.renameBucket(this.path, this.finalPath, this.fs);
                return null;
            } catch (Exception e) {
                BucketWriter.LOG.warn("Renaming file: " + this.path + " failed. Will retry again in " + BucketWriter.this.retryInterval + " seconds.", (Throwable) e);
                BucketWriter.this.timedRollerPool.schedule(this, BucketWriter.this.retryInterval, TimeUnit.SECONDS);
                return null;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BucketWriter(long j, long j2, long j3, long j4, Context context, String str, String str2, String str3, String str4, String str5, CompressionCodec compressionCodec, SequenceFile.CompressionType compressionType, HDFSWriter hDFSWriter, ScheduledExecutorService scheduledExecutorService, PrivilegedExecutor privilegedExecutor, SinkCounter sinkCounter, int i, HDFSEventSink.WriterCallback writerCallback, String str6, long j5, ExecutorService executorService, long j6, int i2) {
        this(j, j2, j3, j4, context, str, str2, str3, str4, str5, compressionCodec, compressionType, hDFSWriter, scheduledExecutorService, privilegedExecutor, sinkCounter, i, writerCallback, str6, j5, executorService, j6, i2, new SystemClock());
    }

    BucketWriter(long j, long j2, long j3, long j4, Context context, String str, String str2, String str3, String str4, String str5, CompressionCodec compressionCodec, SequenceFile.CompressionType compressionType, HDFSWriter hDFSWriter, ScheduledExecutorService scheduledExecutorService, PrivilegedExecutor privilegedExecutor, SinkCounter sinkCounter, int i, HDFSEventSink.WriterCallback writerCallback, String str6, long j5, ExecutorService executorService, long j6, int i2, Clock clock) {
        this.isClosedMethod = null;
        this.consecutiveUnderReplRotateCount = 0;
        this.maxConsecUnderReplRotations = 30;
        this.mockFsInjected = false;
        this.closed = new AtomicBoolean();
        this.renameTries = new AtomicInteger(0);
        this.rollInterval = j;
        this.rollSize = j2;
        this.rollCount = j3;
        this.batchSize = j4;
        this.filePath = str;
        this.fileName = str2;
        this.inUsePrefix = str3;
        this.inUseSuffix = str4;
        this.fileSuffix = str5;
        this.codeC = compressionCodec;
        this.compType = compressionType;
        this.writer = hDFSWriter;
        this.timedRollerPool = scheduledExecutorService;
        this.proxyUser = privilegedExecutor;
        this.sinkCounter = sinkCounter;
        this.idleTimeout = i;
        this.onCloseCallback = writerCallback;
        this.onCloseCallbackPath = str6;
        this.callTimeout = j5;
        this.callTimeoutPool = executorService;
        this.fileExtensionCounter = new AtomicLong(clock.currentTimeMillis());
        this.retryInterval = j6;
        this.maxRetries = i2;
        this.isOpen = false;
        this.isUnderReplicated = false;
        this.writer.configure(context);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public void setFileSystem(FileSystem fileSystem) {
        this.fileSystem = fileSystem;
        this.mockFsInjected = true;
    }

    private void resetCounters() {
        this.eventCounter = 0L;
        this.processSize = 0L;
        this.batchCounter = 0L;
    }

    private Method getRefIsClosed() {
        try {
            return this.fileSystem.getClass().getMethod("isFileClosed", Path.class);
        } catch (Exception e) {
            LOG.info("isFileClosed() is not available in the version of the distributed filesystem being used. Flume will not attempt to re-close files if the close fails on the first attempt");
            return null;
        }
    }

    private Boolean isFileClosed(FileSystem fileSystem, Path path) throws Exception {
        return (Boolean) this.isClosedMethod.invoke(fileSystem, path);
    }

    private void open() throws IOException, InterruptedException {
        if (this.filePath == null || this.writer == null) {
            throw new IOException("Invalid file settings");
        }
        final Configuration configuration = new Configuration();
        configuration.setBoolean("fs.automatic.close", false);
        synchronized (staticLock) {
            checkAndThrowInterruptedException();
            try {
                String str = this.fileName + "." + this.fileExtensionCounter.incrementAndGet();
                if (this.fileSuffix != null && this.fileSuffix.length() > 0) {
                    str = str + this.fileSuffix;
                } else if (this.codeC != null) {
                    str = str + this.codeC.getDefaultExtension();
                }
                this.bucketPath = this.filePath + "/" + this.inUsePrefix + str + this.inUseSuffix;
                this.targetPath = this.filePath + "/" + str;
                LOG.info("Creating " + this.bucketPath);
                callWithTimeout(new CallRunner<Void>() { // from class: org.apache.flume.sink.hdfs.BucketWriter.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // org.apache.flume.sink.hdfs.BucketWriter.CallRunner
                    public Void call() throws Exception {
                        if (BucketWriter.this.codeC == null) {
                            if (!BucketWriter.this.mockFsInjected) {
                                BucketWriter.this.fileSystem = new Path(BucketWriter.this.bucketPath).getFileSystem(configuration);
                            }
                            BucketWriter.this.writer.open(BucketWriter.this.bucketPath);
                            return null;
                        }
                        if (!BucketWriter.this.mockFsInjected) {
                            BucketWriter.this.fileSystem = new Path(BucketWriter.this.bucketPath).getFileSystem(configuration);
                        }
                        BucketWriter.this.writer.open(BucketWriter.this.bucketPath, BucketWriter.this.codeC, BucketWriter.this.compType);
                        return null;
                    }
                });
            } catch (Exception e) {
                this.sinkCounter.incrementConnectionFailedCount();
                if (!(e instanceof IOException)) {
                    throw Throwables.propagate(e);
                }
                throw ((IOException) e);
            }
        }
        this.isClosedMethod = getRefIsClosed();
        this.sinkCounter.incrementConnectionCreatedCount();
        resetCounters();
        if (this.rollInterval > 0) {
            this.timedRollFuture = this.timedRollerPool.schedule(new Callable<Void>() { // from class: org.apache.flume.sink.hdfs.BucketWriter.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    BucketWriter.LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.", BucketWriter.this.bucketPath, Long.valueOf(BucketWriter.this.rollInterval));
                    try {
                        BucketWriter.this.close(true);
                        return null;
                    } catch (Throwable th) {
                        BucketWriter.LOG.error("Unexpected error", th);
                        return null;
                    }
                }
            }, this.rollInterval, TimeUnit.SECONDS);
        }
        this.isOpen = true;
    }

    public void close() throws InterruptedException {
        close(false);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public CallRunner<Void> createCloseCallRunner() {
        return new CallRunner<Void>() { // from class: org.apache.flume.sink.hdfs.BucketWriter.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.flume.sink.hdfs.BucketWriter.CallRunner
            public Void call() throws Exception {
                BucketWriter.this.writer.close();
                return null;
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void recoverLease() {
        if (this.bucketPath == null || !(this.fileSystem instanceof DistributedFileSystem)) {
            return;
        }
        try {
            LOG.debug("Starting lease recovery for {}", this.bucketPath);
            this.fileSystem.recoverLease(new Path(this.bucketPath));
        } catch (IOException e) {
            LOG.warn("Lease recovery failed for {}", this.bucketPath, e);
        }
    }

    public void close(boolean z) throws InterruptedException {
        close(z, false);
    }

    public void close(boolean z, boolean z2) throws InterruptedException {
        if (z) {
            if (this.closed.compareAndSet(false, true)) {
                runCloseAction();
            } else {
                LOG.warn("This bucketWriter is already closing or closed.");
            }
        }
        doClose(z2);
    }

    private synchronized void doClose(boolean z) throws InterruptedException {
        checkAndThrowInterruptedException();
        try {
            flush();
        } catch (IOException e) {
            LOG.warn("pre-close flush failed", (Throwable) e);
        }
        LOG.info("Closing {}", this.bucketPath);
        if (this.isOpen) {
            new CloseHandler().close(z);
            this.isOpen = false;
        } else {
            LOG.info("HDFSWriter is already closed: {}", this.bucketPath);
        }
        if (this.timedRollFuture != null && !this.timedRollFuture.isDone()) {
            this.timedRollFuture.cancel(false);
            this.timedRollFuture = null;
        }
        if (this.idleFuture != null && !this.idleFuture.isDone()) {
            this.idleFuture.cancel(false);
            this.idleFuture = null;
        }
        if (this.bucketPath == null || this.fileSystem == null) {
            return;
        }
        try {
            renameBucket(this.bucketPath, this.targetPath, this.fileSystem);
        } catch (Exception e2) {
            LOG.warn("failed to rename() file (" + this.bucketPath + "). Exception follows.", (Throwable) e2);
            this.sinkCounter.incrementConnectionFailedCount();
            this.timedRollerPool.schedule(new ScheduledRenameCallable(), this.retryInterval, TimeUnit.SECONDS);
        }
    }

    public synchronized void flush() throws IOException, InterruptedException {
        checkAndThrowInterruptedException();
        if (isBatchComplete()) {
            return;
        }
        doFlush();
        if (this.idleTimeout > 0) {
            if (this.idleFuture == null || this.idleFuture.cancel(false)) {
                this.idleFuture = this.timedRollerPool.schedule(new Callable<Void>() { // from class: org.apache.flume.sink.hdfs.BucketWriter.4
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        BucketWriter.LOG.info("Closing idle bucketWriter {} at {}", BucketWriter.this.bucketPath, Long.valueOf(System.currentTimeMillis()));
                        if (!BucketWriter.this.isOpen) {
                            return null;
                        }
                        BucketWriter.this.close(true);
                        return null;
                    }
                }, this.idleTimeout, TimeUnit.SECONDS);
            }
        }
    }

    private void runCloseAction() {
        try {
            if (this.onCloseCallback != null) {
                this.onCloseCallback.run(this.onCloseCallbackPath);
            }
        } catch (Throwable th) {
            LOG.error("Unexpected error", th);
        }
    }

    private void doFlush() throws IOException, InterruptedException {
        callWithTimeout(new CallRunner<Void>() { // from class: org.apache.flume.sink.hdfs.BucketWriter.5
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.flume.sink.hdfs.BucketWriter.CallRunner
            public Void call() throws Exception {
                BucketWriter.this.writer.sync();
                return null;
            }
        });
        this.batchCounter = 0L;
    }

    public synchronized void append(final Event event) throws IOException, InterruptedException {
        checkAndThrowInterruptedException();
        if (this.idleFuture != null) {
            this.idleFuture.cancel(false);
            if (!this.idleFuture.isDone()) {
                try {
                    this.idleFuture.get(this.callTimeout, TimeUnit.MILLISECONDS);
                } catch (TimeoutException e) {
                    LOG.warn("Timeout while trying to cancel closing of idle file. Idle file close may have failed", (Throwable) e);
                } catch (Exception e2) {
                    LOG.warn("Error while trying to cancel closing of idle file. ", (Throwable) e2);
                }
            }
            this.idleFuture = null;
        }
        if (!this.isOpen) {
            if (this.closed.get()) {
                throw new BucketClosedException("This bucket writer was closed and this handle is thus no longer valid");
            }
            open();
        }
        if (shouldRotate()) {
            boolean z = true;
            if (this.isUnderReplicated) {
                if (this.consecutiveUnderReplRotateCount >= 30) {
                    z = false;
                    if (this.consecutiveUnderReplRotateCount == 30) {
                        LOG.error("Hit max consecutive under-replication rotations ({}); will not continue rolling files under this path due to under-replication", (Object) 30);
                    }
                } else {
                    LOG.warn("Block Under-replication detected. Rotating file.");
                }
                this.consecutiveUnderReplRotateCount++;
            } else {
                this.consecutiveUnderReplRotateCount = 0;
            }
            if (z) {
                close();
                open();
            }
        }
        try {
            this.sinkCounter.incrementEventDrainAttemptCount();
            callWithTimeout(new CallRunner<Void>() { // from class: org.apache.flume.sink.hdfs.BucketWriter.6
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.apache.flume.sink.hdfs.BucketWriter.CallRunner
                public Void call() throws Exception {
                    BucketWriter.this.writer.append(event);
                    return null;
                }
            });
            this.processSize += event.getBody().length;
            this.eventCounter++;
            this.batchCounter++;
            if (this.batchCounter == this.batchSize) {
                flush();
            }
        } catch (IOException e3) {
            LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" + this.bucketPath + ") and rethrowing exception.", e3.getMessage());
            close(true);
            throw e3;
        }
    }

    private boolean shouldRotate() {
        boolean z = false;
        if (this.writer.isUnderReplicated()) {
            this.isUnderReplicated = true;
            z = true;
        } else {
            this.isUnderReplicated = false;
        }
        if (this.rollCount > 0 && this.rollCount <= this.eventCounter) {
            LOG.debug("rolling: rollCount: {}, events: {}", Long.valueOf(this.rollCount), Long.valueOf(this.eventCounter));
            z = true;
        }
        if (this.rollSize > 0 && this.rollSize <= this.processSize) {
            LOG.debug("rolling: rollSize: {}, bytes: {}", Long.valueOf(this.rollSize), Long.valueOf(this.processSize));
            z = true;
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void renameBucket(String str, String str2, final FileSystem fileSystem) throws IOException, InterruptedException {
        if (str.equals(str2)) {
            return;
        }
        final Path path = new Path(str);
        final Path path2 = new Path(str2);
        callWithTimeout(new CallRunner<Void>() { // from class: org.apache.flume.sink.hdfs.BucketWriter.7
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.flume.sink.hdfs.BucketWriter.CallRunner
            public Void call() throws Exception {
                if (!fileSystem.exists(path)) {
                    return null;
                }
                BucketWriter.LOG.info("Renaming " + path + " to " + path2);
                BucketWriter.this.renameTries.incrementAndGet();
                fileSystem.rename(path, path2);
                return null;
            }
        });
    }

    public String toString() {
        return "[ " + getClass().getSimpleName() + " targetPath = " + this.targetPath + ", bucketPath = " + this.bucketPath + " ]";
    }

    private boolean isBatchComplete() {
        return this.batchCounter == 0;
    }

    private static void checkAndThrowInterruptedException() throws InterruptedException {
        Thread.currentThread();
        if (Thread.interrupted()) {
            throw new InterruptedException("Timed out before HDFS call was made. Your hdfs.callTimeout might be set too low or HDFS calls are taking too long.");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> T callWithTimeout(final CallRunner<T> callRunner) throws IOException, InterruptedException {
        Future<T> submit = this.callTimeoutPool.submit(new Callable<T>() { // from class: org.apache.flume.sink.hdfs.BucketWriter.8
            @Override // java.util.concurrent.Callable
            public T call() throws Exception {
                return (T) BucketWriter.this.proxyUser.execute(new PrivilegedExceptionAction<T>() { // from class: org.apache.flume.sink.hdfs.BucketWriter.8.1
                    @Override // java.security.PrivilegedExceptionAction
                    public T run() throws Exception {
                        return (T) callRunner.call();
                    }
                });
            }
        });
        try {
            return this.callTimeout > 0 ? submit.get(this.callTimeout, TimeUnit.MILLISECONDS) : submit.get();
        } catch (InterruptedException e) {
            LOG.warn("Unexpected Exception " + e.getMessage(), (Throwable) e);
            throw e;
        } catch (CancellationException e2) {
            throw new InterruptedException("Blocked callable interrupted by rotation event");
        } catch (ExecutionException e3) {
            this.sinkCounter.incrementConnectionFailedCount();
            Throwable cause = e3.getCause();
            if (cause instanceof IOException) {
                throw ((IOException) cause);
            }
            if (cause instanceof InterruptedException) {
                throw ((InterruptedException) cause);
            }
            if (cause instanceof RuntimeException) {
                throw ((RuntimeException) cause);
            }
            if (cause instanceof Error) {
                throw ((Error) cause);
            }
            throw new RuntimeException(e3);
        } catch (TimeoutException e4) {
            submit.cancel(true);
            this.sinkCounter.incrementConnectionFailedCount();
            throw new IOException("Callable timed out after " + this.callTimeout + " ms on file: " + this.bucketPath, e4);
        }
    }
}
