package org.apache.kafka.trogdor.workload;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.WorkerUtils;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/SustainedConnectionWorker.class
 */
/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.17.jar:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/SustainedConnectionWorker.class */
public class SustainedConnectionWorker implements TaskWorker {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SustainedConnectionWorker.class);
    private static final SystemTime SYSTEM_TIME = new SystemTime();
    private final String id;
    private final SustainedConnectionSpec spec;
    private static final int BACKOFF_PERIOD_MS = 10;
    private ExecutorService workerExecutor;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private KafkaFutureImpl<String> doneFuture;
    private ArrayList<SustainedConnection> connections;
    private static final int REPORT_INTERVAL_MS = 5000;
    private WorkerStatusTracker status;
    private AtomicLong totalProducerConnections;
    private AtomicLong totalProducerFailedConnections;
    private AtomicLong totalConsumerConnections;
    private AtomicLong totalConsumerFailedConnections;
    private AtomicLong totalMetadataConnections;
    private AtomicLong totalMetadataFailedConnections;
    private AtomicLong totalAbortedThreads;
    private Future<?> statusUpdaterFuture;
    private ScheduledExecutorService statusUpdaterExecutor;

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/SustainedConnectionWorker$ClaimableConnection.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.17.jar:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/SustainedConnectionWorker$ClaimableConnection.class */
    private abstract class ClaimableConnection implements SustainedConnection {
        protected long nextUpdate;
        protected boolean inUse;
        protected long refreshRate;

        private ClaimableConnection() {
            this.nextUpdate = 0L;
            this.inUse = false;
        }

        @Override // org.apache.kafka.trogdor.workload.SustainedConnectionWorker.SustainedConnection
        public boolean needsRefresh(long j) {
            return !this.inUse && j > this.nextUpdate;
        }

        @Override // org.apache.kafka.trogdor.workload.SustainedConnectionWorker.SustainedConnection
        public void claim() {
            this.inUse = true;
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            closeQuietly();
        }

        protected void completeRefresh() {
            this.nextUpdate = SustainedConnectionWorker.SYSTEM_TIME.milliseconds() + this.refreshRate;
            this.inUse = false;
        }

        protected abstract void closeQuietly();
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/SustainedConnectionWorker$ConsumerSustainedConnection.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.17.jar:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/SustainedConnectionWorker$ConsumerSustainedConnection.class */
    private class ConsumerSustainedConnection extends ClaimableConnection {
        private KafkaConsumer<byte[], byte[]> consumer;
        private TopicPartition activePartition;
        private final String topicName;
        private final Random rand;
        private final Properties props;

        ConsumerSustainedConnection() {
            super();
            this.topicName = SustainedConnectionWorker.this.spec.topicName();
            this.consumer = null;
            this.activePartition = null;
            this.rand = new Random();
            this.refreshRate = SustainedConnectionWorker.this.spec.refreshRateMs();
            this.props = new Properties();
            this.props.put("bootstrap.servers", SustainedConnectionWorker.this.spec.bootstrapServers());
            this.props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
            this.props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
            this.props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 1024);
            WorkerUtils.addConfigsToProperties(this.props, SustainedConnectionWorker.this.spec.commonClientConf(), SustainedConnectionWorker.this.spec.consumerConf());
        }

        @Override // org.apache.kafka.trogdor.workload.SustainedConnectionWorker.SustainedConnection
        public void refresh() {
            try {
                if (this.consumer == null) {
                    SustainedConnectionWorker.this.totalConsumerConnections.incrementAndGet();
                    this.consumer = new KafkaConsumer<>(this.props, (Deserializer) new ByteArrayDeserializer(), (Deserializer) new ByteArrayDeserializer());
                    List list = (List) this.consumer.partitionsFor(this.topicName).stream().map(partitionInfo -> {
                        return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                    }).collect(Collectors.toList());
                    this.activePartition = (TopicPartition) list.get(this.rand.nextInt(list.size()));
                    this.consumer.assign(Collections.singletonList(this.activePartition));
                }
                this.consumer.seekToEnd(Collections.emptyList());
                this.consumer.poll(Duration.ofMillis(50L));
            } catch (Throwable th) {
                closeQuietly();
                SustainedConnectionWorker.this.totalConsumerConnections.decrementAndGet();
                SustainedConnectionWorker.this.totalConsumerFailedConnections.incrementAndGet();
                SustainedConnectionWorker.log.error("Error while refreshing sustained KafkaConsumer connection", th);
            }
            completeRefresh();
        }

        @Override // org.apache.kafka.trogdor.workload.SustainedConnectionWorker.ClaimableConnection
        protected void closeQuietly() {
            Utils.closeQuietly(this.consumer, "KafkaConsumer");
            this.consumer = null;
            this.activePartition = null;
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/SustainedConnectionWorker$MaintainLoop.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.17.jar:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/SustainedConnectionWorker$MaintainLoop.class */
    public class MaintainLoop implements Runnable {
        public MaintainLoop() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!SustainedConnectionWorker.this.doneFuture.isDone()) {
                try {
                    Optional findConnectionToMaintain = SustainedConnectionWorker.this.findConnectionToMaintain();
                    if (findConnectionToMaintain.isPresent()) {
                        ((SustainedConnection) findConnectionToMaintain.get()).refresh();
                    } else {
                        SustainedConnectionWorker.SYSTEM_TIME.sleep(10L);
                    }
                } catch (Exception e) {
                    SustainedConnectionWorker.this.totalAbortedThreads.incrementAndGet();
                    SustainedConnectionWorker.log.error("Aborted thread while maintaining sustained connections", (Throwable) e);
                    return;
                }
            }
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/SustainedConnectionWorker$MetadataSustainedConnection.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.17.jar:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/SustainedConnectionWorker$MetadataSustainedConnection.class */
    private class MetadataSustainedConnection extends ClaimableConnection {
        private Admin client;
        private final Properties props;

        MetadataSustainedConnection() {
            super();
            this.client = null;
            this.refreshRate = SustainedConnectionWorker.this.spec.refreshRateMs();
            this.props = new Properties();
            this.props.put("bootstrap.servers", SustainedConnectionWorker.this.spec.bootstrapServers());
            WorkerUtils.addConfigsToProperties(this.props, SustainedConnectionWorker.this.spec.commonClientConf(), SustainedConnectionWorker.this.spec.commonClientConf());
        }

        @Override // org.apache.kafka.trogdor.workload.SustainedConnectionWorker.SustainedConnection
        public void refresh() {
            try {
                if (this.client == null) {
                    SustainedConnectionWorker.this.totalMetadataConnections.incrementAndGet();
                    this.client = Admin.create(this.props);
                }
                this.client.describeCluster().nodes().get();
            } catch (Throwable th) {
                closeQuietly();
                SustainedConnectionWorker.this.totalMetadataConnections.decrementAndGet();
                SustainedConnectionWorker.this.totalMetadataFailedConnections.incrementAndGet();
                SustainedConnectionWorker.log.error("Error while refreshing sustained AdminClient connection", th);
            }
            completeRefresh();
        }

        @Override // org.apache.kafka.trogdor.workload.SustainedConnectionWorker.ClaimableConnection
        protected void closeQuietly() {
            Utils.closeQuietly(this.client, "AdminClient");
            this.client = null;
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/SustainedConnectionWorker$ProducerSustainedConnection.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.17.jar:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/SustainedConnectionWorker$ProducerSustainedConnection.class */
    private class ProducerSustainedConnection extends ClaimableConnection {
        private KafkaProducer<byte[], byte[]> producer;
        private List<TopicPartition> partitions;
        private Iterator<TopicPartition> partitionsIterator;
        private final String topicName;
        private final PayloadIterator keys;
        private final PayloadIterator values;
        private final Properties props;

        ProducerSustainedConnection() {
            super();
            this.producer = null;
            this.partitions = null;
            this.topicName = SustainedConnectionWorker.this.spec.topicName();
            this.partitionsIterator = null;
            this.keys = new PayloadIterator(SustainedConnectionWorker.this.spec.keyGenerator());
            this.values = new PayloadIterator(SustainedConnectionWorker.this.spec.valueGenerator());
            this.refreshRate = SustainedConnectionWorker.this.spec.refreshRateMs();
            this.props = new Properties();
            this.props.put("bootstrap.servers", SustainedConnectionWorker.this.spec.bootstrapServers());
            WorkerUtils.addConfigsToProperties(this.props, SustainedConnectionWorker.this.spec.commonClientConf(), SustainedConnectionWorker.this.spec.producerConf());
        }

        @Override // org.apache.kafka.trogdor.workload.SustainedConnectionWorker.SustainedConnection
        public void refresh() {
            try {
                if (this.producer == null) {
                    SustainedConnectionWorker.this.totalProducerConnections.incrementAndGet();
                    this.producer = new KafkaProducer<>(this.props, (Serializer) new ByteArraySerializer(), (Serializer) new ByteArraySerializer());
                    this.partitions = (List) this.producer.partitionsFor(this.topicName).stream().map(partitionInfo -> {
                        return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                    }).collect(Collectors.toList());
                    Collections.shuffle(this.partitions);
                }
                if (this.partitionsIterator == null || !this.partitionsIterator.hasNext()) {
                    this.partitionsIterator = this.partitions.iterator();
                }
                TopicPartition next = this.partitionsIterator.next();
                this.producer.send(new ProducerRecord<>(next.topic(), Integer.valueOf(next.partition()), this.keys.next(), this.values.next())).get();
            } catch (Throwable th) {
                closeQuietly();
                SustainedConnectionWorker.this.totalProducerConnections.decrementAndGet();
                SustainedConnectionWorker.this.totalProducerFailedConnections.incrementAndGet();
                SustainedConnectionWorker.log.error("Error while refreshing sustained KafkaProducer connection", th);
            }
            completeRefresh();
        }

        @Override // org.apache.kafka.trogdor.workload.SustainedConnectionWorker.ClaimableConnection
        protected void closeQuietly() {
            Utils.closeQuietly(this.producer, "KafkaProducer");
            this.producer = null;
            this.partitions = null;
            this.partitionsIterator = null;
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/SustainedConnectionWorker$StatusData.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.17.jar:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/SustainedConnectionWorker$StatusData.class */
    public static class StatusData {
        private final long totalProducerConnections;
        private final long totalProducerFailedConnections;
        private final long totalConsumerConnections;
        private final long totalConsumerFailedConnections;
        private final long totalMetadataConnections;
        private final long totalMetadataFailedConnections;
        private final long totalAbortedThreads;
        private final long updatedMs;

        @JsonCreator
        StatusData(@JsonProperty("totalProducerConnections") long j, @JsonProperty("totalProducerFailedConnections") long j2, @JsonProperty("totalConsumerConnections") long j3, @JsonProperty("totalConsumerFailedConnections") long j4, @JsonProperty("totalMetadataConnections") long j5, @JsonProperty("totalMetadataFailedConnections") long j6, @JsonProperty("totalAbortedThreads") long j7, @JsonProperty("updatedMs") long j8) {
            this.totalProducerConnections = j;
            this.totalProducerFailedConnections = j2;
            this.totalConsumerConnections = j3;
            this.totalConsumerFailedConnections = j4;
            this.totalMetadataConnections = j5;
            this.totalMetadataFailedConnections = j6;
            this.totalAbortedThreads = j7;
            this.updatedMs = j8;
        }

        @JsonProperty
        public long totalProducerConnections() {
            return this.totalProducerConnections;
        }

        @JsonProperty
        public long totalProducerFailedConnections() {
            return this.totalProducerFailedConnections;
        }

        @JsonProperty
        public long totalConsumerConnections() {
            return this.totalConsumerConnections;
        }

        @JsonProperty
        public long totalConsumerFailedConnections() {
            return this.totalConsumerFailedConnections;
        }

        @JsonProperty
        public long totalMetadataConnections() {
            return this.totalMetadataConnections;
        }

        @JsonProperty
        public long totalMetadataFailedConnections() {
            return this.totalMetadataFailedConnections;
        }

        @JsonProperty
        public long totalAbortedThreads() {
            return this.totalAbortedThreads;
        }

        @JsonProperty
        public long updatedMs() {
            return this.updatedMs;
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/SustainedConnectionWorker$StatusUpdater.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.17.jar:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/SustainedConnectionWorker$StatusUpdater.class */
    private class StatusUpdater implements Runnable {
        private StatusUpdater() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                SustainedConnectionWorker.this.status.update(JsonUtil.JSON_SERDE.valueToTree(new StatusData(SustainedConnectionWorker.this.totalProducerConnections.get(), SustainedConnectionWorker.this.totalProducerFailedConnections.get(), SustainedConnectionWorker.this.totalConsumerConnections.get(), SustainedConnectionWorker.this.totalConsumerFailedConnections.get(), SustainedConnectionWorker.this.totalMetadataConnections.get(), SustainedConnectionWorker.this.totalMetadataFailedConnections.get(), SustainedConnectionWorker.this.totalAbortedThreads.get(), SustainedConnectionWorker.SYSTEM_TIME.milliseconds())));
            } catch (Exception e) {
                SustainedConnectionWorker.log.error("Aborted test while running StatusUpdater", (Throwable) e);
                WorkerUtils.abort(SustainedConnectionWorker.log, "StatusUpdater", e, SustainedConnectionWorker.this.doneFuture);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/SustainedConnectionWorker$SustainedConnection.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.17.jar:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/SustainedConnectionWorker$SustainedConnection.class */
    public interface SustainedConnection extends AutoCloseable {
        boolean needsRefresh(long j);

        void refresh();

        void claim();
    }

    public SustainedConnectionWorker(String str, SustainedConnectionSpec sustainedConnectionSpec) {
        this.id = str;
        this.spec = sustainedConnectionSpec;
    }

    @Override // org.apache.kafka.trogdor.task.TaskWorker
    public void start(Platform platform, WorkerStatusTracker workerStatusTracker, KafkaFutureImpl<String> kafkaFutureImpl) throws Exception {
        if (!this.running.compareAndSet(false, true)) {
            throw new IllegalStateException("SustainedConnectionWorker is already running.");
        }
        log.info("{}: Activating SustainedConnectionWorker with {}", this.id, this.spec);
        this.doneFuture = kafkaFutureImpl;
        this.status = workerStatusTracker;
        this.connections = new ArrayList<>();
        this.totalProducerConnections = new AtomicLong(0L);
        this.totalProducerFailedConnections = new AtomicLong(0L);
        this.totalConsumerConnections = new AtomicLong(0L);
        this.totalConsumerFailedConnections = new AtomicLong(0L);
        this.totalMetadataConnections = new AtomicLong(0L);
        this.totalMetadataFailedConnections = new AtomicLong(0L);
        this.totalAbortedThreads = new AtomicLong(0L);
        for (int i = 0; i < this.spec.producerConnectionCount(); i++) {
            this.connections.add(new ProducerSustainedConnection());
        }
        for (int i2 = 0; i2 < this.spec.consumerConnectionCount(); i2++) {
            this.connections.add(new ConsumerSustainedConnection());
        }
        for (int i3 = 0; i3 < this.spec.metadataConnectionCount(); i3++) {
            this.connections.add(new MetadataSustainedConnection());
        }
        this.statusUpdaterExecutor = Executors.newScheduledThreadPool(1, ThreadUtils.createThreadFactory("StatusUpdaterWorkerThread%d", false));
        this.statusUpdaterFuture = this.statusUpdaterExecutor.scheduleAtFixedRate(new StatusUpdater(), 0L, WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, TimeUnit.MILLISECONDS);
        this.workerExecutor = Executors.newFixedThreadPool(this.spec.numThreads(), ThreadUtils.createThreadFactory("SustainedConnectionWorkerThread%d", false));
        for (int i4 = 0; i4 < this.spec.numThreads(); i4++) {
            this.workerExecutor.submit(new MaintainLoop());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized Optional<SustainedConnection> findConnectionToMaintain() {
        long milliseconds = SYSTEM_TIME.milliseconds();
        Iterator<SustainedConnection> it = this.connections.iterator();
        while (it.hasNext()) {
            SustainedConnection next = it.next();
            if (next.needsRefresh(milliseconds)) {
                next.claim();
                return Optional.of(next);
            }
        }
        return Optional.empty();
    }

    @Override // org.apache.kafka.trogdor.task.TaskWorker
    public void stop(Platform platform) throws Exception {
        if (!this.running.compareAndSet(true, false)) {
            throw new IllegalStateException("SustainedConnectionWorker is not running.");
        }
        log.info("{}: Deactivating SustainedConnectionWorker.", this.id);
        this.statusUpdaterFuture.cancel(false);
        this.statusUpdaterExecutor.shutdown();
        this.statusUpdaterExecutor.awaitTermination(1L, TimeUnit.HOURS);
        this.statusUpdaterExecutor = null;
        new StatusUpdater().run();
        this.doneFuture.complete("");
        Iterator<SustainedConnection> it = this.connections.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.workerExecutor.shutdownNow();
        this.workerExecutor.awaitTermination(1L, TimeUnit.HOURS);
        this.workerExecutor = null;
        this.status = null;
        this.connections = null;
    }
}
