package org.apache.flume.source;

import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flume.CounterGroup;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.Source;
import org.apache.flume.SourceRunner;
import org.apache.flume.lifecycle.LifecycleState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/flume-ng-core-1.9.0.jar:org/apache/flume/source/PollableSourceRunner.class */
public class PollableSourceRunner extends SourceRunner {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) PollableSourceRunner.class);
    private PollingRunner runner;
    private Thread runnerThread;
    private AtomicBoolean shouldStop = new AtomicBoolean();
    private CounterGroup counterGroup = new CounterGroup();
    private LifecycleState lifecycleState = LifecycleState.IDLE;

    /* loaded from: input_file:META-INF/bundled-dependencies/flume-ng-core-1.9.0.jar:org/apache/flume/source/PollableSourceRunner$PollingRunner.class */
    public static class PollingRunner implements Runnable {
        private PollableSource source;
        private AtomicBoolean shouldStop;
        private CounterGroup counterGroup;

        @Override // java.lang.Runnable
        public void run() {
            PollableSourceRunner.logger.debug("Polling runner starting. Source:{}", this.source);
            while (!this.shouldStop.get()) {
                this.counterGroup.incrementAndGet("runner.polls");
                try {
                    if (this.source.process().equals(PollableSource.Status.BACKOFF)) {
                        this.counterGroup.incrementAndGet("runner.backoffs");
                        Thread.sleep(Math.min(this.counterGroup.incrementAndGet("runner.backoffs.consecutive").longValue() * this.source.getBackOffSleepIncrement(), this.source.getMaxBackOffSleepInterval()));
                    } else {
                        this.counterGroup.set("runner.backoffs.consecutive", 0L);
                    }
                } catch (InterruptedException e) {
                    PollableSourceRunner.logger.info("Source runner interrupted. Exiting");
                    this.counterGroup.incrementAndGet("runner.interruptions");
                } catch (EventDeliveryException e2) {
                    PollableSourceRunner.logger.error("Unable to deliver event. Exception follows.", (Throwable) e2);
                    this.counterGroup.incrementAndGet("runner.deliveryErrors");
                } catch (Exception e3) {
                    this.counterGroup.incrementAndGet("runner.errors");
                    PollableSourceRunner.logger.error("Unhandled exception, logging and sleeping for " + this.source.getMaxBackOffSleepInterval() + "ms", (Throwable) e3);
                    try {
                        Thread.sleep(this.source.getMaxBackOffSleepInterval());
                    } catch (InterruptedException e4) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
            PollableSourceRunner.logger.debug("Polling runner exiting. Metrics:{}", this.counterGroup);
        }
    }

    @Override // org.apache.flume.lifecycle.LifecycleAware
    public void start() {
        PollableSource pollableSource = (PollableSource) getSource();
        pollableSource.getChannelProcessor().initialize();
        pollableSource.start();
        this.runner = new PollingRunner();
        this.runner.source = pollableSource;
        this.runner.counterGroup = this.counterGroup;
        this.runner.shouldStop = this.shouldStop;
        this.runnerThread = new Thread(this.runner);
        this.runnerThread.setName(getClass().getSimpleName() + "-" + pollableSource.getClass().getSimpleName() + "-" + pollableSource.getName());
        this.runnerThread.start();
        this.lifecycleState = LifecycleState.START;
    }

    @Override // org.apache.flume.lifecycle.LifecycleAware
    public void stop() {
        this.runner.shouldStop.set(true);
        try {
            this.runnerThread.interrupt();
            this.runnerThread.join();
        } catch (InterruptedException e) {
            logger.warn("Interrupted while waiting for polling runner to stop. Please report this.", (Throwable) e);
            Thread.currentThread().interrupt();
        }
        Source source = getSource();
        source.stop();
        source.getChannelProcessor().close();
        this.lifecycleState = LifecycleState.STOP;
    }

    public String toString() {
        return "PollableSourceRunner: { source:" + getSource() + " counterGroup:" + this.counterGroup + " }";
    }

    @Override // org.apache.flume.lifecycle.LifecycleAware
    public LifecycleState getLifecycleState() {
        return this.lifecycleState;
    }
}
