package alluxio.master.journal.raft;

import alluxio.conf.PropertyKey;
import alluxio.conf.ServerConfiguration;
import alluxio.exception.ExceptionMessage;
import alluxio.exception.status.CancelledException;
import alluxio.exception.status.UnavailableException;
import alluxio.grpc.AddQuorumServerRequest;
import alluxio.grpc.GrpcService;
import alluxio.grpc.JournalQueryRequest;
import alluxio.grpc.NetAddress;
import alluxio.grpc.QuorumServerInfo;
import alluxio.grpc.QuorumServerState;
import alluxio.grpc.ServiceType;
import alluxio.grpc.TransferLeaderMessage;
import alluxio.master.Master;
import alluxio.master.PrimarySelector;
import alluxio.master.journal.AbstractJournalSystem;
import alluxio.master.journal.AsyncJournalWriter;
import alluxio.master.journal.CatchupFuture;
import alluxio.master.journal.Journal;
import alluxio.master.journal.ufs.UfsJournal;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.metrics.sink.RatisDropwizardExports;
import alluxio.proto.journal.Journal;
import alluxio.util.CommonUtils;
import alluxio.util.LogUtils;
import alluxio.util.WaitForOptions;
import alluxio.util.io.FileUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.net.HostAndPort;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.AccessDeniedException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.GroupInfoReply;
import org.apache.ratis.protocol.GroupInfoRequest;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.retry.ExponentialBackoffRetry;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.NetUtils;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:alluxio/master/journal/raft/RaftJournalSystem.class */
public class RaftJournalSystem extends AbstractJournalSystem {
    public static final UUID RAFT_GROUP_UUID = UUID.fromString("02511d47-d67c-49a3-9011-abb3109a44c1");
    public static final RaftGroupId RAFT_GROUP_ID = RaftGroupId.valueOf(RAFT_GROUP_UUID);
    private static final Logger LOG = LoggerFactory.getLogger(RaftJournalSystem.class);
    private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
    private static final long SINGLE_MASTER_ELECTION_TIMEOUT_MS = 500;
    private static final String WAITING_FOR_ELECTION = "WAITING_FOR_ELECTION";
    private final RaftJournalConfiguration mConf;
    private JournalStateMachine mStateMachine;
    private RaftServer mServer;
    private RaftJournalWriter mRaftJournalWriter;
    private RaftGroup mRaftGroup;
    private RaftPeerId mPeerId;
    private static final long JOURNAL_STAT_LOG_MAX_INTERVAL_MS = 30000;
    private final Map<String, RatisDropwizardExports> mRatisMetricsMap = new ConcurrentHashMap();
    private final ClientId mClientId = ClientId.randomId();
    private final ClientId mRawClientId = ClientId.randomId();
    private final ConcurrentHashMap<String, RaftJournal> mJournals = new ConcurrentHashMap<>();
    private final AtomicBoolean mSnapshotAllowed = new AtomicBoolean(true);
    private final AtomicBoolean mTransferLeaderAllowed = new AtomicBoolean(false);
    private final RaftPrimarySelector mPrimarySelector = new RaftPrimarySelector();
    private final AtomicReference<AsyncJournalWriter> mAsyncJournalWriter = new AtomicReference<>();
    private Map<String, TransferLeaderMessage> mErrorMessages = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    public static long nextCallId() {
        return CALL_ID_COUNTER.getAndIncrement() & UfsJournal.UNKNOWN_SEQUENCE_NUMBER;
    }

    private RaftJournalSystem(RaftJournalConfiguration raftJournalConfiguration) {
        this.mConf = processRaftConfiguration(raftJournalConfiguration);
    }

    private void maybeMigrateOldJournal() {
        File file = new File(this.mConf.getPath(), RAFT_GROUP_UUID.toString());
        File raftJournalDir = RaftJournalUtils.getRaftJournalDir(this.mConf.getPath());
        File file2 = new File(raftJournalDir, RAFT_GROUP_UUID.toString());
        if (!file.isDirectory() || raftJournalDir.exists()) {
            return;
        }
        LOG.info("Old journal detected at {} . moving journal to {}", file, file2);
        if (!raftJournalDir.mkdirs()) {
            LOG.warn("Cannot create journal directory {}", raftJournalDir);
        }
        if (file.renameTo(file2)) {
            return;
        }
        LOG.warn("Failed to move journal from {} to {}", file, file2);
    }

    public static RaftJournalSystem create(RaftJournalConfiguration raftJournalConfiguration) {
        return new RaftJournalSystem(raftJournalConfiguration);
    }

    private RaftJournalConfiguration processRaftConfiguration(RaftJournalConfiguration raftJournalConfiguration) {
        if (raftJournalConfiguration.getClusterAddresses().size() == 1 && !ServerConfiguration.isSetByUser(PropertyKey.MASTER_EMBEDDED_JOURNAL_MIN_ELECTION_TIMEOUT) && !ServerConfiguration.isSetByUser(PropertyKey.MASTER_EMBEDDED_JOURNAL_MAX_ELECTION_TIMEOUT)) {
            LOG.debug("Overriding election timeout to {}ms for single master cluster.", Long.valueOf(SINGLE_MASTER_ELECTION_TIMEOUT_MS));
            raftJournalConfiguration.setElectionMinTimeoutMs(SINGLE_MASTER_ELECTION_TIMEOUT_MS);
            raftJournalConfiguration.setElectionMaxTimeoutMs(1000L);
        }
        raftJournalConfiguration.validate();
        return raftJournalConfiguration;
    }

    public synchronized RaftPeerId getLocalPeerId() {
        return this.mPeerId;
    }

    private synchronized void initServer() throws IOException {
        LOG.debug("Creating journal with max segment size {}", Long.valueOf(this.mConf.getMaxLogSize()));
        if (this.mStateMachine != null) {
            this.mStateMachine.close();
        }
        this.mStateMachine = new JournalStateMachine(this.mJournals, this, this.mConf.getMaxConcurrencyPoolSize());
        RaftProperties raftProperties = new RaftProperties();
        Parameters parameters = new Parameters();
        RaftConfigKeys.Rpc.setType(raftProperties, SupportedRpcType.GRPC);
        GrpcConfigKeys.Server.setPort(raftProperties, this.mConf.getLocalAddress().getPort());
        maybeMigrateOldJournal();
        RaftServerConfigKeys.setStorageDir(raftProperties, Collections.singletonList(RaftJournalUtils.getRaftJournalDir(this.mConf.getPath())));
        RaftServerConfigKeys.Log.setSegmentSizeMax(raftProperties, SizeInBytes.valueOf(this.mConf.getMaxLogSize()));
        RaftServerConfigKeys.Log.Appender.setBufferByteLimit(raftProperties, SizeInBytes.valueOf(ServerConfiguration.global().getBytes(PropertyKey.MASTER_EMBEDDED_JOURNAL_ENTRY_SIZE_MAX)));
        RaftServerConfigKeys.Write.setByteLimit(raftProperties, SizeInBytes.valueOf(ServerConfiguration.global().getBytes(PropertyKey.MASTER_EMBEDDED_JOURNAL_FLUSH_SIZE_MAX)));
        RaftServerConfigKeys.Log.setQueueByteLimit(raftProperties, (int) ServerConfiguration.global().getBytes(PropertyKey.MASTER_EMBEDDED_JOURNAL_FLUSH_SIZE_MAX));
        TimeDuration valueOf = TimeDuration.valueOf(this.mConf.getMinElectionTimeoutMs(), TimeUnit.MILLISECONDS);
        TimeDuration valueOf2 = TimeDuration.valueOf(this.mConf.getMaxElectionTimeoutMs(), TimeUnit.MILLISECONDS);
        RaftServerConfigKeys.Rpc.setTimeoutMin(raftProperties, valueOf);
        RaftServerConfigKeys.Rpc.setTimeoutMax(raftProperties, valueOf2);
        RaftServerConfigKeys.Rpc.setRequestTimeout(raftProperties, TimeDuration.valueOf(ServerConfiguration.global().getMs(PropertyKey.MASTER_EMBEDDED_JOURNAL_TRANSPORT_REQUEST_TIMEOUT_MS), TimeUnit.MILLISECONDS));
        RaftServerConfigKeys.RetryCache.setExpiryTime(raftProperties, TimeDuration.valueOf(ServerConfiguration.getMs(PropertyKey.MASTER_EMBEDDED_JOURNAL_RETRY_CACHE_EXPIRY_TIME), TimeUnit.MILLISECONDS));
        RaftServerConfigKeys.Snapshot.setRetentionFileNum(raftProperties, 3);
        RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(raftProperties, true);
        RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(raftProperties, ServerConfiguration.global().getLong(PropertyKey.MASTER_JOURNAL_CHECKPOINT_PERIOD_ENTRIES));
        RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(raftProperties, false);
        RaftServerConfigKeys.Rpc.setSlownessTimeout(raftProperties, TimeDuration.valueOf(UfsJournal.UNKNOWN_SEQUENCE_NUMBER, TimeUnit.MILLISECONDS));
        RaftServerConfigKeys.LeaderElection.setLeaderStepDownWaitTime(raftProperties, TimeDuration.valueOf(UfsJournal.UNKNOWN_SEQUENCE_NUMBER, TimeUnit.MILLISECONDS));
        GrpcConfigKeys.setMessageSizeMax(raftProperties, SizeInBytes.valueOf(ServerConfiguration.global().getBytes(PropertyKey.MASTER_EMBEDDED_JOURNAL_TRANSPORT_MAX_INBOUND_MESSAGE_SIZE)));
        RatisDropwizardExports.registerRatisMetricReporters(this.mRatisMetricsMap);
        this.mServer = RaftServer.newBuilder().setServerId(this.mPeerId).setGroup(this.mRaftGroup).setStateMachine(this.mStateMachine).setProperties(raftProperties).setParameters(parameters).build();
        super.registerMetrics();
        MetricsSystem.registerGaugeIfAbsent(MetricKey.CLUSTER_LEADER_INDEX.getName(), () -> {
            return Integer.valueOf(getLeaderIndex());
        });
        MetricsSystem.registerGaugeIfAbsent(MetricKey.MASTER_ROLE_ID.getName(), () -> {
            return Integer.valueOf(getRoleId());
        });
        MetricsSystem.registerGaugeIfAbsent(MetricKey.CLUSTER_LEADER_ID.getName(), () -> {
            return getLeaderId();
        });
    }

    @VisibleForTesting
    public synchronized RaftGroup getCurrentGroup() {
        try {
            Iterator it = this.mServer.getGroups().iterator();
            Preconditions.checkState(it.hasNext(), "no group info found");
            RaftGroup raftGroup = (RaftGroup) it.next();
            Preconditions.checkState(raftGroup.getGroupId() == RAFT_GROUP_ID, String.format("Invalid group id %s, expecting %s", raftGroup.getGroupId(), RAFT_GROUP_ID));
            return raftGroup;
        } catch (IOException | IllegalStateException e) {
            LogUtils.warnWithException(LOG, "Failed to get raft group, falling back to initial group", new Object[]{e});
            return this.mRaftGroup;
        }
    }

    private RaftClient createClient() {
        RaftProperties raftProperties = new RaftProperties();
        Parameters parameters = new Parameters();
        RaftClientConfigKeys.Rpc.setRequestTimeout(raftProperties, TimeDuration.valueOf(15L, TimeUnit.SECONDS));
        return RaftClient.newBuilder().setRaftGroup(this.mRaftGroup).setClientId(this.mClientId).setLeaderId((RaftPeerId) null).setProperties(raftProperties).setParameters(parameters).setRetryPolicy(ExponentialBackoffRetry.newBuilder().setBaseSleepTime(TimeDuration.valueOf(100L, TimeUnit.MILLISECONDS)).setMaxAttempts(10).setMaxSleepTime(TimeDuration.valueOf(this.mConf.getMaxElectionTimeoutMs(), TimeUnit.MILLISECONDS)).build()).build();
    }

    @Override // alluxio.master.journal.JournalSystem
    public synchronized Journal createJournal(Master master) {
        RaftJournal raftJournal = new RaftJournal(master, this.mConf.getPath().toURI(), this.mAsyncJournalWriter);
        this.mJournals.put(master.getName(), raftJournal);
        return raftJournal;
    }

    @Override // alluxio.master.journal.JournalSystem
    public synchronized void gainPrimacy() {
        LOG.info("Gaining primacy.");
        this.mSnapshotAllowed.set(false);
        RaftJournalAppender raftJournalAppender = new RaftJournalAppender(this.mServer, this::createClient, this.mRawClientId, ServerConfiguration.global());
        Runnable runnable = () -> {
            try {
                raftJournalAppender.close();
            } catch (IOException e) {
                LOG.warn("Failed to close raft client: {}", e.toString());
            }
        };
        try {
            catchUp(this.mStateMachine, raftJournalAppender);
            long upgrade = this.mStateMachine.upgrade() + 1;
            Preconditions.checkState(this.mRaftJournalWriter == null);
            this.mRaftJournalWriter = new RaftJournalWriter(upgrade, raftJournalAppender);
            this.mAsyncJournalWriter.set(new AsyncJournalWriter(this.mRaftJournalWriter, () -> {
                return getJournalSinks(null);
            }));
            this.mTransferLeaderAllowed.set(true);
            LOG.info("Gained primacy.");
        } catch (InterruptedException e) {
            runnable.run();
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        } catch (TimeoutException e2) {
            runnable.run();
            throw new RuntimeException(e2);
        }
    }

    @Override // alluxio.master.journal.JournalSystem
    public synchronized void losePrimacy() {
        LOG.info("Losing primacy.");
        if (this.mServer.getLifeCycleState() != LifeCycle.State.RUNNING) {
            return;
        }
        this.mTransferLeaderAllowed.set(false);
        try {
            this.mAsyncJournalWriter.get().close();
            this.mRaftJournalWriter.close();
        } catch (IOException e) {
            LOG.warn("Error closing journal writer: {}", e.toString());
        } finally {
            this.mAsyncJournalWriter.set(null);
            this.mRaftJournalWriter = null;
        }
        LOG.info("Shutting down Raft server");
        try {
            this.mServer.close();
            LOG.info("Shut down Raft server");
            try {
                this.mSnapshotAllowed.set(true);
                initServer();
                LOG.info("Bootstrapping new Raft server");
                try {
                    this.mServer.start();
                    LOG.info("Raft server successfully restarted and lost primacy");
                } catch (IOException e2) {
                    throw new IllegalStateException(String.format("Fatal error: failed to start Raft cluster with addresses %s while stepping down", this.mConf.getClusterAddresses()), e2);
                }
            } catch (IOException e3) {
                throw new IllegalStateException(String.format("Fatal error: failed to init Raft cluster with addresses %s while stepping down", this.mConf.getClusterAddresses()), e3);
            }
        } catch (IOException e4) {
            throw new IllegalStateException("Fatal error: failed to leave Raft cluster while stepping down", e4);
        }
    }

    @Override // alluxio.master.journal.JournalSystem
    public synchronized Map<String, Long> getCurrentSequenceNumbers() {
        Preconditions.checkState(this.mStateMachine != null, "State machine not initialized");
        long lastAppliedSequenceNumber = this.mStateMachine.getLastAppliedSequenceNumber();
        HashMap hashMap = new HashMap();
        Iterator it = this.mJournals.keySet().iterator();
        while (it.hasNext()) {
            hashMap.put((String) it.next(), Long.valueOf(lastAppliedSequenceNumber));
        }
        return hashMap;
    }

    @Override // alluxio.master.journal.JournalSystem
    public synchronized void suspend(Runnable runnable) throws IOException {
        this.mSnapshotAllowed.set(false);
        this.mStateMachine.suspend(runnable);
    }

    @Override // alluxio.master.journal.JournalSystem
    public synchronized void resume() throws IOException {
        try {
            this.mStateMachine.resume();
        } finally {
            this.mSnapshotAllowed.set(true);
        }
    }

    public synchronized boolean isSuspended() {
        return this.mStateMachine.isSuspended();
    }

    @Override // alluxio.master.journal.JournalSystem
    public synchronized CatchupFuture catchup(Map<String, Long> map) {
        List list = (List) map.values().stream().distinct().collect(Collectors.toList());
        Preconditions.checkState(list.size() == 1, "incorrect journal sequences");
        return this.mStateMachine.catchup(((Long) list.get(0)).longValue());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Message toRaftMessage(Journal.JournalEntry journalEntry) {
        return Message.valueOf(UnsafeByteOperations.unsafeWrap(new JournalEntryCommand(journalEntry).getSerializedJournalEntry()));
    }

    @Override // alluxio.master.journal.JournalSystem
    public synchronized void checkpoint() throws IOException {
        try {
            try {
                try {
                    RaftJournalAppender raftJournalAppender = new RaftJournalAppender(this.mServer, this::createClient, this.mRawClientId, ServerConfiguration.global());
                    Throwable th = null;
                    try {
                        this.mSnapshotAllowed.set(true);
                        catchUp(this.mStateMachine, raftJournalAppender);
                        this.mStateMachine.takeLocalSnapshot();
                        if (raftJournalAppender != null) {
                            if (0 != 0) {
                                try {
                                    raftJournalAppender.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                raftJournalAppender.close();
                            }
                        }
                    } catch (Throwable th3) {
                        if (raftJournalAppender != null) {
                            if (0 != 0) {
                                try {
                                    raftJournalAppender.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                raftJournalAppender.close();
                            }
                        }
                        throw th3;
                    }
                } catch (TimeoutException e) {
                    LOG.warn("Timeout while performing snapshot: {}", e.toString());
                    throw new IOException("Timeout while performing snapshot", e);
                }
            } catch (InterruptedException e2) {
                LOG.warn("Interrupted while performing snapshot: {}", e2.toString());
                Thread.currentThread().interrupt();
                throw new CancelledException("Interrupted while performing snapshot", e2);
            }
        } finally {
            this.mSnapshotAllowed.set(false);
        }
    }

    @Override // alluxio.master.journal.JournalSystem
    public synchronized Map<ServiceType, GrpcService> getJournalServices() {
        HashMap hashMap = new HashMap();
        hashMap.put(ServiceType.RAFT_JOURNAL_SERVICE, new GrpcService(new RaftJournalServiceHandler(this.mStateMachine.getSnapshotReplicationManager())));
        return hashMap;
    }

    private void catchUp(JournalStateMachine journalStateMachine, RaftJournalAppender raftJournalAppender) throws TimeoutException, InterruptedException {
        RaftException raftException;
        long currentTimeMillis = System.currentTimeMillis();
        long ms = ServerConfiguration.global().getMs(PropertyKey.MASTER_EMBEDDED_JOURNAL_CATCHUP_RETRY_WAIT);
        CommonUtils.waitFor("snapshotting to finish", () -> {
            return Boolean.valueOf(!journalStateMachine.isSnapshotting());
        }, WaitForOptions.defaults().setTimeoutMs(600000));
        OptionalLong empty = OptionalLong.empty();
        try {
            synchronized (this) {
                RaftPeerId id = this.mServer.getId();
                Optional findFirst = getGroupInfo().getCommitInfos().stream().filter(commitInfoProto -> {
                    return id.equals(RaftPeerId.valueOf(commitInfoProto.getServer().getId()));
                }).findFirst();
                if (!findFirst.isPresent()) {
                    throw new IOException("Commit info was not present. Couldn't find the current server's latest commit");
                }
                empty = OptionalLong.of(((RaftProtos.CommitInfoProto) findFirst.get()).getCommitIndex());
            }
        } catch (IOException e) {
            LogUtils.warnWithException(LOG, "Failed to get raft log information before replay. Replay statistics will not be available", new Object[]{e});
        }
        RaftJournalProgressLogger raftJournalProgressLogger = new RaftJournalProgressLogger(this.mStateMachine, empty);
        while (this.mPrimarySelector.getState() == PrimarySelector.State.PRIMARY) {
            long lastAppliedSequenceNumber = journalStateMachine.getLastAppliedSequenceNumber();
            long nextLong = ThreadLocalRandom.current().nextLong(Long.MIN_VALUE, 0L);
            LOG.info("Performing catchup. Last applied SN: {}. Catchup ID: {}", Long.valueOf(lastAppliedSequenceNumber), Long.valueOf(nextLong));
            try {
                raftException = raftJournalAppender.sendAsync(toRaftMessage(Journal.JournalEntry.newBuilder().setSequenceNumber(nextLong).build()), TimeDuration.valueOf(5L, TimeUnit.SECONDS)).get(5L, TimeUnit.SECONDS).getException();
            } catch (IOException | ExecutionException | TimeoutException e2) {
                raftException = e2;
            }
            if (raftException != null) {
                if (raftException instanceof LeaderNotReadyException) {
                    raftJournalProgressLogger.logProgress();
                } else {
                    LOG.info("Exception submitting term start entry: {}", raftException.toString());
                }
                Thread.sleep(ms);
            } else {
                try {
                    CommonUtils.waitFor("term start entry " + nextLong + " to be applied to state machine", () -> {
                        return Boolean.valueOf(journalStateMachine.getLastPrimaryStartSequenceNumber() == nextLong);
                    }, WaitForOptions.defaults().setInterval(1000).setTimeoutMs(5000));
                    try {
                        CommonUtils.waitFor("check primacySN " + nextLong + " and lastAppliedSN " + lastAppliedSequenceNumber + " to be applied to leader", () -> {
                            return Boolean.valueOf(journalStateMachine.getLastAppliedSequenceNumber() == lastAppliedSequenceNumber && journalStateMachine.getLastPrimaryStartSequenceNumber() == nextLong);
                        }, WaitForOptions.defaults().setInterval(1000).setTimeoutMs((int) this.mConf.getMaxElectionTimeoutMs()));
                        LOG.info("Caught up in {}ms. Last sequence number from previous term: {}.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Long.valueOf(journalStateMachine.getLastAppliedSequenceNumber()));
                        return;
                    } catch (TimeoutException e3) {
                    }
                } catch (TimeoutException e4) {
                    LOG.info(e4.toString());
                }
            }
        }
    }

    @Override // alluxio.master.journal.AbstractJournalSystem
    public synchronized void startInternal() throws InterruptedException, IOException {
        LOG.info("Initializing Raft Journal System");
        this.mPeerId = RaftJournalUtils.getPeerId(this.mConf.getLocalAddress());
        this.mRaftGroup = RaftGroup.valueOf(RAFT_GROUP_ID, (Set) this.mConf.getClusterAddresses().stream().map(inetSocketAddress -> {
            return RaftPeer.newBuilder().setId(RaftJournalUtils.getPeerId(inetSocketAddress)).setAddress(inetSocketAddress).build();
        }).collect(Collectors.toSet()));
        initServer();
        super.registerMetrics();
        List<InetSocketAddress> clusterAddresses = this.mConf.getClusterAddresses();
        LOG.info("Starting Raft journal system. Cluster addresses: {}. Local address: {}", clusterAddresses, this.mConf.getLocalAddress());
        long currentTimeMillis = System.currentTimeMillis();
        try {
            this.mServer.start();
            LOG.info("Started Raft Journal System in {}ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            joinQuorum();
        } catch (IOException e) {
            ExceptionMessage exceptionMessage = ExceptionMessage.FAILED_RAFT_BOOTSTRAP;
            Object[] objArr = new Object[2];
            objArr[0] = Arrays.toString(clusterAddresses.toArray());
            objArr[1] = e.getCause() == null ? e : e.getCause().toString();
            throw new IOException(exceptionMessage.getMessage(objArr), e.getCause());
        }
    }

    private void joinQuorum() {
        InetSocketAddress localAddress = this.mConf.getLocalAddress();
        AddQuorumServerRequest build = AddQuorumServerRequest.newBuilder().setServerAddress(NetAddress.newBuilder().setHost(localAddress.getHostString()).setRpcPort(localAddress.getPort())).build();
        RaftClient createClient = createClient();
        createClient.async().sendReadOnly(Message.valueOf(UnsafeByteOperations.unsafeWrap(JournalQueryRequest.newBuilder().setAddQuorumServerRequest(build).build().toByteArray()))).whenComplete((raftClientReply, th) -> {
            if (th != null) {
                LogUtils.warnWithException(LOG, "Exception occurred while joining quorum", new Object[]{th});
            }
            if (raftClientReply != null && raftClientReply.getException() != null) {
                LogUtils.warnWithException(LOG, "Received an error while joining quorum", new Object[]{raftClientReply.getException()});
            }
            try {
                createClient.close();
            } catch (IOException e) {
                LogUtils.warnWithException(LOG, "Exception occurred closing raft client", new Object[]{e});
            }
        });
    }

    @Override // alluxio.master.journal.AbstractJournalSystem
    public synchronized void stopInternal() throws InterruptedException, IOException {
        LOG.info("Shutting down raft journal");
        if (this.mRaftJournalWriter != null) {
            this.mRaftJournalWriter.close();
        }
        try {
            this.mServer.close();
            LOG.info("Journal shutdown complete");
        } catch (IOException e) {
            throw new RuntimeException("Failed to shut down Raft server", e);
        }
    }

    public synchronized List<QuorumServerInfo> getQuorumServerInfoList() throws IOException {
        LinkedList linkedList = new LinkedList();
        GroupInfoReply groupInfo = getGroupInfo();
        if (groupInfo == null) {
            throw new UnavailableException("Cannot get raft group info");
        }
        if (groupInfo.getException() != null) {
            throw groupInfo.getException();
        }
        RaftProtos.RoleInfoProto roleInfoProto = groupInfo.getRoleInfoProto();
        if (roleInfoProto == null) {
            throw new UnavailableException("Cannot get server role info");
        }
        RaftProtos.LeaderInfoProto leaderInfo = roleInfoProto.getLeaderInfo();
        if (leaderInfo == null) {
            throw new UnavailableException("Cannot get server leader info");
        }
        for (RaftProtos.ServerRpcProto serverRpcProto : leaderInfo.getFollowerInfoList()) {
            HostAndPort fromString = HostAndPort.fromString(serverRpcProto.getId().getAddress());
            linkedList.add(QuorumServerInfo.newBuilder().setIsLeader(false).setPriority(serverRpcProto.getId().getPriority()).setServerAddress(NetAddress.newBuilder().setHost(fromString.getHost()).setRpcPort(fromString.getPort()).build()).setServerState(serverRpcProto.getLastRpcElapsedTimeMs() > this.mConf.getMaxElectionTimeoutMs() ? QuorumServerState.UNAVAILABLE : QuorumServerState.AVAILABLE).build());
        }
        InetSocketAddress localAddress = this.mConf.getLocalAddress();
        linkedList.add(QuorumServerInfo.newBuilder().setIsLeader(true).setPriority(roleInfoProto.getSelf().getPriority()).setServerAddress(NetAddress.newBuilder().setHost(localAddress.getHostString()).setRpcPort(localAddress.getPort()).build()).setServerState(QuorumServerState.AVAILABLE).build());
        linkedList.sort(Comparator.comparing(quorumServerInfo -> {
            return quorumServerInfo.getServerAddress().toString();
        }));
        return linkedList;
    }

    public synchronized CompletableFuture<RaftClientReply> sendMessageAsync(RaftPeerId raftPeerId, Message message) {
        RaftClient createClient = createClient();
        return createClient.getClientRpc().sendRequestAsync(RaftClientRequest.newBuilder().setClientId(this.mRawClientId).setServerId(raftPeerId).setGroupId(RAFT_GROUP_ID).setCallId(nextCallId()).setMessage(message).setType(RaftClientRequest.staleReadRequestType(0L)).setSlidingWindowEntry((RaftProtos.SlidingWindowEntry) null).build()).whenComplete((raftClientReply, th) -> {
            try {
                createClient.close();
            } catch (IOException e) {
                throw new CompletionException(e);
            }
        });
    }

    private GroupInfoReply getGroupInfo() throws IOException {
        return getRaftServer().getGroupInfo(new GroupInfoRequest(this.mRawClientId, getLocalPeerId(), RAFT_GROUP_ID, nextCallId()));
    }

    @VisibleForTesting
    public synchronized boolean isLeader() {
        return this.mServer != null && this.mServer.getLifeCycleState() == LifeCycle.State.RUNNING && this.mPrimarySelector.getState() == PrimarySelector.State.PRIMARY;
    }

    public synchronized void removeQuorumServer(NetAddress netAddress) throws IOException {
        RaftPeerId peerId = RaftJournalUtils.getPeerId(InetSocketAddress.createUnresolved(netAddress.getHost(), netAddress.getRpcPort()));
        RaftClient createClient = createClient();
        Throwable th = null;
        try {
            RaftClientReply configuration = createClient.admin().setConfiguration((List) ((RaftGroup) this.mServer.getGroups().iterator().next()).getPeers().stream().filter(raftPeer -> {
                return !raftPeer.getId().equals(peerId);
            }).collect(Collectors.toList()));
            if (configuration.getException() != null) {
                throw configuration.getException();
            }
            if (createClient != null) {
                if (0 == 0) {
                    createClient.close();
                    return;
                }
                try {
                    createClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createClient != null) {
                if (0 != 0) {
                    try {
                        createClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createClient.close();
                }
            }
            throw th3;
        }
    }

    public synchronized void resetPriorities() throws IOException {
        ArrayList arrayList = new ArrayList();
        Iterator it = this.mRaftGroup.getPeers().iterator();
        while (it.hasNext()) {
            arrayList.add(RaftPeer.newBuilder((RaftPeer) it.next()).setPriority(1).build());
        }
        LOG.info("Resetting RaftPeer priorities");
        RaftClient createClient = createClient();
        Throwable th = null;
        try {
            try {
                processReply(createClient.admin().setConfiguration(arrayList), "failed to reset master priorities to 1");
                if (createClient != null) {
                    if (0 == 0) {
                        createClient.close();
                        return;
                    }
                    try {
                        createClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createClient != null) {
                if (th != null) {
                    try {
                        createClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createClient.close();
                }
            }
            throw th4;
        }
    }

    public synchronized String transferLeadership(NetAddress netAddress) {
        InetSocketAddress createUnresolved;
        ArrayList<RaftPeer> arrayList;
        String address2String;
        boolean andSet = this.mTransferLeaderAllowed.getAndSet(false);
        String uuid = UUID.randomUUID().toString();
        if (!andSet) {
            this.mErrorMessages.put(uuid, TransferLeaderMessage.newBuilder().setMsg("transfer is not allowed at the moment because the master is " + (this.mRaftJournalWriter == null ? "still gaining primacy" : "already transferring the ") + "leadership").build());
            return uuid;
        }
        try {
            createUnresolved = InetSocketAddress.createUnresolved(netAddress.getHost(), netAddress.getRpcPort());
            arrayList = new ArrayList(this.mRaftGroup.getPeers());
            address2String = NetUtils.address2String(createUnresolved);
        } catch (Throwable th) {
            this.mTransferLeaderAllowed.set(true);
            LOG.warn(th.getMessage());
            this.mErrorMessages.put(uuid, TransferLeaderMessage.newBuilder().setMsg(th.getMessage()).build());
        }
        if (arrayList.stream().map((v0) -> {
            return v0.getAddress();
        }).noneMatch(str -> {
            return str.equals(address2String);
        })) {
            throw new IOException(String.format("<%s> is not part of the quorum <%s>.", address2String, arrayList.stream().map((v0) -> {
                return v0.getAddress();
            }).collect(Collectors.toList())));
        }
        if (address2String.equals(this.mRaftGroup.getPeer(this.mPeerId).getAddress())) {
            throw new IOException(String.format("%s is already the leader", address2String));
        }
        RaftPeerId peerId = RaftJournalUtils.getPeerId(createUnresolved);
        ArrayList arrayList2 = new ArrayList();
        for (RaftPeer raftPeer : arrayList) {
            arrayList2.add(RaftPeer.newBuilder(raftPeer).setPriority(raftPeer.getId().equals(peerId) ? 2 : 1).build());
        }
        RaftClient createClient = createClient();
        Throwable th2 = null;
        try {
            try {
                LOG.info("Applying new peer state before transferring leadership: {}", "[" + ((String) arrayList2.stream().map((v0) -> {
                    return v0.toString();
                }).collect(Collectors.joining(", "))) + "]");
                processReply(createClient.admin().setConfiguration(arrayList2), "failed to set master priorities before initiating election");
                LOG.info("Transferring leadership to master with address <{}> and with RaftPeerId <{}>", createUnresolved, peerId);
                new Thread(() -> {
                    try {
                        Thread.sleep(3000L);
                        processReply(createClient.admin().transferLeadership(peerId, 30000L), "election failed");
                    } catch (Throwable th3) {
                        LOG.error("caught an error when executing transfer: {}", th3.getMessage());
                        this.mTransferLeaderAllowed.set(true);
                        this.mErrorMessages.put(uuid, TransferLeaderMessage.newBuilder().setMsg(th3.getMessage()).build());
                    }
                }).start();
                LOG.info("Transferring leadership initiated");
                if (createClient != null) {
                    if (0 != 0) {
                        try {
                            createClient.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        createClient.close();
                    }
                }
                return uuid;
            } finally {
            }
        } finally {
        }
    }

    private void processReply(RaftClientReply raftClientReply, String str) throws IOException {
        if (raftClientReply.isSuccess()) {
            return;
        }
        LOG.error("{}. Error: {}", str, raftClientReply.getException() != null ? raftClientReply.getException() : new IOException(String.format("reply <%s> failed", raftClientReply)));
        throw new IOException(str);
    }

    public synchronized TransferLeaderMessage getTransferLeaderMessage(String str) {
        return this.mErrorMessages.get(str) != null ? this.mErrorMessages.get(str) : TransferLeaderMessage.newBuilder().setMsg("").build();
    }

    public synchronized void addQuorumServer(NetAddress netAddress) throws IOException {
        InetSocketAddress createUnresolved = InetSocketAddress.createUnresolved(netAddress.getHost(), netAddress.getRpcPort());
        RaftPeerId peerId = RaftJournalUtils.getPeerId(createUnresolved);
        Collection peers = ((RaftGroup) this.mServer.getGroups().iterator().next()).getPeers();
        if (peers.stream().anyMatch(raftPeer -> {
            return raftPeer.getId().equals(peerId);
        })) {
            return;
        }
        RaftPeer build = RaftPeer.newBuilder().setId(peerId).setAddress(createUnresolved).build();
        ArrayList arrayList = new ArrayList(peers);
        arrayList.add(build);
        RaftClientReply configuration = this.mServer.setConfiguration(new SetConfigurationRequest(this.mRawClientId, this.mPeerId, RAFT_GROUP_ID, nextCallId(), arrayList));
        if (configuration.getException() != null) {
            throw configuration.getException();
        }
    }

    @Override // alluxio.master.journal.JournalSystem
    public synchronized boolean isEmpty() {
        return this.mRaftJournalWriter != null && this.mRaftJournalWriter.getNextSequenceNumberToWrite() == 0;
    }

    @Override // alluxio.master.journal.JournalSystem
    public boolean isFormatted() {
        return this.mConf.getPath().exists();
    }

    @Override // alluxio.master.journal.JournalSystem
    public void format() throws IOException {
        File path = this.mConf.getPath();
        if (path.isDirectory()) {
            if (!FileUtils.isStorageDirAccessible(path.getPath())) {
                throw new AccessDeniedException(path.getPath());
            }
            org.apache.commons.io.FileUtils.cleanDirectory(path);
        } else {
            if (path.exists()) {
                org.apache.commons.io.FileUtils.forceDelete(path);
            }
            if (!path.mkdirs()) {
                throw new AccessDeniedException(path.getPath());
            }
        }
    }

    public PrimarySelector getPrimarySelector() {
        return this.mPrimarySelector;
    }

    public boolean isSnapshotAllowed() {
        return this.mSnapshotAllowed.get();
    }

    public void notifyLeadershipStateChanged(boolean z) {
        this.mPrimarySelector.notifyStateChanged(z ? PrimarySelector.State.PRIMARY : PrimarySelector.State.STANDBY);
    }

    @VisibleForTesting
    synchronized RaftServer getRaftServer() {
        return this.mServer;
    }

    public synchronized void updateGroup() {
        RaftGroup currentGroup = getCurrentGroup();
        if (currentGroup.equals(this.mRaftGroup)) {
            return;
        }
        LOG.info("Raft group updated: old {}, new {}", this.mRaftGroup, currentGroup);
        this.mRaftGroup = currentGroup;
    }

    @Nullable
    private RaftProtos.RoleInfoProto getRaftRoleInfo() {
        GroupInfoReply groupInfoReply = null;
        try {
            groupInfoReply = getGroupInfo();
        } catch (IOException e) {
            LOG.error("Error while getting RAFT group info", e);
        }
        if (groupInfoReply == null || groupInfoReply.getException() != null) {
            return null;
        }
        return groupInfoReply.getRoleInfoProto();
    }

    public int getRoleId() {
        RaftProtos.RoleInfoProto raftRoleInfo = getRaftRoleInfo();
        if (raftRoleInfo != null) {
            return raftRoleInfo.getRoleValue();
        }
        return -1;
    }

    public String getLeaderId() {
        RaftProtos.RoleInfoProto raftRoleInfo = getRaftRoleInfo();
        if (raftRoleInfo == null) {
            return WAITING_FOR_ELECTION;
        }
        if (raftRoleInfo.getRole() == RaftProtos.RaftPeerRole.LEADER) {
            return getLocalPeerId().toString();
        }
        RaftProtos.FollowerInfoProto followerInfo = raftRoleInfo.getFollowerInfo();
        return (followerInfo == null || followerInfo.getLeaderInfo().getId() == null || followerInfo.getLeaderInfo().getId().getId() == null) ? WAITING_FOR_ELECTION : followerInfo.getLeaderInfo().getId().getId().toStringUtf8();
    }

    protected int getLeaderIndex() {
        String leaderId = getLeaderId();
        if (WAITING_FOR_ELECTION.equals(leaderId)) {
            return -1;
        }
        String replace = leaderId.replace('_', ':');
        int i = 0;
        Iterator<InetSocketAddress> it = this.mConf.getClusterAddresses().iterator();
        while (it.hasNext()) {
            if (it.next().toString().equals(replace)) {
                return i;
            }
            i++;
        }
        return -1;
    }
}
