package com.linkedin.venice.producer;

import com.linkedin.venice.ConfigKeys;
import com.linkedin.venice.SSLConfig;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.pubsub.api.PubSubProducerCallback;
import com.linkedin.venice.schema.SchemaReader;
import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.SchemaPresenceChecker;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.serializer.RecordSerializer;
import com.linkedin.venice.stats.ThreadPoolStats;
import com.linkedin.venice.utils.PartitionUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.BlockingQueueType;
import com.linkedin.venice.utils.concurrent.ThreadPoolFactory;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import com.linkedin.venice.writer.VeniceWriterOptions;
import com.linkedin.venice.writer.update.UpdateBuilder;
import com.linkedin.venice.writer.update.UpdateBuilderImpl;
import io.tehuti.metrics.MetricsRepository;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericContainer;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/producer/AbstractVeniceProducer.class */
public abstract class AbstractVeniceProducer<K, V> implements VeniceProducer<K, V> {
    private VeniceProperties producerConfigs;
    private boolean configured = false;
    private boolean closed = false;
    private VeniceProducerMetrics producerMetrics;
    private SchemaReader schemaReader;
    private ThreadPoolExecutor producerExecutor;
    private VeniceWriter<byte[], byte[], byte[]> veniceWriter;
    private RecordSerializer<Object> keySerializer;
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) AbstractVeniceProducer.class);
    private static final DurableWrite DURABLE_WRITE = new DurableWrite();
    private static final Schema STRING_SCHEMA = Schema.create(Schema.Type.STRING);
    private static final Schema INT_SCHEMA = Schema.create(Schema.Type.INT);
    private static final Schema LONG_SCHEMA = Schema.create(Schema.Type.LONG);
    private static final Schema FLOAT_SCHEMA = Schema.create(Schema.Type.FLOAT);
    private static final Schema DOUBLE_SCHEMA = Schema.create(Schema.Type.DOUBLE);
    private static final Schema BYTES_SCHEMA = Schema.create(Schema.Type.BYTES);
    private static final Schema BOOL_SCHEMA = Schema.create(Schema.Type.BOOLEAN);

    /* JADX INFO: Access modifiers changed from: protected */
    public void configure(String str, VeniceProperties veniceProperties, MetricsRepository metricsRepository, SchemaReader schemaReader, SchemaReader schemaReader2) {
        this.configured = true;
        this.producerConfigs = veniceProperties;
        this.schemaReader = schemaReader;
        if (schemaReader2 != null) {
            new SchemaPresenceChecker(schemaReader2, AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE).verifySchemaVersionPresentOrExit();
            LOGGER.info("Successfully verified the latest protocols at runtime are valid in Venice backend.");
        }
        this.producerMetrics = new VeniceProducerMetrics(metricsRepository, str);
        this.producerExecutor = ThreadPoolFactory.createThreadPool(veniceProperties.getInt(ConfigKeys.CLIENT_PRODUCER_THREAD_NUM, 10), "ClientProducer", Integer.MAX_VALUE, BlockingQueueType.LINKED_BLOCKING_QUEUE);
        if (metricsRepository != null) {
            new ThreadPoolStats(metricsRepository, this.producerExecutor, "client_producer_thread_pool");
        }
        this.keySerializer = getSerializer(schemaReader.getKeySchema());
        this.veniceWriter = getVeniceWriter(requestTopic());
    }

    private VeniceWriter<byte[], byte[], byte[]> getVeniceWriter(VersionCreationResponse versionCreationResponse) {
        Properties properties = new Properties();
        if (versionCreationResponse.isEnableSSL()) {
            properties.put("kafka.over.ssl", "true");
            properties.put("ssl.kafka.bootstrap.servers", versionCreationResponse.getKafkaBootstrapServers());
            properties.put("security.protocol", this.producerConfigs.getString("security.protocol"));
            properties.putAll(new SSLConfig(this.producerConfigs).getKafkaSSLConfig());
        } else {
            properties.put("kafka.bootstrap.servers", versionCreationResponse.getKafkaBootstrapServers());
        }
        return getVeniceWriter(versionCreationResponse, properties);
    }

    private VeniceWriter<byte[], byte[], byte[]> getVeniceWriter(VersionCreationResponse versionCreationResponse, Properties properties) {
        int amplificationFactor = versionCreationResponse.getAmplificationFactor();
        Integer valueOf = Integer.valueOf(versionCreationResponse.getPartitions());
        Properties properties2 = new Properties();
        properties2.putAll(versionCreationResponse.getPartitionerParams());
        return constructVeniceWriter(properties, new VeniceWriterOptions.Builder(versionCreationResponse.getKafkaTopic()).setPartitioner(PartitionUtils.getVenicePartitioner(versionCreationResponse.getPartitionerClass(), amplificationFactor, new VeniceProperties(properties2))).setPartitionCount(valueOf).setChunkingEnabled(false).build());
    }

    protected VeniceWriter<byte[], byte[], byte[]> constructVeniceWriter(Properties properties, VeniceWriterOptions veniceWriterOptions) {
        return new VeniceWriterFactory(properties).createVeniceWriter(veniceWriterOptions);
    }

    protected RecordSerializer<Object> getSerializer(Schema schema) {
        return FastSerializerDeserializerFactory.getAvroGenericSerializer(schema);
    }

    private static Schema getSchemaFromObject(Object obj) {
        if (obj instanceof GenericContainer) {
            return ((GenericContainer) obj).getSchema();
        }
        if (obj instanceof CharSequence) {
            return STRING_SCHEMA;
        }
        if (obj instanceof Integer) {
            return INT_SCHEMA;
        }
        if (obj instanceof Long) {
            return LONG_SCHEMA;
        }
        if (obj instanceof Double) {
            return DOUBLE_SCHEMA;
        }
        if (obj instanceof Float) {
            return FLOAT_SCHEMA;
        }
        if ((obj instanceof byte[]) || (obj instanceof ByteBuffer)) {
            return BYTES_SCHEMA;
        }
        if (obj instanceof Boolean) {
            return BOOL_SCHEMA;
        }
        throw new VeniceException("Venice Producer only supports Avro objects, and primitives, found object of class: " + obj.getClass().toString());
    }

    @Override // com.linkedin.venice.producer.VeniceProducer
    public CompletableFuture<DurableWrite> asyncPut(K k, V v) {
        return asyncPutInternal(-2L, k, v);
    }

    @Override // com.linkedin.venice.producer.VeniceProducer
    public CompletableFuture<DurableWrite> asyncPut(long j, K k, V v) {
        return j < 0 ? getFutureCompletedExceptionally("Logical time must be a non-negative value. Got: " + j) : asyncPutInternal(j, k, v);
    }

    private CompletableFuture<DurableWrite> asyncPutInternal(long j, K k, V v) {
        String validateProducer = validateProducer();
        if (!StringUtils.isEmpty(validateProducer)) {
            return getFutureCompletedExceptionally(validateProducer);
        }
        this.producerMetrics.recordPutRequest();
        return CompletableFuture.supplyAsync(() -> {
            int i;
            try {
                Schema schemaFromObject = getSchemaFromObject(v);
                Exception exc = null;
                try {
                    i = this.schemaReader.getValueSchemaId(schemaFromObject);
                } catch (Exception e) {
                    i = -1;
                    exc = e;
                }
                if (i == -1) {
                    this.producerMetrics.recordFailedRequest();
                    throw new VeniceException("Could not find a registered schema id for schema: " + schemaFromObject + ". This might be transient if the schema has been registered recently.", exc);
                }
                CompletableFuture completableFuture = new CompletableFuture();
                Instant now = Instant.now();
                this.veniceWriter.put((VeniceWriter<byte[], byte[], byte[]>) this.keySerializer.serialize(k), getSerializer(schemaFromObject).serialize(v), i, j, (pubSubProduceResult, exc2) -> {
                    Duration between = Duration.between(now, Instant.now());
                    if (exc2 == null) {
                        this.producerMetrics.recordSuccessfulRequestWithLatency(between.toMillis());
                        completableFuture.complete(null);
                    } else {
                        this.producerMetrics.recordFailedRequest();
                        LOGGER.error("Failed to write the requested data to the PubSub system", (Throwable) exc2);
                        completableFuture.completeExceptionally(exc2);
                    }
                });
                try {
                    completableFuture.get();
                    return DURABLE_WRITE;
                } catch (InterruptedException | ExecutionException e2) {
                    throw new VeniceException(e2);
                }
            } catch (Exception e3) {
                this.producerMetrics.recordFailedRequest();
                throw e3;
            }
        }, this.producerExecutor);
    }

    @Override // com.linkedin.venice.producer.VeniceProducer
    public CompletableFuture<DurableWrite> asyncDelete(K k) {
        return asyncDeleteInternal(-2L, k);
    }

    @Override // com.linkedin.venice.producer.VeniceProducer
    public CompletableFuture<DurableWrite> asyncDelete(long j, K k) {
        return j < 0 ? getFutureCompletedExceptionally("Logical time must be a non-negative value. Got: " + j) : asyncDeleteInternal(j, k);
    }

    private CompletableFuture<DurableWrite> asyncDeleteInternal(long j, K k) {
        String validateProducer = validateProducer();
        if (!StringUtils.isEmpty(validateProducer)) {
            return getFutureCompletedExceptionally(validateProducer);
        }
        this.producerMetrics.recordDeleteRequest();
        return CompletableFuture.supplyAsync(() -> {
            CompletableFuture completableFuture = new CompletableFuture();
            Instant now = Instant.now();
            PubSubProducerCallback pubSubProducerCallback = (pubSubProduceResult, exc) -> {
                Duration between = Duration.between(now, Instant.now());
                if (exc == null) {
                    this.producerMetrics.recordSuccessfulRequestWithLatency(between.toMillis());
                    completableFuture.complete(null);
                } else {
                    this.producerMetrics.recordFailedRequest();
                    LOGGER.error("Failed to write the delete operation to the PubSub system", (Throwable) exc);
                    completableFuture.completeExceptionally(exc);
                }
            };
            this.veniceWriter.delete((VeniceWriter<byte[], byte[], byte[]>) this.keySerializer.serialize(k), j, pubSubProducerCallback);
            try {
                completableFuture.get();
                return DURABLE_WRITE;
            } catch (InterruptedException | ExecutionException e) {
                throw new VeniceException(e);
            }
        }, this.producerExecutor);
    }

    @Override // com.linkedin.venice.producer.VeniceProducer
    public CompletableFuture<DurableWrite> asyncUpdate(K k, Consumer<UpdateBuilder> consumer) {
        return asyncUpdateInternal(-2L, k, consumer);
    }

    @Override // com.linkedin.venice.producer.VeniceProducer
    public CompletableFuture<DurableWrite> asyncUpdate(long j, K k, Consumer<UpdateBuilder> consumer) {
        return j < 0 ? getFutureCompletedExceptionally("Logical time must be a non-negative value. Got: " + j) : asyncUpdateInternal(j, k, consumer);
    }

    private CompletableFuture<DurableWrite> asyncUpdateInternal(long j, K k, Consumer<UpdateBuilder> consumer) {
        String validateProducer = validateProducer();
        if (!StringUtils.isEmpty(validateProducer)) {
            return getFutureCompletedExceptionally(validateProducer);
        }
        this.producerMetrics.recordUpdateRequest();
        return CompletableFuture.supplyAsync(() -> {
            DerivedSchemaEntry latestUpdateSchema = this.schemaReader.getLatestUpdateSchema();
            if (latestUpdateSchema == null) {
                this.producerMetrics.recordFailedRequest();
                throw new VeniceException("Update schema not found. Check if partial update is enabled for the store. This error might also be transient if partial update has been enabled recently.");
            }
            Schema schema = latestUpdateSchema.getSchema();
            if (latestUpdateSchema.getValueSchemaID() == -1 || latestUpdateSchema.getId() == -1) {
                this.producerMetrics.recordFailedRequest();
                throw new VeniceException("Could not find a registered schema id for schema: " + schema + ". This might be transient if the schema has been registered recently.");
            }
            UpdateBuilderImpl updateBuilderImpl = new UpdateBuilderImpl(latestUpdateSchema.getSchema());
            consumer.accept(updateBuilderImpl);
            GenericRecord build = updateBuilderImpl.build();
            CompletableFuture completableFuture = new CompletableFuture();
            Instant now = Instant.now();
            this.veniceWriter.update(this.keySerializer.serialize(k), getSerializer(schema).serialize(build), latestUpdateSchema.getValueSchemaID(), latestUpdateSchema.getId(), (pubSubProduceResult, exc) -> {
                Duration between = Duration.between(now, Instant.now());
                if (exc == null) {
                    this.producerMetrics.recordSuccessfulRequestWithLatency(between.toMillis());
                    completableFuture.complete(null);
                } else {
                    this.producerMetrics.recordFailedRequest();
                    LOGGER.error("Failed to write the partial update record to the PubSub system", (Throwable) exc);
                    completableFuture.completeExceptionally(exc);
                }
            }, j);
            try {
                completableFuture.get();
                return DURABLE_WRITE;
            } catch (InterruptedException | ExecutionException e) {
                throw new VeniceException(e);
            }
        }, this.producerExecutor);
    }

    protected abstract VersionCreationResponse requestTopic();

    private String validateProducer() {
        return this.closed ? "Producer is already closed. New requests are not accepted." : !this.configured ? "Producer is not configured. Please call `configure`." : "";
    }

    private <D> CompletableFuture<D> getFutureCompletedExceptionally(String str) {
        CompletableFuture<D> completableFuture = new CompletableFuture<>();
        completableFuture.completeExceptionally(new VeniceException(str));
        return completableFuture;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.closed = true;
        this.producerExecutor.shutdownNow();
        try {
            this.producerExecutor.awaitTermination(60L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            LOGGER.warn("Caught InterruptedException while closing the Venice producer ExecutorService", (Throwable) e);
        }
        Utils.closeQuietlyWithErrorLogged(this.veniceWriter);
    }

    public boolean isClosed() {
        return this.closed;
    }
}
