package org.apache.kafka.tools;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.debezium.converters.spi.CloudEventsMaker;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.utils.Exit;

/* loaded from: input_file:META-INF/bundled-dependencies/kafka-tools-2.7.2.jar:org/apache/kafka/tools/TransactionalMessageCopier.class */
public class TransactionalMessageCopier {
    private static final DateFormat FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss:SSS");

    private static ArgumentParser argParser() {
        ArgumentParser description = ArgumentParsers.newArgumentParser("transactional-message-copier").defaultHelp(true).description("This tool copies messages transactionally from an input partition to an output topic, committing the consumed offsets along with the output messages");
        description.addArgument("--input-topic").action(Arguments.store()).required(true).type(String.class).metavar("INPUT-TOPIC").dest("inputTopic").help("Consume messages from this topic");
        description.addArgument("--input-partition").action(Arguments.store()).required(true).type(Integer.class).metavar("INPUT-PARTITION").dest("inputPartition").help("Consume messages from this partition of the input topic.");
        description.addArgument("--output-topic").action(Arguments.store()).required(true).type(String.class).metavar("OUTPUT-TOPIC").dest("outputTopic").help("Produce messages to this topic");
        description.addArgument("--broker-list").action(Arguments.store()).required(true).type(String.class).metavar("HOST1:PORT1[,HOST2:PORT2[...]]").dest("brokerList").help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
        description.addArgument("--max-messages").action(Arguments.store()).required(false).setDefault((Object) (-1)).type(Integer.class).metavar("MAX-MESSAGES").dest("maxMessages").help("Process these many messages upto the end offset at the time this program was launched. If set to -1 we will just read to the end offset of the input partition (as of the time the program was launched).");
        description.addArgument("--consumer-group").action(Arguments.store()).required(false).setDefault((Object) (-1)).type(String.class).metavar("CONSUMER-GROUP").dest("consumerGroup").help("The consumer group id to use for storing the consumer offsets.");
        description.addArgument("--transaction-size").action(Arguments.store()).required(false).setDefault((Object) 200).type(Integer.class).metavar("TRANSACTION-SIZE").dest("messagesPerTransaction").help("The number of messages to put in each transaction. Default is 200.");
        description.addArgument("--transaction-timeout").action(Arguments.store()).required(false).setDefault((Object) 60000).type(Integer.class).metavar("TRANSACTION-TIMEOUT").dest("transactionTimeout").help("The transaction timeout in milliseconds. Default is 60000(1 minute).");
        description.addArgument("--transactional-id").action(Arguments.store()).required(true).type(String.class).metavar("TRANSACTIONAL-ID").dest("transactionalId").help("The transactionalId to assign to the producer");
        description.addArgument("--enable-random-aborts").action(Arguments.storeTrue()).type(Boolean.class).metavar("ENABLE-RANDOM-ABORTS").dest("enableRandomAborts").help("Whether or not to enable random transaction aborts (for system testing)");
        description.addArgument("--group-mode").action(Arguments.storeTrue()).type(Boolean.class).metavar("GROUP-MODE").dest("groupMode").help("Whether to let consumer subscribe to the input topic or do manual assign. If we do subscription based consumption, the input partition shall be ignored");
        description.addArgument("--use-group-metadata").action(Arguments.storeTrue()).type(Boolean.class).metavar("USE-GROUP-METADATA").dest("useGroupMetadata").help("Whether to use the new transactional commit API with group metadata");
        return description;
    }

    private static KafkaProducer<String, String> createProducer(Namespace namespace) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", namespace.getString("brokerList"));
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, namespace.getString("transactionalId"));
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "512");
        properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
        properties.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, namespace.getInt("transactionTimeout"));
        return new KafkaProducer<>(properties);
    }

    private static KafkaConsumer<String, String> createConsumer(Namespace namespace) {
        String string = namespace.getString("consumerGroup");
        String string2 = namespace.getString("brokerList");
        Integer num = namespace.getInt("messagesPerTransaction");
        Properties properties = new Properties();
        properties.put("group.id", string);
        properties.put("bootstrap.servers", string2);
        properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, num.toString());
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        properties.put("session.timeout.ms", "10000");
        properties.put("max.poll.interval.ms", "180000");
        properties.put("heartbeat.interval.ms", "3000");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        return new KafkaConsumer<>(properties);
    }

    private static ProducerRecord<String, String> producerRecordFromConsumerRecord(String str, ConsumerRecord<String, String> consumerRecord) {
        return new ProducerRecord<>(str, Integer.valueOf(consumerRecord.partition()), consumerRecord.key(), consumerRecord.value());
    }

    private static Map<TopicPartition, OffsetAndMetadata> consumerPositions(KafkaConsumer<String, String> kafkaConsumer) {
        HashMap hashMap = new HashMap();
        for (TopicPartition topicPartition : kafkaConsumer.assignment()) {
            hashMap.put(topicPartition, new OffsetAndMetadata(kafkaConsumer.position(topicPartition), null));
        }
        return hashMap;
    }

    private static void resetToLastCommittedPositions(KafkaConsumer<String, String> kafkaConsumer) {
        Map<TopicPartition, OffsetAndMetadata> committed = kafkaConsumer.committed(kafkaConsumer.assignment());
        kafkaConsumer.assignment().forEach(topicPartition -> {
            OffsetAndMetadata offsetAndMetadata = (OffsetAndMetadata) committed.get(topicPartition);
            if (offsetAndMetadata != null) {
                kafkaConsumer.seek(topicPartition, offsetAndMetadata.offset());
            } else {
                kafkaConsumer.seekToBeginning(Collections.singleton(topicPartition));
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static long messagesRemaining(KafkaConsumer<String, String> kafkaConsumer, TopicPartition topicPartition) {
        long position = kafkaConsumer.position(topicPartition);
        Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(Collections.singleton(topicPartition));
        if (endOffsets.containsKey(topicPartition)) {
            return endOffsets.get(topicPartition).longValue() - position;
        }
        return 0L;
    }

    private static String toJsonString(Map<String, Object> map) {
        String str;
        try {
            str = new ObjectMapper().writeValueAsString(map);
        } catch (JsonProcessingException e) {
            str = "Bad data can't be written as json: " + e.getMessage();
        }
        return str;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static synchronized String statusAsJson(long j, long j2, long j3, String str, String str2) {
        HashMap hashMap = new HashMap();
        hashMap.put("progress", str);
        hashMap.put("totalProcessed", Long.valueOf(j));
        hashMap.put("consumed", Long.valueOf(j2));
        hashMap.put("remaining", Long.valueOf(j3));
        hashMap.put(CloudEventsMaker.FieldName.TIME, FORMAT.format(new Date()));
        hashMap.put("stage", str2);
        return toJsonString(hashMap);
    }

    private static synchronized String shutDownString(long j, long j2, long j3, String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("shutdown_complete", str);
        hashMap.put("totalProcessed", Long.valueOf(j));
        hashMap.put("consumed", Long.valueOf(j2));
        hashMap.put("remaining", Long.valueOf(j3));
        hashMap.put(CloudEventsMaker.FieldName.TIME, FORMAT.format(new Date()));
        return toJsonString(hashMap);
    }

    public static void main(String[] strArr) {
        Namespace parseArgsOrFail = argParser().parseArgsOrFail(strArr);
        final String string = parseArgsOrFail.getString("transactionalId");
        String string2 = parseArgsOrFail.getString("outputTopic");
        String string3 = parseArgsOrFail.getString("consumerGroup");
        KafkaProducer<String, String> createProducer = createProducer(parseArgsOrFail);
        final KafkaConsumer<String, String> createConsumer = createConsumer(parseArgsOrFail);
        final AtomicLong atomicLong = new AtomicLong(parseArgsOrFail.getInt("maxMessages").intValue() == -1 ? Long.MAX_VALUE : parseArgsOrFail.getInt("maxMessages").intValue());
        boolean booleanValue = parseArgsOrFail.getBoolean("groupMode").booleanValue();
        String string4 = parseArgsOrFail.getString("inputTopic");
        final AtomicLong atomicLong2 = new AtomicLong(0L);
        final AtomicLong atomicLong3 = new AtomicLong(0L);
        if (booleanValue) {
            createConsumer.subscribe(Collections.singleton(string4), new ConsumerRebalanceListener() { // from class: org.apache.kafka.tools.TransactionalMessageCopier.1
                @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                }

                @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                    AtomicLong atomicLong4 = atomicLong;
                    Stream<TopicPartition> stream = collection.stream();
                    KafkaConsumer kafkaConsumer = createConsumer;
                    atomicLong4.set(stream.mapToLong(topicPartition -> {
                        return TransactionalMessageCopier.messagesRemaining(kafkaConsumer, topicPartition);
                    }).sum());
                    atomicLong2.set(0L);
                    System.out.println(TransactionalMessageCopier.statusAsJson(atomicLong3.get(), atomicLong2.get(), atomicLong.get(), string, "RebalanceComplete"));
                }
            });
        } else {
            TopicPartition topicPartition = new TopicPartition(string4, parseArgsOrFail.getInt("inputPartition").intValue());
            createConsumer.assign(Collections.singleton(topicPartition));
            atomicLong.set(Math.min(messagesRemaining(createConsumer, topicPartition), atomicLong.get()));
        }
        boolean booleanValue2 = parseArgsOrFail.getBoolean("enableRandomAborts").booleanValue();
        createProducer.initTransactions();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Exit.addShutdownHook("transactional-message-copier-shutdown-hook", () -> {
            atomicBoolean.set(true);
            createProducer.close();
            synchronized (createConsumer) {
                createConsumer.close();
            }
            System.out.println(shutDownString(atomicLong3.get(), atomicLong2.get(), atomicLong.get(), string));
        });
        boolean booleanValue3 = parseArgsOrFail.getBoolean("useGroupMetadata").booleanValue();
        try {
            Random random = new Random();
            while (atomicLong.get() > 0) {
                System.out.println(statusAsJson(atomicLong3.get(), atomicLong2.get(), atomicLong.get(), string, "ProcessLoop"));
                if (atomicBoolean.get()) {
                    break;
                }
                ConsumerRecords<String, String> poll = createConsumer.poll(Duration.ofMillis(200L));
                if (poll.count() > 0) {
                    try {
                        try {
                            createProducer.beginTransaction();
                            Iterator<ConsumerRecord<String, String>> it = poll.iterator();
                            while (it.hasNext()) {
                                createProducer.send(producerRecordFromConsumerRecord(string2, it.next()));
                            }
                            long count = poll.count();
                            if (booleanValue3) {
                                createProducer.sendOffsetsToTransaction(consumerPositions(createConsumer), createConsumer.groupMetadata());
                            } else {
                                createProducer.sendOffsetsToTransaction(consumerPositions(createConsumer), string3);
                            }
                            if (booleanValue2 && random.nextInt() % 3 == 0) {
                                throw new KafkaException("Aborting transaction");
                                break;
                            }
                            createProducer.commitTransaction();
                            atomicLong.getAndAdd(-count);
                            atomicLong2.getAndAdd(count);
                            atomicLong3.getAndAdd(count);
                        } catch (OutOfOrderSequenceException | ProducerFencedException e) {
                            throw e;
                        }
                    } catch (KafkaException e2) {
                        createProducer.abortTransaction();
                        resetToLastCommittedPositions(createConsumer);
                    }
                }
            }
            createProducer.close();
            synchronized (createConsumer) {
                createConsumer.close();
            }
            Exit.exit(0);
        } catch (Throwable th) {
            createProducer.close();
            synchronized (createConsumer) {
                createConsumer.close();
                throw th;
            }
        }
    }
}
