package com.linkedin.d2.discovery.stores.zk;

import com.linkedin.d2.discovery.stores.zk.ZKConnection;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/d2/discovery/stores/zk/ZKPersistentConnection.class */
public class ZKPersistentConnection {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ZKPersistentConnection.class);
    private final Object _mutex;
    private final ZKConnectionBuilder _zkConnectionBuilder;
    private ZKConnection _zkConnection;
    private Set<EventListener> _listeners;
    private State _state;
    private AtomicInteger _activeUserCount;
    private AtomicInteger _registeredUserCount;
    private volatile boolean _hasForcefullyShutdown;

    /* loaded from: input_file:com/linkedin/d2/discovery/stores/zk/ZKPersistentConnection$Event.class */
    public enum Event {
        SESSION_ESTABLISHED,
        SESSION_EXPIRED,
        DISCONNECTED,
        CONNECTED
    }

    /* loaded from: input_file:com/linkedin/d2/discovery/stores/zk/ZKPersistentConnection$EventListener.class */
    public interface EventListener {
        void notifyEvent(Event event);
    }

    /* loaded from: input_file:com/linkedin/d2/discovery/stores/zk/ZKPersistentConnection$EventListenerNotifiers.class */
    public static class EventListenerNotifiers implements EventListener {
        @Override // com.linkedin.d2.discovery.stores.zk.ZKPersistentConnection.EventListener
        public void notifyEvent(Event event) {
            switch (event) {
                case SESSION_ESTABLISHED:
                    sessionEstablished(event);
                    return;
                case SESSION_EXPIRED:
                    sessionExpired(event);
                    return;
                case CONNECTED:
                    connected(event);
                    return;
                case DISCONNECTED:
                    disconnected(event);
                    return;
                default:
                    return;
            }
        }

        public void sessionEstablished(Event event) {
        }

        public void sessionExpired(Event event) {
        }

        public void disconnected(Event event) {
        }

        public void connected(Event event) {
        }
    }

    /* loaded from: input_file:com/linkedin/d2/discovery/stores/zk/ZKPersistentConnection$Listener.class */
    private class Listener implements ZKConnection.StateListener {
        private long _sessionId;

        private Listener() {
        }

        @Override // com.linkedin.d2.discovery.stores.zk.ZKConnection.StateListener
        public void notifyStateChange(Watcher.Event.KeeperState keeperState) {
            long sessionId = ZKPersistentConnection.this.getZooKeeper().getSessionId();
            ZKPersistentConnection.LOG.info("Got event {} for session 0x{}", keeperState, Long.toHexString(sessionId));
            boolean z = false;
            if (keeperState == Watcher.Event.KeeperState.SyncConnected && sessionId != this._sessionId) {
                z = true;
                this._sessionId = sessionId;
            }
            switch (keeperState) {
                case SyncConnected:
                    deliver(z ? Event.SESSION_ESTABLISHED : Event.CONNECTED);
                    break;
                case Disconnected:
                    deliver(Event.DISCONNECTED);
                    break;
                case Expired:
                    deliver(Event.SESSION_EXPIRED);
                    break;
            }
            if (keeperState == Watcher.Event.KeeperState.Expired) {
                try {
                    synchronized (ZKPersistentConnection.this._mutex) {
                        if (ZKPersistentConnection.this._state == State.STARTED) {
                            ZKPersistentConnection.this._zkConnection.shutdown();
                            ZKPersistentConnection.this._zkConnection = ZKPersistentConnection.this._zkConnectionBuilder.build();
                            ZKPersistentConnection.this._zkConnection.addStateListener(new Listener());
                            ZKPersistentConnection.this._zkConnection.start();
                        }
                    }
                } catch (IOException e) {
                    ZKPersistentConnection.LOG.error("Failed to restart ZKConnection after expiration", (Throwable) e);
                } catch (InterruptedException e2) {
                    ZKPersistentConnection.LOG.error("Failed to shutdown ZKConnection after expiration", (Throwable) e2);
                }
            }
        }

        private void deliver(Event event) {
            Iterator it2 = ZKPersistentConnection.this._listeners.iterator();
            while (it2.hasNext()) {
                ((EventListener) it2.next()).notifyEvent(event);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/d2/discovery/stores/zk/ZKPersistentConnection$State.class */
    public enum State {
        INIT,
        STARTED,
        STOPPED
    }

    public ZKPersistentConnection(String str, int i, Collection<? extends EventListener> collection) {
        this(str, i, collection, false);
    }

    public ZKPersistentConnection(String str, int i, Collection<? extends EventListener> collection, boolean z) {
        this(str, i, collection, z, false);
    }

    public ZKPersistentConnection(String str, int i, Collection<? extends EventListener> collection, boolean z, boolean z2) {
        this(str, i, collection, z, z2, false);
    }

    public ZKPersistentConnection(String str, int i, Collection<? extends EventListener> collection, boolean z, boolean z2, boolean z3) {
        this(new ZKConnectionBuilder(str).setTimeout(i).setShutdownAsynchronously(z).setIsSymlinkAware(z2).setWaitForConnected(z3));
        addListeners(collection);
    }

    public ZKPersistentConnection(ZKConnectionBuilder zKConnectionBuilder) {
        this._mutex = new Object();
        this._state = State.INIT;
        this._zkConnectionBuilder = zKConnectionBuilder;
        this._zkConnection = this._zkConnectionBuilder.build();
        this._zkConnection.addStateListener(new Listener());
        this._listeners = new HashSet();
        this._activeUserCount = new AtomicInteger(0);
        this._registeredUserCount = new AtomicInteger(0);
        this._hasForcefullyShutdown = false;
    }

    public void addListeners(Collection<? extends EventListener> collection) {
        synchronized (this._mutex) {
            if (this._state != State.INIT) {
                throw new IllegalStateException("Listeners can be added only before connection starts, current state: " + this._state);
            }
            this._listeners.addAll(collection);
        }
    }

    public void incrementShareCount() {
        this._registeredUserCount.incrementAndGet();
    }

    public void start() throws IOException {
        synchronized (this._mutex) {
            this._activeUserCount.getAndIncrement();
            if (this._state != State.INIT) {
                return;
            }
            this._state = State.STARTED;
            this._listeners = Collections.unmodifiableSet(this._listeners);
            this._zkConnection.start();
        }
    }

    public void shutdown() throws InterruptedException {
        synchronized (this._mutex) {
            if (this._hasForcefullyShutdown) {
                LOG.warn("The connection has already been forcefully shutdown");
                return;
            }
            if (this._state != State.STARTED) {
                throw new IllegalStateException("Can not shutdown ZKConnection when " + this._state);
            }
            int decrementAndGet = this._activeUserCount.decrementAndGet();
            int decrementAndGet2 = this._registeredUserCount.decrementAndGet();
            if (decrementAndGet > 0 || decrementAndGet2 > 0) {
                return;
            }
            this._state = State.STOPPED;
            this._zkConnection.shutdown();
        }
    }

    public void forceShutdown() throws InterruptedException {
        synchronized (this._mutex) {
            if (this._state != State.STARTED) {
                LOG.warn("Unnecessary to forcefully shutdown a zkPersistentConnection that is either not started or already stopped");
                return;
            }
            this._hasForcefullyShutdown = true;
            int i = this._activeUserCount.get();
            if (i != 0) {
                LOG.warn("Forcefully shutting down ZkPersistentConnection when there still are" + i + " active users");
            }
            this._state = State.STOPPED;
            try {
                this._zkConnection.shutdown();
            } catch (IllegalStateException e) {
                LOG.warn("trying to forcefully shutdown zk connection but encountered:" + e.getMessage());
            }
        }
    }

    public ZooKeeper getZooKeeper() {
        ZooKeeper zooKeeper;
        synchronized (this._mutex) {
            zooKeeper = this._zkConnection.getZooKeeper();
        }
        return zooKeeper;
    }

    public ZKConnection getZKConnection() {
        ZKConnection zKConnection;
        synchronized (this._mutex) {
            zKConnection = this._zkConnection;
        }
        return zKConnection;
    }

    public boolean isConnectionStarted() {
        boolean z;
        synchronized (this._mutex) {
            z = this._state == State.STARTED;
        }
        return z;
    }

    public boolean isConnectionStopped() {
        boolean z;
        synchronized (this._mutex) {
            z = this._state == State.STOPPED;
        }
        return z;
    }
}
