package com.bazaarvoice.emodb.job.service;

import com.bazaarvoice.emodb.common.dropwizard.lifecycle.LifeCycleRegistry;
import com.bazaarvoice.emodb.job.JobZooKeeper;
import com.bazaarvoice.emodb.job.api.JobHandler;
import com.bazaarvoice.emodb.job.api.JobHandlerUtil;
import com.bazaarvoice.emodb.job.api.JobIdentifier;
import com.bazaarvoice.emodb.job.api.JobRequest;
import com.bazaarvoice.emodb.job.api.JobService;
import com.bazaarvoice.emodb.job.api.JobStatus;
import com.bazaarvoice.emodb.job.api.JobType;
import com.bazaarvoice.emodb.job.dao.JobStatusDAO;
import com.bazaarvoice.emodb.job.handler.JobHandlerRegistryInternal;
import com.bazaarvoice.emodb.job.handler.RegistryEntry;
import com.bazaarvoice.emodb.job.util.JobStatusUtil;
import com.bazaarvoice.emodb.queue.api.Message;
import com.bazaarvoice.emodb.queue.api.QueueService;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.Callables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import io.dropwizard.lifecycle.Managed;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/bazaarvoice/emodb/job/service/DefaultJobService.class */
public class DefaultJobService implements JobService, Managed {
    private static final Callable<DateTime> EPOCH = Callables.returning(new DateTime(0));
    private final QueueService _queueService;
    private final String _queueName;
    private final JobHandlerRegistryInternal _jobHandlerRegistry;
    private final JobStatusDAO _jobStatusDAO;
    private final CuratorFramework _curator;
    private final int _concurrencyLevel;
    private final Duration _notOwnerRetryDelay;
    private final Supplier<Queue<Message>> _messageSupplier;
    private ScheduledExecutorService _service;
    private final Cache<String, DateTime> _recentNotOwnerDelays;
    private final Logger _log = LoggerFactory.getLogger(DefaultJobService.class);
    private boolean _stopped = false;
    private final AtomicBoolean _paused = new AtomicBoolean(false);

    @Inject
    public DefaultJobService(LifeCycleRegistry lifeCycleRegistry, QueueService queueService, @JobQueueName String str, JobHandlerRegistryInternal jobHandlerRegistryInternal, JobStatusDAO jobStatusDAO, @JobZooKeeper CuratorFramework curatorFramework, @JobConcurrencyLevel Integer num, @QueueRefreshTime Duration duration, @QueuePeekLimit final Integer num2, @NotOwnerRetryDelay Duration duration2) {
        this._queueService = (QueueService) Preconditions.checkNotNull(queueService, "queueService");
        this._queueName = (String) Preconditions.checkNotNull(str, "queueName");
        this._jobHandlerRegistry = (JobHandlerRegistryInternal) Preconditions.checkNotNull(jobHandlerRegistryInternal, "jobHandlerRegistry");
        this._jobStatusDAO = (JobStatusDAO) Preconditions.checkNotNull(jobStatusDAO, "jobStatusDAO");
        this._curator = (CuratorFramework) Preconditions.checkNotNull(curatorFramework, "curator");
        this._concurrencyLevel = ((Integer) Preconditions.checkNotNull(num, "concurrencyLevel")).intValue();
        Preconditions.checkArgument(this._concurrencyLevel >= 0, "Concurrency level cannot be negative");
        this._notOwnerRetryDelay = (Duration) Preconditions.checkNotNull(duration2, "notOwnerRetryDelay");
        Preconditions.checkNotNull(num2, "queuePeekLimit");
        Preconditions.checkNotNull(lifeCycleRegistry, "lifecycleRegistry");
        this._recentNotOwnerDelays = CacheBuilder.newBuilder().expireAfterWrite(duration2.getMillis(), TimeUnit.MILLISECONDS).build();
        Supplier<Queue<Message>> supplier = new Supplier<Queue<Message>>() { // from class: com.bazaarvoice.emodb.job.service.DefaultJobService.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // com.google.common.base.Supplier
            public Queue<Message> get() {
                return Queues.synchronizedQueue(Queues.newArrayDeque(DefaultJobService.this._queueService.peek(DefaultJobService.this._queueName, num2.intValue())));
            }
        };
        Preconditions.checkNotNull(duration, "queueRefreshTime");
        if (duration.getMillis() == 0) {
            this._messageSupplier = supplier;
        } else {
            this._messageSupplier = Suppliers.memoizeWithExpiration(supplier, duration.getMillis(), TimeUnit.MILLISECONDS);
        }
        lifeCycleRegistry.manage((LifeCycleRegistry) this);
    }

    @Override // io.dropwizard.lifecycle.Managed
    public void start() throws Exception {
        if (this._concurrencyLevel == 0) {
            this._log.info("Job processing has been disabled");
            return;
        }
        this._service = Executors.newScheduledThreadPool(this._concurrencyLevel, new ThreadFactoryBuilder().setNameFormat("job-%d").build());
        Runnable runnable = new Runnable() { // from class: com.bazaarvoice.emodb.job.service.DefaultJobService.2
            @Override // java.lang.Runnable
            public void run() {
                while (!DefaultJobService.this._stopped && !DefaultJobService.this._paused.get() && DefaultJobService.this.runNextJob()) {
                }
            }
        };
        for (int i = 0; i < this._concurrencyLevel; i++) {
            this._service.scheduleWithFixedDelay(runnable, 5L, 5L, TimeUnit.SECONDS);
        }
    }

    @Override // io.dropwizard.lifecycle.Managed
    public void stop() throws Exception {
        this._stopped = true;
        if (this._service != null) {
            this._service.shutdownNow();
            this._service = null;
        }
    }

    @Override // com.bazaarvoice.emodb.job.api.JobService
    public <Q, R> JobIdentifier<Q, R> submitJob(JobRequest<Q, R> jobRequest) {
        Preconditions.checkNotNull(jobRequest, "jobRequest");
        JobType<Q, R> type = jobRequest.getType();
        if (this._jobHandlerRegistry.getRegistryEntry(type.getName()) == null) {
            throw new IllegalArgumentException("Cannot handle job of type " + type);
        }
        JobIdentifier<Q, R> createNew = JobIdentifier.createNew(type);
        this._jobStatusDAO.updateJobStatus(createNew, new JobStatus<>(JobStatus.Status.SUBMITTED, jobRequest.getRequest(), null, null));
        this._queueService.send(this._queueName, createNew.toString());
        return createNew;
    }

    @Override // com.bazaarvoice.emodb.job.api.JobService
    public <Q, R> JobStatus<Q, R> getJobStatus(JobIdentifier<Q, R> jobIdentifier) {
        Preconditions.checkNotNull(jobIdentifier);
        JobStatus<Q, R> jobStatus = this._jobStatusDAO.getJobStatus(jobIdentifier);
        if (jobStatus == null) {
            return null;
        }
        return JobStatusUtil.narrow(jobStatus, jobIdentifier.getJobType());
    }

    /* JADX WARN: Code restructure failed: missing block: B:14:0x008a, code lost:
    
        r0 = r8._jobHandlerRegistry.getRegistryEntry(com.bazaarvoice.emodb.job.api.JobIdentifier.getJobTypeNameFromId(r0));
        r8._log.info("Executing job {}... ", r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:15:0x00b4, code lost:
    
        if (run(r0, r0) == false) goto L14;
     */
    /* JADX WARN: Code restructure failed: missing block: B:16:0x00b7, code lost:
    
        acknowledgeQueueMessage(r0.getId());
        r8._log.info("Executing job {}... DONE", r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:18:0x00fd, code lost:
    
        r0.release();
     */
    /* JADX WARN: Code restructure failed: missing block: B:19:0x010d, code lost:
    
        return true;
     */
    /* JADX WARN: Code restructure failed: missing block: B:21:0x00ce, code lost:
    
        r8._recentNotOwnerDelays.put(r0, new org.joda.time.DateTime().plus(r8._notOwnerRetryDelay));
        r8._recentNotOwnerDelays.cleanUp();
        r8._log.info("Executing job {}... not local", r0);
     */
    /* JADX WARN: Finally extract failed */
    @com.google.common.annotations.VisibleForTesting
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    boolean runNextJob() {
        /*
            Method dump skipped, instructions count: 300
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.bazaarvoice.emodb.job.service.DefaultJobService.runNextJob():boolean");
    }

    private InterProcessMutex getMutex(String str) {
        return new InterProcessMutex(this._curator, String.format("/leader/%s", str));
    }

    private boolean acquireMutex(InterProcessMutex interProcessMutex) throws Exception {
        return interProcessMutex.acquire(200L, TimeUnit.MILLISECONDS);
    }

    private <Q, R> boolean run(String str, RegistryEntry<Q, R> registryEntry) {
        try {
            if (registryEntry == null) {
                throw new IllegalArgumentException("No handler found for job type: " + JobIdentifier.getJobTypeNameFromId(str));
            }
            JobIdentifier<Q, R> fromString = JobIdentifier.fromString(str, registryEntry.getJobType());
            JobStatus<Q, R> jobStatus = this._jobStatusDAO.getJobStatus(fromString);
            if (jobStatus == null) {
                throw new IllegalArgumentException("Job not found: " + fromString);
            }
            Q request = jobStatus.getRequest();
            if (jobStatus.getStatus() != JobStatus.Status.SUBMITTED) {
                if (jobStatus.getStatus() != JobStatus.Status.RUNNING) {
                    this._log.info("Job has already run: [id={}, type={}]", str, JobIdentifier.getJobTypeNameFromId(str));
                    return true;
                }
                this._log.info("Job failed previously for an unknown reason: [id={}, type={}]", str, JobIdentifier.getJobTypeNameFromId(str));
            }
            try {
                this._jobStatusDAO.updateJobStatus(fromString, new JobStatus<>(JobStatus.Status.RUNNING, request, null, null));
                JobHandler<Q, R> newHandler = registryEntry.newHandler();
                R run = newHandler.run(request);
                if (JobHandlerUtil.isNotOwner(newHandler)) {
                    this._jobStatusDAO.updateJobStatus(fromString, new JobStatus<>(JobStatus.Status.SUBMITTED, request, null, null));
                    return false;
                }
                recordFinalStatus(fromString, new JobStatus<>(JobStatus.Status.FINISHED, request, run, null));
                return true;
            } catch (Exception e) {
                this._log.error("Job failed: [id={}, type={}]", fromString, fromString.getJobType(), e);
                recordFinalStatus(fromString, new JobStatus<>(JobStatus.Status.FAILED, request, null, e.getMessage()));
                return true;
            }
        } catch (Exception e2) {
            this._log.warn("Unable to execute job: [id={}, type={}]", str, JobIdentifier.getJobTypeNameFromId(str), e2);
            return true;
        }
    }

    private <Q, R> void recordFinalStatus(JobIdentifier<Q, R> jobIdentifier, JobStatus<Q, R> jobStatus) {
        try {
            this._jobStatusDAO.updateJobStatus(jobIdentifier, jobStatus);
        } catch (Exception e) {
            this._log.error("Failed to record final status for job: [id={}, status={}]", jobIdentifier, jobStatus.getStatus(), e);
        }
    }

    private void acknowledgeQueueMessage(String str) {
        try {
            this._queueService.acknowledge(this._queueName, ImmutableList.of(str));
        } catch (Exception e) {
            this._log.error("Failed to acknowledge message: [messageId={}]", str, e);
        }
    }

    @Override // com.bazaarvoice.emodb.job.api.JobService
    public boolean pause() {
        boolean compareAndSet = this._paused.compareAndSet(false, true);
        if (compareAndSet) {
            this._log.info("Job processing has been paused");
        }
        return compareAndSet;
    }

    @Override // com.bazaarvoice.emodb.job.api.JobService
    public boolean resume() {
        boolean compareAndSet = this._paused.compareAndSet(true, false);
        if (compareAndSet) {
            this._log.info("Job processing has been resumed");
        }
        return compareAndSet;
    }
}
