package org.apache.kafka.trogdor.workload;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.node.TextNode;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.WorkerUtils;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.27.jar:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/RoundTripWorker.class
 */
/* loaded from: input_file:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/RoundTripWorker.class */
public class RoundTripWorker implements TaskWorker {
    private static final int THROTTLE_PERIOD_MS = 100;
    private static final int LOG_INTERVAL_MS = 5000;
    private static final int LOG_NUM_MESSAGES = 10;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) RoundTripWorker.class);
    private static final PayloadGenerator KEY_GENERATOR = new SequentialPayloadGenerator(4, 0);
    private ToReceiveTracker toReceiveTracker;
    private final String id;
    private final RoundTripWorkloadSpec spec;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final Lock lock = new ReentrantLock();
    private final Condition unackedSendsAreZero = this.lock.newCondition();
    private ScheduledExecutorService executor;
    private WorkerStatusTracker status;
    private KafkaFutureImpl<String> doneFuture;
    private KafkaProducer<byte[], byte[]> producer;
    private KafkaConsumer<byte[], byte[]> consumer;
    private Long unackedSends;
    private ToSendTracker toSendTracker;

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.27.jar:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/RoundTripWorker$ConsumerRunnable.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/RoundTripWorker$ConsumerRunnable.class */
    class ConsumerRunnable implements Runnable {
        private final Properties props = new Properties();

        ConsumerRunnable(HashSet<TopicPartition> hashSet) {
            this.props.put("bootstrap.servers", RoundTripWorker.this.spec.bootstrapServers());
            this.props.put("client.id", "consumer." + RoundTripWorker.this.id);
            this.props.put("group.id", "round-trip-consumer-group-" + RoundTripWorker.this.id);
            this.props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            this.props.put("request.timeout.ms", 105000);
            this.props.put("max.poll.interval.ms", 100000);
            WorkerUtils.addConfigsToProperties(this.props, RoundTripWorker.this.spec.commonClientConf(), RoundTripWorker.this.spec.consumerConf());
            RoundTripWorker.this.consumer = new KafkaConsumer(this.props, (Deserializer) new ByteArrayDeserializer(), (Deserializer) new ByteArrayDeserializer());
            RoundTripWorker.this.consumer.assign(hashSet);
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Runnable
        public void run() {
            long j = 0;
            long j2 = 0;
            long j3 = 0;
            RoundTripWorker.log.debug("{}: Starting RoundTripWorker#ConsumerRunnable.", RoundTripWorker.this.id);
            try {
                try {
                    long milliseconds = Time.SYSTEM.milliseconds();
                    loop0: while (true) {
                        try {
                            j3++;
                            Iterator it = RoundTripWorker.this.consumer.poll(Duration.ofMillis(50L)).iterator();
                            while (it.hasNext()) {
                                j2++;
                                if (RoundTripWorker.this.toReceiveTracker.removePending(ByteBuffer.wrap((byte[]) ((ConsumerRecord) it.next()).key()).order(ByteOrder.LITTLE_ENDIAN).getInt())) {
                                    j++;
                                    if (j >= RoundTripWorker.this.spec.maxMessages()) {
                                        try {
                                            break loop0;
                                        } catch (Throwable th) {
                                            RoundTripWorker.this.lock.unlock();
                                            throw th;
                                        }
                                    }
                                }
                            }
                            long milliseconds2 = Time.SYSTEM.milliseconds();
                            if (milliseconds2 > milliseconds + WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT) {
                                RoundTripWorker.this.toReceiveTracker.log();
                                milliseconds = milliseconds2;
                            }
                        } catch (TimeoutException e) {
                            RoundTripWorker.log.debug("{}: Consumer got TimeoutException", RoundTripWorker.this.id, e);
                        } catch (WakeupException e2) {
                            RoundTripWorker.log.debug("{}: Consumer got WakeupException", RoundTripWorker.this.id, e2);
                        }
                    }
                    RoundTripWorker.this.lock.lock();
                    RoundTripWorker.log.info("{}: Consumer received the full count of {} unique messages.  Waiting for all {} sends to be acked...", RoundTripWorker.this.id, Long.valueOf(RoundTripWorker.this.spec.maxMessages()), RoundTripWorker.this.unackedSends);
                    while (RoundTripWorker.this.unackedSends.longValue() > 0) {
                        RoundTripWorker.this.unackedSendsAreZero.await();
                    }
                    RoundTripWorker.this.lock.unlock();
                    RoundTripWorker.log.info("{}: all sends have been acked.", RoundTripWorker.this.id);
                    new StatusUpdater().update();
                    RoundTripWorker.this.doneFuture.complete("");
                    RoundTripWorker.log.info("{}: ConsumerRunnable is exiting.  Invoked poll {} time(s).  messagesReceived = {}; uniqueMessagesReceived = {}.", RoundTripWorker.this.id, Long.valueOf(j3), Long.valueOf(j2), Long.valueOf(j));
                } catch (Throwable th2) {
                    WorkerUtils.abort(RoundTripWorker.log, "ConsumerRunnable", th2, RoundTripWorker.this.doneFuture);
                    RoundTripWorker.log.info("{}: ConsumerRunnable is exiting.  Invoked poll {} time(s).  messagesReceived = {}; uniqueMessagesReceived = {}.", RoundTripWorker.this.id, 0L, 0L, 0L);
                }
            } catch (Throwable th3) {
                RoundTripWorker.log.info("{}: ConsumerRunnable is exiting.  Invoked poll {} time(s).  messagesReceived = {}; uniqueMessagesReceived = {}.", RoundTripWorker.this.id, 0L, 0L, 0L);
                throw th3;
            }
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.27.jar:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/RoundTripWorker$Prepare.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/RoundTripWorker$Prepare.class */
    class Prepare implements Runnable {
        Prepare() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (RoundTripWorker.this.spec.targetMessagesPerSec() <= 0) {
                    throw new ConfigException("Can't have targetMessagesPerSec <= 0.");
                }
                HashMap hashMap = new HashMap();
                HashSet hashSet = new HashSet();
                for (Map.Entry<String, PartitionsSpec> entry : RoundTripWorker.this.spec.activeTopics().materialize().entrySet()) {
                    String key = entry.getKey();
                    PartitionsSpec value = entry.getValue();
                    hashMap.put(key, value.newTopic(key));
                    Iterator<Integer> it = value.partitionNumbers().iterator();
                    while (it.hasNext()) {
                        hashSet.add(new TopicPartition(key, it.next().intValue()));
                    }
                }
                if (hashSet.isEmpty()) {
                    throw new RuntimeException("You must specify at least one active topic.");
                }
                RoundTripWorker.this.status.update(new TextNode("Creating " + hashMap.keySet().size() + " topic(s)"));
                WorkerUtils.createTopics(RoundTripWorker.log, RoundTripWorker.this.spec.bootstrapServers(), RoundTripWorker.this.spec.commonClientConf(), RoundTripWorker.this.spec.adminClientConf(), hashMap, false);
                RoundTripWorker.this.status.update(new TextNode("Created " + hashMap.keySet().size() + " topic(s)"));
                RoundTripWorker.this.toSendTracker = new ToSendTracker(RoundTripWorker.this.spec.maxMessages());
                RoundTripWorker.this.toReceiveTracker = new ToReceiveTracker();
                RoundTripWorker.this.executor.submit(new ProducerRunnable(hashSet));
                RoundTripWorker.this.executor.submit(new ConsumerRunnable(hashSet));
                RoundTripWorker.this.executor.submit(new StatusUpdater());
                RoundTripWorker.this.executor.scheduleWithFixedDelay(new StatusUpdater(), 30L, 30L, TimeUnit.SECONDS);
            } catch (Throwable th) {
                WorkerUtils.abort(RoundTripWorker.log, "Prepare", th, RoundTripWorker.this.doneFuture);
            }
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.27.jar:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/RoundTripWorker$ProducerRunnable.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/RoundTripWorker$ProducerRunnable.class */
    class ProducerRunnable implements Runnable {
        private final HashSet<TopicPartition> partitions;
        private final Throttle throttle;

        ProducerRunnable(HashSet<TopicPartition> hashSet) {
            this.partitions = hashSet;
            Properties properties = new Properties();
            properties.put("bootstrap.servers", RoundTripWorker.this.spec.bootstrapServers());
            properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
            properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 65536L);
            properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000L);
            properties.put("client.id", "producer." + RoundTripWorker.this.id);
            properties.put(ProducerConfig.ACKS_CONFIG, "all");
            properties.put("request.timeout.ms", 105000);
            WorkerUtils.addConfigsToProperties(properties, RoundTripWorker.this.spec.commonClientConf(), RoundTripWorker.this.spec.producerConf());
            RoundTripWorker.this.producer = new KafkaProducer(properties, (Serializer) new ByteArraySerializer(), (Serializer) new ByteArraySerializer());
            this.throttle = new Throttle(WorkerUtils.perSecToPerPeriod(RoundTripWorker.this.spec.targetMessagesPerSec(), 100L), 100);
        }

        @Override // java.lang.Runnable
        public void run() {
            long j = 0;
            long j2 = 0;
            RoundTripWorker.log.debug("{}: Starting RoundTripWorker#ProducerRunnable.", RoundTripWorker.this.id);
            try {
                try {
                    Iterator<TopicPartition> it = this.partitions.iterator();
                    while (true) {
                        ToSendTrackerResult next = RoundTripWorker.this.toSendTracker.next();
                        if (next == null) {
                            try {
                                RoundTripWorker.this.lock.lock();
                                RoundTripWorker.log.info("{}: ProducerRunnable is exiting.  messagesSent={}; uniqueMessagesSent={}; ackedSends={}/{}.", RoundTripWorker.this.id, Long.valueOf(j), Long.valueOf(j2), Long.valueOf(RoundTripWorker.this.spec.maxMessages() - RoundTripWorker.this.unackedSends.longValue()), Long.valueOf(RoundTripWorker.this.spec.maxMessages()));
                                RoundTripWorker.this.lock.unlock();
                                return;
                            } finally {
                            }
                        }
                        this.throttle.increment();
                        long j3 = next.index;
                        if (next.firstSend) {
                            RoundTripWorker.this.toReceiveTracker.addPending(j3);
                            j2++;
                        }
                        j++;
                        if (!it.hasNext()) {
                            it = this.partitions.iterator();
                        }
                        TopicPartition next2 = it.next();
                        RoundTripWorker.this.producer.send(new ProducerRecord(next2.topic(), Integer.valueOf(next2.partition()), RoundTripWorker.KEY_GENERATOR.generate(j3), RoundTripWorker.this.spec.valueGenerator().generate(j3)), (recordMetadata, exc) -> {
                            if (exc != null) {
                                RoundTripWorker.log.info("{}: Got exception when sending message {}: {}", RoundTripWorker.this.id, Long.valueOf(j3), exc.getMessage());
                                RoundTripWorker.this.toSendTracker.addFailed(j3);
                                return;
                            }
                            try {
                                RoundTripWorker.this.lock.lock();
                                RoundTripWorker.this.unackedSends = Long.valueOf(RoundTripWorker.this.unackedSends.longValue() - 1);
                                if (RoundTripWorker.this.unackedSends.longValue() <= 0) {
                                    RoundTripWorker.this.unackedSendsAreZero.signalAll();
                                }
                            } finally {
                                RoundTripWorker.this.lock.unlock();
                            }
                        });
                    }
                } catch (Throwable th) {
                    WorkerUtils.abort(RoundTripWorker.log, "ProducerRunnable", th, RoundTripWorker.this.doneFuture);
                    try {
                        RoundTripWorker.this.lock.lock();
                        RoundTripWorker.log.info("{}: ProducerRunnable is exiting.  messagesSent={}; uniqueMessagesSent={}; ackedSends={}/{}.", RoundTripWorker.this.id, Long.valueOf(j), Long.valueOf(j2), Long.valueOf(RoundTripWorker.this.spec.maxMessages() - RoundTripWorker.this.unackedSends.longValue()), Long.valueOf(RoundTripWorker.this.spec.maxMessages()));
                        RoundTripWorker.this.lock.unlock();
                    } finally {
                        RoundTripWorker.this.lock.unlock();
                    }
                }
            } catch (Throwable th2) {
                try {
                    RoundTripWorker.this.lock.lock();
                    RoundTripWorker.log.info("{}: ProducerRunnable is exiting.  messagesSent={}; uniqueMessagesSent={}; ackedSends={}/{}.", RoundTripWorker.this.id, Long.valueOf(j), Long.valueOf(j2), Long.valueOf(RoundTripWorker.this.spec.maxMessages() - RoundTripWorker.this.unackedSends.longValue()), Long.valueOf(RoundTripWorker.this.spec.maxMessages()));
                    RoundTripWorker.this.lock.unlock();
                    throw th2;
                } finally {
                }
            }
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.27.jar:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/RoundTripWorker$StatusData.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/RoundTripWorker$StatusData.class */
    public static class StatusData {
        private final long totalUniqueSent;
        private final long totalReceived;

        @JsonCreator
        public StatusData(@JsonProperty("totalUniqueSent") long j, @JsonProperty("totalReceived") long j2) {
            this.totalUniqueSent = j;
            this.totalReceived = j2;
        }

        @JsonProperty
        public long totalUniqueSent() {
            return this.totalUniqueSent;
        }

        @JsonProperty
        public long totalReceived() {
            return this.totalReceived;
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.27.jar:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/RoundTripWorker$StatusUpdater.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/RoundTripWorker$StatusUpdater.class */
    public class StatusUpdater implements Runnable {
        public StatusUpdater() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                update();
            } catch (Exception e) {
                WorkerUtils.abort(RoundTripWorker.log, "StatusUpdater", e, RoundTripWorker.this.doneFuture);
            }
        }

        StatusData update() {
            StatusData statusData = new StatusData(RoundTripWorker.this.toSendTracker.frontier(), RoundTripWorker.this.toReceiveTracker.totalReceived());
            RoundTripWorker.this.status.update(JsonUtil.JSON_SERDE.valueToTree(statusData));
            return statusData;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.27.jar:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/RoundTripWorker$ToReceiveTracker.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/RoundTripWorker$ToReceiveTracker.class */
    public class ToReceiveTracker {
        private final TreeSet<Long> pending;
        private long totalReceived;

        private ToReceiveTracker() {
            this.pending = new TreeSet<>();
            this.totalReceived = 0L;
        }

        synchronized void addPending(long j) {
            this.pending.add(Long.valueOf(j));
        }

        synchronized boolean removePending(long j) {
            if (!this.pending.remove(Long.valueOf(j))) {
                return false;
            }
            this.totalReceived++;
            return true;
        }

        synchronized long totalReceived() {
            return this.totalReceived;
        }

        void log() {
            long size;
            ArrayList arrayList = new ArrayList(10);
            synchronized (this) {
                size = this.pending.size();
                Iterator<Long> it = this.pending.iterator();
                while (it.hasNext() && arrayList.size() < 10) {
                    arrayList.add(it.next());
                }
            }
            RoundTripWorker.log.info("{}: consumer waiting for {} message(s), starting with: {}", RoundTripWorker.this.id, Long.valueOf(size), Utils.join(arrayList, ", "));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.27.jar:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/RoundTripWorker$ToSendTracker.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/RoundTripWorker$ToSendTracker.class */
    public static class ToSendTracker {
        private final long maxMessages;
        private final List<Long> failed = new ArrayList();
        private long frontier = 0;

        ToSendTracker(long j) {
            this.maxMessages = j;
        }

        synchronized void addFailed(long j) {
            this.failed.add(Long.valueOf(j));
        }

        synchronized long frontier() {
            return this.frontier;
        }

        synchronized ToSendTrackerResult next() {
            if (!this.failed.isEmpty()) {
                return new ToSendTrackerResult(this.failed.remove(0).longValue(), false);
            }
            if (this.frontier >= this.maxMessages) {
                return null;
            }
            long j = this.frontier;
            this.frontier = j + 1;
            return new ToSendTrackerResult(j, true);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:META-INF/bundled-dependencies/pulsar-io-kafka-connect-adaptor-2.7.2.1.1.27.jar:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/RoundTripWorker$ToSendTrackerResult.class
     */
    /* loaded from: input_file:META-INF/bundled-dependencies/kafka-tools-2.7.0.jar:org/apache/kafka/trogdor/workload/RoundTripWorker$ToSendTrackerResult.class */
    public static class ToSendTrackerResult {
        final long index;
        final boolean firstSend;

        ToSendTrackerResult(long j, boolean z) {
            this.index = j;
            this.firstSend = z;
        }
    }

    public RoundTripWorker(String str, RoundTripWorkloadSpec roundTripWorkloadSpec) {
        this.id = str;
        this.spec = roundTripWorkloadSpec;
    }

    @Override // org.apache.kafka.trogdor.task.TaskWorker
    public void start(Platform platform, WorkerStatusTracker workerStatusTracker, KafkaFutureImpl<String> kafkaFutureImpl) throws Exception {
        if (!this.running.compareAndSet(false, true)) {
            throw new IllegalStateException("RoundTripWorker is already running.");
        }
        log.info("{}: Activating RoundTripWorker.", this.id);
        this.executor = Executors.newScheduledThreadPool(3, ThreadUtils.createThreadFactory("RoundTripWorker%d", false));
        this.status = workerStatusTracker;
        this.doneFuture = kafkaFutureImpl;
        this.producer = null;
        this.consumer = null;
        this.unackedSends = Long.valueOf(this.spec.maxMessages());
        this.executor.submit(new Prepare());
    }

    @Override // org.apache.kafka.trogdor.task.TaskWorker
    public void stop(Platform platform) throws Exception {
        if (!this.running.compareAndSet(true, false)) {
            throw new IllegalStateException("RoundTripWorker is not running.");
        }
        log.info("{}: Deactivating RoundTripWorker.", this.id);
        this.doneFuture.complete("");
        this.executor.shutdownNow();
        this.executor.awaitTermination(1L, TimeUnit.DAYS);
        Utils.closeQuietly(this.consumer, ConsumerProtocol.PROTOCOL_TYPE);
        Utils.closeQuietly(this.producer, "producer");
        this.consumer = null;
        this.producer = null;
        this.unackedSends = null;
        this.executor = null;
        this.doneFuture = null;
        log.info("{}: Deactivated RoundTripWorker.", this.id);
    }
}
