package com.bazaarvoice.emodb.web.scanner.writer;

import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/bazaarvoice/emodb/web/scanner/writer/TemporaryFileScanWriter.class */
public abstract class TemporaryFileScanWriter extends AbstractScanWriter {
    private final Logger _log;
    private final Counter _openTransfers;
    private final Counter _blockedNewShards;
    private static final int DEFAULT_MAX_OPEN_SHARDS = 20;
    private final Map<TransferKey, ShardFiles> _openShardFiles;
    private final int _maxOpenShards;
    private final ReentrantLock _lock;
    private final Condition _shardFilesClosedOrExceptionCaught;
    private volatile IOException _uploadException;
    private static Predicate<ShardFile> _shardFileNotEmpty = new Predicate<ShardFile>() { // from class: com.bazaarvoice.emodb.web.scanner.writer.TemporaryFileScanWriter.2
        @Override // com.google.common.base.Predicate
        public boolean apply(ShardFile shardFile) {
            return !shardFile.isEmpty();
        }
    };
    private static Function<ShardFile, File> _getShardFileFile = new Function<ShardFile, File>() { // from class: com.bazaarvoice.emodb.web.scanner.writer.TemporaryFileScanWriter.3
        @Override // com.google.common.base.Function
        public File apply(ShardFile shardFile) {
            return shardFile.getFile();
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/bazaarvoice/emodb/web/scanner/writer/TemporaryFileScanWriter$ShardFile.class */
    public static class ShardFile {
        private final File _file;
        private boolean _complete;
        private boolean _isEmpty;

        private ShardFile(File file) {
            this(file, false, false);
        }

        private ShardFile(File file, boolean z, boolean z2) {
            this._file = file;
            this._complete = z;
            this._isEmpty = z2;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public File getFile() {
            return this._file;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isComplete() {
            return this._complete;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void complete(boolean z) {
            this._complete = true;
            this._isEmpty = z;
        }

        public boolean isEmpty() {
            return this._isEmpty;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/bazaarvoice/emodb/web/scanner/writer/TemporaryFileScanWriter$ShardFiles.class */
    public class ShardFiles {
        private final TransferKey _key;
        private final List<ShardFile> _parts;
        private Optional<Integer> _finalPartCount;
        private volatile boolean _canceled;

        private ShardFiles(TransferKey transferKey) {
            this._parts = Lists.newArrayList();
            this._finalPartCount = Optional.absent();
            this._key = transferKey;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public TransferKey getKey() {
            return this._key;
        }

        public synchronized void setFinalPartCount(Optional<Integer> optional) {
            if (optional.isPresent()) {
                if (!this._finalPartCount.isPresent()) {
                    this._finalPartCount = optional;
                } else if (!this._finalPartCount.get().equals(optional.get())) {
                    throw new IllegalStateException("Shard set with inconsistent final part counts: " + this._key);
                }
            }
        }

        public File addShardFile() throws IOException {
            File createTemporaryShardFile = createTemporaryShardFile();
            ShardFile shardFile = new ShardFile(createTemporaryShardFile);
            synchronized (this) {
                this._parts.add(shardFile);
            }
            return createTemporaryShardFile;
        }

        private File createTemporaryShardFile() throws IOException {
            return File.createTempFile(String.format("emoshard_%02x_%016x", Integer.valueOf(this._key.getShardId()), Long.valueOf(this._key.getTableUuid())), TemporaryFileScanWriter.this._compression.getExtension());
        }

        @Nullable
        public synchronized Optional<File> shardFileComplete(boolean z, File file) throws IOException {
            boolean z2 = this._finalPartCount.or((Optional<Integer>) (-1)).intValue() == this._parts.size();
            boolean z3 = true;
            for (ShardFile shardFile : this._parts) {
                if (file.equals(shardFile.getFile())) {
                    shardFile.complete(z);
                }
                z3 = z3 && shardFile.isComplete();
            }
            if (!z2 || !z3) {
                return null;
            }
            ImmutableList list = FluentIterable.from(this._parts).filter(TemporaryFileScanWriter._shardFileNotEmpty).transform(TemporaryFileScanWriter._getShardFileFile).toList();
            switch (list.size()) {
                case 0:
                    return Optional.absent();
                case 1:
                    return Optional.of(list.get(0));
                default:
                    File concatenateFiles = TemporaryFileScanWriter.this.concatenateFiles(list, createTemporaryShardFile());
                    deleteAllShardFiles();
                    this._parts.clear();
                    this._parts.add(new ShardFile(concatenateFiles, true, false));
                    return Optional.of(concatenateFiles);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void deleteAllShardFiles() {
            Iterator<ShardFile> it2 = this._parts.iterator();
            while (it2.hasNext()) {
                deleteShardFile(it2.next().getFile());
            }
        }

        public void deleteShardFile(File file) {
            if (file.delete()) {
                TemporaryFileScanWriter.this._log.debug("Deleted temporary file : {}", file);
            } else {
                TemporaryFileScanWriter.this._log.warn("Failed to delete file: {}", file);
            }
        }

        public File getFirstFile() {
            return this._parts.get(0).getFile();
        }

        public boolean isCanceled() {
            return this._canceled;
        }

        public void setCanceled(boolean z) {
            this._canceled = z;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public TemporaryFileScanWriter(String str, int i, URI uri, Compression compression, MetricRegistry metricRegistry, Optional<Integer> optional) {
        super(str, i, uri, compression, metricRegistry);
        this._log = LoggerFactory.getLogger(TemporaryFileScanWriter.class);
        this._openShardFiles = Maps.newHashMap();
        this._lock = new ReentrantLock();
        this._shardFilesClosedOrExceptionCaught = this._lock.newCondition();
        this._uploadException = null;
        Preconditions.checkNotNull(optional, "maxOpenShards");
        this._maxOpenShards = optional.or((Optional<Integer>) 20).intValue();
        Preconditions.checkArgument(this._maxOpenShards > 0, "maxOpenShards <= 0");
        this._openTransfers = metricRegistry.counter(MetricRegistry.name("bv.emodb.scan", "ScanUploader", "open-transfers"));
        this._blockedNewShards = metricRegistry.counter(MetricRegistry.name("bv.emodb.scan", "ScanUploader", "blocked-new-shards"));
    }

    protected abstract ListenableFuture<?> transfer(TransferKey transferKey, URI uri, File file);

    protected abstract Map<TransferKey, TransferStatus> getStatusForActiveTransfers();

    @Override // com.bazaarvoice.emodb.web.scanner.writer.ScanWriter
    public ShardWriter writeShardRows(String str, final String str2, int i, long j) throws IOException, InterruptedException {
        final ShardFiles shardFiles = getShardFiles(i, j);
        propagateExceptionIfPresent();
        blockNewShardIfNecessary(shardFiles);
        if (shardFiles.isCanceled()) {
            throw new IOException("Shard file closed: " + shardFiles.getKey());
        }
        if (this._closed) {
            throw new IOException("Writer closed");
        }
        final File addShardFile = shardFiles.addShardFile();
        try {
            final URI uriForShard = getUriForShard(str, i, j);
            return new ShardWriter(open(addShardFile, getCounterForPlacement(str2))) { // from class: com.bazaarvoice.emodb.web.scanner.writer.TemporaryFileScanWriter.1
                /* JADX INFO: Access modifiers changed from: protected */
                @Override // com.bazaarvoice.emodb.web.scanner.writer.ShardWriter
                public synchronized void ready(boolean z, Optional<Integer> optional) throws IOException {
                    if (shardFiles.isCanceled()) {
                        throw new IOException("Shard file closed: " + shardFiles.getKey());
                    }
                    if (optional.isPresent()) {
                        shardFiles.setFinalPartCount(optional);
                    }
                    Optional<File> shardFileComplete = shardFiles.shardFileComplete(z, addShardFile);
                    if (shardFileComplete == null) {
                        return;
                    }
                    if (shardFileComplete.isPresent()) {
                        asyncTransfer(shardFileComplete.get());
                    } else {
                        TemporaryFileScanWriter.this.closeShardFiles(shardFiles);
                    }
                }

                public void asyncTransfer(File file) {
                    TemporaryFileScanWriter.this._log.debug("Initiating async transfer: id={}, file={}, uri={}", Integer.valueOf(TemporaryFileScanWriter.this._taskId), file, uriForShard);
                    TemporaryFileScanWriter.this._openTransfers.inc();
                    Futures.addCallback(TemporaryFileScanWriter.this.transfer(shardFiles.getKey(), uriForShard, file), new FutureCallback<Object>() { // from class: com.bazaarvoice.emodb.web.scanner.writer.TemporaryFileScanWriter.1.1
                        @Override // com.google.common.util.concurrent.FutureCallback
                        public void onSuccess(Object obj) {
                            TemporaryFileScanWriter.this.transferComplete(shardFiles);
                        }

                        @Override // com.google.common.util.concurrent.FutureCallback
                        public void onFailure(Throwable th) {
                            if (!TemporaryFileScanWriter.this._closed) {
                                TemporaryFileScanWriter.this.registerException(str2, th);
                            }
                            TemporaryFileScanWriter.this.transferComplete(shardFiles);
                        }
                    });
                }

                /* JADX INFO: Access modifiers changed from: protected */
                @Override // com.bazaarvoice.emodb.web.scanner.writer.ShardWriter
                public synchronized void cancel() {
                    shardFiles.setCanceled(true);
                    TemporaryFileScanWriter.this.closeShardFiles(shardFiles);
                }

                public String toString() {
                    return uriForShard.toString();
                }
            };
        } catch (IOException e) {
            shardFiles.deleteShardFile(addShardFile);
            throw e;
        }
    }

    private ShardFiles getShardFiles(int i, long j) {
        TransferKey transferKey = new TransferKey(j, i);
        this._lock.lock();
        try {
            ShardFiles shardFiles = this._openShardFiles.get(transferKey);
            if (shardFiles == null) {
                shardFiles = new ShardFiles(transferKey);
                this._openShardFiles.put(transferKey, shardFiles);
            }
            return shardFiles;
        } finally {
            this._lock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeShardFiles(ShardFiles shardFiles) {
        shardFiles.deleteAllShardFiles();
        this._lock.lock();
        try {
            this._openShardFiles.remove(shardFiles.getKey());
            this._shardFilesClosedOrExceptionCaught.signalAll();
        } finally {
            this._lock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void transferComplete(ShardFiles shardFiles) {
        this._log.debug("Transfer complete: id={}, file={}", Integer.valueOf(this._taskId), shardFiles.getFirstFile());
        closeShardFiles(shardFiles);
        this._openTransfers.dec();
    }

    private void propagateExceptionIfPresent() throws IOException {
        if (this._uploadException != null) {
            throw this._uploadException;
        }
    }

    private void blockNewShardIfNecessary(ShardFiles shardFiles) throws IOException, InterruptedException {
        if (maxOpenShardsGreaterThan(this._maxOpenShards - 1, shardFiles.getKey())) {
            this._blockedNewShards.inc();
            try {
                blockUntilOpenShardsAtMost(this._maxOpenShards - 1, shardFiles.getKey());
            } finally {
                this._blockedNewShards.dec();
            }
        }
    }

    private void blockUntilOpenShardsAtMost(int i, @Nullable TransferKey transferKey) throws IOException, InterruptedException {
        blockUntilOpenShardsAtMost(i, transferKey, Long.MAX_VALUE);
    }

    private boolean blockUntilOpenShardsAtMost(int i, @Nullable TransferKey transferKey, long j) throws IOException, InterruptedException {
        Stopwatch start = new Stopwatch().start();
        this._lock.lock();
        while (!this._closed && maxOpenShardsGreaterThan(i, transferKey)) {
            try {
                long currentTimeMillis = System.currentTimeMillis();
                if (currentTimeMillis >= j) {
                    break;
                }
                propagateExceptionIfPresent();
                this._shardFilesClosedOrExceptionCaught.await(Math.min(Duration.standardSeconds(30L).getMillis(), j - currentTimeMillis), TimeUnit.MILLISECONDS);
                if (!maxOpenShardsGreaterThan(i, transferKey)) {
                    propagateExceptionIfPresent();
                    this._lock.unlock();
                    return true;
                }
                this._log.debug("After {} seconds task {} still has {} open shards", Long.valueOf(start.elapsed(TimeUnit.SECONDS)), Integer.valueOf(this._taskId), Integer.valueOf(this._openShardFiles.size()));
            } finally {
                this._lock.unlock();
            }
        }
        propagateExceptionIfPresent();
        return !maxOpenShardsGreaterThan(i, transferKey);
    }

    private boolean maxOpenShardsGreaterThan(int i, @Nullable TransferKey transferKey) {
        return this._openShardFiles.size() > i && (transferKey == null || !this._openShardFiles.containsKey(transferKey));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void registerException(String str, Throwable th) {
        this._log.error("Async transfer failed for task id={}, placement {}", Integer.valueOf(this._taskId), str, th);
        IOException iOException = th instanceof IOException ? (IOException) th : new IOException(th);
        this._lock.lock();
        try {
            if (this._uploadException == null) {
                this._uploadException = iOException;
                this._shardFilesClosedOrExceptionCaught.signalAll();
            }
        } finally {
            this._lock.unlock();
        }
    }

    @Override // com.bazaarvoice.emodb.web.scanner.writer.ScanWriter
    public WaitForAllTransfersCompleteResult waitForAllTransfersComplete(Duration duration) throws IOException, InterruptedException {
        return blockUntilOpenShardsAtMost(0, null, DateTime.now().plus(duration).getMillis()) ? new WaitForAllTransfersCompleteResult(ImmutableMap.of()) : new WaitForAllTransfersCompleteResult(getStatusForActiveTransfers());
    }

    @Override // com.bazaarvoice.emodb.web.scanner.writer.AbstractScanWriter, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        super.close();
        this._lock.lock();
        try {
            for (ShardFiles shardFiles : this._openShardFiles.values()) {
                shardFiles.setCanceled(true);
                shardFiles.deleteAllShardFiles();
            }
            this._openShardFiles.clear();
            this._shardFilesClosedOrExceptionCaught.signalAll();
        } finally {
            this._lock.unlock();
        }
    }
}
