package com.bazaarvoice.emodb.web.scanner.control;

import com.bazaarvoice.emodb.common.dropwizard.lifecycle.LifeCycleRegistry;
import com.bazaarvoice.emodb.sor.db.ScanRange;
import com.bazaarvoice.emodb.web.scanner.rangescan.RangeScanUploader;
import com.bazaarvoice.emodb.web.scanner.rangescan.RangeScanUploaderResult;
import com.bazaarvoice.emodb.web.scanner.scanstatus.ScanRangeStatus;
import com.bazaarvoice.emodb.web.scanner.scanstatus.ScanStatus;
import com.bazaarvoice.emodb.web.scanner.scanstatus.ScanStatusDAO;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import io.dropwizard.lifecycle.Managed;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/bazaarvoice/emodb/web/scanner/control/DistributedScanRangeMonitor.class */
public class DistributedScanRangeMonitor implements Managed {
    private static final Duration QUEUE_CLAIM_TTL = Duration.standardMinutes(2);
    private static final Duration QUEUE_RENEW_TTL = Duration.standardMinutes(4);
    private static final Duration CLAIM_START_TIMEOUT = QUEUE_CLAIM_TTL.minus(Duration.standardSeconds(15));
    private final ScanWorkflow _scanWorkflow;
    private final ScanStatusDAO _scanStatusDAO;
    private final RangeScanUploader _rangeScanUploader;
    private final ScanTableSetManager _scanTableSetManager;
    private final int _maxConcurrentScans;
    private ExecutorService _scanningService;
    private ScheduledExecutorService _backgroundService;
    private final Logger _log = LoggerFactory.getLogger(DistributedScanRangeMonitor.class);
    private final ConcurrentMap<Integer, ClaimedTask> _claimedTasks = Maps.newConcurrentMap();
    private final Runnable _startScansIfAvailableRunnable = new Runnable() { // from class: com.bazaarvoice.emodb.web.scanner.control.DistributedScanRangeMonitor.2
        @Override // java.lang.Runnable
        public void run() {
            DistributedScanRangeMonitor.this.startScansIfAvailable();
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/bazaarvoice/emodb/web/scanner/control/DistributedScanRangeMonitor$ClaimedTask.class */
    public class ClaimedTask {
        private final ScanRangeTask _task;
        private final Date _claimTime;
        private Date _startTime;
        private boolean _allowStart;
        private boolean _complete;

        private ClaimedTask(ScanRangeTask scanRangeTask, Date date) {
            this._allowStart = true;
            this._complete = false;
            this._task = scanRangeTask;
            this._claimTime = date;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int getTaskId() {
            return this._task.getId();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ScanRangeTask getTask() {
            return this._task;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Date getClaimTime() {
            return this._claimTime;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public synchronized boolean setStartTime(Date date) {
            if (!this._allowStart) {
                return false;
            }
            this._startTime = date;
            return true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public synchronized boolean unclaimIfNotStarted() {
            if (this._startTime != null) {
                return false;
            }
            this._allowStart = false;
            return true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isStarted() {
            return this._startTime != null;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isComplete() {
            return this._complete;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setComplete(boolean z) {
            this._complete = z;
        }

        public boolean equals(Object obj) {
            return obj == this || ((obj instanceof ClaimedTask) && this._task.equals(((ClaimedTask) obj)._task));
        }

        public int hashCode() {
            return this._task.hashCode();
        }
    }

    @Inject
    public DistributedScanRangeMonitor(ScanWorkflow scanWorkflow, ScanStatusDAO scanStatusDAO, RangeScanUploader rangeScanUploader, ScanTableSetManager scanTableSetManager, @MaxConcurrentScans int i, LifeCycleRegistry lifeCycleRegistry) {
        this._scanWorkflow = (ScanWorkflow) Preconditions.checkNotNull(scanWorkflow, "scanWorkflow");
        this._scanStatusDAO = (ScanStatusDAO) Preconditions.checkNotNull(scanStatusDAO, "scanStatusDAO");
        this._rangeScanUploader = (RangeScanUploader) Preconditions.checkNotNull(rangeScanUploader, "rangeScanUploader");
        this._scanTableSetManager = (ScanTableSetManager) Preconditions.checkNotNull(scanTableSetManager, "scanTableSetManager");
        Preconditions.checkArgument(i > 0, "maxConcurrentScans <= 0");
        this._maxConcurrentScans = i;
        lifeCycleRegistry.manage((LifeCycleRegistry) this);
    }

    @VisibleForTesting
    public void setExecutorServices(ExecutorService executorService, ScheduledExecutorService scheduledExecutorService) {
        this._scanningService = executorService;
        this._backgroundService = scheduledExecutorService;
    }

    @Override // io.dropwizard.lifecycle.Managed
    public void start() throws Exception {
        this._log.info("Distributed scan range monitor is starting");
        if (this._scanningService == null) {
            this._scanningService = Executors.newFixedThreadPool(this._maxConcurrentScans, new ThreadFactoryBuilder().setNameFormat("ScanRange-%d").build());
        }
        if (this._backgroundService == null) {
            this._backgroundService = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("ScanRangeRenewal-%d").build());
        }
        this._backgroundService.scheduleWithFixedDelay(this._startScansIfAvailableRunnable, 5L, 5L, TimeUnit.SECONDS);
        this._backgroundService.scheduleWithFixedDelay(new Runnable() { // from class: com.bazaarvoice.emodb.web.scanner.control.DistributedScanRangeMonitor.1
            @Override // java.lang.Runnable
            public void run() {
                DistributedScanRangeMonitor.this.renewClaimedTasks();
            }
        }, 1L, 1L, TimeUnit.MINUTES);
    }

    @Override // io.dropwizard.lifecycle.Managed
    public void stop() throws Exception {
        this._log.info("Distributed scan range monitor is stopping");
        if (this._backgroundService != null) {
            this._backgroundService.shutdownNow();
            this._backgroundService = null;
        }
        if (this._scanningService != null) {
            this._scanningService.shutdownNow();
            this._scanningService = null;
        }
    }

    @VisibleForTesting
    public void startScansIfAvailable() {
        while (true) {
            try {
                int availableScanThreadCount = getAvailableScanThreadCount();
                if (availableScanThreadCount <= 0) {
                    return;
                }
                List<ClaimedTask> claimScanRangeTasks = claimScanRangeTasks(availableScanThreadCount);
                if (claimScanRangeTasks.isEmpty()) {
                    return;
                }
                for (final ClaimedTask claimedTask : claimScanRangeTasks) {
                    this._scanningService.submit(new Runnable() { // from class: com.bazaarvoice.emodb.web.scanner.control.DistributedScanRangeMonitor.3
                        @Override // java.lang.Runnable
                        public void run() {
                            DistributedScanRangeMonitor.this.executeClaimedTask(claimedTask);
                        }
                    });
                }
            } catch (Exception e) {
                this._log.error("Failed to start available scans", (Throwable) e);
                return;
            }
        }
    }

    private int getAvailableScanThreadCount() {
        return this._maxConcurrentScans - this._claimedTasks.size();
    }

    private List<ClaimedTask> claimScanRangeTasks(int i) {
        try {
            Date date = new Date();
            List<ScanRangeTask> claimScanRangeTasks = this._scanWorkflow.claimScanRangeTasks(i, QUEUE_CLAIM_TTL);
            if (claimScanRangeTasks.isEmpty()) {
                return ImmutableList.of();
            }
            ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(claimScanRangeTasks.size());
            for (ScanRangeTask scanRangeTask : claimScanRangeTasks) {
                final ClaimedTask claimedTask = new ClaimedTask(scanRangeTask, date);
                if (this._claimedTasks.putIfAbsent(Integer.valueOf(scanRangeTask.getId()), claimedTask) != null) {
                    this._log.warn("Workflow returned scan range task that is already claimed: {}", scanRangeTask);
                } else {
                    this._log.info("Claimed scan range task: {}", scanRangeTask);
                    newArrayListWithCapacity.add(claimedTask);
                    this._backgroundService.schedule(new Runnable() { // from class: com.bazaarvoice.emodb.web.scanner.control.DistributedScanRangeMonitor.4
                        @Override // java.lang.Runnable
                        public void run() {
                            DistributedScanRangeMonitor.this.validateClaimedTaskHasStarted(claimedTask);
                        }
                    }, CLAIM_START_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS);
                }
            }
            return newArrayListWithCapacity;
        } catch (Exception e) {
            this._log.error("Failed to start next available scan range", (Throwable) e);
            return ImmutableList.of();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void validateClaimedTaskHasStarted(ClaimedTask claimedTask) {
        if (claimedTask.unclaimIfNotStarted()) {
            this._log.warn("Claimed task has not started since it was scheduled at {}; unclaiming task: {}", claimedTask.getClaimTime(), claimedTask.getTask());
            unclaimTask(claimedTask, false);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void renewClaimedTasks() {
        try {
            ImmutableList<ClaimedTask> copyOf = ImmutableList.copyOf((Collection) this._claimedTasks.values());
            ArrayList newArrayList = Lists.newArrayList();
            for (ClaimedTask claimedTask : copyOf) {
                if (claimedTask.isComplete()) {
                    this._log.info("Complete claimed task found during renew: id={}", Integer.valueOf(claimedTask.getTaskId()));
                    this._claimedTasks.remove(Integer.valueOf(claimedTask.getTaskId()));
                } else if (claimedTask.isStarted()) {
                    newArrayList.add(claimedTask.getTask());
                }
            }
            if (!newArrayList.isEmpty()) {
                this._scanWorkflow.renewScanRangeTasks(newArrayList, QUEUE_RENEW_TTL);
                Iterator it2 = newArrayList.iterator();
                while (it2.hasNext()) {
                    this._log.info("Renewed scan range task: {}", (ScanRangeTask) it2.next());
                }
            }
        } catch (Exception e) {
            this._log.error("Failed to renew scan ranges", (Throwable) e);
        }
    }

    private void unclaimTask(ClaimedTask claimedTask, boolean z) {
        this._claimedTasks.remove(Integer.valueOf(claimedTask.getTaskId()));
        claimedTask.setComplete(true);
        if (z) {
            try {
                this._scanWorkflow.releaseScanRangeTask(claimedTask.getTask());
                this._log.info("Released scan range task: {}", claimedTask.getTask());
            } catch (Exception e) {
                this._log.error("Failed to release scan range", (Throwable) e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void executeClaimedTask(ClaimedTask claimedTask) {
        if (!claimedTask.setStartTime(new Date())) {
            this._log.info("Claimed task is overdue; range not scanned: {}", claimedTask.getTask());
            return;
        }
        ScanRangeTask task = claimedTask.getTask();
        boolean z = false;
        try {
            this._scanWorkflow.renewScanRangeTasks(ImmutableList.of(task), QUEUE_RENEW_TTL);
            z = asyncRangeScan(task);
            unclaimTask(claimedTask, z);
            this._backgroundService.submit(this._startScansIfAvailableRunnable);
        } catch (Throwable th) {
            unclaimTask(claimedTask, z);
            this._backgroundService.submit(this._startScansIfAvailableRunnable);
            throw th;
        }
    }

    private boolean asyncRangeScan(ScanRangeTask scanRangeTask) {
        RangeScanUploaderResult failure;
        ScanStatus scanStatus;
        String scanId = scanRangeTask.getScanId();
        final int id = scanRangeTask.getId();
        String placement = scanRangeTask.getPlacement();
        ScanRange range = scanRangeTask.getRange();
        try {
            scanStatus = this._scanStatusDAO.getScanStatus(scanId);
        } catch (Throwable th) {
            this._log.error("Scan range task failed: {}", scanRangeTask, th);
            failure = RangeScanUploaderResult.failure();
        }
        if (scanStatus.isCanceled()) {
            this._log.info("Ignoring scan range from canceled task: [task={}]", scanRangeTask);
            return true;
        }
        ScanRangeStatus scanRangeStatus = (ScanRangeStatus) Iterables.getOnlyElement(Iterables.filter(scanStatus.getCompleteScanRanges(), new Predicate<ScanRangeStatus>() { // from class: com.bazaarvoice.emodb.web.scanner.control.DistributedScanRangeMonitor.5
            @Override // com.google.common.base.Predicate
            public boolean apply(ScanRangeStatus scanRangeStatus2) {
                return scanRangeStatus2.getTaskId() == id;
            }
        }), null);
        if (scanRangeStatus != null) {
            this._log.info("Ignoring duplicate post of completed scan range task: [task={}, completeTime={}]", scanRangeTask, scanRangeStatus.getScanCompleteTime());
            return true;
        }
        this._log.info("Started scan range task: {}", scanRangeTask);
        this._scanStatusDAO.setScanRangeTaskActive(scanId, id, new Date());
        failure = this._rangeScanUploader.scanAndUpload(id, scanStatus.getOptions(), placement, range, this._scanTableSetManager.getTableSetForScan(scanId));
        this._log.info("Completed scan range task: {}", scanRangeTask);
        try {
            switch (failure.getStatus()) {
                case SUCCESS:
                    this._scanStatusDAO.setScanRangeTaskComplete(scanId, id, new Date());
                    break;
                case FAILURE:
                    this._scanStatusDAO.setScanRangeTaskInactive(scanId, id);
                    break;
                case REPSPLIT:
                    this._scanStatusDAO.setScanRangeTaskPartiallyComplete(scanId, id, ScanRange.create(range.getFrom(), failure.getResplitRange().getFrom()), failure.getResplitRange(), new Date());
                    break;
            }
            return true;
        } catch (Throwable th2) {
            this._log.error("Failed to mark scan range result: [id={}, placement={}, range={}, result={}]", scanId, placement, range, failure, th2);
            return false;
        }
    }
}
