package kafka.tools;

import com.typesafe.scalalogging.Logger;
import java.io.PrintStream;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import kafka.tools.ConsoleConsumer;
import kafka.utils.CommandLineUtils$;
import kafka.utils.Exit$;
import kafka.utils.Implicits;
import kafka.utils.Implicits$;
import kafka.utils.Log4jControllerRegistration$;
import kafka.utils.Logging;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.pulsar.kafka.shade.org.tukaani.xz.common.Util;
import scala.Function0;
import scala.None$;
import scala.Option$;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0$mcV$sp;

/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/kafka_2.13-2.7.0.jar:kafka/tools/ConsoleConsumer$.class
 */
/* compiled from: ConsoleConsumer.scala */
/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.11.jar:META-INF/bundled-dependencies/kafka_2.13-2.7.0.jar:kafka/tools/ConsoleConsumer$.class */
public final class ConsoleConsumer$ implements Logging {
    public static final ConsoleConsumer$ MODULE$ = new ConsoleConsumer$();
    private static int messageCount;
    private static final CountDownLatch shutdownLatch;
    private static Logger logger;
    private static String logIdent;
    private static volatile boolean bitmap$0;

    static {
        ConsoleConsumer$ consoleConsumer$ = MODULE$;
        Log4jControllerRegistration$ log4jControllerRegistration$ = Log4jControllerRegistration$.MODULE$;
        messageCount = 0;
        shutdownLatch = new CountDownLatch(1);
    }

    @Override // kafka.utils.Logging
    public String loggerName() {
        String loggerName;
        loggerName = loggerName();
        return loggerName;
    }

    @Override // kafka.utils.Logging
    public String msgWithLogIdent(String str) {
        String msgWithLogIdent;
        msgWithLogIdent = msgWithLogIdent(str);
        return msgWithLogIdent;
    }

    @Override // kafka.utils.Logging
    public void trace(Function0<String> function0) {
        trace(function0);
    }

    @Override // kafka.utils.Logging
    public void trace(Function0<String> function0, Function0<Throwable> function02) {
        trace(function0, function02);
    }

    @Override // kafka.utils.Logging
    public boolean isDebugEnabled() {
        boolean isDebugEnabled;
        isDebugEnabled = isDebugEnabled();
        return isDebugEnabled;
    }

    @Override // kafka.utils.Logging
    public boolean isTraceEnabled() {
        boolean isTraceEnabled;
        isTraceEnabled = isTraceEnabled();
        return isTraceEnabled;
    }

    @Override // kafka.utils.Logging
    public void debug(Function0<String> function0) {
        debug(function0);
    }

    @Override // kafka.utils.Logging
    public void debug(Function0<String> function0, Function0<Throwable> function02) {
        debug(function0, function02);
    }

    @Override // kafka.utils.Logging
    public void info(Function0<String> function0) {
        info(function0);
    }

    @Override // kafka.utils.Logging
    public void info(Function0<String> function0, Function0<Throwable> function02) {
        info(function0, function02);
    }

    @Override // kafka.utils.Logging
    public void warn(Function0<String> function0) {
        warn(function0);
    }

    @Override // kafka.utils.Logging
    public void warn(Function0<String> function0, Function0<Throwable> function02) {
        warn(function0, function02);
    }

    @Override // kafka.utils.Logging
    public void error(Function0<String> function0) {
        error(function0);
    }

    @Override // kafka.utils.Logging
    public void error(Function0<String> function0, Function0<Throwable> function02) {
        error(function0, function02);
    }

    @Override // kafka.utils.Logging
    public void fatal(Function0<String> function0) {
        fatal(function0);
    }

    @Override // kafka.utils.Logging
    public void fatal(Function0<String> function0, Function0<Throwable> function02) {
        fatal(function0, function02);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v7 */
    private Logger logger$lzycompute() {
        Logger logger2;
        ?? r0 = this;
        synchronized (r0) {
            if (!bitmap$0) {
                logger2 = logger();
                logger = logger2;
                r0 = 1;
                bitmap$0 = true;
            }
            return logger;
        }
    }

    @Override // kafka.utils.Logging
    public Logger logger() {
        return !bitmap$0 ? logger$lzycompute() : logger;
    }

    @Override // kafka.utils.Logging
    public String logIdent() {
        return logIdent;
    }

    @Override // kafka.utils.Logging
    public void logIdent_$eq(String str) {
        logIdent = str;
    }

    public int messageCount() {
        return messageCount;
    }

    public void messageCount_$eq(int i) {
        messageCount = i;
    }

    private CountDownLatch shutdownLatch() {
        return shutdownLatch;
    }

    public void main(String[] strArr) {
        String msgWithLogIdent;
        String msgWithLogIdent2;
        try {
            run(new ConsoleConsumer.ConsumerConfig(strArr));
        } catch (AuthenticationException e) {
            if (logger().underlying().isErrorEnabled()) {
                org.slf4j.Logger underlying = logger().underlying();
                msgWithLogIdent2 = msgWithLogIdent("Authentication failed: terminating consumer process");
                underlying.error(msgWithLogIdent2, (Throwable) e);
            }
            Exit$ exit$ = Exit$.MODULE$;
            Exit$ exit$2 = Exit$.MODULE$;
            throw exit$.exit(1, None$.MODULE$);
        } catch (Throwable th) {
            if (logger().underlying().isErrorEnabled()) {
                org.slf4j.Logger underlying2 = logger().underlying();
                msgWithLogIdent = msgWithLogIdent("Unknown error when running consumer: ");
                underlying2.error(msgWithLogIdent, th);
            }
            Exit$ exit$3 = Exit$.MODULE$;
            Exit$ exit$4 = Exit$.MODULE$;
            throw exit$3.exit(1, None$.MODULE$);
        }
    }

    public void run(ConsoleConsumer.ConsumerConfig consumerConfig) {
        long timeoutMs = consumerConfig.timeoutMs() >= 0 ? consumerConfig.timeoutMs() : Util.VLI_MAX;
        KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerProps(consumerConfig), (Deserializer) new ByteArrayDeserializer(), (Deserializer) new ByteArrayDeserializer());
        ConsoleConsumer.ConsumerWrapper consumerWrapper = consumerConfig.partitionArg().isDefined() ? new ConsoleConsumer.ConsumerWrapper(Option$.MODULE$.apply(consumerConfig.topicArg()), consumerConfig.partitionArg(), Option$.MODULE$.apply(Long.valueOf(consumerConfig.offsetArg())), None$.MODULE$, kafkaConsumer, timeoutMs) : new ConsoleConsumer.ConsumerWrapper(Option$.MODULE$.apply(consumerConfig.topicArg()), None$.MODULE$, None$.MODULE$, Option$.MODULE$.apply(consumerConfig.whitelistArg()), kafkaConsumer, timeoutMs);
        addShutdownHook(consumerWrapper, consumerConfig);
        try {
            process(Integer.valueOf(consumerConfig.maxMessages()), consumerConfig.formatter(), consumerWrapper, System.out, consumerConfig.skipMessageOnError());
        } finally {
            consumerWrapper.cleanup();
            consumerConfig.formatter().close();
            reportRecordCount();
            shutdownLatch().countDown();
        }
    }

    public void addShutdownHook(ConsoleConsumer.ConsumerWrapper consumerWrapper, ConsoleConsumer.ConsumerConfig consumerConfig) {
        Exit$ exit$ = Exit$.MODULE$;
        JFunction0$mcV$sp jFunction0$mcV$sp = () -> {
            consumerWrapper.wakeup();
            MODULE$.shutdownLatch().await();
            if (consumerConfig.enableSystestEventsLogging()) {
                System.out.println("shutdown_complete");
            }
        };
        Exit.addShutdownHook("consumer-shutdown-hook", () -> {
            Exit$.$anonfun$addShutdownHook$1(r1);
        });
    }

    public void process(Integer num, MessageFormatter messageFormatter, ConsoleConsumer.ConsumerWrapper consumerWrapper, PrintStream printStream, boolean z) {
        String msgWithLogIdent;
        String msgWithLogIdent2;
        do {
            if (messageCount() >= BoxesRunTime.unboxToInt(num) && !BoxesRunTime.equalsNumObject(num, -1)) {
                return;
            }
            try {
                ConsumerRecord<byte[], byte[]> receive = consumerWrapper.receive();
                messageCount_$eq(messageCount() + 1);
                try {
                    messageFormatter.writeTo(new ConsumerRecord<>(receive.topic(), receive.partition(), receive.offset(), receive.timestamp(), receive.timestampType(), 0L, 0, 0, receive.key(), receive.value(), receive.headers()), printStream);
                } finally {
                    if (z) {
                    }
                }
            } catch (WakeupException unused) {
                if (logger().underlying().isTraceEnabled()) {
                    org.slf4j.Logger underlying = logger().underlying();
                    msgWithLogIdent2 = msgWithLogIdent("Caught WakeupException because consumer is shutdown, ignore and terminate.");
                    underlying.trace(msgWithLogIdent2);
                    return;
                }
                return;
            } catch (Throwable th) {
                if (logger().underlying().isErrorEnabled()) {
                    org.slf4j.Logger underlying2 = logger().underlying();
                    msgWithLogIdent = msgWithLogIdent("Error processing message, terminating consumer process: ");
                    underlying2.error(msgWithLogIdent, th);
                    return;
                }
                return;
            }
        } while (!checkErr(printStream, messageFormatter));
    }

    public void reportRecordCount() {
        System.err.println(new StringBuilder(30).append("Processed a total of ").append(messageCount()).append(" messages").toString());
    }

    public boolean checkErr(PrintStream printStream, MessageFormatter messageFormatter) {
        boolean checkError = printStream.checkError();
        if (checkError) {
            System.err.println("Unable to write to standard out, closing consumer.");
        }
        return checkError;
    }

    public Properties consumerProps(ConsoleConsumer.ConsumerConfig consumerConfig) {
        Properties properties = new Properties();
        Implicits$ implicits$ = Implicits$.MODULE$;
        new Implicits.PropertiesOps(properties).$plus$plus$eq(consumerConfig.consumerProps());
        Implicits$ implicits$2 = Implicits$.MODULE$;
        new Implicits.PropertiesOps(properties).$plus$plus$eq(consumerConfig.extraConsumerProps());
        setAutoOffsetResetValue(consumerConfig, properties);
        properties.put("bootstrap.servers", consumerConfig.bootstrapServer());
        CommandLineUtils$.MODULE$.maybeMergeOptions(properties, ConsumerConfig.ISOLATION_LEVEL_CONFIG, consumerConfig.options(), consumerConfig.isolationLevelOpt());
        return properties;
    }

    public void setAutoOffsetResetValue(ConsoleConsumer.ConsumerConfig consumerConfig, Properties properties) {
        if (!properties.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) {
            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerConfig.options().has(consumerConfig.resetBeginningOpt()) ? "earliest" : "latest");
            return;
        }
        String property = properties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
        if (!consumerConfig.options().has(consumerConfig.resetBeginningOpt()) || "earliest".equals(property)) {
            return;
        }
        System.err.println(new StringBuilder(24).append(new StringBuilder(72).append("Can't simultaneously specify --from-beginning and 'auto.offset.reset=").append(property).append("', ").toString()).append("please remove one option").toString());
        Exit$ exit$ = Exit$.MODULE$;
        Exit$ exit$2 = Exit$.MODULE$;
        throw exit$.exit(1, None$.MODULE$);
    }

    public static final /* synthetic */ String $anonfun$main$1() {
        return "Authentication failed: terminating consumer process";
    }

    public static final /* synthetic */ AuthenticationException $anonfun$main$2(AuthenticationException authenticationException) {
        return authenticationException;
    }

    public static final /* synthetic */ String $anonfun$main$3() {
        return "Unknown error when running consumer: ";
    }

    public static final /* synthetic */ Throwable $anonfun$main$4(Throwable th) {
        return th;
    }

    public static final /* synthetic */ String $anonfun$process$1() {
        return "Caught WakeupException because consumer is shutdown, ignore and terminate.";
    }

    public static final /* synthetic */ String $anonfun$process$2() {
        return "Error processing message, terminating consumer process: ";
    }

    public static final /* synthetic */ Throwable $anonfun$process$3(Throwable th) {
        return th;
    }

    public static final /* synthetic */ String $anonfun$process$4() {
        return "Error processing message, skipping this message: ";
    }

    public static final /* synthetic */ Throwable $anonfun$process$5(Throwable th) {
        return th;
    }

    private ConsoleConsumer$() {
    }
}
