package org.apache.distributedlog.admin;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.distributedlog.LogRecordWithDLSN;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.ReadUtils;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.ZooKeeperClientBuilder;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.common.util.SchedulerUtils;
import org.apache.distributedlog.exceptions.DLIllegalStateException;
import org.apache.distributedlog.impl.acl.ZKAccessControl;
import org.apache.distributedlog.impl.metadata.BKDLConfig;
import org.apache.distributedlog.metadata.DLMetadata;
import org.apache.distributedlog.metadata.DryrunLogSegmentMetadataStoreUpdater;
import org.apache.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
import org.apache.distributedlog.metadata.MetadataUpdater;
import org.apache.distributedlog.namespace.NamespaceDriver;
import org.apache.distributedlog.thrift.AccessControlEntry;
import org.apache.distributedlog.tools.DistributedLogTool;
import org.apache.distributedlog.tools.Tool;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.shade.org.apache.bookkeeper.stream.protocol.ProtocolConstants;
import org.apache.pulsar.shade.org.apache.bookkeeper.util.IOUtils;
import org.apache.pulsar.shade.org.apache.commons.cli.CommandLine;
import org.apache.pulsar.shade.org.apache.commons.cli.Options;
import org.apache.pulsar.shade.org.apache.commons.cli.ParseException;
import org.apache.pulsar.shade.org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.pulsar.shade.org.apache.zookeeper.KeeperException;
import org.apache.pulsar.shade.org.glassfish.jersey.message.internal.Quality;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/admin/DistributedLogAdmin.class */
public class DistributedLogAdmin extends DistributedLogTool {
    static final Logger LOG = LoggerFactory.getLogger(DistributedLogAdmin.class);
    private static final Comparator<LogSegmentCandidate> LOG_SEGMENT_CANDIDATE_COMPARATOR = new Comparator<LogSegmentCandidate>() { // from class: org.apache.distributedlog.admin.DistributedLogAdmin.1
        @Override // java.util.Comparator
        public int compare(LogSegmentCandidate logSegmentCandidate, LogSegmentCandidate logSegmentCandidate2) {
            return LogSegmentMetadata.COMPARATOR.compare(logSegmentCandidate.metadata, logSegmentCandidate2.metadata);
        }
    };

    /* loaded from: input_file:org/apache/distributedlog/admin/DistributedLogAdmin$BindCommand.class */
    class BindCommand extends Tool.OptsCommand {
        Options options;

        BindCommand() {
            super("bind", "bind the bookkeeper environment settings for a given distributedlog instance.");
            this.options = new Options();
            this.options.addOption("l", "bkLedgers", true, "ZooKeeper ledgers path for bookkeeper instance.");
            this.options.addOption("s", "bkZkServers", true, "ZooKeeper servers used for bookkeeper for writers.");
            this.options.addOption("bkzr", "bkZkServersForReader", true, "ZooKeeper servers used for bookkeeper for readers.");
            this.options.addOption("dlzw", "dlZkServersForWriter", true, "ZooKeeper servers used for distributedlog for writers.");
            this.options.addOption("dlzr", "dlZkServersForReader", true, "ZooKeeper servers used for distributedlog for readers.");
            this.options.addOption("i", "sanityCheckTxnID", true, "Flag to sanity check highest txn id.");
            this.options.addOption("r", "encodeRegionID", true, "Flag to encode region id.");
            this.options.addOption("seqno", "firstLogSegmentSeqNo", true, "The first log segment sequence number to use after upgrade");
            this.options.addOption("fns", "federatedNamespace", false, "Flag to turn a namespace to federated namespace");
            this.options.addOption("f", "force", false, "Force binding without prompt.");
            this.options.addOption("c", "creation", false, "Whether is it a creation binding.");
            this.options.addOption(Quality.QUALITY_PARAMETER_NAME, "query", false, "Query the bookkeeper bindings");
        }

        @Override // org.apache.distributedlog.tools.Tool.OptsCommand
        protected Options getOptions() {
            return this.options;
        }

        @Override // org.apache.distributedlog.tools.Tool.OptsCommand
        protected String getUsage() {
            return "bind [options] <distributedlog uri>";
        }

        /* JADX WARN: Removed duplicated region for block: B:60:0x0331 A[EXC_TOP_SPLITTER, SYNTHETIC] */
        @Override // org.apache.distributedlog.tools.Tool.OptsCommand
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        protected int runCmd(org.apache.pulsar.shade.org.apache.commons.cli.CommandLine r9) throws java.lang.Exception {
            /*
                Method dump skipped, instructions count: 850
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.distributedlog.admin.DistributedLogAdmin.BindCommand.runCmd(org.apache.pulsar.shade.org.apache.commons.cli.CommandLine):int");
        }
    }

    /* loaded from: input_file:org/apache/distributedlog/admin/DistributedLogAdmin$DLCKCommand.class */
    static class DLCKCommand extends DistributedLogTool.PerDLCommand {
        boolean dryrun;
        boolean verbose;
        int concurrency;

        DLCKCommand() {
            super("dlck", "Check and repair a distributedlog namespace");
            this.dryrun = false;
            this.verbose = false;
            this.concurrency = 1;
            this.options.addOption("d", "dryrun", false, "Dry run without repairing");
            this.options.addOption("v", "verbose", false, "Print verbose messages");
            this.options.addOption("cy", "concurrency", true, "Concurrency on checking streams");
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.distributedlog.tools.DistributedLogTool.PerDLCommand
        public void parseCommandLine(CommandLine commandLine) throws ParseException {
            super.parseCommandLine(commandLine);
            this.dryrun = commandLine.hasOption("d");
            this.verbose = commandLine.hasOption("v");
            if (commandLine.hasOption("cy")) {
                try {
                    this.concurrency = Integer.parseInt(commandLine.getOptionValue("cy"));
                } catch (NumberFormatException e) {
                    throw new ParseException("Invalid concurrency value : " + commandLine.getOptionValue("cy"));
                }
            }
        }

        @Override // org.apache.distributedlog.tools.DistributedLogTool.PerDLCommand
        protected int runCmd() throws Exception {
            MetadataUpdater dryrunLogSegmentMetadataStoreUpdater = this.dryrun ? new DryrunLogSegmentMetadataStoreUpdater(getConf(), getLogSegmentMetadataStore()) : LogSegmentMetadataStoreUpdater.createMetadataUpdater(getConf(), getLogSegmentMetadataStore());
            OrderedScheduler build = OrderedScheduler.newSchedulerBuilder().name("dlck-scheduler").numThreads(Runtime.getRuntime().availableProcessors()).build();
            ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
            try {
                DistributedLogAdmin.checkAndRepairDLNamespace(getUri(), getNamespace(), dryrunLogSegmentMetadataStoreUpdater, build, this.verbose, !getForce(), this.concurrency);
                SchedulerUtils.shutdownScheduler(newCachedThreadPool, 5L, TimeUnit.MINUTES);
                return 0;
            } catch (Throwable th) {
                SchedulerUtils.shutdownScheduler(newCachedThreadPool, 5L, TimeUnit.MINUTES);
                throw th;
            }
        }

        @Override // org.apache.distributedlog.tools.Tool.OptsCommand
        protected String getUsage() {
            return "dlck [options]";
        }
    }

    /* loaded from: input_file:org/apache/distributedlog/admin/DistributedLogAdmin$DeleteStreamACLCommand.class */
    static class DeleteStreamACLCommand extends DistributedLogTool.PerDLCommand {
        String stream;

        DeleteStreamACLCommand() {
            super("delete_stream_acl", "Delete ACL for a given stream");
            this.stream = null;
            this.options.addOption("s", ProtocolConstants.SCHEME, true, "Stream to set ACL");
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.distributedlog.tools.DistributedLogTool.PerDLCommand
        public void parseCommandLine(CommandLine commandLine) throws ParseException {
            super.parseCommandLine(commandLine);
            if (!commandLine.hasOption("s")) {
                throw new ParseException("No stream to set ACL");
            }
            this.stream = commandLine.getOptionValue("s");
        }

        @Override // org.apache.distributedlog.tools.DistributedLogTool.PerDLCommand
        protected int runCmd() throws Exception {
            BKDLConfig resolveDLConfig = BKDLConfig.resolveDLConfig(getZooKeeperClient(), getUri());
            if (null == resolveDLConfig.getACLRootPath()) {
                System.err.println("ACL isn't enabled for namespace " + getUri());
                return -1;
            }
            ZKAccessControl.delete(getZooKeeperClient(), getUri() + "/" + resolveDLConfig.getACLRootPath() + "/" + this.stream);
            return 0;
        }

        @Override // org.apache.distributedlog.tools.Tool.OptsCommand
        protected String getUsage() {
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/distributedlog/admin/DistributedLogAdmin$LogSegmentCandidate.class */
    public static class LogSegmentCandidate {
        final LogSegmentMetadata metadata;
        final LogRecordWithDLSN lastRecord;

        LogSegmentCandidate(LogSegmentMetadata logSegmentMetadata, LogRecordWithDLSN logRecordWithDLSN) {
            this.metadata = logSegmentMetadata;
            this.lastRecord = logRecordWithDLSN;
        }

        public String toString() {
            return "LogSegmentCandidate[ metadata = " + this.metadata + ", last record = " + this.lastRecord + " ]";
        }
    }

    /* loaded from: input_file:org/apache/distributedlog/admin/DistributedLogAdmin$RepairSeqNoCommand.class */
    static class RepairSeqNoCommand extends DistributedLogTool.PerDLCommand {
        boolean dryrun;
        boolean verbose;
        final List<String> streams;

        RepairSeqNoCommand() {
            super("repairseqno", "Repair a stream whose inprogress log segment has lower sequence number.");
            this.dryrun = false;
            this.verbose = false;
            this.streams = new ArrayList();
            this.options.addOption("d", "dryrun", false, "Dry run without repairing");
            this.options.addOption("l", "list", true, "List of streams to repair, separated by comma");
            this.options.addOption("v", "verbose", false, "Print verbose messages");
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.distributedlog.tools.DistributedLogTool.PerDLCommand
        public void parseCommandLine(CommandLine commandLine) throws ParseException {
            super.parseCommandLine(commandLine);
            this.dryrun = commandLine.hasOption("d");
            this.verbose = commandLine.hasOption("v");
            this.force = !this.dryrun && commandLine.hasOption("f");
            if (!commandLine.hasOption("l")) {
                throw new ParseException("No streams provided to repair");
            }
            Collections.addAll(this.streams, commandLine.getOptionValue("l").split(","));
        }

        @Override // org.apache.distributedlog.tools.DistributedLogTool.PerDLCommand
        protected int runCmd() throws Exception {
            MetadataUpdater dryrunLogSegmentMetadataStoreUpdater = this.dryrun ? new DryrunLogSegmentMetadataStoreUpdater(getConf(), getLogSegmentMetadataStore()) : LogSegmentMetadataStoreUpdater.createMetadataUpdater(getConf(), getLogSegmentMetadataStore());
            System.out.println("List of streams : ");
            System.out.println(this.streams);
            if (!IOUtils.confirmPrompt("Do you want to repair all these streams (Y/N):")) {
                return -1;
            }
            Iterator<String> it = this.streams.iterator();
            while (it.hasNext()) {
                DistributedLogAdmin.fixInprogressSegmentWithLowerSequenceNumber(getNamespace(), dryrunLogSegmentMetadataStoreUpdater, it.next(), this.verbose, !getForce());
            }
            return 0;
        }

        @Override // org.apache.distributedlog.tools.Tool.OptsCommand
        protected String getUsage() {
            return "repairseqno [options]";
        }
    }

    /* loaded from: input_file:org/apache/distributedlog/admin/DistributedLogAdmin$SetACLCommand.class */
    static abstract class SetACLCommand extends DistributedLogTool.PerDLCommand {
        boolean denyWrite;
        boolean denyTruncate;
        boolean denyDelete;
        boolean denyAcquire;
        boolean denyRelease;

        protected SetACLCommand(String str, String str2) {
            super(str, str2);
            this.denyWrite = false;
            this.denyTruncate = false;
            this.denyDelete = false;
            this.denyAcquire = false;
            this.denyRelease = false;
            this.options.addOption("dw", "deny-write", false, "Deny write/bulkWrite requests");
            this.options.addOption("dt", "deny-truncate", false, "Deny truncate requests");
            this.options.addOption("dd", "deny-delete", false, "Deny delete requests");
            this.options.addOption("da", "deny-acquire", false, "Deny acquire requests");
            this.options.addOption("dr", "deny-release", false, "Deny release requests");
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.distributedlog.tools.DistributedLogTool.PerDLCommand
        public void parseCommandLine(CommandLine commandLine) throws ParseException {
            super.parseCommandLine(commandLine);
            this.denyWrite = commandLine.hasOption("dw");
            this.denyTruncate = commandLine.hasOption("dt");
            this.denyDelete = commandLine.hasOption("dd");
            this.denyAcquire = commandLine.hasOption("da");
            this.denyRelease = commandLine.hasOption("dr");
        }

        protected abstract String getZKPath(String str);

        protected ZKAccessControl getZKAccessControl(ZooKeeperClient zooKeeperClient, String str) throws Exception {
            ZKAccessControl zKAccessControl;
            try {
                zKAccessControl = (ZKAccessControl) FutureUtils.result(ZKAccessControl.read(zooKeeperClient, str, null));
            } catch (KeeperException.NoNodeException e) {
                zKAccessControl = new ZKAccessControl(new AccessControlEntry(), str);
            }
            return zKAccessControl;
        }

        protected void setZKAccessControl(ZooKeeperClient zooKeeperClient, ZKAccessControl zKAccessControl) throws Exception {
            if (null == zooKeeperClient.get().exists(zKAccessControl.getZKPath(), false)) {
                zKAccessControl.create(zooKeeperClient);
            } else {
                zKAccessControl.update(zooKeeperClient);
            }
        }

        @Override // org.apache.distributedlog.tools.DistributedLogTool.PerDLCommand
        protected int runCmd() throws Exception {
            BKDLConfig resolveDLConfig = BKDLConfig.resolveDLConfig(getZooKeeperClient(), getUri());
            if (null == resolveDLConfig.getACLRootPath()) {
                System.err.println("ACL isn't enabled for namespace " + getUri());
                return -1;
            }
            ZKAccessControl zKAccessControl = getZKAccessControl(getZooKeeperClient(), getZKPath(getUri().getPath() + "/" + resolveDLConfig.getACLRootPath()));
            AccessControlEntry accessControlEntry = zKAccessControl.getAccessControlEntry();
            accessControlEntry.setDenyWrite(this.denyWrite);
            accessControlEntry.setDenyTruncate(this.denyTruncate);
            accessControlEntry.setDenyDelete(this.denyDelete);
            accessControlEntry.setDenyAcquire(this.denyAcquire);
            accessControlEntry.setDenyRelease(this.denyRelease);
            setZKAccessControl(getZooKeeperClient(), zKAccessControl);
            return 0;
        }
    }

    /* loaded from: input_file:org/apache/distributedlog/admin/DistributedLogAdmin$SetDefaultACLCommand.class */
    static class SetDefaultACLCommand extends SetACLCommand {
        SetDefaultACLCommand() {
            super("set_default_acl", "Set Default ACL for a namespace");
        }

        @Override // org.apache.distributedlog.admin.DistributedLogAdmin.SetACLCommand
        protected String getZKPath(String str) {
            return str;
        }

        @Override // org.apache.distributedlog.tools.Tool.OptsCommand
        protected String getUsage() {
            return "set_default_acl [options]";
        }
    }

    /* loaded from: input_file:org/apache/distributedlog/admin/DistributedLogAdmin$SetStreamACLCommand.class */
    static class SetStreamACLCommand extends SetACLCommand {
        String stream;

        SetStreamACLCommand() {
            super("set_stream_acl", "Set Default ACL for a given stream");
            this.stream = null;
            this.options.addOption("s", ProtocolConstants.SCHEME, true, "Stream to set ACL");
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.distributedlog.admin.DistributedLogAdmin.SetACLCommand, org.apache.distributedlog.tools.DistributedLogTool.PerDLCommand
        public void parseCommandLine(CommandLine commandLine) throws ParseException {
            super.parseCommandLine(commandLine);
            if (!commandLine.hasOption("s")) {
                throw new ParseException("No stream to set ACL");
            }
            this.stream = commandLine.getOptionValue("s");
        }

        @Override // org.apache.distributedlog.admin.DistributedLogAdmin.SetACLCommand
        protected String getZKPath(String str) {
            return str + "/" + this.stream;
        }

        @Override // org.apache.distributedlog.tools.Tool.OptsCommand
        protected String getUsage() {
            return "set_stream_acl [options]";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/distributedlog/admin/DistributedLogAdmin$StreamCandidate.class */
    public static class StreamCandidate {
        final String streamName;
        final SortedSet<LogSegmentCandidate> segmentCandidates = new TreeSet(DistributedLogAdmin.LOG_SEGMENT_CANDIDATE_COMPARATOR);

        StreamCandidate(String str) {
            this.streamName = str;
        }

        synchronized void addLogSegmentCandidate(LogSegmentCandidate logSegmentCandidate) {
            this.segmentCandidates.add(logSegmentCandidate);
        }

        public String toString() {
            return "StreamCandidate[ name = " + this.streamName + ", segments = " + this.segmentCandidates + " ]";
        }
    }

    /* loaded from: input_file:org/apache/distributedlog/admin/DistributedLogAdmin$UnbindCommand.class */
    class UnbindCommand extends Tool.OptsCommand {
        Options options;

        UnbindCommand() {
            super("unbind", "unbind the bookkeeper environment bound for a given distributedlog instance.");
            this.options = new Options();
            this.options.addOption("f", "force", false, "Force unbinding without prompt.");
        }

        @Override // org.apache.distributedlog.tools.Tool.OptsCommand
        protected Options getOptions() {
            return this.options;
        }

        @Override // org.apache.distributedlog.tools.Tool.OptsCommand
        protected String getUsage() {
            return "unbind [options] <distributedlog uri>";
        }

        @Override // org.apache.distributedlog.tools.Tool.OptsCommand
        protected int runCmd(CommandLine commandLine) throws Exception {
            BKDLConfig bKDLConfig;
            String[] args = commandLine.getArgs();
            if (args.length <= 0) {
                System.err.println("No distributedlog uri specified.");
                printUsage();
                return -1;
            }
            boolean hasOption = commandLine.hasOption("f");
            URI create = URI.create(args[0]);
            try {
                bKDLConfig = BKDLConfig.resolveDLConfig(ZooKeeperClientBuilder.newBuilder().uri(create).zkAclId(null).sessionTimeoutMs(10000).build(), create);
            } catch (IOException e) {
                bKDLConfig = null;
            }
            if (null == bKDLConfig) {
                System.out.println("No bookkeeper is bound to " + create);
                return 0;
            }
            System.out.println("There is bookkeeper bound to " + create + " : ");
            System.out.println("");
            System.out.println(bKDLConfig.toString());
            System.out.println("");
            if (!hasOption && !IOUtils.confirmPrompt("Do you want to unbind " + create + " :\n")) {
                return 0;
            }
            DLMetadata.unbind(create);
            System.out.println("Unbound on " + create + DefaultExpressionEngine.DEFAULT_PROPERTY_DELIMITER);
            return 0;
        }
    }

    public static void fixInprogressSegmentWithLowerSequenceNumber(Namespace namespace, MetadataUpdater metadataUpdater, String str, boolean z, boolean z2) throws Exception {
        DistributedLogManager openLog = namespace.openLog(str);
        try {
            List<LogSegmentMetadata> logSegments = openLog.getLogSegments();
            if (z) {
                System.out.println("LogSegments for " + str + " : ");
                for (LogSegmentMetadata logSegmentMetadata : logSegments) {
                    System.out.println(logSegmentMetadata.getLogSegmentSequenceNumber() + "\t: " + logSegmentMetadata);
                }
            }
            LOG.info("Get log segments for {} : {}", str, logSegments);
            long j = -1;
            LogSegmentMetadata logSegmentMetadata2 = null;
            for (LogSegmentMetadata logSegmentMetadata3 : logSegments) {
                if (!logSegmentMetadata3.isInProgress()) {
                    j = Math.max(j, logSegmentMetadata3.getLogSegmentSequenceNumber());
                } else {
                    if (null != logSegmentMetadata2) {
                        throw new DLIllegalStateException("Multiple inprogress segments found for stream " + str + " : " + logSegments);
                    }
                    logSegmentMetadata2 = logSegmentMetadata3;
                }
            }
            if (null == logSegmentMetadata2 || logSegmentMetadata2.getLogSegmentSequenceNumber() > j) {
                return;
            }
            long j2 = j + 1;
            if (z2 && !IOUtils.confirmPrompt("Confirm to fix (Y/N), Ctrl+C to break : ")) {
                openLog.close();
                return;
            }
            LogSegmentMetadata logSegmentMetadata4 = (LogSegmentMetadata) FutureUtils.result(metadataUpdater.changeSequenceNumber(logSegmentMetadata2, j2));
            LOG.info("Fixed {} : {} -> {} ", new Object[]{str, logSegmentMetadata2, logSegmentMetadata4});
            if (z) {
                System.out.println("Fixed " + str + " : " + logSegmentMetadata2.getZNodeName() + " -> " + logSegmentMetadata4.getZNodeName());
                System.out.println("\t old: " + logSegmentMetadata2);
                System.out.println("\t new: " + logSegmentMetadata4);
                System.out.println();
            }
            openLog.close();
        } finally {
            openLog.close();
        }
    }

    public static void checkAndRepairDLNamespace(URI uri, Namespace namespace, MetadataUpdater metadataUpdater, OrderedScheduler orderedScheduler, boolean z, boolean z2) throws Exception {
        checkAndRepairDLNamespace(uri, namespace, metadataUpdater, orderedScheduler, z, z2, 1);
    }

    public static void checkAndRepairDLNamespace(URI uri, Namespace namespace, MetadataUpdater metadataUpdater, OrderedScheduler orderedScheduler, boolean z, boolean z2, int i) throws Exception {
        Preconditions.checkArgument(i > 0, "Invalid concurrency " + i + " found.");
        Iterator<String> logs = namespace.getLogs();
        ArrayList newArrayList = Lists.newArrayList();
        while (logs.hasNext()) {
            newArrayList.add(logs.next());
        }
        if (z) {
            System.out.println("- 0. checking streams under " + uri);
        }
        if (newArrayList.size() == 0) {
            System.out.println("+ 0. nothing to check. quit.");
            return;
        }
        Map<String, StreamCandidate> checkStreams = checkStreams(namespace, newArrayList, orderedScheduler, i);
        if (z) {
            System.out.println("+ 0. " + checkStreams.size() + " corrupted streams found.");
        }
        if (!z2 || IOUtils.confirmPrompt("Do you want to fix all " + checkStreams.size() + " corrupted streams (Y/N) : ")) {
            if (z) {
                System.out.println("- 1. repairing " + checkStreams.size() + " corrupted streams.");
            }
            Iterator<StreamCandidate> it = checkStreams.values().iterator();
            while (it.hasNext()) {
                if (!repairStream(metadataUpdater, it.next(), z, z2)) {
                    if (z) {
                        System.out.println("* 1. aborted repairing corrupted streams.");
                        return;
                    }
                    return;
                }
            }
            if (z) {
                System.out.println("+ 1. repaired " + checkStreams.size() + " corrupted streams.");
            }
        }
    }

    private static Map<String, StreamCandidate> checkStreams(final Namespace namespace, Collection<String> collection, final OrderedScheduler orderedScheduler, int i) throws IOException {
        final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        linkedBlockingQueue.addAll(collection);
        final ConcurrentSkipListMap concurrentSkipListMap = new ConcurrentSkipListMap();
        final AtomicInteger atomicInteger = new AtomicInteger(collection.size());
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Runnable runnable = new Runnable() { // from class: org.apache.distributedlog.admin.DistributedLogAdmin.2
            @Override // java.lang.Runnable
            public void run() {
                while (!linkedBlockingQueue.isEmpty()) {
                    try {
                        String str = (String) linkedBlockingQueue.take();
                        try {
                            DistributedLogAdmin.LOG.info("Checking stream {}.", str);
                            StreamCandidate checkStream = DistributedLogAdmin.checkStream(namespace, str, orderedScheduler);
                            DistributedLogAdmin.LOG.info("Checked stream {} - {}.", str, checkStream);
                            if (null != checkStream) {
                                concurrentSkipListMap.put(str, checkStream);
                            }
                            if (atomicInteger.decrementAndGet() == 0) {
                                countDownLatch.countDown();
                            }
                        } catch (Throwable th) {
                            DistributedLogAdmin.LOG.error("Error on checking stream {} : ", str, th);
                            countDownLatch.countDown();
                            return;
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
            }
        };
        Thread[] threadArr = new Thread[i];
        for (int i2 = 0; i2 < i; i2++) {
            threadArr[i2] = new Thread(runnable, "check-thread-" + i2);
            threadArr[i2].start();
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        if (atomicInteger.get() != 0) {
            throw new IOException(atomicInteger.get() + " streams left w/o checked");
        }
        for (int i3 = 0; i3 < i; i3++) {
            threadArr[i3].interrupt();
            try {
                threadArr[i3].join();
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
            }
        }
        return concurrentSkipListMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static StreamCandidate checkStream(Namespace namespace, String str, OrderedScheduler orderedScheduler) throws IOException {
        DistributedLogManager openLog = namespace.openLog(str);
        try {
            List<LogSegmentMetadata> logSegments = openLog.getLogSegments();
            if (logSegments.isEmpty()) {
                return null;
            }
            ArrayList arrayList = new ArrayList(logSegments.size());
            Iterator<LogSegmentMetadata> it = logSegments.iterator();
            while (it.hasNext()) {
                arrayList.add(checkLogSegment(namespace, str, it.next(), orderedScheduler));
            }
            try {
                List<LogSegmentCandidate> list = (List) FutureUtils.result(FutureUtils.collect(arrayList));
                StreamCandidate streamCandidate = new StreamCandidate(str);
                for (LogSegmentCandidate logSegmentCandidate : list) {
                    if (null != logSegmentCandidate) {
                        streamCandidate.addLogSegmentCandidate(logSegmentCandidate);
                    }
                }
                if (streamCandidate.segmentCandidates.isEmpty()) {
                    openLog.close();
                    return null;
                }
                openLog.close();
                return streamCandidate;
            } catch (Exception e) {
                throw new IOException("Failed on checking stream " + str, e);
            }
        } finally {
            openLog.close();
        }
    }

    private static CompletableFuture<LogSegmentCandidate> checkLogSegment(Namespace namespace, String str, final LogSegmentMetadata logSegmentMetadata, OrderedScheduler orderedScheduler) {
        if (logSegmentMetadata.isInProgress()) {
            return FutureUtils.value(null);
        }
        return ReadUtils.asyncReadLastRecord(str, logSegmentMetadata, true, false, true, 4, 16, new AtomicInteger(0), orderedScheduler, namespace.getNamespaceDriver().getLogSegmentEntryStore(NamespaceDriver.Role.READER)).thenApply((Function<? super LogRecordWithDLSN, ? extends U>) new Function<LogRecordWithDLSN, LogSegmentCandidate>() { // from class: org.apache.distributedlog.admin.DistributedLogAdmin.3
            @Override // java.util.function.Function
            public LogSegmentCandidate apply(LogRecordWithDLSN logRecordWithDLSN) {
                if (null == logRecordWithDLSN) {
                    return null;
                }
                if (logRecordWithDLSN.getDlsn().compareTo(LogSegmentMetadata.this.getLastDLSN()) > 0 || logRecordWithDLSN.getTransactionId() > LogSegmentMetadata.this.getLastTxId() || !LogSegmentMetadata.this.isRecordPositionWithinSegmentScope(logRecordWithDLSN)) {
                    return new LogSegmentCandidate(LogSegmentMetadata.this, logRecordWithDLSN);
                }
                return null;
            }
        });
    }

    private static boolean repairStream(MetadataUpdater metadataUpdater, StreamCandidate streamCandidate, boolean z, boolean z2) throws Exception {
        if (z) {
            System.out.println("Stream " + streamCandidate.streamName + " : ");
            for (LogSegmentCandidate logSegmentCandidate : streamCandidate.segmentCandidates) {
                System.out.println("  " + logSegmentCandidate.metadata.getLogSegmentSequenceNumber() + " : metadata = " + logSegmentCandidate.metadata + ", last dlsn = " + logSegmentCandidate.lastRecord.getDlsn());
            }
            System.out.println("-------------------------------------------");
        }
        if (z2 && !IOUtils.confirmPrompt("Do you want to fix the stream " + streamCandidate.streamName + " (Y/N) : ")) {
            return false;
        }
        for (LogSegmentCandidate logSegmentCandidate2 : streamCandidate.segmentCandidates) {
            LogSegmentMetadata logSegmentMetadata = (LogSegmentMetadata) FutureUtils.result(metadataUpdater.updateLastRecord(logSegmentCandidate2.metadata, logSegmentCandidate2.lastRecord));
            if (z) {
                System.out.println(" Fixed segment " + logSegmentCandidate2.metadata.getLogSegmentSequenceNumber() + " : ");
                System.out.println("    old metadata : " + logSegmentCandidate2.metadata);
                System.out.println("    new metadata : " + logSegmentMetadata);
            }
        }
        if (!z) {
            return true;
        }
        System.out.println("-------------------------------------------");
        return true;
    }

    public DistributedLogAdmin() {
        this.commands.clear();
        addCommand(new Tool.HelpCommand());
        addCommand(new BindCommand());
        addCommand(new UnbindCommand());
        addCommand(new RepairSeqNoCommand());
        addCommand(new DLCKCommand());
        addCommand(new SetDefaultACLCommand());
        addCommand(new SetStreamACLCommand());
        addCommand(new DeleteStreamACLCommand());
    }

    @Override // org.apache.distributedlog.tools.DistributedLogTool, org.apache.distributedlog.tools.Tool
    protected String getName() {
        return "dlog_admin";
    }
}
