package com.linkedin.venice.writer;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.venice.ConfigKeys;
import com.linkedin.venice.annotation.Threadsafe;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.exceptions.RecordTooLargeException;
import com.linkedin.venice.exceptions.TopicAuthorizationVeniceException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.guid.GuidUtils;
import com.linkedin.venice.kafka.protocol.ControlMessage;
import com.linkedin.venice.kafka.protocol.Delete;
import com.linkedin.venice.kafka.protocol.EndOfIncrementalPush;
import com.linkedin.venice.kafka.protocol.EndOfSegment;
import com.linkedin.venice.kafka.protocol.GUID;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.LeaderMetadata;
import com.linkedin.venice.kafka.protocol.ProducerMetadata;
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.StartOfIncrementalPush;
import com.linkedin.venice.kafka.protocol.StartOfPush;
import com.linkedin.venice.kafka.protocol.StartOfSegment;
import com.linkedin.venice.kafka.protocol.TopicSwitch;
import com.linkedin.venice.kafka.protocol.Update;
import com.linkedin.venice.kafka.protocol.VersionSwap;
import com.linkedin.venice.kafka.protocol.enums.ControlMessageType;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.kafka.validation.Segment;
import com.linkedin.venice.kafka.validation.checksum.CheckSumType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.partitioner.VenicePartitioner;
import com.linkedin.venice.pubsub.api.PubSubMessageHeaders;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import com.linkedin.venice.pubsub.api.PubSubProducerAdapter;
import com.linkedin.venice.pubsub.api.PubSubProducerCallback;
import com.linkedin.venice.pubsub.kafka.KafkaPubSubMessageDeserializer;
import com.linkedin.venice.serialization.KeyWithChunkingSuffixSerializer;
import com.linkedin.venice.serialization.VeniceKafkaSerializer;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.ChunkedValueManifestSerializer;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.ExceptionUtils;
import com.linkedin.venice.utils.LatencyUtils;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import org.apache.avro.Schema;
import org.apache.avro.specific.FixedSize;
import org.apache.avro.util.Utf8;
import org.apache.commons.lang.Validate;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

@Threadsafe
/* loaded from: input_file:com/linkedin/venice/writer/VeniceWriter.class */
public class VeniceWriter<K, V, U> extends AbstractVeniceWriter<K, V, U> {
    private final Logger logger;
    public static final String VENICE_WRITER_CONFIG_PREFIX = "venice.writer.";
    public static final String CLOSE_TIMEOUT_MS = "venice.writer.close.timeout.ms";
    public static final String CHECK_SUM_TYPE = "venice.writer.checksum.type";
    public static final String ENABLE_CHUNKING = "venice.writer.chunking.enabled";
    public static final String ENABLE_RMD_CHUNKING = "venice.writer.replication.metadata.chunking.enabled";
    public static final String MAX_ATTEMPTS_WHEN_TOPIC_MISSING = "venice.writer.max.attemps.when.topic.missing";
    public static final String SLEEP_TIME_MS_WHEN_TOPIC_MISSING = "venice.writer.sleep.time.ms.when.topic.missing";
    public static final String MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS = "venice.writer.max.elapsed.time.for.segment.in.ms";
    public static final String MAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES = "venice.writer.max.size.for.user.payload.per.message.in.bytes";
    public static final int DEFAULT_MAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES = 972800;
    public static final int DEFAULT_CLOSE_TIMEOUT_MS = 30000;
    public static final int DEFAULT_MAX_ATTEMPTS_WHEN_TOPIC_MISSING = 30;
    public static final int DEFAULT_SLEEP_TIME_MS_WHEN_TOPIC_MISSING = 10000;
    public static final long VENICE_DEFAULT_LOGICAL_TS = -1;
    public static final long APP_DEFAULT_LOGICAL_TS = -2;
    public static final int VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_ID = -1;
    public static final int VENICE_DEFAULT_VALUE_SCHEMA_ID = -1;
    private final PubSubMessageHeaders protocolSchemaHeaders;
    private final VeniceKafkaSerializer<K> keySerializer;
    private final VeniceKafkaSerializer<V> valueSerializer;
    private final VeniceKafkaSerializer<U> writeComputeSerializer;
    private final PubSubProducerAdapter producerAdapter;
    private final GUID producerGUID;
    private final Time time;
    private final VenicePartitioner partitioner;
    private final int numberOfPartitions;
    private final int closeTimeOut;
    private final CheckSumType checkSumType;
    private final int maxSizeForUserPayloadPerMessageInBytes;
    private final int maxAttemptsWhenTopicMissing;
    private final long sleepTimeMsWhenTopicMissing;
    private final long maxElapsedTimeForSegmentInMs;
    private final Segment[] segments;
    private final long[] segmentsCreationTimeArray;
    private final KeyWithChunkingSuffixSerializer keyWithChunkingSuffixSerializer;
    private final ChunkedValueManifestSerializer chunkedValueManifestSerializer;
    private final Map<CharSequence, CharSequence> defaultDebugInfo;
    private final boolean elapsedTimeForClosingSegmentEnabled;
    private final Object[] partitionLocks;
    private String writerId;
    private volatile boolean isChunkingEnabled;
    private volatile boolean isChunkingSet;
    private volatile boolean isChunkingFlagInvoked;
    private final boolean isRmdChunkingEnabled;
    private static final ChunkedPayloadAndManifest EMPTY_CHUNKED_PAYLOAD_AND_MANIFEST = new ChunkedPayloadAndManifest(null, null);
    public static final String DEFAULT_CHECK_SUM_TYPE = CheckSumType.MD5.name();
    public static final long DEFAULT_UPSTREAM_OFFSET = ((Long) AvroCompatibilityHelper.getSpecificDefaultValue(LeaderMetadata.SCHEMA$.getField("upstreamOffset"))).longValue();
    public static final int DEFAULT_UPSTREAM_KAFKA_CLUSTER_ID = ((Integer) AvroCompatibilityHelper.getSpecificDefaultValue(LeaderMetadata.SCHEMA$.getField("upstreamKafkaClusterId"))).intValue();
    public static final AtomicLong OPEN_VENICE_WRITER_COUNT = new AtomicLong(0);
    public static final AtomicLong VENICE_WRITER_CLOSE_FAILED_COUNT = new AtomicLong(0);
    private static final long DEFAULT_MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS = TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES);
    private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
    private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.wrap(EMPTY_BYTE_ARRAY);
    public static final LeaderMetadataWrapper DEFAULT_LEADER_METADATA_WRAPPER = new LeaderMetadataWrapper(DEFAULT_UPSTREAM_OFFSET, DEFAULT_UPSTREAM_KAFKA_CLUSTER_ID);
    private static final PubSubMessageHeaders EMPTY_MSG_HEADERS = new PubSubMessageHeaders();
    private static final int CONTROL_MESSAGE_KAFKA_KEY_LENGTH = GUID.class.getAnnotation(FixedSize.class).value() + 8;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/venice/writer/VeniceWriter$KafkaMessageEnvelopeProvider.class */
    public interface KafkaMessageEnvelopeProvider {
        KafkaMessageEnvelope getKafkaMessageEnvelope();
    }

    /* loaded from: input_file:com/linkedin/venice/writer/VeniceWriter$KeyProvider.class */
    public interface KeyProvider {
        KafkaKey getKey(ProducerMetadata producerMetadata);
    }

    public VeniceWriter(VeniceWriterOptions veniceWriterOptions, VeniceProperties veniceProperties, PubSubProducerAdapter pubSubProducerAdapter) {
        this(veniceWriterOptions, veniceProperties, pubSubProducerAdapter, KafkaMessageEnvelope.SCHEMA$);
    }

    public VeniceWriter(VeniceWriterOptions veniceWriterOptions, VeniceProperties veniceProperties, PubSubProducerAdapter pubSubProducerAdapter, Schema schema) {
        super(veniceWriterOptions.getTopicName());
        this.keyWithChunkingSuffixSerializer = new KeyWithChunkingSuffixSerializer();
        this.chunkedValueManifestSerializer = new ChunkedValueManifestSerializer(true);
        this.keySerializer = veniceWriterOptions.getKeySerializer();
        this.valueSerializer = veniceWriterOptions.getValueSerializer();
        this.writeComputeSerializer = veniceWriterOptions.getWriteComputeSerializer();
        this.time = veniceWriterOptions.getTime();
        this.partitioner = veniceWriterOptions.getPartitioner();
        this.closeTimeOut = veniceProperties.getInt(CLOSE_TIMEOUT_MS, 30000);
        this.checkSumType = CheckSumType.valueOf(veniceProperties.getString(CHECK_SUM_TYPE, DEFAULT_CHECK_SUM_TYPE));
        this.isChunkingEnabled = veniceWriterOptions.isChunkingEnabled();
        this.isChunkingSet = true;
        this.isRmdChunkingEnabled = veniceWriterOptions.isRmdChunkingEnabled();
        this.maxSizeForUserPayloadPerMessageInBytes = veniceProperties.getInt(MAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES, DEFAULT_MAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES);
        if (this.maxSizeForUserPayloadPerMessageInBytes > 972800) {
            if (!this.isChunkingEnabled) {
                throw new VeniceException("venice.writer.max.size.for.user.payload.per.message.in.bytes cannot be set higher than 972800 unless venice.writer.chunking.enabled is true");
            }
            if (this.isChunkingEnabled && !Version.isVersionTopic(this.topicName)) {
                throw new VeniceException("venice.writer.max.size.for.user.payload.per.message.in.bytes cannot be set higher than 972800 unless venice.writer.chunking.enabled is true and the topic is Version Topic");
            }
        }
        this.isChunkingFlagInvoked = false;
        this.maxAttemptsWhenTopicMissing = veniceProperties.getInt(MAX_ATTEMPTS_WHEN_TOPIC_MISSING, 30);
        this.sleepTimeMsWhenTopicMissing = veniceProperties.getInt(SLEEP_TIME_MS_WHEN_TOPIC_MISSING, DEFAULT_SLEEP_TIME_MS_WHEN_TOPIC_MISSING);
        this.maxElapsedTimeForSegmentInMs = veniceProperties.getLong(MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS, DEFAULT_MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS);
        this.elapsedTimeForClosingSegmentEnabled = this.maxElapsedTimeForSegmentInMs > 0;
        this.defaultDebugInfo = Utils.getDebugInfo();
        if (veniceProperties.containsKey(ConfigKeys.INSTANCE_ID)) {
            this.writerId = veniceProperties.getString(ConfigKeys.INSTANCE_ID);
        } else {
            this.writerId = Utils.getHostName();
            if (veniceProperties.containsKey(ConfigKeys.LISTENER_PORT)) {
                this.writerId += ":" + veniceProperties.getInt(ConfigKeys.LISTENER_PORT);
            }
        }
        this.producerGUID = GuidUtils.getGUID(veniceProperties);
        this.logger = LogManager.getLogger("VeniceWriter [" + GuidUtils.getHexFromGuid(this.producerGUID) + "]");
        this.protocolSchemaHeaders = schema == null ? EMPTY_MSG_HEADERS : new PubSubMessageHeaders().add(KafkaPubSubMessageDeserializer.VENICE_TRANSPORT_PROTOCOL_HEADER, schema.toString().getBytes(StandardCharsets.UTF_8));
        try {
            this.producerAdapter = pubSubProducerAdapter;
            if (veniceWriterOptions.getPartitionCount() != null) {
                this.numberOfPartitions = veniceWriterOptions.getPartitionCount().intValue();
            } else {
                this.numberOfPartitions = this.producerAdapter.getNumberOfPartitions(this.topicName, 30, TimeUnit.SECONDS);
            }
            this.segmentsCreationTimeArray = new long[this.numberOfPartitions];
            this.partitionLocks = new Object[this.numberOfPartitions];
            for (int i = 0; i < this.numberOfPartitions; i++) {
                this.partitionLocks[i] = new Object();
                this.segmentsCreationTimeArray[i] = -1;
            }
            this.segments = new Segment[this.numberOfPartitions];
            OPEN_VENICE_WRITER_COUNT.incrementAndGet();
        } catch (Exception e) {
            this.logger.error("VeniceWriter cannot be constructed with the props: {}", veniceProperties);
            throw new VeniceException("Error while constructing VeniceWriter for store name: " + this.topicName, e);
        }
    }

    @Override // com.linkedin.venice.writer.AbstractVeniceWriter
    public void close(boolean z) {
        if (z) {
            try {
                endAllSegments(true);
            } catch (Exception e) {
                this.logger.warn("Swallowed an exception while trying to close the VeniceWriter for topic: {}", this.topicName, e);
                VENICE_WRITER_CLOSE_FAILED_COUNT.incrementAndGet();
                return;
            }
        }
        this.producerAdapter.close(this.topicName, this.closeTimeOut, z);
        OPEN_VENICE_WRITER_COUNT.decrementAndGet();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        close(true);
    }

    PubSubProducerAdapter getProducerAdapter() {
        return this.producerAdapter;
    }

    @Override // com.linkedin.venice.writer.AbstractVeniceWriter
    public void flush() {
        this.producerAdapter.flush();
    }

    public String toString() {
        return getClass().getSimpleName() + "{topicName: " + this.topicName + ", producerGUID: " + this.producerGUID + ", numberOfPartitions: " + this.numberOfPartitions + "}";
    }

    public GUID getProducerGUID() {
        return this.producerGUID;
    }

    @Override // com.linkedin.venice.writer.AbstractVeniceWriter
    public String getTopicName() {
        return this.topicName;
    }

    public Future<PubSubProduceResult> delete(K k, PubSubProducerCallback pubSubProducerCallback) {
        return delete((VeniceWriter<K, V, U>) k, pubSubProducerCallback, DEFAULT_LEADER_METADATA_WRAPPER, -2L, (DeleteMetadata) null);
    }

    public Future<PubSubProduceResult> delete(K k, long j, PubSubProducerCallback pubSubProducerCallback) {
        return delete((VeniceWriter<K, V, U>) k, pubSubProducerCallback, DEFAULT_LEADER_METADATA_WRAPPER, j, (DeleteMetadata) null);
    }

    public Future<PubSubProduceResult> delete(K k, PubSubProducerCallback pubSubProducerCallback, LeaderMetadataWrapper leaderMetadataWrapper) {
        return delete((VeniceWriter<K, V, U>) k, pubSubProducerCallback, leaderMetadataWrapper, -2L, (DeleteMetadata) null);
    }

    public Future<PubSubProduceResult> delete(K k, PubSubProducerCallback pubSubProducerCallback, LeaderMetadataWrapper leaderMetadataWrapper, long j) {
        return delete((VeniceWriter<K, V, U>) k, pubSubProducerCallback, leaderMetadataWrapper, j, (DeleteMetadata) null);
    }

    public Future<PubSubProduceResult> delete(K k, PubSubProducerCallback pubSubProducerCallback, LeaderMetadataWrapper leaderMetadataWrapper, DeleteMetadata deleteMetadata) {
        return delete((VeniceWriter<K, V, U>) k, pubSubProducerCallback, leaderMetadataWrapper, -2L, deleteMetadata);
    }

    @Override // com.linkedin.venice.writer.AbstractVeniceWriter
    public Future<PubSubProduceResult> delete(K k, PubSubProducerCallback pubSubProducerCallback, DeleteMetadata deleteMetadata) {
        return delete((VeniceWriter<K, V, U>) k, pubSubProducerCallback, DEFAULT_LEADER_METADATA_WRAPPER, -2L, deleteMetadata);
    }

    private Future<PubSubProduceResult> delete(K k, PubSubProducerCallback pubSubProducerCallback, LeaderMetadataWrapper leaderMetadataWrapper, long j, DeleteMetadata deleteMetadata) {
        byte[] serialize = this.keySerializer.serialize(this.topicName, k);
        int partition = getPartition(serialize);
        this.isChunkingFlagInvoked = true;
        int serializedSize = deleteMetadata == null ? 0 : deleteMetadata.getSerializedSize();
        if (serialize.length + serializedSize > this.maxSizeForUserPayloadPerMessageInBytes) {
            throw new RecordTooLargeException("This record exceeds the maximum size. " + getSizeReport(serialize.length, 0, serializedSize));
        }
        if (this.isChunkingEnabled) {
            serialize = this.keyWithChunkingSuffixSerializer.serializeNonChunkedKey(serialize);
        }
        if (pubSubProducerCallback instanceof ChunkAwareCallback) {
            ((ChunkAwareCallback) pubSubProducerCallback).setChunkingInfo(serialize, null, null, null, null);
        }
        KafkaKey kafkaKey = new KafkaKey(MessageType.DELETE, serialize);
        Delete delete = new Delete();
        if (deleteMetadata == null) {
            delete.schemaId = -1;
            delete.replicationMetadataVersionId = -1;
            delete.replicationMetadataPayload = EMPTY_BYTE_BUFFER;
        } else {
            delete.schemaId = deleteMetadata.getValueSchemaId();
            delete.replicationMetadataVersionId = deleteMetadata.getRmdVersionId();
            delete.replicationMetadataPayload = deleteMetadata.getRmdPayload();
        }
        return sendMessage(producerMetadata -> {
            return kafkaKey;
        }, MessageType.DELETE, delete, partition, pubSubProducerCallback, leaderMetadataWrapper, j);
    }

    @Override // com.linkedin.venice.writer.AbstractVeniceWriter
    public Future<PubSubProduceResult> put(K k, V v, int i, PubSubProducerCallback pubSubProducerCallback) {
        return put(k, v, i, pubSubProducerCallback, DEFAULT_LEADER_METADATA_WRAPPER, -2L, null);
    }

    @Override // com.linkedin.venice.writer.AbstractVeniceWriter
    public Future<PubSubProduceResult> put(K k, V v, int i, PubSubProducerCallback pubSubProducerCallback, PutMetadata putMetadata) {
        return put(k, v, i, pubSubProducerCallback, DEFAULT_LEADER_METADATA_WRAPPER, -2L, putMetadata);
    }

    public Future<PubSubProduceResult> put(K k, V v, int i, long j, PubSubProducerCallback pubSubProducerCallback) {
        return put(k, v, i, pubSubProducerCallback, DEFAULT_LEADER_METADATA_WRAPPER, j, null);
    }

    public Future<PubSubProduceResult> put(K k, V v, int i, PubSubProducerCallback pubSubProducerCallback, LeaderMetadataWrapper leaderMetadataWrapper) {
        return put(k, v, i, pubSubProducerCallback, leaderMetadataWrapper, -2L, null);
    }

    public Future<PubSubProduceResult> put(K k, V v, int i, PubSubProducerCallback pubSubProducerCallback, LeaderMetadataWrapper leaderMetadataWrapper, long j, PutMetadata putMetadata) {
        byte[] serialize = this.keySerializer.serialize(this.topicName, k);
        byte[] serialize2 = this.valueSerializer.serialize(this.topicName, v);
        int partition = getPartition(serialize);
        int serializedSize = putMetadata == null ? 0 : putMetadata.getSerializedSize();
        this.isChunkingFlagInvoked = true;
        if (serialize.length + serialize2.length + serializedSize > this.maxSizeForUserPayloadPerMessageInBytes) {
            if (this.isChunkingEnabled) {
                return putLargeValue(serialize, serialize2, i, pubSubProducerCallback, partition, leaderMetadataWrapper, j, putMetadata);
            }
            throw new RecordTooLargeException("This record exceeds the maximum size. " + getSizeReport(serialize.length, serialize2.length, serializedSize));
        }
        if (this.isChunkingEnabled) {
            serialize = this.keyWithChunkingSuffixSerializer.serializeNonChunkedKey(serialize);
        }
        if (pubSubProducerCallback instanceof ChunkAwareCallback) {
            ((ChunkAwareCallback) pubSubProducerCallback).setChunkingInfo(serialize, null, null, null, null);
        }
        KafkaKey kafkaKey = new KafkaKey(MessageType.PUT, serialize);
        Put put = new Put();
        put.putValue = ByteBuffer.wrap(serialize2);
        put.schemaId = i;
        if (putMetadata == null) {
            put.replicationMetadataVersionId = -1;
            put.replicationMetadataPayload = EMPTY_BYTE_BUFFER;
        } else {
            put.replicationMetadataVersionId = putMetadata.getRmdVersionId();
            put.replicationMetadataPayload = putMetadata.getRmdPayload();
        }
        return sendMessage(producerMetadata -> {
            return kafkaKey;
        }, MessageType.PUT, put, partition, pubSubProducerCallback, leaderMetadataWrapper, j);
    }

    @Deprecated
    public Future<PubSubProduceResult> put(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, PubSubProducerCallback pubSubProducerCallback, int i, LeaderMetadataWrapper leaderMetadataWrapper) {
        verifyChunkingSetting(kafkaMessageEnvelope);
        byte[] key = kafkaKey.getKey();
        KafkaMessageEnvelopeProvider kafkaMessageEnvelopeProvider = getKafkaMessageEnvelopeProvider(kafkaMessageEnvelope, leaderMetadataWrapper);
        if (pubSubProducerCallback instanceof ChunkAwareCallback) {
            ((ChunkAwareCallback) pubSubProducerCallback).setChunkingInfo(key, null, null, null, null);
        }
        return sendMessage(producerMetadata -> {
            return kafkaKey;
        }, kafkaMessageEnvelopeProvider, i, pubSubProducerCallback, false);
    }

    private KafkaMessageEnvelopeProvider getKafkaMessageEnvelopeProvider(KafkaMessageEnvelope kafkaMessageEnvelope, LeaderMetadataWrapper leaderMetadataWrapper) {
        LeaderMetadata leaderMetadata = new LeaderMetadata();
        leaderMetadata.upstreamOffset = leaderMetadataWrapper.getUpstreamOffset();
        leaderMetadata.upstreamKafkaClusterId = leaderMetadataWrapper.getUpstreamKafkaClusterId();
        leaderMetadata.hostName = this.writerId;
        kafkaMessageEnvelope.leaderMetadataFooter = leaderMetadata;
        return () -> {
            return kafkaMessageEnvelope;
        };
    }

    public Future<PubSubProduceResult> delete(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, PubSubProducerCallback pubSubProducerCallback, int i, LeaderMetadataWrapper leaderMetadataWrapper) {
        verifyChunkingSetting(kafkaMessageEnvelope);
        KafkaMessageEnvelopeProvider kafkaMessageEnvelopeProvider = getKafkaMessageEnvelopeProvider(kafkaMessageEnvelope, leaderMetadataWrapper);
        if (pubSubProducerCallback instanceof ChunkAwareCallback) {
            ((ChunkAwareCallback) pubSubProducerCallback).setChunkingInfo(kafkaKey.getKey(), null, null, null, null);
        }
        return sendMessage(producerMetadata -> {
            return kafkaKey;
        }, kafkaMessageEnvelopeProvider, i, pubSubProducerCallback, false);
    }

    @Override // com.linkedin.venice.writer.AbstractVeniceWriter
    public Future<PubSubProduceResult> update(K k, U u, int i, int i2, PubSubProducerCallback pubSubProducerCallback) {
        return update(k, u, i, i2, pubSubProducerCallback, -2L);
    }

    public Future<PubSubProduceResult> update(K k, U u, int i, int i2, PubSubProducerCallback pubSubProducerCallback, long j) {
        this.isChunkingFlagInvoked = true;
        if (this.isChunkingEnabled) {
            throw new VeniceException("Chunking is not supported for update operation in VeniceWriter");
        }
        byte[] serialize = this.keySerializer.serialize(this.topicName, k);
        byte[] serialize2 = this.writeComputeSerializer.serialize(this.topicName, u);
        int partition = getPartition(serialize);
        if (serialize.length + serialize2.length > 972800) {
            throw new RecordTooLargeException("This partial update exceeds the maximum size. " + getSizeReport(serialize.length, serialize2.length, 0));
        }
        KafkaKey kafkaKey = new KafkaKey(MessageType.UPDATE, serialize);
        Update update = new Update();
        update.updateValue = ByteBuffer.wrap(serialize2);
        update.schemaId = i;
        update.updateSchemaId = i2;
        return sendMessage(producerMetadata -> {
            return kafkaKey;
        }, MessageType.UPDATE, update, partition, pubSubProducerCallback, DEFAULT_LEADER_METADATA_WRAPPER, j);
    }

    public void broadcastStartOfPush(Map<String, String> map) {
        broadcastStartOfPush(false, map);
    }

    public void broadcastStartOfPush(boolean z, Map<String, String> map) {
        broadcastStartOfPush(z, false, CompressionStrategy.NO_OP, map);
    }

    public void broadcastStartOfPush(boolean z, boolean z2, CompressionStrategy compressionStrategy, Map<String, String> map) {
        broadcastStartOfPush(z, z2, compressionStrategy, Optional.empty(), map);
    }

    public void broadcastStartOfPush(boolean z, boolean z2, CompressionStrategy compressionStrategy, Optional<ByteBuffer> optional, Map<String, String> map) {
        ControlMessage emptyControlMessage = getEmptyControlMessage(ControlMessageType.START_OF_PUSH);
        StartOfPush startOfPush = new StartOfPush();
        startOfPush.sorted = z;
        startOfPush.chunked = z2;
        startOfPush.compressionStrategy = compressionStrategy.getValue();
        startOfPush.compressionDictionary = optional.orElse(null);
        emptyControlMessage.controlMessageUnion = startOfPush;
        broadcastControlMessage(emptyControlMessage, map);
        this.producerAdapter.flush();
    }

    public void broadcastEndOfPush(Map<String, String> map) {
        broadcastControlMessage(getEmptyControlMessage(ControlMessageType.END_OF_PUSH), map);
        endAllSegments(true);
    }

    public void broadcastTopicSwitch(@Nonnull List<CharSequence> list, @Nonnull String str, @Nonnull Long l, Map<String, String> map) {
        Validate.notNull(list);
        Validate.notEmpty(str);
        Validate.notNull(l);
        ControlMessage emptyControlMessage = getEmptyControlMessage(ControlMessageType.TOPIC_SWITCH);
        TopicSwitch topicSwitch = new TopicSwitch();
        topicSwitch.sourceKafkaServers = list;
        topicSwitch.sourceTopicName = str;
        topicSwitch.rewindStartTimestamp = l.longValue();
        emptyControlMessage.controlMessageUnion = topicSwitch;
        broadcastControlMessage(emptyControlMessage, map);
        this.producerAdapter.flush();
    }

    public void broadcastVersionSwap(@Nonnull String str, @Nonnull String str2, Map<String, String> map) {
        Validate.notEmpty(str);
        Validate.notEmpty(str2);
        ControlMessage emptyControlMessage = getEmptyControlMessage(ControlMessageType.VERSION_SWAP);
        VersionSwap versionSwap = new VersionSwap();
        versionSwap.oldServingVersionTopic = str;
        versionSwap.newServingVersionTopic = str2;
        emptyControlMessage.controlMessageUnion = versionSwap;
        broadcastControlMessage(emptyControlMessage, map);
        this.producerAdapter.flush();
    }

    public void broadcastStartOfIncrementalPush(String str, Map<String, String> map) {
        ControlMessage emptyControlMessage = getEmptyControlMessage(ControlMessageType.START_OF_INCREMENTAL_PUSH);
        StartOfIncrementalPush startOfIncrementalPush = new StartOfIncrementalPush();
        startOfIncrementalPush.version = str;
        emptyControlMessage.controlMessageUnion = startOfIncrementalPush;
        broadcastControlMessage(emptyControlMessage, map);
        this.producerAdapter.flush();
    }

    public void broadcastEndOfIncrementalPush(String str, Map<String, String> map) {
        ControlMessage emptyControlMessage = getEmptyControlMessage(ControlMessageType.END_OF_INCREMENTAL_PUSH);
        EndOfIncrementalPush endOfIncrementalPush = new EndOfIncrementalPush();
        endOfIncrementalPush.version = str;
        emptyControlMessage.controlMessageUnion = endOfIncrementalPush;
        broadcastControlMessage(emptyControlMessage, map);
        this.producerAdapter.flush();
    }

    private void verifyChunkingSetting(KafkaMessageEnvelope kafkaMessageEnvelope) {
        if (this.isChunkingSet || !MessageType.valueOf(kafkaMessageEnvelope.messageType).equals(MessageType.CONTROL_MESSAGE)) {
            return;
        }
        ControlMessage controlMessage = (ControlMessage) kafkaMessageEnvelope.payloadUnion;
        if (ControlMessageType.valueOf(controlMessage).equals(ControlMessageType.START_OF_PUSH)) {
            StartOfPush startOfPush = (StartOfPush) controlMessage.controlMessageUnion;
            synchronized (this) {
                if (!this.isChunkingSet) {
                    if (this.isChunkingFlagInvoked) {
                        throw new VeniceException("Chunking enabled config shouldn't be updated after VeniceWriter has explicitly produced a regular or chunked message");
                    }
                    this.logger.info("Chunking enabled config is updated from {} to {}", Boolean.valueOf(this.isChunkingEnabled), Boolean.valueOf(startOfPush.chunked));
                    if (this.isChunkingEnabled != startOfPush.chunked) {
                        this.isChunkingEnabled = startOfPush.chunked;
                    }
                    this.isChunkingSet = true;
                }
            }
        }
    }

    public void closePartition(int i) {
        if (this.segments[i] != null) {
            this.logger.info("Closing partition: {} in VeniceWriter.", Integer.valueOf(i));
            endSegment(i, true);
        }
    }

    private Future<PubSubProduceResult> sendMessage(KeyProvider keyProvider, MessageType messageType, Object obj, int i, PubSubProducerCallback pubSubProducerCallback, LeaderMetadataWrapper leaderMetadataWrapper, long j) {
        return sendMessage(keyProvider, messageType, obj, false, i, pubSubProducerCallback, true, leaderMetadataWrapper, j);
    }

    private Future<PubSubProduceResult> sendMessage(KeyProvider keyProvider, MessageType messageType, Object obj, boolean z, int i, PubSubProducerCallback pubSubProducerCallback, boolean z2, LeaderMetadataWrapper leaderMetadataWrapper, long j) {
        return sendMessage(keyProvider, () -> {
            KafkaMessageEnvelope kafkaMessageEnvelope = getKafkaMessageEnvelope(messageType, z, i, z2, leaderMetadataWrapper, j);
            kafkaMessageEnvelope.payloadUnion = obj;
            return kafkaMessageEnvelope;
        }, i, pubSubProducerCallback, z2);
    }

    private Future<PubSubProduceResult> sendMessage(KeyProvider keyProvider, KafkaMessageEnvelopeProvider kafkaMessageEnvelopeProvider, int i, PubSubProducerCallback pubSubProducerCallback, boolean z) {
        Future<PubSubProduceResult> sendMessage;
        synchronized (this.partitionLocks[i]) {
            KafkaMessageEnvelope kafkaMessageEnvelope = kafkaMessageEnvelopeProvider.getKafkaMessageEnvelope();
            KafkaKey key = keyProvider.getKey(kafkaMessageEnvelope.producerMetadata);
            if (z) {
                Segment segment = this.segments[i];
                if (segment == null) {
                    throw new VeniceException("segmentMap does not contain partition " + i + " for topic " + this.topicName);
                }
                segment.addToCheckSum(key, kafkaMessageEnvelope);
            }
            PubSubProducerCallback pubSubProducerCallback2 = pubSubProducerCallback;
            if (pubSubProducerCallback == null) {
                pubSubProducerCallback2 = new SendMessageErrorLoggerCallback(kafkaMessageEnvelope, this.logger);
            } else if (pubSubProducerCallback instanceof CompletableFutureCallback) {
                CompletableFutureCallback completableFutureCallback = (CompletableFutureCallback) pubSubProducerCallback;
                if (completableFutureCallback.getCallback() == null) {
                    completableFutureCallback.setCallback(new SendMessageErrorLoggerCallback(kafkaMessageEnvelope, this.logger));
                }
            }
            try {
                sendMessage = this.producerAdapter.sendMessage(this.topicName, Integer.valueOf(i), key, kafkaMessageEnvelope, getHeaders(kafkaMessageEnvelope.getProducerMetadata()), pubSubProducerCallback2);
            } catch (Exception e) {
                if (ExceptionUtils.recursiveClassEquals(e, TopicAuthorizationException.class)) {
                    throw new TopicAuthorizationVeniceException("You do not have permission to write to this store. Please check that ACLs are set correctly.", e);
                }
                throw e;
            }
        }
        return sendMessage;
    }

    private PubSubMessageHeaders getHeaders(ProducerMetadata producerMetadata) {
        return (producerMetadata.getSegmentNumber() == 0 && producerMetadata.getMessageSequenceNumber() == 0) ? this.protocolSchemaHeaders : EMPTY_MSG_HEADERS;
    }

    private Future<PubSubProduceResult> putLargeValue(byte[] bArr, byte[] bArr2, int i, PubSubProducerCallback pubSubProducerCallback, int i2, LeaderMetadataWrapper leaderMetadataWrapper, long j, PutMetadata putMetadata) {
        ChunkedPayloadAndManifest chunkedPayloadAndManifest;
        int serializedSize = putMetadata == null ? 0 : putMetadata.getSerializedSize();
        Supplier supplier = () -> {
            return getSizeReport(bArr.length, bArr2.length, serializedSize);
        };
        ErrorPropagationCallback errorPropagationCallback = pubSubProducerCallback == null ? null : new ErrorPropagationCallback(pubSubProducerCallback);
        BiConsumer biConsumer = (keyProvider, put) -> {
            sendMessage(keyProvider, MessageType.PUT, put, i2, errorPropagationCallback, DEFAULT_LEADER_METADATA_WRAPPER, -1L);
        };
        ChunkedPayloadAndManifest chunkPayloadAndSend = WriterChunkingHelper.chunkPayloadAndSend(bArr, bArr2, true, i, 0, pubSubProducerCallback instanceof ChunkAwareCallback, supplier, this.maxSizeForUserPayloadPerMessageInBytes, this.keyWithChunkingSuffixSerializer, biConsumer);
        int size = chunkPayloadAndSend.getChunkedValueManifest().keysWithChunkIdSuffix.size();
        if (this.isRmdChunkingEnabled) {
            chunkedPayloadAndManifest = WriterChunkingHelper.chunkPayloadAndSend(bArr, putMetadata == null ? EMPTY_BYTE_ARRAY : ByteUtils.extractByteArray(putMetadata.getRmdPayload()), false, i, size, pubSubProducerCallback instanceof ChunkAwareCallback, supplier, this.maxSizeForUserPayloadPerMessageInBytes, this.keyWithChunkingSuffixSerializer, biConsumer);
        } else {
            chunkedPayloadAndManifest = EMPTY_CHUNKED_PAYLOAD_AND_MANIFEST;
        }
        ChunkedPayloadAndManifest chunkedPayloadAndManifest2 = chunkedPayloadAndManifest;
        byte[] serializeNonChunkedKey = this.keyWithChunkingSuffixSerializer.serializeNonChunkedKey(bArr);
        KeyProvider keyProvider2 = producerMetadata -> {
            return new KafkaKey(MessageType.PUT, serializeNonChunkedKey);
        };
        Put put2 = new Put();
        put2.putValue = this.chunkedValueManifestSerializer.serialize(chunkPayloadAndSend.getChunkedValueManifest());
        put2.schemaId = AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion();
        if (putMetadata == null) {
            put2.replicationMetadataVersionId = -1;
            put2.replicationMetadataPayload = EMPTY_BYTE_BUFFER;
        } else {
            put2.replicationMetadataVersionId = putMetadata.getRmdVersionId();
            put2.replicationMetadataPayload = this.isRmdChunkingEnabled ? this.chunkedValueManifestSerializer.serialize(chunkedPayloadAndManifest2.getChunkedValueManifest()) : putMetadata.getRmdPayload();
        }
        if (put2.putValue.remaining() + put2.replicationMetadataPayload.remaining() > this.maxSizeForUserPayloadPerMessageInBytes - bArr.length) {
            throw new VeniceException("This message cannot be chunked, because even its manifest is too big to go through. Please reconsider your life choices. " + ((String) supplier.get()));
        }
        if (pubSubProducerCallback instanceof ChunkAwareCallback) {
            ((ChunkAwareCallback) pubSubProducerCallback).setChunkingInfo(serializeNonChunkedKey, chunkPayloadAndSend.getPayloadChunks(), chunkPayloadAndSend.getChunkedValueManifest(), chunkedPayloadAndManifest2.getPayloadChunks(), chunkedPayloadAndManifest2.getChunkedValueManifest());
        }
        return sendMessage(keyProvider2, MessageType.PUT, put2, i2, pubSubProducerCallback, leaderMetadataWrapper, j);
    }

    private String getSizeReport(int i, int i2, int i3) {
        return "Key size: " + i + " bytes, Value size: " + i2 + " bytes, Replication Metadata size: " + i3 + " bytes, Total payload size: " + (i + i2 + i3) + " bytes, Max available payload size: " + this.maxSizeForUserPayloadPerMessageInBytes + " bytes.";
    }

    private void sendStartOfSegment(int i, Map<String, String> map) {
        ControlMessage controlMessage = new ControlMessage();
        controlMessage.controlMessageType = ControlMessageType.START_OF_SEGMENT.getValue();
        StartOfSegment startOfSegment = new StartOfSegment();
        startOfSegment.checksumType = this.checkSumType.getValue();
        startOfSegment.upcomingAggregates = new ArrayList();
        controlMessage.controlMessageUnion = startOfSegment;
        sendControlMessage(controlMessage, i, map, null, DEFAULT_LEADER_METADATA_WRAPPER);
    }

    private void sendEndOfSegment(int i, Map<String, String> map, boolean z) {
        ControlMessage controlMessage = new ControlMessage();
        controlMessage.controlMessageType = ControlMessageType.END_OF_SEGMENT.getValue();
        EndOfSegment endOfSegment = new EndOfSegment();
        endOfSegment.checksumValue = ByteBuffer.wrap(this.segments[i].getFinalCheckSum());
        endOfSegment.computedAggregates = new ArrayList();
        endOfSegment.finalSegment = z;
        controlMessage.controlMessageUnion = endOfSegment;
        sendControlMessage(controlMessage, i, map, null, DEFAULT_LEADER_METADATA_WRAPPER);
    }

    private ControlMessage getEmptyControlMessage(ControlMessageType controlMessageType) {
        ControlMessage controlMessage = new ControlMessage();
        controlMessage.controlMessageType = controlMessageType.getValue();
        controlMessage.controlMessageUnion = controlMessageType.getNewInstance();
        return controlMessage;
    }

    private void broadcastControlMessage(ControlMessage controlMessage, Map<String, String> map) {
        for (int i = 0; i < this.numberOfPartitions; i++) {
            sendControlMessage(controlMessage, i, map, null, DEFAULT_LEADER_METADATA_WRAPPER);
        }
        this.logger.info("Successfully broadcast {} Control Message for topic: {}", ControlMessageType.valueOf(controlMessage), this.topicName);
    }

    private Map<CharSequence, CharSequence> getDebugInfo(Map<String, String> map) {
        if (map == null || map.isEmpty()) {
            return this.defaultDebugInfo;
        }
        HashMap hashMap = new HashMap(this.defaultDebugInfo);
        map.entrySet().stream().forEach(entry -> {
            String str = (String) entry.getKey();
            String str2 = (String) entry.getValue();
            CharSequence charSequence = (CharSequence) hashMap.get(str);
            if (charSequence == null || charSequence.equals(new Utf8(str2))) {
                hashMap.put(str, str2);
            } else {
                this.logger.warn("Debug info key: '{}' will be omitted because it is already part of the default {} debug info. Default value: '{}', supplied (omitted) value: '{}'", str, getClass().getSimpleName(), charSequence, str2);
            }
        });
        return hashMap;
    }

    public void sendControlMessage(ControlMessage controlMessage, int i, Map<String, String> map, PubSubProducerCallback pubSubProducerCallback, LeaderMetadataWrapper leaderMetadataWrapper) {
        synchronized (this.partitionLocks[i]) {
            controlMessage.debugInfo = getDebugInfo(map);
            int i2 = 1;
            boolean z = true;
            while (true) {
                try {
                    sendMessage(this::getControlMessageKey, MessageType.CONTROL_MESSAGE, controlMessage, ControlMessageType.valueOf(controlMessage).equals(ControlMessageType.END_OF_SEGMENT), i, pubSubProducerCallback, z, leaderMetadataWrapper, -1L).get();
                } catch (InterruptedException | ExecutionException e) {
                    if (e.getMessage() == null || !e.getMessage().contains(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())) {
                        if (e.getCause() != null && e.getCause().getClass().equals(TopicAuthorizationException.class)) {
                            throw new TopicAuthorizationVeniceException("You do not have permission to write to this store. Please check that ACLs are set correctly.", e);
                        }
                        throw new VeniceException("Got an exception while trying to send a control message (" + ControlMessageType.valueOf(controlMessage).name() + ")", e);
                    }
                    String str = "Caught a UNKNOWN_TOPIC_OR_PARTITION error, attempt " + i2 + "/" + this.maxAttemptsWhenTopicMissing;
                    if (i2 >= this.maxAttemptsWhenTopicMissing) {
                        throw new VeniceException(str + ", will bubble up.");
                    }
                    i2++;
                    z = false;
                    this.logger.warn("{}, will sleep {} ms before the next attempt.", str, Long.valueOf(this.sleepTimeMsWhenTopicMissing));
                }
            }
        }
    }

    public Future<PubSubProduceResult> asyncSendControlMessage(ControlMessage controlMessage, int i, Map<String, String> map, PubSubProducerCallback pubSubProducerCallback, LeaderMetadataWrapper leaderMetadataWrapper) {
        Future<PubSubProduceResult> sendMessage;
        synchronized (this.partitionLocks[i]) {
            controlMessage.debugInfo = getDebugInfo(map);
            sendMessage = sendMessage(this::getControlMessageKey, MessageType.CONTROL_MESSAGE, controlMessage, ControlMessageType.valueOf(controlMessage).equals(ControlMessageType.END_OF_SEGMENT), i, pubSubProducerCallback, true, leaderMetadataWrapper, -1L);
        }
        return sendMessage;
    }

    private KafkaKey getControlMessageKey(ProducerMetadata producerMetadata) {
        return new KafkaKey(MessageType.CONTROL_MESSAGE, ByteBuffer.allocate(CONTROL_MESSAGE_KAFKA_KEY_LENGTH).put(producerMetadata.producerGUID.bytes()).putInt(producerMetadata.segmentNumber).putInt(producerMetadata.messageSequenceNumber).array());
    }

    protected KafkaMessageEnvelope getKafkaMessageEnvelope(MessageType messageType, boolean z, int i, boolean z2, LeaderMetadataWrapper leaderMetadataWrapper, long j) {
        KafkaMessageEnvelope kafkaMessageEnvelope = new KafkaMessageEnvelope();
        kafkaMessageEnvelope.messageType = messageType.getValue();
        ProducerMetadata producerMetadata = new ProducerMetadata();
        producerMetadata.producerGUID = this.producerGUID;
        Segment segment = getSegment(i, z);
        producerMetadata.segmentNumber = segment.getSegmentNumber();
        if (z2) {
            producerMetadata.messageSequenceNumber = segment.getAndIncrementSequenceNumber();
        } else {
            producerMetadata.messageSequenceNumber = segment.getSequenceNumber();
        }
        producerMetadata.messageTimestamp = this.time.getMilliseconds();
        producerMetadata.logicalTimestamp = j;
        kafkaMessageEnvelope.producerMetadata = producerMetadata;
        kafkaMessageEnvelope.leaderMetadataFooter = new LeaderMetadata();
        kafkaMessageEnvelope.leaderMetadataFooter.hostName = this.writerId;
        kafkaMessageEnvelope.leaderMetadataFooter.upstreamOffset = leaderMetadataWrapper.getUpstreamOffset();
        kafkaMessageEnvelope.leaderMetadataFooter.upstreamKafkaClusterId = leaderMetadataWrapper.getUpstreamKafkaClusterId();
        return kafkaMessageEnvelope;
    }

    private int getPartition(byte[] bArr) {
        return this.partitioner.getPartitionId(bArr, this.numberOfPartitions);
    }

    private Segment getSegment(int i, boolean z) {
        Segment segment = this.segments[i];
        if (segment == null || segment.isEnded()) {
            segment = startSegment(i);
        } else if (this.elapsedTimeForClosingSegmentEnabled && !z) {
            long j = this.segmentsCreationTimeArray[i];
            if (j != -1 && LatencyUtils.getElapsedTimeInMs(j) > this.maxElapsedTimeForSegmentInMs) {
                this.segmentsCreationTimeArray[i] = -1;
                endSegment(i, true);
                segment = startSegment(i);
            }
        }
        return segment;
    }

    private Segment startSegment(int i) {
        Segment segment;
        synchronized (this.partitionLocks[i]) {
            Segment segment2 = this.segments[i];
            if (segment2 == null) {
                segment2 = new Segment(i, 0, this.checkSumType);
                this.segments[i] = segment2;
            } else if (segment2.isEnded()) {
                segment2 = new Segment(i, segment2.getSegmentNumber() + 1, this.checkSumType);
                this.segments[i] = segment2;
            }
            this.segmentsCreationTimeArray[i] = this.time.getMilliseconds();
            if (!segment2.isStarted()) {
                sendStartOfSegment(i, null);
                segment2.start();
            }
            segment = segment2;
        }
        return segment;
    }

    private void endAllSegments(boolean z) {
        for (int i = 0; i < this.segments.length; i++) {
            if (this.segments[i] != null) {
                endSegment(i, z);
            }
        }
    }

    public void endSegment(int i, boolean z) {
        synchronized (this.partitionLocks[i]) {
            Segment segment = this.segments[i];
            if (segment == null) {
                this.logger.warn("endSegment(partition {}) called but currentSegment == null. Ignoring.", Integer.valueOf(i));
            } else if (!segment.isStarted()) {
                this.logger.warn("endSegment(partition {}) called but currentSegment.begun == false. Ignoring.", Integer.valueOf(i));
            } else if (segment.isEnded()) {
                this.logger.warn("endSegment(partition {}) called but currentSegment.ended == true. Ignoring.", Integer.valueOf(i));
            } else {
                sendEndOfSegment(i, new HashMap(), z);
                segment.end(z);
            }
        }
    }

    public Time getTime() {
        return this.time;
    }

    public int getMaxSizeForUserPayloadPerMessageInBytes() {
        return this.maxSizeForUserPayloadPerMessageInBytes;
    }

    public String getDestination() {
        return this.topicName + "@" + this.producerAdapter.getBrokerAddress();
    }
}
