package com.bazaarvoice.emodb.web.resources.databus;

import com.bazaarvoice.emodb.auth.jersey.Subject;
import com.bazaarvoice.emodb.databus.api.PollResult;
import com.bazaarvoice.emodb.web.auth.Permissions;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterators;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/bazaarvoice/emodb/web/resources/databus/DatabusResourcePoller.class */
public class DatabusResourcePoller {
    public static final int DEFAULT_NUM_KEEP_ALIVE_THREADS = 4;
    public static final int DEFAULT_NUM_POLLING_THREADS = 8;
    private static final String POLL_DATABUS_EMPTY_HEADER = "X-BV-Databus-Empty";
    private final Timer _pollTimer;
    private final ScheduledExecutorService _keepAliveExecutorService;
    private final ScheduledExecutorService _pollingExecutorService;
    private final Histogram _keepAliveThreadDelayHistogram;
    private final Histogram _pollingThreadDelayHistogram;
    private static final Logger _log = LoggerFactory.getLogger(DatabusResourcePoller.class);
    private static final Duration MAX_LONG_POLL_TIME = Duration.standardSeconds(20);
    private static final Duration LONG_POLL_RETRY_TIME = Duration.standardSeconds(2);
    private static final Duration LONG_POLL_SEND_REFRESH_TIME = Duration.millis(200);
    private static final Duration KEEP_ALIVE_SAFETY_BUFFER_TIME = Duration.standardSeconds(10);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/bazaarvoice/emodb/web/resources/databus/DatabusResourcePoller$DatabusPollRunnable.class */
    public class DatabusPollRunnable implements Runnable {
        private final AsyncContext _asyncContext;
        private KeepAliveRunnable _keepAliveRunnable;
        private Subject _subject;
        private SubjectDatabus _databus;
        private Duration _claimTtl;
        private int _limit;
        private String _subscription;
        private PeekOrPollResponseHelper _helper;
        private long _longPollStopTime;
        private Timer.Context _timerContext;
        private volatile boolean _pollingActive = true;
        private long _lastRunTime = 0;

        DatabusPollRunnable(AsyncContext asyncContext, KeepAliveRunnable keepAliveRunnable, Subject subject, SubjectDatabus subjectDatabus, Duration duration, int i, String str, PeekOrPollResponseHelper peekOrPollResponseHelper, long j, Timer.Context context) {
            this._asyncContext = asyncContext;
            this._keepAliveRunnable = keepAliveRunnable;
            this._subject = subject;
            this._databus = subjectDatabus;
            this._claimTtl = duration;
            this._limit = i;
            this._subscription = str;
            this._helper = peekOrPollResponseHelper;
            this._longPollStopTime = j;
            this._timerContext = context;
        }

        @Override // java.lang.Runnable
        public void run() {
            PollResult pollResult;
            boolean z = false;
            try {
                if (this._lastRunTime > 0) {
                    DatabusResourcePoller.this._pollingThreadDelayHistogram.update((System.currentTimeMillis() - this._lastRunTime) - DatabusResourcePoller.LONG_POLL_RETRY_TIME.getMillis());
                }
                if (this._pollingActive) {
                    boolean z2 = false;
                    try {
                        pollResult = this._databus.poll(this._subject, this._subscription, this._claimTtl, this._limit);
                    } catch (Exception e) {
                        DatabusResourcePoller._log.error("Failed to perform asynchronous poll on subscription {}", this._subscription, e);
                        pollResult = new PollResult(Iterators.emptyIterator(), 0, false);
                        z2 = true;
                    }
                    if (pollResult.getEventIterator().hasNext() || System.currentTimeMillis() + DatabusResourcePoller.LONG_POLL_RETRY_TIME.getMillis() >= this._longPollStopTime || z2) {
                        synchronized (this._asyncContext) {
                            if (this._pollingActive) {
                                this._keepAliveRunnable.cancelKeepAlive();
                                DatabusResourcePoller.populateResponse(pollResult, (HttpServletResponse) this._asyncContext.getResponse(), this._helper);
                            }
                        }
                    } else {
                        this._lastRunTime = System.currentTimeMillis();
                        if (pollResult.hasMoreEvents()) {
                            DatabusResourcePoller.this._pollingExecutorService.execute(this);
                        } else {
                            DatabusResourcePoller.this._pollingExecutorService.schedule(this, DatabusResourcePoller.LONG_POLL_RETRY_TIME.getMillis(), TimeUnit.MILLISECONDS);
                        }
                        z = true;
                    }
                }
                if (z) {
                    return;
                }
                this._timerContext.stop();
                synchronized (this._asyncContext) {
                    this._asyncContext.complete();
                }
            } catch (Throwable th) {
                if (0 == 0) {
                    this._timerContext.stop();
                    synchronized (this._asyncContext) {
                        this._asyncContext.complete();
                    }
                }
                throw th;
            }
        }

        public void cancelPolling() {
            this._pollingActive = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/bazaarvoice/emodb/web/resources/databus/DatabusResourcePoller$KeepAliveRunnable.class */
    public class KeepAliveRunnable implements Runnable {
        private final AsyncContext _asyncContext;
        private DateTime _keepAliveStopTime;
        private DatabusPollRunnable _databusPollRunnable;
        private volatile boolean _keepAliveRequired = true;
        private long _lastRunTime = 0;

        public KeepAliveRunnable(AsyncContext asyncContext, long j) {
            this._asyncContext = asyncContext;
            this._keepAliveStopTime = new DateTime(j).plus(DatabusResourcePoller.KEEP_ALIVE_SAFETY_BUFFER_TIME.getMillis());
        }

        public void setDatabusPollRunnable(DatabusPollRunnable databusPollRunnable) {
            this._databusPollRunnable = databusPollRunnable;
        }

        public void cancelKeepAlive() {
            this._keepAliveRequired = false;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this._lastRunTime > 0) {
                DatabusResourcePoller.this._keepAliveThreadDelayHistogram.update((System.currentTimeMillis() - this._lastRunTime) - DatabusResourcePoller.LONG_POLL_SEND_REFRESH_TIME.getMillis());
            }
            synchronized (this._asyncContext) {
                if (this._keepAliveRequired && this._keepAliveStopTime.isAfterNow()) {
                    try {
                        DatabusResourcePoller.sendClientRefresh((HttpServletResponse) this._asyncContext.getResponse());
                        this._lastRunTime = System.currentTimeMillis();
                        DatabusResourcePoller.this._keepAliveExecutorService.schedule(this, DatabusResourcePoller.LONG_POLL_SEND_REFRESH_TIME.getMillis(), TimeUnit.MILLISECONDS);
                    } catch (Exception e) {
                        DatabusResourcePoller._log.error("Failed to send a keep-alive message to the client");
                        if (this._databusPollRunnable != null) {
                            this._databusPollRunnable.cancelPolling();
                        }
                    }
                }
            }
        }
    }

    @Inject
    public DatabusResourcePoller(Optional<LongPollingExecutorServices> optional, MetricRegistry metricRegistry) {
        Preconditions.checkNotNull(optional, "longPollingExecutorServices");
        if (optional.isPresent()) {
            this._keepAliveExecutorService = optional.get().getKeepAlive();
            this._pollingExecutorService = optional.get().getPoller();
            addThreadPoolMonitoring(this._keepAliveExecutorService, "inactiveKeepAliveThreads", metricRegistry);
            addThreadPoolMonitoring(this._pollingExecutorService, "inactivePollingThreads", metricRegistry);
        } else {
            this._keepAliveExecutorService = null;
            this._pollingExecutorService = null;
        }
        this._keepAliveThreadDelayHistogram = metricRegistry.histogram(MetricRegistry.name("bv.emodb.databus", "DatabusResource1", "keepAliveThreadDelay"));
        this._pollingThreadDelayHistogram = metricRegistry.histogram(MetricRegistry.name("bv.emodb.databus", "DatabusResource1", "pollingThreadDelay"));
        this._pollTimer = buildPollTimer(metricRegistry);
    }

    @VisibleForTesting
    public DatabusResourcePoller(MetricRegistry metricRegistry) {
        this._pollTimer = buildPollTimer(metricRegistry);
        this._keepAliveExecutorService = null;
        this._pollingExecutorService = null;
        this._keepAliveThreadDelayHistogram = null;
        this._pollingThreadDelayHistogram = null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void populateResponse(PollResult pollResult, HttpServletResponse httpServletResponse, PeekOrPollResponseHelper peekOrPollResponseHelper) {
        try {
            peekOrPollResponseHelper.getJson().writeJson(httpServletResponse.getOutputStream(), pollResult.getEventIterator());
        } catch (IOException e) {
            _log.error("Failed to write response to the client");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void sendClientRefresh(HttpServletResponse httpServletResponse) throws IOException {
        httpServletResponse.getOutputStream().print(' ');
        httpServletResponse.flushBuffer();
    }

    public Response poll(Subject subject, SubjectDatabus subjectDatabus, String str, Duration duration, int i, HttpServletRequest httpServletRequest, boolean z, PeekOrPollResponseHelper peekOrPollResponseHelper) {
        Response build;
        Timer.Context time = this._pollTimer.time();
        boolean z2 = true;
        try {
            long currentTimeMillis = System.currentTimeMillis() + MAX_LONG_POLL_TIME.getMillis();
            PollResult poll = subjectDatabus.poll(subject, str, duration, i);
            if (z || poll.getEventIterator().hasNext() || this._keepAliveExecutorService == null || this._pollingExecutorService == null) {
                build = Response.ok().header(POLL_DATABUS_EMPTY_HEADER, String.valueOf(!poll.hasMoreEvents())).entity(peekOrPollResponseHelper.asEntity(poll.getEventIterator())).build();
            } else {
                build = scheduleLongPollingRunnables(httpServletRequest, currentTimeMillis, subject, subjectDatabus, duration, i, str, poll.hasMoreEvents(), peekOrPollResponseHelper, time);
                z2 = false;
            }
            z2 = z2;
            return build;
        } finally {
            if (1 != 0) {
                time.stop();
            }
        }
    }

    private Response scheduleLongPollingRunnables(HttpServletRequest httpServletRequest, long j, Subject subject, SubjectDatabus subjectDatabus, Duration duration, int i, String str, boolean z, PeekOrPollResponseHelper peekOrPollResponseHelper, Timer.Context context) {
        AsyncContext startAsync = httpServletRequest.startAsync();
        boolean z2 = false;
        try {
            try {
                startAsync.getResponse().setContentType(MediaType.APPLICATION_JSON);
                ((HttpServletResponse) startAsync.getResponse()).setHeader(POLL_DATABUS_EMPTY_HEADER, String.valueOf(!z));
                sendClientRefresh((HttpServletResponse) startAsync.getResponse());
                KeepAliveRunnable keepAliveRunnable = new KeepAliveRunnable(startAsync, j);
                DatabusPollRunnable databusPollRunnable = new DatabusPollRunnable(startAsync, keepAliveRunnable, subject, subjectDatabus, duration, i, str, peekOrPollResponseHelper, j, context);
                keepAliveRunnable.setDatabusPollRunnable(databusPollRunnable);
                this._pollingExecutorService.schedule(databusPollRunnable, LONG_POLL_RETRY_TIME.getMillis(), TimeUnit.MILLISECONDS);
                this._keepAliveExecutorService.schedule(keepAliveRunnable, 0L, TimeUnit.MILLISECONDS);
                z2 = true;
                Response build = Response.ok().build();
                if (1 == 0) {
                    startAsync.complete();
                }
                return build;
            } catch (IOException e) {
                throw Throwables.propagate(e);
            }
        } catch (Throwable th) {
            if (!z2) {
                startAsync.complete();
            }
            throw th;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static void addThreadPoolMonitoring(ScheduledExecutorService scheduledExecutorService, String str, MetricRegistry metricRegistry) {
        if (scheduledExecutorService instanceof ThreadPoolExecutor) {
            final ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) scheduledExecutorService;
            metricRegistry.register(MetricRegistry.name("bv.emodb.databus", "DatabusResource1", str), new Gauge<Integer>() { // from class: com.bazaarvoice.emodb.web.resources.databus.DatabusResourcePoller.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // com.codahale.metrics.Gauge
                public Integer getValue() {
                    return Integer.valueOf(threadPoolExecutor.getPoolSize() - threadPoolExecutor.getActiveCount());
                }
            });
        }
    }

    private static Timer buildPollTimer(MetricRegistry metricRegistry) {
        return metricRegistry.timer(MetricRegistry.name("bv.emodb.databus", "DatabusResource1", Permissions.POLL));
    }
}
