package kafka.tools;

import com.typesafe.scalalogging.Logger;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import joptsimple.AbstractOptionSpec;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import kafka.utils.CommandLineUtils$;
import kafka.utils.CoreUtils$;
import kafka.utils.Exit$;
import kafka.utils.IncludeList;
import kafka.utils.Log4jControllerRegistration$;
import kafka.utils.Logging;
import kafka.utils.ToolsUtils$;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.slf4j.event.Level;
import scala.C$less$colon$less$;
import scala.Console$;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.IterableOnce;
import scala.collection.IterableOnceOps;
import scala.collection.Seq;
import scala.collection.convert.AsJavaExtensions;
import scala.collection.convert.AsScalaExtensions;
import scala.collection.immutable.Iterable;
import scala.collection.immutable.Map;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.jdk.CollectionConverters$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction0$mcV$sp;

/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.7.jar:META-INF/bundled-dependencies/kafka_2.13-2.7.0.jar:kafka/tools/ReplicaVerificationTool$.class
 */
/* compiled from: ReplicaVerificationTool.scala */
/* loaded from: input_file:META-INF/bundled-dependencies/kafka_2.13-2.7.0.jar:kafka/tools/ReplicaVerificationTool$.class */
public final class ReplicaVerificationTool$ implements Logging {
    public static final ReplicaVerificationTool$ MODULE$ = new ReplicaVerificationTool$();
    private static final String clientId;
    private static final String dateFormatString;
    private static final SimpleDateFormat dateFormat;
    private static Logger logger;
    private static String logIdent;
    private static volatile boolean bitmap$0;

    static {
        ReplicaVerificationTool$ replicaVerificationTool$ = MODULE$;
        Log4jControllerRegistration$ log4jControllerRegistration$ = Log4jControllerRegistration$.MODULE$;
        clientId = "replicaVerificationTool";
        dateFormatString = "yyyy-MM-dd HH:mm:ss,SSS";
        dateFormat = new SimpleDateFormat(MODULE$.dateFormatString());
    }

    @Override // kafka.utils.Logging
    public String loggerName() {
        String loggerName;
        loggerName = loggerName();
        return loggerName;
    }

    @Override // kafka.utils.Logging
    public String msgWithLogIdent(String str) {
        String msgWithLogIdent;
        msgWithLogIdent = msgWithLogIdent(str);
        return msgWithLogIdent;
    }

    @Override // kafka.utils.Logging
    public void trace(Function0<String> function0) {
        trace(function0);
    }

    @Override // kafka.utils.Logging
    public void trace(Function0<String> function0, Function0<Throwable> function02) {
        trace(function0, function02);
    }

    @Override // kafka.utils.Logging
    public boolean isDebugEnabled() {
        boolean isDebugEnabled;
        isDebugEnabled = isDebugEnabled();
        return isDebugEnabled;
    }

    @Override // kafka.utils.Logging
    public boolean isTraceEnabled() {
        boolean isTraceEnabled;
        isTraceEnabled = isTraceEnabled();
        return isTraceEnabled;
    }

    @Override // kafka.utils.Logging
    public void debug(Function0<String> function0) {
        debug(function0);
    }

    @Override // kafka.utils.Logging
    public void debug(Function0<String> function0, Function0<Throwable> function02) {
        debug(function0, function02);
    }

    @Override // kafka.utils.Logging
    public void info(Function0<String> function0) {
        info(function0);
    }

    @Override // kafka.utils.Logging
    public void info(Function0<String> function0, Function0<Throwable> function02) {
        info(function0, function02);
    }

    @Override // kafka.utils.Logging
    public void warn(Function0<String> function0) {
        warn(function0);
    }

    @Override // kafka.utils.Logging
    public void warn(Function0<String> function0, Function0<Throwable> function02) {
        warn(function0, function02);
    }

    @Override // kafka.utils.Logging
    public void error(Function0<String> function0) {
        error(function0);
    }

    @Override // kafka.utils.Logging
    public void error(Function0<String> function0, Function0<Throwable> function02) {
        error(function0, function02);
    }

    @Override // kafka.utils.Logging
    public void fatal(Function0<String> function0) {
        fatal(function0);
    }

    @Override // kafka.utils.Logging
    public void fatal(Function0<String> function0, Function0<Throwable> function02) {
        fatal(function0, function02);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v7 */
    private Logger logger$lzycompute() {
        Logger logger2;
        ?? r0 = this;
        synchronized (r0) {
            if (!bitmap$0) {
                logger2 = logger();
                logger = logger2;
                r0 = 1;
                bitmap$0 = true;
            }
            return logger;
        }
    }

    @Override // kafka.utils.Logging
    public Logger logger() {
        return !bitmap$0 ? logger$lzycompute() : logger;
    }

    @Override // kafka.utils.Logging
    public String logIdent() {
        return logIdent;
    }

    @Override // kafka.utils.Logging
    public void logIdent_$eq(String str) {
        logIdent = str;
    }

    public String clientId() {
        return clientId;
    }

    public String dateFormatString() {
        return dateFormatString;
    }

    public SimpleDateFormat dateFormat() {
        return dateFormat;
    }

    public String getCurrentTimeString() {
        return dateFormat().format(new Date(Time.SYSTEM.milliseconds()));
    }

    public void main(String[] strArr) {
        String msgWithLogIdent;
        String msgWithLogIdent2;
        String msgWithLogIdent3;
        String msgWithLogIdent4;
        String msgWithLogIdent5;
        OptionParser optionParser = new OptionParser(false);
        OptionSpec ofType = optionParser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.").withRequiredArg().describedAs("hostname:port,...,hostname:port").ofType(String.class);
        ArgumentAcceptingOptionSpec defaultsTo = optionParser.accepts("fetch-size", "The fetch size of each request.").withRequiredArg().describedAs("bytes").ofType(Integer.class).defaultsTo(1048576, new Integer[0]);
        ArgumentAcceptingOptionSpec defaultsTo2 = optionParser.accepts("max-wait-ms", "The max amount of time each fetch request waits.").withRequiredArg().describedAs("ms").ofType(Integer.class).defaultsTo(1000, new Integer[0]);
        ArgumentAcceptingOptionSpec defaultsTo3 = optionParser.accepts("topic-white-list", "White list of topics to verify replica consistency. Defaults to all topics.").withRequiredArg().describedAs("Java regex (String)").ofType(String.class).defaultsTo(JmxReporter.DEFAULT_INCLUDE, new String[0]);
        ArgumentAcceptingOptionSpec defaultsTo4 = optionParser.accepts("time", "Timestamp for getting the initial offsets.").withRequiredArg().describedAs("timestamp/-1(latest)/-2(earliest)").ofType(Long.class).defaultsTo(-1L, new Long[0]);
        ArgumentAcceptingOptionSpec defaultsTo5 = optionParser.accepts("report-interval-ms", "The reporting interval.").withRequiredArg().describedAs("ms").ofType(Long.class).defaultsTo(30000L, new Long[0]);
        AbstractOptionSpec<Void> forHelp = optionParser.accepts("help", "Print usage information.").forHelp();
        AbstractOptionSpec<Void> forHelp2 = optionParser.accepts("version", "Print version information and exit.").forHelp();
        OptionSet parse = optionParser.parse(strArr);
        if (strArr.length == 0 || parse.has(forHelp)) {
            throw CommandLineUtils$.MODULE$.printUsageAndDie(optionParser, "Validate that all replicas for a set of topics have the same data.");
        }
        if (parse.has(forHelp2)) {
            throw CommandLineUtils$.MODULE$.printVersionAndDie();
        }
        CommandLineUtils$.MODULE$.checkRequiredArgs(optionParser, parse, ScalaRunTime$.MODULE$.wrapRefArray(new OptionSpec[]{ofType}));
        String str = (String) parse.valueOf(defaultsTo3);
        IncludeList includeList = new IncludeList(str);
        try {
            Pattern.compile(str);
            int intValue = ((Integer) parse.valueOf(defaultsTo)).intValue();
            int intValue2 = ((Integer) parse.valueOf(defaultsTo2)).intValue();
            long longValue = ((Long) parse.valueOf(defaultsTo4)).longValue();
            long longValue2 = ((Long) parse.valueOf(defaultsTo5)).longValue();
            if (logger().underlying().isInfoEnabled()) {
                org.slf4j.Logger underlying = logger().underlying();
                msgWithLogIdent5 = msgWithLogIdent("Getting topic metadata...");
                underlying.info(msgWithLogIdent5);
            }
            String str2 = (String) parse.valueOf(ofType);
            ToolsUtils$.MODULE$.validatePortOrDie(optionParser, str2);
            Admin createAdminClient = createAdminClient(str2);
            try {
                Seq<TopicDescription> listTopicsMetadata = listTopicsMetadata(createAdminClient);
                Map<Object, Node> brokerDetails = brokerDetails(createAdminClient);
                CoreUtils$ coreUtils$ = CoreUtils$.MODULE$;
                JFunction0$mcV$sp jFunction0$mcV$sp = () -> {
                    createAdminClient.close();
                };
                CoreUtils$ coreUtils$2 = CoreUtils$.MODULE$;
                coreUtils$.swallow(jFunction0$mcV$sp, this, Level.WARN);
                Seq filter = listTopicsMetadata.filter(topicDescription -> {
                    return BoxesRunTime.boxToBoolean($anonfun$main$3(includeList, topicDescription));
                });
                if (filter.isEmpty()) {
                    if (logger().underlying().isErrorEnabled()) {
                        org.slf4j.Logger underlying2 = logger().underlying();
                        msgWithLogIdent4 = msgWithLogIdent($anonfun$main$4(defaultsTo3));
                        underlying2.error(msgWithLogIdent4);
                    }
                    Exit$ exit$ = Exit$.MODULE$;
                    Exit$ exit$2 = Exit$.MODULE$;
                    throw exit$.exit(1, None$.MODULE$);
                }
                Seq flatMap = filter.flatMap(topicDescription2 -> {
                    AsScalaExtensions.ListHasAsScala ListHasAsScala;
                    ListHasAsScala = CollectionConverters$.MODULE$.ListHasAsScala(topicDescription2.partitions());
                    return (Buffer) ListHasAsScala.asScala().flatMap(topicPartitionInfo -> {
                        AsScalaExtensions.ListHasAsScala ListHasAsScala2;
                        ListHasAsScala2 = CollectionConverters$.MODULE$.ListHasAsScala(topicPartitionInfo.replicas());
                        return (Buffer) ListHasAsScala2.asScala().map(node -> {
                            return new TopicPartitionReplica(topicDescription2.name(), topicPartitionInfo.partition(), node.id());
                        });
                    });
                });
                if (logger().underlying().isDebugEnabled()) {
                    org.slf4j.Logger underlying3 = logger().underlying();
                    msgWithLogIdent3 = msgWithLogIdent($anonfun$main$8(flatMap));
                    underlying3.debug(msgWithLogIdent3);
                }
                Map map = (Map) flatMap.groupBy(topicPartitionReplica -> {
                    return BoxesRunTime.boxToInteger(topicPartitionReplica.replicaId());
                }).map(tuple2 -> {
                    if (tuple2 == null) {
                        throw new MatchError(null);
                    }
                    int _1$mcI$sp = tuple2._1$mcI$sp();
                    Seq seq = (Seq) tuple2.mo6879_2();
                    Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
                    return new Tuple2(Integer.valueOf(_1$mcI$sp), seq.map(topicPartitionReplica2 -> {
                        return new TopicPartition(topicPartitionReplica2.topic(), topicPartitionReplica2.partitionId());
                    }));
                });
                if (logger().underlying().isDebugEnabled()) {
                    org.slf4j.Logger underlying4 = logger().underlying();
                    msgWithLogIdent2 = msgWithLogIdent($anonfun$main$12(map));
                    underlying4.debug(msgWithLogIdent2);
                }
                Map map2 = (Map) flatMap.groupBy(topicPartitionReplica2 -> {
                    return new TopicPartition(topicPartitionReplica2.topic(), topicPartitionReplica2.partitionId());
                }).map(tuple22 -> {
                    if (tuple22 == null) {
                        throw new MatchError(null);
                    }
                    TopicPartition topicPartition = (TopicPartition) tuple22.mo6880_1();
                    Seq seq = (Seq) tuple22.mo6879_2();
                    Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
                    if (seq == null) {
                        throw null;
                    }
                    return new Tuple2(topicPartition, Integer.valueOf(seq.length()));
                });
                if (logger().underlying().isDebugEnabled()) {
                    org.slf4j.Logger underlying5 = logger().underlying();
                    msgWithLogIdent = msgWithLogIdent($anonfun$main$15(map2));
                    underlying5.debug(msgWithLogIdent);
                }
                Seq flatMap2 = filter.flatMap(topicDescription3 -> {
                    AsScalaExtensions.ListHasAsScala ListHasAsScala;
                    ListHasAsScala = CollectionConverters$.MODULE$.ListHasAsScala(topicDescription3.partitions());
                    return (Buffer) ListHasAsScala.asScala().map(topicPartitionInfo -> {
                        return new TopicPartition(topicDescription3.name(), topicPartitionInfo.partition());
                    });
                });
                Properties consumerConfig = consumerConfig(str2);
                ReplicaBuffer replicaBuffer = new ReplicaBuffer(map2, initialOffsets(flatMap2, consumerConfig, longValue), map.size(), longValue2);
                int _1$mcI$sp = map.mo7085head()._1$mcI$sp();
                AtomicInteger atomicInteger = new AtomicInteger(0);
                Iterable iterable = (Iterable) map.map(tuple23 -> {
                    if (tuple23 == null) {
                        throw new MatchError(null);
                    }
                    int _1$mcI$sp2 = tuple23._1$mcI$sp();
                    return new ReplicaFetcher(new StringBuilder(15).append("ReplicaFetcher-").append(_1$mcI$sp2).toString(), (Node) brokerDetails.mo6899apply((Map) Integer.valueOf(_1$mcI$sp2)), (Seq) tuple23.mo6879_2(), replicaBuffer, 30000, 256000, intValue, intValue2, 1, _1$mcI$sp2 == _1$mcI$sp, consumerConfig, atomicInteger.incrementAndGet());
                });
                Exit$ exit$3 = Exit$.MODULE$;
                JFunction0$mcV$sp jFunction0$mcV$sp2 = () -> {
                    String msgWithLogIdent6;
                    ReplicaVerificationTool$ replicaVerificationTool$ = MODULE$;
                    if (replicaVerificationTool$.logger().underlying().isInfoEnabled()) {
                        org.slf4j.Logger underlying6 = replicaVerificationTool$.logger().underlying();
                        msgWithLogIdent6 = replicaVerificationTool$.msgWithLogIdent("Stopping all fetchers");
                        underlying6.info(msgWithLogIdent6);
                    }
                    iterable.foreach(replicaFetcher -> {
                        replicaFetcher.shutdown();
                        return BoxedUnit.UNIT;
                    });
                };
                Exit.addShutdownHook("ReplicaVerificationToolShutdownHook", () -> {
                    Exit$.$anonfun$addShutdownHook$1(r1);
                });
                iterable.foreach(replicaFetcher -> {
                    replicaFetcher.start();
                    return BoxedUnit.UNIT;
                });
                Console$.MODULE$.println(new StringBuilder(34).append(getCurrentTimeString()).append(": verification process is started.").toString());
            } catch (Throwable th) {
                CoreUtils$ coreUtils$3 = CoreUtils$.MODULE$;
                JFunction0$mcV$sp jFunction0$mcV$sp3 = () -> {
                    createAdminClient.close();
                };
                CoreUtils$ coreUtils$4 = CoreUtils$.MODULE$;
                coreUtils$3.swallow(jFunction0$mcV$sp3, this, Level.WARN);
                throw th;
            }
        } catch (PatternSyntaxException unused) {
            throw new RuntimeException(new StringBuilder(21).append(str).append(" is an invalid regex.").toString());
        }
    }

    private Seq<TopicDescription> listTopicsMetadata(Admin admin) {
        AsScalaExtensions.CollectionHasAsScala CollectionHasAsScala;
        CollectionHasAsScala = CollectionConverters$.MODULE$.CollectionHasAsScala(admin.describeTopics(admin.listTopics(new ListTopicsOptions().listInternal(true)).names().get()).all().get().values());
        scala.collection.Iterable asScala = CollectionHasAsScala.asScala();
        if (asScala == null) {
            throw null;
        }
        return Buffer$.MODULE$.from2((IterableOnce) asScala);
    }

    private Map<Object, Node> brokerDetails(Admin admin) {
        AsScalaExtensions.CollectionHasAsScala CollectionHasAsScala;
        CollectionHasAsScala = CollectionConverters$.MODULE$.CollectionHasAsScala(admin.describeCluster().nodes().get());
        return ((IterableOnceOps) CollectionHasAsScala.asScala().map(node -> {
            return new Tuple2(Integer.valueOf(node.id()), node);
        })).toMap(C$less$colon$less$.MODULE$.refl());
    }

    private Admin createAdminClient(String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        return Admin.create(properties);
    }

    private scala.collection.Map<TopicPartition, Object> initialOffsets(Seq<TopicPartition> seq, Properties properties, long j) {
        AsJavaExtensions.MapHasAsJava MapHasAsJava;
        AsScalaExtensions.MapHasAsScala MapHasAsScala;
        scala.collection.Map<TopicPartition, Object> map;
        AsJavaExtensions.SeqHasAsJava SeqHasAsJava;
        AsScalaExtensions.MapHasAsScala MapHasAsScala2;
        AsJavaExtensions.SeqHasAsJava SeqHasAsJava2;
        AsScalaExtensions.MapHasAsScala MapHasAsScala3;
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        try {
            if (-1 == j) {
                CollectionConverters$ collectionConverters$ = CollectionConverters$.MODULE$;
                SeqHasAsJava2 = CollectionConverters$.MODULE$.SeqHasAsJava(seq);
                MapHasAsScala3 = collectionConverters$.MapHasAsScala(kafkaConsumer.endOffsets(SeqHasAsJava2.asJava()));
                map = (scala.collection.Map) MapHasAsScala3.asScala().map(tuple2 -> {
                    if (tuple2 == null) {
                        throw new MatchError(null);
                    }
                    TopicPartition topicPartition = (TopicPartition) tuple2.mo6880_1();
                    Long l = (Long) tuple2.mo6879_2();
                    Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
                    return new Tuple2(topicPartition, Long.valueOf(l.longValue()));
                });
            } else if (-2 == j) {
                CollectionConverters$ collectionConverters$2 = CollectionConverters$.MODULE$;
                SeqHasAsJava = CollectionConverters$.MODULE$.SeqHasAsJava(seq);
                MapHasAsScala2 = collectionConverters$2.MapHasAsScala(kafkaConsumer.beginningOffsets(SeqHasAsJava.asJava()));
                map = (scala.collection.Map) MapHasAsScala2.asScala().map(tuple22 -> {
                    if (tuple22 == null) {
                        throw new MatchError(null);
                    }
                    TopicPartition topicPartition = (TopicPartition) tuple22.mo6880_1();
                    Long l = (Long) tuple22.mo6879_2();
                    Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
                    return new Tuple2(topicPartition, Long.valueOf(l.longValue()));
                });
            } else {
                scala.collection.Map map2 = seq.map(topicPartition -> {
                    Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
                    return new Tuple2(topicPartition, Long.valueOf(j));
                }).toMap(C$less$colon$less$.MODULE$.refl());
                CollectionConverters$ collectionConverters$3 = CollectionConverters$.MODULE$;
                MapHasAsJava = CollectionConverters$.MODULE$.MapHasAsJava(map2);
                MapHasAsScala = collectionConverters$3.MapHasAsScala(kafkaConsumer.offsetsForTimes(MapHasAsJava.asJava()));
                map = (scala.collection.Map) MapHasAsScala.asScala().map(tuple23 -> {
                    if (tuple23 == null) {
                        throw new MatchError(null);
                    }
                    TopicPartition topicPartition2 = (TopicPartition) tuple23.mo6880_1();
                    OffsetAndTimestamp offsetAndTimestamp = (OffsetAndTimestamp) tuple23.mo6879_2();
                    Predef$ArrowAssoc$ predef$ArrowAssoc$ = Predef$ArrowAssoc$.MODULE$;
                    return new Tuple2(topicPartition2, Long.valueOf(offsetAndTimestamp.offset()));
                });
            }
            return map;
        } finally {
            kafkaConsumer.close();
        }
    }

    private Properties consumerConfig(String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put("group.id", "ReplicaVerification");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return properties;
    }

    private KafkaConsumer<String, String> createConsumer(Properties properties) {
        return new KafkaConsumer<>(properties);
    }

    public static final /* synthetic */ String $anonfun$main$1() {
        return "Getting topic metadata...";
    }

    public static final /* synthetic */ boolean $anonfun$main$3(IncludeList includeList, TopicDescription topicDescription) {
        return includeList.isTopicAllowed(topicDescription.name(), false);
    }

    public static final /* synthetic */ String $anonfun$main$4(ArgumentAcceptingOptionSpec argumentAcceptingOptionSpec) {
        return new StringBuilder(88).append("No topics found. ").append(argumentAcceptingOptionSpec).append(" if specified, is either filtering out all topics or there is no topic.").toString();
    }

    public static final /* synthetic */ String $anonfun$main$8(Seq seq) {
        return new StringBuilder(27).append("Selected topic partitions: ").append(seq).toString();
    }

    public static final /* synthetic */ String $anonfun$main$12(Map map) {
        return new StringBuilder(29).append("Topic partitions per broker: ").append(map).toString();
    }

    public static final /* synthetic */ String $anonfun$main$15(Map map) {
        return new StringBuilder(39).append("Expected replicas per topic partition: ").append(map).toString();
    }

    public static final /* synthetic */ String $anonfun$main$20() {
        return "Stopping all fetchers";
    }

    private ReplicaVerificationTool$() {
    }
}
