package org.apache.pulsar.metadata.coordination.impl;

import com.fasterxml.jackson.databind.type.TypeFactory;
import java.util.EnumSet;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataCacheConfig;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.coordination.LeaderElection;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.cache.impl.JSONMetadataSerdeSimpleType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-metadata-2.10.2.1.jar:org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.class */
class LeaderElectionImpl<T> implements LeaderElection<T> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) LeaderElectionImpl.class);
    private final String path;
    private final MetadataSerde<T> serde;
    private final MetadataStoreExtended store;
    private final MetadataCache<T> cache;
    private final Consumer<LeaderElectionState> stateChangesListener;
    private final ScheduledFuture<?> updateCachedValueFuture;
    private LeaderElectionState leaderElectionState;
    private Optional<Long> version = Optional.empty();
    private Optional<T> proposedValue;
    private final ScheduledExecutorService executor;
    private InternalState internalState;
    private static final int LEADER_ELECTION_RETRY_DELAY_SECONDS = 5;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-metadata-2.10.2.1.jar:org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl$InternalState.class */
    public enum InternalState {
        Init,
        ElectionInProgress,
        LeaderIsPresent,
        Closed
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LeaderElectionImpl(MetadataStoreExtended metadataStoreExtended, Class<T> cls, String str, Consumer<LeaderElectionState> consumer, ScheduledExecutorService scheduledExecutorService) {
        this.path = str;
        this.serde = new JSONMetadataSerdeSimpleType(TypeFactory.defaultInstance().constructSimpleType(cls, null));
        this.store = metadataStoreExtended;
        MetadataCacheConfig build = MetadataCacheConfig.builder().expireAfterWriteMillis(-1L).build();
        this.cache = metadataStoreExtended.getMetadataCache(cls, build);
        this.leaderElectionState = LeaderElectionState.NoLeader;
        this.internalState = InternalState.Init;
        this.stateChangesListener = consumer;
        this.executor = scheduledExecutorService;
        metadataStoreExtended.registerListener(this::handlePathNotification);
        metadataStoreExtended.registerSessionListener(this::handleSessionNotification);
        this.updateCachedValueFuture = scheduledExecutorService.scheduleWithFixedDelay(SafeRunnable.safeRun(this::getLeaderValue), build.getRefreshAfterWriteMillis() / 2, build.getRefreshAfterWriteMillis(), TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.pulsar.metadata.api.coordination.LeaderElection
    public synchronized CompletableFuture<LeaderElectionState> elect(T t) {
        if (this.leaderElectionState != LeaderElectionState.NoLeader) {
            return CompletableFuture.completedFuture(this.leaderElectionState);
        }
        this.proposedValue = Optional.of(t);
        return elect();
    }

    private synchronized CompletableFuture<LeaderElectionState> elect() {
        this.internalState = InternalState.ElectionInProgress;
        return this.store.get(this.path).thenCompose(optional -> {
            return optional.isPresent() ? handleExistingLeaderValue((GetResult) optional.get()) : tryToBecomeLeader();
        }).thenComposeAsync((Function<? super U, ? extends CompletionStage<U>>) leaderElectionState -> {
            this.cache.refresh(this.path);
            return this.cache.get(this.path).thenApply(optional2 -> {
                return leaderElectionState;
            });
        }, (Executor) this.executor);
    }

    private synchronized CompletableFuture<LeaderElectionState> handleExistingLeaderValue(GetResult getResult) {
        try {
            if (this.serde.deserialize(this.path, getResult.getValue(), getResult.getStat()).equals(this.proposedValue.orElse(null))) {
                if (!getResult.getStat().isCreatedBySelf()) {
                    return this.store.delete(this.path, Optional.of(Long.valueOf(getResult.getStat().getVersion()))).thenCompose(r3 -> {
                        return tryToBecomeLeader();
                    });
                }
                changeState(LeaderElectionState.Leading);
            } else if (getResult.getStat().isCreatedBySelf()) {
                return this.store.delete(this.path, Optional.of(Long.valueOf(getResult.getStat().getVersion()))).thenCompose(r32 -> {
                    return tryToBecomeLeader();
                });
            }
            changeState(LeaderElectionState.Following);
            return CompletableFuture.completedFuture(LeaderElectionState.Following);
        } catch (Throwable th) {
            return FutureUtils.exception(th);
        }
    }

    private synchronized void changeState(LeaderElectionState leaderElectionState) {
        this.internalState = InternalState.LeaderIsPresent;
        if (this.leaderElectionState != leaderElectionState) {
            this.leaderElectionState = leaderElectionState;
            try {
                this.stateChangesListener.accept(this.leaderElectionState);
            } catch (Throwable th) {
                log.warn("Exception in state change listener", th);
            }
        }
    }

    private synchronized CompletableFuture<LeaderElectionState> tryToBecomeLeader() {
        try {
            byte[] serialize = this.serde.serialize(this.path, this.proposedValue.get());
            CompletableFuture<LeaderElectionState> completableFuture = new CompletableFuture<>();
            this.store.put(this.path, serialize, Optional.of(-1L), EnumSet.of(CreateOption.Ephemeral)).thenAccept(stat -> {
                synchronized (this) {
                    if (this.internalState == InternalState.ElectionInProgress) {
                        this.cache.get(this.path).thenRun(() -> {
                            synchronized (this) {
                                log.info("Acquired leadership on {}", this.path);
                                this.internalState = InternalState.LeaderIsPresent;
                                if (this.leaderElectionState != LeaderElectionState.Leading) {
                                    this.leaderElectionState = LeaderElectionState.Leading;
                                    try {
                                        this.stateChangesListener.accept(this.leaderElectionState);
                                    } catch (Throwable th) {
                                        log.warn("Exception in state change listener", th);
                                    }
                                }
                                completableFuture.complete(this.leaderElectionState);
                            }
                        }).exceptionally(th -> {
                            this.store.delete(this.path, Optional.of(Long.valueOf(stat.getVersion()))).thenRun(() -> {
                                completableFuture.completeExceptionally(th);
                            }).exceptionally(th -> {
                                completableFuture.completeExceptionally(th);
                                return null;
                            });
                            return null;
                        });
                    } else {
                        this.store.delete(this.path, Optional.of(Long.valueOf(stat.getVersion()))).thenRun(() -> {
                            completableFuture.completeExceptionally(new MetadataStoreException.AlreadyClosedException("The leader election was already closed"));
                        }).exceptionally(th2 -> {
                            completableFuture.completeExceptionally(th2);
                            return null;
                        });
                    }
                }
            }).exceptionally(th -> {
                if (th.getCause() instanceof MetadataStoreException.BadVersionException) {
                    elect().thenAccept(leaderElectionState -> {
                        completableFuture.complete(leaderElectionState);
                    }).exceptionally(th -> {
                        completableFuture.completeExceptionally(th);
                        return null;
                    });
                    return null;
                }
                completableFuture.completeExceptionally(th.getCause());
                return null;
            });
            return completableFuture;
        } catch (Throwable th2) {
            return FutureUtils.exception(th2);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.updateCachedValueFuture.cancel(true);
        try {
            asyncClose().join();
        } catch (CompletionException e) {
            throw MetadataStoreException.unwrap(e);
        }
    }

    @Override // org.apache.pulsar.metadata.api.coordination.LeaderElection
    public synchronized CompletableFuture<Void> asyncClose() {
        if (this.internalState == InternalState.Closed) {
            return CompletableFuture.completedFuture(null);
        }
        this.internalState = InternalState.Closed;
        this.executor.shutdownNow();
        return this.leaderElectionState != LeaderElectionState.Leading ? CompletableFuture.completedFuture(null) : this.store.delete(this.path, this.version);
    }

    @Override // org.apache.pulsar.metadata.api.coordination.LeaderElection
    public synchronized LeaderElectionState getState() {
        return this.leaderElectionState;
    }

    @Override // org.apache.pulsar.metadata.api.coordination.LeaderElection
    public CompletableFuture<Optional<T>> getLeaderValue() {
        return this.cache.get(this.path);
    }

    @Override // org.apache.pulsar.metadata.api.coordination.LeaderElection
    public Optional<T> getLeaderValueIfPresent() {
        return this.cache.getIfCached(this.path);
    }

    private void handleSessionNotification(SessionEvent sessionEvent) {
        this.executor.execute(SafeRunnable.safeRun(() -> {
            if (sessionEvent == SessionEvent.SessionReestablished) {
                log.info("Revalidating leadership for {}", this.path);
                try {
                    log.info("Resynced leadership for {} - State: {}", this.path, elect().get());
                } catch (InterruptedException | ExecutionException e) {
                    log.warn("Failure when processing session event", e);
                }
            }
        }));
    }

    private void handlePathNotification(Notification notification) {
        if (this.path.equals(notification.getPath())) {
            synchronized (this) {
                if (this.internalState != InternalState.LeaderIsPresent) {
                    return;
                }
                if (notification.getType() == NotificationType.Deleted) {
                    if (this.leaderElectionState == LeaderElectionState.Leading) {
                        log.warn("Leadership released for {}", this.path);
                    }
                    this.leaderElectionState = LeaderElectionState.NoLeader;
                    if (this.proposedValue.isPresent()) {
                        elect().exceptionally(th -> {
                            log.warn("Leader election for path {} has failed", this.path, th);
                            synchronized (this) {
                                try {
                                    this.stateChangesListener.accept(this.leaderElectionState);
                                } catch (Throwable th) {
                                    log.warn("Exception in state change listener", th);
                                }
                                if (this.internalState != InternalState.Closed) {
                                    this.executor.schedule(() -> {
                                        log.info("Retrying Leader election for path {}", this.path);
                                        elect();
                                    }, 5L, TimeUnit.SECONDS);
                                }
                            }
                            return null;
                        });
                    }
                }
            }
        }
    }
}
