package com.linkedin.alpini.base.concurrency;

import com.linkedin.alpini.base.misc.DoublyLinkedList;
import com.linkedin.alpini.base.misc.Time;
import com.linkedin.alpini.base.registry.ResourceRegistry;
import com.linkedin.alpini.base.registry.ShutdownableExecutors;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/alpini/base/concurrency/TimeoutProcessor.class */
public class TimeoutProcessor implements Executor {
    private static final long TICKING_INTERVAL_MILLISECONDS = 1000;
    private final ResourceRegistry _registry;
    private final ScheduledExecutorService _executor;
    private final DoublyLinkedList<TimeoutEvent> _nextEvent;
    private final NavigableMap<Long, TimeoutEvent> _eventsMap;
    private final EventStore _eventStore;
    private final ReentrantLock _mapLock;
    private final ReentrantLock _nextEventLock;
    private final Runnable _drainingEvents;
    private final Runnable _tick;
    private static final Logger LOG = LogManager.getLogger((Class<?>) TimeoutProcessor.class);
    private static final Runnable NOP = () -> {
    };
    private static final TimeoutInterface DEFUNCT = new TimeoutInterface() { // from class: com.linkedin.alpini.base.concurrency.TimeoutProcessor.1
        @Override // com.linkedin.alpini.base.concurrency.TimeoutProcessor.TimeoutInterface
        public boolean isDone() {
            return true;
        }

        @Override // com.linkedin.alpini.base.concurrency.TimeoutProcessor.TimeoutInterface
        public boolean cancel() {
            return false;
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/alpini/base/concurrency/TimeoutProcessor$EventStore.class */
    public enum EventStore {
        TreeMap { // from class: com.linkedin.alpini.base.concurrency.TimeoutProcessor.EventStore.1
            private TimeoutEvent nextEvent(long j, NavigableMap<Long, TimeoutEvent> navigableMap, ReentrantLock reentrantLock, ReentrantLock reentrantLock2) {
                reentrantLock.lock();
                try {
                    Iterator<Map.Entry<Long, TimeoutEvent>> it = navigableMap.entrySet().iterator();
                    if (it.hasNext()) {
                        TimeoutEvent value = it.next().getValue();
                        if (value._time.longValue() <= j) {
                            it.remove();
                            reentrantLock2.lock();
                            try {
                                value.unlink();
                                reentrantLock2.unlock();
                                return value;
                            } catch (Throwable th) {
                                reentrantLock2.unlock();
                                throw th;
                            }
                        }
                    }
                    reentrantLock.unlock();
                    return null;
                } finally {
                    reentrantLock.unlock();
                }
            }

            @Override // com.linkedin.alpini.base.concurrency.TimeoutProcessor.EventStore
            List<TimeoutEvent> removeTimeoutEvents(long j, NavigableMap<Long, TimeoutEvent> navigableMap, ReentrantLock reentrantLock, ReentrantLock reentrantLock2) {
                LinkedList linkedList = new LinkedList();
                TimeoutEvent nextEvent = nextEvent(j, navigableMap, reentrantLock, reentrantLock2);
                while (true) {
                    TimeoutEvent timeoutEvent = nextEvent;
                    if (timeoutEvent == null) {
                        return linkedList;
                    }
                    linkedList.add(timeoutEvent);
                    timeoutEvent.executeAll();
                    nextEvent = nextEvent(j, navigableMap, reentrantLock, reentrantLock2);
                }
            }

            @Override // com.linkedin.alpini.base.concurrency.TimeoutProcessor.EventStore
            TimeoutFuture schedule(Runnable runnable, TimeoutEvent timeoutEvent, Long l, DoublyLinkedList<TimeoutEvent> doublyLinkedList, ScheduledExecutorService scheduledExecutorService, Runnable runnable2, NavigableMap<Long, TimeoutEvent> navigableMap, ReentrantLock reentrantLock, ReentrantLock reentrantLock2) {
                reentrantLock.lock();
                try {
                    TimeoutFuture schedule = schedule(runnable, timeoutEvent, l, doublyLinkedList, scheduledExecutorService, runnable2, navigableMap, reentrantLock2);
                    reentrantLock.unlock();
                    return schedule;
                } catch (Throwable th) {
                    reentrantLock.unlock();
                    throw th;
                }
            }

            @Override // com.linkedin.alpini.base.concurrency.TimeoutProcessor.EventStore
            void scheduleFirstNodeInEvents(DoublyLinkedList<TimeoutEvent> doublyLinkedList, ScheduledExecutorService scheduledExecutorService, Runnable runnable, NavigableMap<Long, TimeoutEvent> navigableMap, ReentrantLock reentrantLock, ReentrantLock reentrantLock2) {
                reentrantLock.lock();
                try {
                    scheduleFirstNodeInEvents(doublyLinkedList, scheduledExecutorService, runnable, navigableMap, reentrantLock2);
                    reentrantLock.unlock();
                } catch (Throwable th) {
                    reentrantLock.unlock();
                    throw th;
                }
            }

            @Override // com.linkedin.alpini.base.concurrency.TimeoutProcessor.EventStore
            NavigableMap<Long, TimeoutEvent> buildCorrespondentMap() {
                return new TreeMap();
            }
        },
        SkipList { // from class: com.linkedin.alpini.base.concurrency.TimeoutProcessor.EventStore.2
            @Override // com.linkedin.alpini.base.concurrency.TimeoutProcessor.EventStore
            NavigableMap<Long, TimeoutEvent> buildCorrespondentMap() {
                return new ConcurrentSkipListMap();
            }

            @Override // com.linkedin.alpini.base.concurrency.TimeoutProcessor.EventStore
            void scheduleFirstNodeInEvents(DoublyLinkedList<TimeoutEvent> doublyLinkedList, ScheduledExecutorService scheduledExecutorService, Runnable runnable, NavigableMap<Long, TimeoutEvent> navigableMap, ReentrantLock reentrantLock, ReentrantLock reentrantLock2) {
                scheduleFirstNodeInEvents(doublyLinkedList, scheduledExecutorService, runnable, navigableMap, reentrantLock2);
            }

            @Override // com.linkedin.alpini.base.concurrency.TimeoutProcessor.EventStore
            TimeoutFuture schedule(Runnable runnable, TimeoutEvent timeoutEvent, Long l, DoublyLinkedList<TimeoutEvent> doublyLinkedList, ScheduledExecutorService scheduledExecutorService, Runnable runnable2, NavigableMap<Long, TimeoutEvent> navigableMap, ReentrantLock reentrantLock, ReentrantLock reentrantLock2) {
                return schedule(runnable, timeoutEvent, l, doublyLinkedList, scheduledExecutorService, runnable2, navigableMap, reentrantLock2);
            }

            @Override // com.linkedin.alpini.base.concurrency.TimeoutProcessor.EventStore
            List<TimeoutEvent> removeTimeoutEvents(long j, NavigableMap<Long, TimeoutEvent> navigableMap, ReentrantLock reentrantLock, ReentrantLock reentrantLock2) {
                return removeTimeoutEvents(j, navigableMap, reentrantLock2);
            }
        };

        /* JADX INFO: Access modifiers changed from: private */
        public List<TimeoutEvent> removeTimeoutEvents(long j, NavigableMap<Long, TimeoutEvent> navigableMap, ReentrantLock reentrantLock) {
            return (List) navigableMap.headMap(Long.valueOf(j), true).entrySet().stream().map(entry -> {
                return (TimeoutEvent) navigableMap.remove(entry.getKey());
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).map(timeoutEvent -> {
                reentrantLock.lock();
                try {
                    timeoutEvent.unlink();
                    timeoutEvent.executeAll();
                    return timeoutEvent;
                } finally {
                    reentrantLock.unlock();
                }
            }).collect(Collectors.toList());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public TimeoutFuture schedule(Runnable runnable, TimeoutEvent timeoutEvent, Long l, DoublyLinkedList<TimeoutEvent> doublyLinkedList, ScheduledExecutorService scheduledExecutorService, Runnable runnable2, NavigableMap<Long, TimeoutEvent> navigableMap, ReentrantLock reentrantLock) {
            TimeoutFuture timeoutFuture = new TimeoutFuture();
            ((TimeoutEvent) navigableMap.computeIfAbsent(l, l2 -> {
                return timeoutEvent;
            })).add(timeoutFuture, RunOnce.make(runnable));
            scheduleFirstNodeInEvents(doublyLinkedList, scheduledExecutorService, runnable2, navigableMap, reentrantLock);
            return timeoutFuture;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void scheduleFirstNodeInEvents(DoublyLinkedList<TimeoutEvent> doublyLinkedList, ScheduledExecutorService scheduledExecutorService, Runnable runnable, NavigableMap<Long, TimeoutEvent> navigableMap, ReentrantLock reentrantLock) {
            Optional.ofNullable(navigableMap.isEmpty() ? null : navigableMap.firstEntry()).map((v0) -> {
                return v0.getValue();
            }).ifPresent(timeoutEvent -> {
                boolean z = false;
                reentrantLock.lock();
                try {
                    if (doublyLinkedList.isEmpty() || ((TimeoutEvent) doublyLinkedList.peek())._time.longValue() > timeoutEvent._time.longValue()) {
                        doublyLinkedList.push((DoublyLinkedList) timeoutEvent);
                        z = true;
                    }
                    if (z) {
                        scheduledExecutorService.schedule(runnable, timeoutEvent._time.longValue() - Time.nanoTime(), TimeUnit.NANOSECONDS);
                    }
                } finally {
                    reentrantLock.unlock();
                }
            });
        }

        abstract NavigableMap<Long, TimeoutEvent> buildCorrespondentMap();

        abstract void scheduleFirstNodeInEvents(DoublyLinkedList<TimeoutEvent> doublyLinkedList, ScheduledExecutorService scheduledExecutorService, Runnable runnable, NavigableMap<Long, TimeoutEvent> navigableMap, ReentrantLock reentrantLock, ReentrantLock reentrantLock2);

        abstract TimeoutFuture schedule(Runnable runnable, TimeoutEvent timeoutEvent, Long l, DoublyLinkedList<TimeoutEvent> doublyLinkedList, ScheduledExecutorService scheduledExecutorService, Runnable runnable2, NavigableMap<Long, TimeoutEvent> navigableMap, ReentrantLock reentrantLock, ReentrantLock reentrantLock2);

        abstract List<TimeoutEvent> removeTimeoutEvents(long j, NavigableMap<Long, TimeoutEvent> navigableMap, ReentrantLock reentrantLock, ReentrantLock reentrantLock2);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/alpini/base/concurrency/TimeoutProcessor$TimeoutEvent.class */
    public class TimeoutEvent extends DoublyLinkedList.Entry<TimeoutEvent> {
        private final Long _time;
        private boolean _purgePending;
        private final ReentrantLock _lock;
        private final DoublyLinkedList<Future> _futures;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:com/linkedin/alpini/base/concurrency/TimeoutProcessor$TimeoutEvent$Future.class */
        public class Future extends DoublyLinkedList.Entry<Future> implements TimeoutInterface {
            private final AtomicReference<Runnable> _task;
            private TimeoutFuture _public;
            static final /* synthetic */ boolean $assertionsDisabled;

            public Future(TimeoutFuture timeoutFuture, Runnable runnable) {
                if (!$assertionsDisabled && timeoutFuture._future != null) {
                    throw new AssertionError();
                }
                this._task = new AtomicReference<>(runnable);
                this._public = timeoutFuture;
                timeoutFuture._future = this;
            }

            public void execute() {
                Runnable andSet;
                TimeoutFuture timeoutFuture = this._public;
                if (timeoutFuture == null || (andSet = this._task.getAndSet(TimeoutProcessor.NOP)) == TimeoutProcessor.NOP) {
                    return;
                }
                TimeoutProcessor.this._executor.execute(andSet);
                timeoutFuture._future = TimeoutProcessor.DEFUNCT;
                this._public = null;
            }

            @Override // com.linkedin.alpini.base.concurrency.TimeoutProcessor.TimeoutInterface
            public boolean isDone() {
                return this._task.get() == TimeoutProcessor.NOP;
            }

            @Override // com.linkedin.alpini.base.concurrency.TimeoutProcessor.TimeoutInterface
            public boolean cancel() {
                TimeoutFuture timeoutFuture = this._public;
                TimeoutEvent.this._lock.lock();
                try {
                    try {
                        if (this._task.getAndSet(TimeoutProcessor.NOP) == TimeoutProcessor.NOP) {
                            TimeoutEvent.this._lock.unlock();
                            return false;
                        }
                        unlink();
                        if (timeoutFuture != null) {
                            timeoutFuture._future = TimeoutProcessor.DEFUNCT;
                            this._public = null;
                        }
                        return true;
                    } finally {
                        if (timeoutFuture != null) {
                            timeoutFuture._future = TimeoutProcessor.DEFUNCT;
                            this._public = null;
                        }
                    }
                } finally {
                    TimeoutEvent.this._lock.unlock();
                }
            }

            static {
                $assertionsDisabled = !TimeoutProcessor.class.desiredAssertionStatus();
            }
        }

        public void executeAll() {
            Future pop = pop();
            while (true) {
                Future future = pop;
                if (future == null) {
                    return;
                }
                future.execute();
                pop = pop();
            }
        }

        private TimeoutEvent(Long l) {
            this._lock = new ReentrantLock();
            this._futures = new DoublyLinkedList<Future>() { // from class: com.linkedin.alpini.base.concurrency.TimeoutProcessor.TimeoutEvent.1
                static final /* synthetic */ boolean $assertionsDisabled;

                /* JADX INFO: Access modifiers changed from: protected */
                @Override // com.linkedin.alpini.base.misc.DoublyLinkedList
                public boolean unlink(DoublyLinkedList.Entry<Future> entry) {
                    boolean unlink = super.unlink(entry);
                    if (!$assertionsDisabled && unlink && !TimeoutEvent.this._lock.isHeldByCurrentThread()) {
                        throw new AssertionError();
                    }
                    if (unlink && isEmpty() && !TimeoutEvent.this._purgePending) {
                        TimeoutEvent.this._purgePending = true;
                        TimeoutProcessor.this._executor.execute(() -> {
                            try {
                                if (isEmpty()) {
                                    TimeoutProcessor.this._nextEventLock.lock();
                                    try {
                                        TimeoutEvent.this.unlink();
                                        TimeoutProcessor.this._nextEventLock.unlock();
                                    } catch (Throwable th) {
                                        TimeoutProcessor.this._nextEventLock.unlock();
                                        throw th;
                                    }
                                }
                            } finally {
                                TimeoutEvent.this._purgePending = false;
                            }
                        });
                    }
                    return unlink;
                }

                static {
                    $assertionsDisabled = !TimeoutProcessor.class.desiredAssertionStatus();
                }
            };
            this._time = l;
        }

        public void add(TimeoutFuture timeoutFuture, Runnable runnable) {
            this._lock.lock();
            try {
                this._futures.add(new Future(timeoutFuture, runnable));
            } finally {
                this._lock.unlock();
            }
        }

        public Future pop() {
            Future pop;
            this._lock.lock();
            do {
                try {
                    if (this._futures.isEmpty()) {
                        return null;
                    }
                    pop = this._futures.pop();
                } finally {
                    this._lock.unlock();
                }
            } while (pop.isDone());
            this._lock.unlock();
            return pop;
        }
    }

    /* loaded from: input_file:com/linkedin/alpini/base/concurrency/TimeoutProcessor$TimeoutFuture.class */
    public static class TimeoutFuture extends DoublyLinkedList.Entry<TimeoutFuture> implements TimeoutInterface {
        private TimeoutInterface _future;

        private TimeoutFuture() {
        }

        @Override // com.linkedin.alpini.base.concurrency.TimeoutProcessor.TimeoutInterface
        public boolean isDone() {
            return this._future.isDone();
        }

        @Override // com.linkedin.alpini.base.concurrency.TimeoutProcessor.TimeoutInterface
        public boolean cancel() {
            return this._future.cancel();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/linkedin/alpini/base/concurrency/TimeoutProcessor$TimeoutInterface.class */
    public interface TimeoutInterface {
        boolean isDone();

        boolean cancel();
    }

    public TimeoutProcessor() {
        this(null);
    }

    public TimeoutProcessor(ResourceRegistry resourceRegistry) {
        this(resourceRegistry, 1000L);
    }

    TimeoutProcessor(ResourceRegistry resourceRegistry, long j, EventStore eventStore, int i) {
        ResourceRegistry resourceRegistry2;
        this._nextEvent = new DoublyLinkedList<>();
        this._mapLock = new ReentrantLock();
        this._nextEventLock = new ReentrantLock();
        this._drainingEvents = () -> {
            removeTimeoutNodes();
            scheduleFirstNodeInEvents();
        };
        this._tick = () -> {
            try {
                this._drainingEvents.run();
            } catch (RejectedExecutionException e) {
                LOG.debug("RejectedExecutionException: {}", e.getMessage());
            } catch (Throwable th) {
                LOG.warn("Caught a throwable. But next _tick would continue.", th);
            }
        };
        if (resourceRegistry == null) {
            resourceRegistry2 = new ResourceRegistry();
            resourceRegistry = resourceRegistry2;
        } else {
            resourceRegistry2 = null;
        }
        this._registry = resourceRegistry2;
        this._executor = ((ShutdownableExecutors) resourceRegistry.factory(ShutdownableExecutors.class)).newScheduledThreadPool(i, new NamedThreadFactory("timeout-processor"));
        this._eventStore = (EventStore) Objects.requireNonNull(eventStore);
        this._eventsMap = this._eventStore.buildCorrespondentMap();
        this._executor.scheduleAtFixedRate(this._tick, 0L, j, TimeUnit.MILLISECONDS);
    }

    TimeoutProcessor(ResourceRegistry resourceRegistry, long j) {
        this(resourceRegistry, j, EventStore.TreeMap, 1);
    }

    public TimeoutProcessor(ResourceRegistry resourceRegistry, boolean z, int i) {
        this(resourceRegistry, 1000L, z ? EventStore.TreeMap : EventStore.SkipList, i);
    }

    public TimeoutProcessor(ResourceRegistry resourceRegistry, long j, boolean z, int i) {
        this(resourceRegistry, j, z ? EventStore.TreeMap : EventStore.SkipList, i);
    }

    public String toString() {
        return this._eventStore.name();
    }

    private void scheduleFirstNodeInEvents() {
        this._eventStore.scheduleFirstNodeInEvents(this._nextEvent, this._executor, this._drainingEvents, this._eventsMap, this._mapLock, this._nextEventLock);
    }

    private List<TimeoutEvent> removeTimeoutNodes() {
        return this._eventStore.removeTimeoutEvents(Long.valueOf(Time.nanoTime()).longValue(), this._eventsMap, this._mapLock, this._nextEventLock);
    }

    public TimeoutFuture schedule(Runnable runnable, long j, TimeUnit timeUnit) {
        Long valueOf = Long.valueOf(absoluteTime(j, timeUnit));
        return this._eventStore.schedule(runnable, new TimeoutEvent(valueOf), valueOf, this._nextEvent, this._executor, this._drainingEvents, this._eventsMap, this._mapLock, this._nextEventLock);
    }

    public void shutdownNow() {
        if (this._registry == null) {
            this._executor.shutdownNow();
            return;
        }
        this._registry.shutdown();
        try {
            this._registry.waitForShutdown();
        } catch (InterruptedException e) {
            LOG.warn("Interrupted", (Throwable) e);
        }
    }

    public void awaitTermination(int i, TimeUnit timeUnit) throws InterruptedException {
        if (this._registry == null) {
            this._executor.awaitTermination(i, timeUnit);
            return;
        }
        try {
            this._registry.waitForShutdown(timeUnit.toMillis(i));
        } catch (TimeoutException e) {
            LOG.warn("Timeout", (Throwable) e);
        }
    }

    @Override // java.util.concurrent.Executor
    public void execute(Runnable runnable) {
        this._executor.execute(runnable);
    }

    private static long absoluteTime(long j, TimeUnit timeUnit) {
        return (Time.nanoTime() + timeUnit.toNanos(j) + 524287) | 1048575;
    }
}
