package com.bazaarvoice.emodb.web.scanner.control;

import com.bazaarvoice.emodb.plugin.stash.StashStateListener;
import com.bazaarvoice.emodb.sor.core.DataTools;
import com.bazaarvoice.emodb.sor.db.ScanRange;
import com.bazaarvoice.emodb.sor.db.ScanRangeSplits;
import com.bazaarvoice.emodb.web.scanner.notifications.ScanCountListener;
import com.bazaarvoice.emodb.web.scanner.scanstatus.ScanRangeStatus;
import com.bazaarvoice.emodb.web.scanner.scanstatus.ScanStatus;
import com.bazaarvoice.emodb.web.scanner.scanstatus.ScanStatusDAO;
import com.bazaarvoice.emodb.web.scanner.writer.ScanWriterGenerator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/bazaarvoice/emodb/web/scanner/control/LocalScanUploadMonitor.class */
public class LocalScanUploadMonitor extends AbstractService {
    private static final Duration OVERRUN_SCAN_TIME = Duration.standardDays(1);
    private final ScanWorkflow _scanWorkflow;
    private final ScanStatusDAO _scanStatusDAO;
    private final ScanTableSetManager _scanTableSetManager;
    private final ScanWriterGenerator _scanWriterGenerator;
    private final StashStateListener _stashStateListener;
    private final ScanCountListener _scanCountListener;
    private final DataTools _dataTools;
    private ScheduledExecutorService _service;
    private final Logger _log = LoggerFactory.getLogger(LocalScanUploadMonitor.class);
    private final Set<String> _activeScans = Sets.newHashSet();
    private final Runnable _processCompleteRangeScansExecution = new Runnable() { // from class: com.bazaarvoice.emodb.web.scanner.control.LocalScanUploadMonitor.2
        @Override // java.lang.Runnable
        public void run() {
            try {
                LocalScanUploadMonitor.this.processCompleteRangeScans();
            } catch (Exception e) {
                LocalScanUploadMonitor.this._log.error("Unexpected exception caught processing complete range scans", (Throwable) e);
            }
            LocalScanUploadMonitor.this._service.schedule(LocalScanUploadMonitor.this._processCompleteRangeScansExecution, LocalScanUploadMonitor.this._activeScans.isEmpty() ? 3L : 1L, TimeUnit.SECONDS);
        }
    };

    public LocalScanUploadMonitor(ScanWorkflow scanWorkflow, ScanStatusDAO scanStatusDAO, ScanTableSetManager scanTableSetManager, ScanWriterGenerator scanWriterGenerator, StashStateListener stashStateListener, ScanCountListener scanCountListener, DataTools dataTools) {
        this._scanWorkflow = (ScanWorkflow) Preconditions.checkNotNull(scanWorkflow, "scanWorkflow");
        this._scanStatusDAO = (ScanStatusDAO) Preconditions.checkNotNull(scanStatusDAO, "scanStatusDAO");
        this._scanTableSetManager = (ScanTableSetManager) Preconditions.checkNotNull(scanTableSetManager, "scanTableSetManager");
        this._scanWriterGenerator = (ScanWriterGenerator) Preconditions.checkNotNull(scanWriterGenerator, "scanWriterGenerator");
        this._stashStateListener = (StashStateListener) Preconditions.checkNotNull(stashStateListener, "stashStateListener");
        this._scanCountListener = (ScanCountListener) Preconditions.checkNotNull(scanCountListener, "scanCountListener");
        this._dataTools = (DataTools) Preconditions.checkNotNull(dataTools, "dataTools");
    }

    @VisibleForTesting
    public void setExecutorService(ScheduledExecutorService scheduledExecutorService) {
        this._service = scheduledExecutorService;
    }

    @Override // com.google.common.util.concurrent.AbstractService
    protected void doStart() {
        this._log.info("Starting scan upload monitor service");
        if (this._service == null) {
            this._service = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("scan-upload-monitor-%d").build());
        }
        this._service.execute(new Runnable() { // from class: com.bazaarvoice.emodb.web.scanner.control.LocalScanUploadMonitor.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    LocalScanUploadMonitor.this.initializeAllActiveScans();
                    LocalScanUploadMonitor.this.notifyActiveScanCountChanged();
                    LocalScanUploadMonitor.this._service.schedule(LocalScanUploadMonitor.this._processCompleteRangeScansExecution, 1L, TimeUnit.SECONDS);
                    LocalScanUploadMonitor.this._service.scheduleAtFixedRate(new Runnable() { // from class: com.bazaarvoice.emodb.web.scanner.control.LocalScanUploadMonitor.1.1
                        @Override // java.lang.Runnable
                        public void run() {
                            LocalScanUploadMonitor.this.cleanupOrphanedScans();
                        }
                    }, 1L, TimeUnit.DAYS.toMinutes(1L), TimeUnit.MINUTES);
                    LocalScanUploadMonitor.this.notifyStarted();
                } catch (Exception e) {
                    LocalScanUploadMonitor.this._log.error("Failed to start local scan upload monitor", (Throwable) e);
                    LocalScanUploadMonitor.this.notifyFailed(e);
                }
            }
        });
    }

    @Override // com.google.common.util.concurrent.AbstractService
    protected void doStop() {
        this._log.info("Stopping scan upload monitor service");
        if (this._service != null) {
            this._service.shutdownNow();
            try {
                if (!this._service.awaitTermination(30L, TimeUnit.SECONDS)) {
                    this._log.warn("Service still has running threads after shutdown request");
                }
            } catch (InterruptedException e) {
                this._log.warn("Service interrupted while waiting for shutdown", (Throwable) e);
            }
            this._service = null;
        }
        this._activeScans.clear();
        notifyActiveScanCountChanged();
        notifyStopped();
    }

    protected void processCompleteRangeScans() {
        try {
            for (Map.Entry entry : Multimaps.index(this._scanWorkflow.claimCompleteScanRanges(Duration.standardMinutes(5L)), new Function<ScanRangeComplete, String>() { // from class: com.bazaarvoice.emodb.web.scanner.control.LocalScanUploadMonitor.3
                @Override // com.google.common.base.Function
                public String apply(ScanRangeComplete scanRangeComplete) {
                    return scanRangeComplete.getScanId();
                }
            }).asMap().entrySet()) {
                String str = (String) entry.getKey();
                Collection<ScanRangeComplete> collection = (Collection) entry.getValue();
                try {
                    refreshScan(str);
                    this._scanWorkflow.releaseCompleteScanRanges(collection);
                } catch (Exception e) {
                    this._log.error("Failed to process scan range complete: id={}", str, e);
                }
            }
        } catch (Exception e2) {
            this._log.error("Failed to claim complete scan ranges", (Throwable) e2);
        }
    }

    @VisibleForTesting
    public void refreshScan(String str) throws IOException {
        ScanStatus scanStatus = this._scanStatusDAO.getScanStatus(str);
        if (scanStatus == null) {
            this._log.warn("Refresh scan called for unknown scan: {}", str);
            return;
        }
        if (scanStatus.isDone()) {
            if (scanStatus.isCanceled()) {
                scanCanceled(scanStatus);
                return;
            } else {
                completeScan(scanStatus);
                return;
            }
        }
        if (this._activeScans.add(str)) {
            notifyActiveScanCountChanged();
            scheduleOverrunCheck(scanStatus);
        }
        ScanStatus resplitPartiallyCompleteTasks = resplitPartiallyCompleteTasks(scanStatus);
        Set<Integer> incompleteBatches = getIncompleteBatches(resplitPartiallyCompleteTasks);
        Multimap<Integer, ScanRangeStatus> queuedRangeScansByConcurrencyId = getQueuedRangeScansByConcurrencyId(resplitPartiallyCompleteTasks);
        int maxConcurrentSubRangeScans = resplitPartiallyCompleteTasks.getOptions().getMaxConcurrentSubRangeScans();
        Date date = new Date();
        for (ScanRangeStatus scanRangeStatus : getUnqueuedRangeScans(resplitPartiallyCompleteTasks)) {
            Optional<Integer> blockedByBatchId = scanRangeStatus.getBlockedByBatchId();
            Optional<Integer> concurrencyId = scanRangeStatus.getConcurrencyId();
            if (!blockedByBatchId.isPresent() || !incompleteBatches.contains(blockedByBatchId.get())) {
                if (!concurrencyId.isPresent() || queuedRangeScansByConcurrencyId.get(concurrencyId.get()).size() < maxConcurrentSubRangeScans) {
                    int taskId = scanRangeStatus.getTaskId();
                    ScanRangeTask addScanRangeTask = this._scanWorkflow.addScanRangeTask(str, taskId, scanRangeStatus.getPlacement(), scanRangeStatus.getScanRange());
                    this._scanStatusDAO.setScanRangeTaskQueued(str, taskId, date);
                    if (concurrencyId.isPresent()) {
                        queuedRangeScansByConcurrencyId.put(concurrencyId.get(), scanRangeStatus);
                    }
                    this._log.info("Queued scan range task: {}", addScanRangeTask);
                }
            }
        }
    }

    private ScanStatus resplitPartiallyCompleteTasks(ScanStatus scanStatus) {
        boolean z = false;
        int i = -1;
        for (ScanRangeStatus scanRangeStatus : scanStatus.getCompleteScanRanges()) {
            if (scanRangeStatus.getResplitRange().isPresent()) {
                if (i == -1) {
                    i = getNextTaskId(scanStatus);
                }
                List<ScanRange> resplit = resplit(scanRangeStatus.getPlacement(), scanRangeStatus.getResplitRange().get(), scanStatus.getOptions().getRangeScanSplitSize());
                ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(resplit.size());
                Iterator<ScanRange> it2 = resplit.iterator();
                while (it2.hasNext()) {
                    int i2 = i;
                    i++;
                    newArrayListWithCapacity.add(new ScanRangeStatus(i2, scanRangeStatus.getPlacement(), it2.next(), scanRangeStatus.getBatchId(), scanRangeStatus.getBlockedByBatchId(), scanRangeStatus.getConcurrencyId()));
                }
                this._scanStatusDAO.resplitScanRangeTask(scanStatus.getScanId(), scanRangeStatus.getTaskId(), newArrayListWithCapacity);
                z = true;
            }
        }
        return !z ? scanStatus : this._scanStatusDAO.getScanStatus(scanStatus.getScanId());
    }

    private int getNextTaskId(ScanStatus scanStatus) {
        Iterable<ScanRangeStatus> allScanRanges = scanStatus.getAllScanRanges();
        if (Iterables.isEmpty(allScanRanges)) {
            return 0;
        }
        return ((Integer) Ordering.natural().max(FluentIterable.from(allScanRanges).transform(new Function<ScanRangeStatus, Integer>() { // from class: com.bazaarvoice.emodb.web.scanner.control.LocalScanUploadMonitor.4
            @Override // com.google.common.base.Function
            public Integer apply(ScanRangeStatus scanRangeStatus) {
                return Integer.valueOf(scanRangeStatus.getTaskId());
            }
        }))).intValue() + 1;
    }

    private List<ScanRange> resplit(String str, ScanRange scanRange, int i) {
        ScanRangeSplits scanRangeSplits = this._dataTools.getScanRangeSplits(str, i, Optional.of(scanRange));
        ImmutableList.Builder builder = ImmutableList.builder();
        Iterator<ScanRangeSplits.SplitGroup> it2 = scanRangeSplits.getSplitGroups().iterator();
        while (it2.hasNext()) {
            Iterator<ScanRangeSplits.TokenRange> it3 = it2.next().getTokenRanges().iterator();
            while (it3.hasNext()) {
                builder.addAll((Iterable) it3.next().getScanRanges());
            }
        }
        return builder.build();
    }

    private Set<Integer> getIncompleteBatches(ScanStatus scanStatus) {
        return FluentIterable.from(Iterables.concat(scanStatus.getPendingScanRanges(), scanStatus.getActiveScanRanges())).transform(new Function<ScanRangeStatus, Integer>() { // from class: com.bazaarvoice.emodb.web.scanner.control.LocalScanUploadMonitor.5
            @Override // com.google.common.base.Function
            public Integer apply(ScanRangeStatus scanRangeStatus) {
                return Integer.valueOf(scanRangeStatus.getBatchId());
            }
        }).toSet();
    }

    private Multimap<Integer, ScanRangeStatus> getQueuedRangeScansByConcurrencyId(ScanStatus scanStatus) {
        Iterable<ScanRangeStatus> concat = Iterables.concat(scanStatus.getPendingScanRanges(), scanStatus.getActiveScanRanges());
        ArrayListMultimap create = ArrayListMultimap.create();
        for (ScanRangeStatus scanRangeStatus : concat) {
            if (hasBeenQueued(scanRangeStatus) && scanRangeStatus.getConcurrencyId().isPresent()) {
                create.put(scanRangeStatus.getConcurrencyId().get(), scanRangeStatus);
            }
        }
        return create;
    }

    public List<ScanRangeStatus> getUnqueuedRangeScans(ScanStatus scanStatus) {
        return FluentIterable.from(scanStatus.getPendingScanRanges()).filter(new Predicate<ScanRangeStatus>() { // from class: com.bazaarvoice.emodb.web.scanner.control.LocalScanUploadMonitor.7
            @Override // com.google.common.base.Predicate
            public boolean apply(ScanRangeStatus scanRangeStatus) {
                return !LocalScanUploadMonitor.this.hasBeenQueued(scanRangeStatus);
            }
        }).toSortedList(new Comparator<ScanRangeStatus>() { // from class: com.bazaarvoice.emodb.web.scanner.control.LocalScanUploadMonitor.6
            @Override // java.util.Comparator
            public int compare(ScanRangeStatus scanRangeStatus, ScanRangeStatus scanRangeStatus2) {
                return ComparisonChain.start().compare(scanRangeStatus.getBatchId(), scanRangeStatus2.getBatchId()).compare(scanRangeStatus.getPlacement(), scanRangeStatus2.getPlacement()).compare(scanRangeStatus.getScanRange(), scanRangeStatus2.getScanRange()).result();
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean hasBeenQueued(ScanRangeStatus scanRangeStatus) {
        return scanRangeStatus.getScanQueuedTime() != null;
    }

    private void scanCanceled(ScanStatus scanStatus) {
        this._stashStateListener.stashCanceled(scanStatus.asPluginStashMetadata(), new Date());
        cleanupScan(scanStatus.getScanId());
    }

    private void completeScan(ScanStatus scanStatus) throws IOException {
        String scanId = scanStatus.getScanId();
        if (scanStatus.getCompleteTime() != null) {
            this._log.info("Scan already marked complete: {}", scanId);
            return;
        }
        this._log.info("Scan complete: {}", scanId);
        try {
            this._scanWriterGenerator.createScanWriter(-1, scanStatus.getOptions().getDestinations()).writeScanComplete(scanId, this._scanStatusDAO.getScanStatus(scanId).getStartTime());
            Date date = new Date();
            this._scanStatusDAO.setCompleteTime(scanId, date);
            scanStatus.setCompleteTime(date);
            this._stashStateListener.stashCompleted(scanStatus.asPluginStashMetadata(), scanStatus.getCompleteTime());
            cleanupScan(scanId);
        } catch (Throwable th) {
            cleanupScan(scanId);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cleanupScan(String str) {
        if (this._activeScans.remove(str)) {
            notifyActiveScanCountChanged();
        }
        try {
            this._scanTableSetManager.cleanupTableSetForScan(str);
        } catch (Exception e) {
            this._log.error("Failed to clean up table set for scan {}", str, e);
        }
    }

    @VisibleForTesting
    public void cleanupOrphanedScans() {
        try {
            for (final String str : this._scanTableSetManager.getAvailableTableSets()) {
                final ScanStatus scanStatus = this._scanStatusDAO.getScanStatus(str);
                if (scanStatus == null || scanStatus.isDone()) {
                    this._service.submit(new Runnable() { // from class: com.bazaarvoice.emodb.web.scanner.control.LocalScanUploadMonitor.8
                        @Override // java.lang.Runnable
                        public void run() {
                            if (scanStatus == null) {
                                LocalScanUploadMonitor.this._log.warn("Cleaning table set from unknown scan: {}", str);
                            } else {
                                LocalScanUploadMonitor.this._log.info("Cleaning orphaned table set for scan: {}", str);
                            }
                            LocalScanUploadMonitor.this.cleanupScan(str);
                        }
                    });
                }
            }
        } catch (Exception e) {
            this._log.error("Failed to clean up orphaned table sets", (Throwable) e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void initializeAllActiveScans() {
        try {
            Iterator<ScanStatus> list = this._scanStatusDAO.list(null, Long.MAX_VALUE);
            while (list.hasNext()) {
                ScanStatus next = list.next();
                if (!next.isDone()) {
                    this._activeScans.add(next.getScanId());
                    scheduleOverrunCheck(next);
                }
            }
        } catch (Exception e) {
            this._log.warn("Failed to initialize active scan count", (Throwable) e);
        }
    }

    private void scheduleOverrunCheck(ScanStatus scanStatus) {
        final String scanId = scanStatus.getScanId();
        DateTime now = DateTime.now();
        DateTime plus = new DateTime(scanStatus.getStartTime()).plus(OVERRUN_SCAN_TIME);
        long j = 0;
        if (now.isBefore(plus)) {
            j = new Duration(now, plus).getMillis();
        }
        this._service.schedule(new Runnable() { // from class: com.bazaarvoice.emodb.web.scanner.control.LocalScanUploadMonitor.9
            @Override // java.lang.Runnable
            public void run() {
                if (LocalScanUploadMonitor.this._scanStatusDAO.getScanStatus(scanId).isDone()) {
                    return;
                }
                LocalScanUploadMonitor.this._log.warn("Overrun scan detected, canceling scan: {}", scanId);
                LocalScanUploadMonitor.this._scanStatusDAO.setCanceled(scanId);
                LocalScanUploadMonitor.this._scanWorkflow.scanStatusUpdated(scanId);
            }
        }, j, TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyActiveScanCountChanged() {
        this._scanCountListener.activeScanCountChanged(this._activeScans.size());
    }
}
