package com.linkedin.venice.samza;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.d2.balancer.D2Client;
import com.linkedin.d2.balancer.D2ClientBuilder;
import com.linkedin.venice.D2.D2ClientUtils;
import com.linkedin.venice.authentication.ClientAuthenticationProvider;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.client.store.transport.D2TransportClient;
import com.linkedin.venice.client.store.transport.HttpTransportClient;
import com.linkedin.venice.client.store.transport.HttpsTransportClient;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.controllerapi.D2ControllerClient;
import com.linkedin.venice.controllerapi.D2ServiceDiscoveryResponse;
import com.linkedin.venice.controllerapi.MultiSchemaResponse;
import com.linkedin.venice.controllerapi.SchemaResponse;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.pushmonitor.HybridStoreQuotaStatus;
import com.linkedin.venice.pushmonitor.RouterBasedHybridStoreQuotaMonitor;
import com.linkedin.venice.pushmonitor.RouterBasedPushMonitor;
import com.linkedin.venice.schema.AvroSchemaParseUtils;
import com.linkedin.venice.schema.writecompute.WriteComputeHandlerV1;
import com.linkedin.venice.security.SSLFactory;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.SchemaPresenceChecker;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.service.ICProvider;
import com.linkedin.venice.utils.BoundedHashMap;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.PartitionUtils;
import com.linkedin.venice.utils.SystemTime;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.writer.CompletableFutureCallback;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.util.Utf8;
import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.samza.SamzaException;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemProducer;

/* loaded from: input_file:com/linkedin/venice/samza/VeniceSystemProducer.class */
public class VeniceSystemProducer implements SystemProducer, Closeable {
    private static final Logger LOGGER = LogManager.getLogger(VeniceSystemProducer.class);
    private static final Schema STRING_SCHEMA = Schema.create(Schema.Type.STRING);
    private static final Schema INT_SCHEMA = Schema.create(Schema.Type.INT);
    private static final Schema LONG_SCHEMA = Schema.create(Schema.Type.LONG);
    private static final Schema FLOAT_SCHEMA = Schema.create(Schema.Type.FLOAT);
    private static final Schema DOUBLE_SCHEMA = Schema.create(Schema.Type.DOUBLE);
    private static final Schema BYTES_SCHEMA = Schema.create(Schema.Type.BYTES);
    private static final Schema BOOL_SCHEMA = Schema.create(Schema.Type.BOOLEAN);
    private static final DatumWriter<Utf8> STRING_DATUM_WRITER = new GenericDatumWriter(STRING_SCHEMA);
    private static final DatumWriter<Integer> INT_DATUM_WRITER = new GenericDatumWriter(INT_SCHEMA);
    private static final DatumWriter<Long> LONG_DATUM_WRITER = new GenericDatumWriter(LONG_SCHEMA);
    private static final DatumWriter<Float> FLOAT_DATUM_WRITER = new GenericDatumWriter(FLOAT_SCHEMA);
    private static final DatumWriter<Double> DOUBLE_DATUM_WRITER = new GenericDatumWriter(DOUBLE_SCHEMA);
    private static final DatumWriter<ByteBuffer> BYTES_DATUM_WRITER = new GenericDatumWriter(BYTES_SCHEMA);
    private static final DatumWriter<Boolean> BOOL_DATUM_WRITER = new GenericDatumWriter(BOOL_SCHEMA);
    private static final WriteComputeHandlerV1 writeComputeHandlerV1 = new WriteComputeHandlerV1();
    private final String veniceChildD2ZkHost;
    private final String primaryControllerColoD2ZKHost;
    private final String primaryControllerD2ServiceName;
    private final String storeName;
    private final String samzaJobId;
    private final Version.PushType pushType;
    private final Optional<SSLFactory> sslFactory;
    private final VeniceSystemFactory factory;
    private final Optional<String> partitioners;
    private final Time time;
    private final String runningFabric;
    private final boolean verifyLatestProtocolPresent;
    private final Map<String, D2ClientEnvelope> d2ZkHostToClientEnvelopeMap;
    private final VeniceConcurrentHashMap<Schema, Pair<Integer, Integer>> valueSchemaToIdsMap;
    private final VeniceConcurrentHashMap<Pair<Integer, Integer>, Schema> valueSchemaIdsToSchemaMap;
    private Schema keySchema;
    private String canonicalKeySchemaStr;
    private final Map<Schema, String> canonicalSchemaStrCache;
    private D2Client primaryControllerColoD2Client;
    private D2Client childColoD2Client;
    private ControllerClient controllerClient;
    private String topicName;
    private String kafkaBootstrapServers;
    private boolean isWriteComputeEnabled;
    private boolean isChunkingEnabled;
    private boolean isStarted;
    private Optional<String> discoveryUrl;
    private Optional<String> routerUrl;
    private ClientAuthenticationProvider authenticationProvider;
    private VeniceWriter<byte[], byte[], byte[]> veniceWriter;
    private Optional<RouterBasedPushMonitor> pushMonitor;
    private Optional<RouterBasedHybridStoreQuotaMonitor> hybridStoreQuotaMonitor;
    private Map<String, String> additionalWriterConfigs;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.linkedin.venice.samza.VeniceSystemProducer$1, reason: invalid class name */
    /* loaded from: input_file:com/linkedin/venice/samza/VeniceSystemProducer$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$linkedin$venice$pushmonitor$ExecutionStatus;
        static final /* synthetic */ int[] $SwitchMap$com$linkedin$venice$pushmonitor$HybridStoreQuotaStatus = new int[HybridStoreQuotaStatus.values().length];

        static {
            try {
                $SwitchMap$com$linkedin$venice$pushmonitor$HybridStoreQuotaStatus[HybridStoreQuotaStatus.QUOTA_VIOLATED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$linkedin$venice$pushmonitor$HybridStoreQuotaStatus[HybridStoreQuotaStatus.QUOTA_NOT_VIOLATED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            $SwitchMap$com$linkedin$venice$pushmonitor$ExecutionStatus = new int[ExecutionStatus.values().length];
            try {
                $SwitchMap$com$linkedin$venice$pushmonitor$ExecutionStatus[ExecutionStatus.COMPLETED.ordinal()] = 1;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$linkedin$venice$pushmonitor$ExecutionStatus[ExecutionStatus.END_OF_PUSH_RECEIVED.ordinal()] = 2;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$linkedin$venice$pushmonitor$ExecutionStatus[ExecutionStatus.ERROR.ordinal()] = 3;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/venice/samza/VeniceSystemProducer$D2ClientEnvelope.class */
    public static final class D2ClientEnvelope implements Closeable {
        D2Client d2Client;
        String fsBasePath;

        D2ClientEnvelope(D2Client d2Client, String str) {
            this.d2Client = d2Client;
            this.fsBasePath = str;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            D2ClientUtils.shutdownClient(this.d2Client);
            try {
                FileUtils.deleteDirectory(new File(this.fsBasePath));
            } catch (IOException e) {
                VeniceSystemProducer.LOGGER.info("Error in cleaning up: {}", this.fsBasePath);
            }
        }
    }

    @Deprecated
    public VeniceSystemProducer(String str, String str2, String str3, Version.PushType pushType, String str4, String str5, boolean z, VeniceSystemFactory veniceSystemFactory, Optional<SSLFactory> optional, Optional<String> optional2) {
        this(str, str, str2, str3, pushType, str4, str5, z, veniceSystemFactory, optional, optional2);
    }

    public VeniceSystemProducer(String str, String str2, String str3, String str4, Version.PushType pushType, String str5, String str6, boolean z, VeniceSystemFactory veniceSystemFactory, Optional<SSLFactory> optional, Optional<String> optional2) {
        this(str, str2, str3, str4, pushType, str5, str6, z, veniceSystemFactory, optional, optional2, SystemTime.INSTANCE);
    }

    @Deprecated
    public VeniceSystemProducer(String str, String str2, String str3, Version.PushType pushType, String str4, String str5, boolean z, VeniceSystemFactory veniceSystemFactory, Optional<SSLFactory> optional, Optional<String> optional2, Time time) {
        this(str, str, str2, str3, pushType, str4, str5, z, veniceSystemFactory, optional, optional2, time);
    }

    public VeniceSystemProducer(String str, String str2, String str3, String str4, Version.PushType pushType, String str5, String str6, boolean z, VeniceSystemFactory veniceSystemFactory, Optional<SSLFactory> optional, Optional<String> optional2, Time time) {
        this.d2ZkHostToClientEnvelopeMap = new HashMap();
        this.valueSchemaToIdsMap = new VeniceConcurrentHashMap<>();
        this.valueSchemaIdsToSchemaMap = new VeniceConcurrentHashMap<>();
        this.canonicalSchemaStrCache = new BoundedHashMap(10, true);
        this.isWriteComputeEnabled = false;
        this.isChunkingEnabled = false;
        this.isStarted = false;
        this.discoveryUrl = Optional.empty();
        this.routerUrl = Optional.empty();
        this.veniceWriter = null;
        this.pushMonitor = Optional.empty();
        this.hybridStoreQuotaMonitor = Optional.empty();
        this.additionalWriterConfigs = new HashMap();
        this.veniceChildD2ZkHost = str;
        this.primaryControllerColoD2ZKHost = str2;
        this.primaryControllerD2ServiceName = str3;
        this.storeName = str4;
        this.pushType = pushType;
        this.samzaJobId = str5;
        this.runningFabric = str6;
        this.verifyLatestProtocolPresent = z;
        this.factory = veniceSystemFactory;
        this.sslFactory = optional;
        this.partitioners = optional2;
        this.time = time;
    }

    public VeniceSystemProducer(String str, String str2, Version.PushType pushType, String str3, String str4, boolean z, VeniceSystemFactory veniceSystemFactory, Optional<SSLFactory> optional, Optional<String> optional2, Time time) {
        this.d2ZkHostToClientEnvelopeMap = new HashMap();
        this.valueSchemaToIdsMap = new VeniceConcurrentHashMap<>();
        this.valueSchemaIdsToSchemaMap = new VeniceConcurrentHashMap<>();
        this.canonicalSchemaStrCache = new BoundedHashMap(10, true);
        this.isWriteComputeEnabled = false;
        this.isChunkingEnabled = false;
        this.isStarted = false;
        this.discoveryUrl = Optional.empty();
        this.routerUrl = Optional.empty();
        this.veniceWriter = null;
        this.pushMonitor = Optional.empty();
        this.hybridStoreQuotaMonitor = Optional.empty();
        this.additionalWriterConfigs = new HashMap();
        if (str == null || str.trim().isEmpty()) {
            throw new IllegalStateException("Discovery URL is not present");
        }
        this.discoveryUrl = Optional.of(str);
        this.veniceChildD2ZkHost = null;
        this.primaryControllerColoD2ZKHost = null;
        this.primaryControllerD2ServiceName = null;
        this.storeName = str2;
        this.pushType = pushType;
        this.samzaJobId = str3;
        this.runningFabric = str4;
        this.verifyLatestProtocolPresent = z;
        this.factory = veniceSystemFactory;
        this.sslFactory = optional;
        this.partitioners = optional2;
        this.time = time;
    }

    public void applyAdditionalWriterConfigs(Map<String, String> map) {
        this.additionalWriterConfigs.putAll(map);
    }

    public void setAuthenticationProvider(ClientAuthenticationProvider clientAuthenticationProvider) {
        this.authenticationProvider = clientAuthenticationProvider;
    }

    public void setRouterUrl(String str) {
        this.routerUrl = Optional.of(str);
    }

    public String getRunningFabric() {
        return this.runningFabric;
    }

    protected ControllerResponse controllerRequestWithRetry(Supplier<ControllerResponse> supplier, int i) {
        ControllerResponse controllerResponse;
        String str = "";
        Exception exc = null;
        for (int i2 = 0; i2 < i; i2++) {
            exc = null;
            try {
                controllerResponse = supplier.get();
            } catch (Exception e) {
                if (e instanceof InterruptedException) {
                    throw new VeniceException(e);
                }
                try {
                    this.time.sleep(1000 * (i2 + 1));
                    str = e.getMessage();
                    exc = e;
                } catch (InterruptedException e2) {
                    throw new VeniceException(e2);
                }
            }
            if (!controllerResponse.isError()) {
                return controllerResponse;
            }
            this.time.sleep(1000 * (i2 + 1));
            str = controllerResponse.getError();
        }
        throw new SamzaException("Failed to send request to Controller, error: " + str, exc);
    }

    public String getTopicName() {
        return this.topicName;
    }

    protected VeniceWriter<byte[], byte[], byte[]> getVeniceWriter(VersionCreationResponse versionCreationResponse) {
        Properties properties = new Properties();
        properties.put("kafka.bootstrap.servers", versionCreationResponse.getKafkaBootstrapServers());
        return getVeniceWriter(versionCreationResponse, properties);
    }

    protected VeniceWriter<byte[], byte[], byte[]> getVeniceWriter(VersionCreationResponse versionCreationResponse, Properties properties) {
        int amplificationFactor = versionCreationResponse.getAmplificationFactor();
        Integer valueOf = this.pushType.isBatchOrStreamReprocessing() ? Integer.valueOf(versionCreationResponse.getPartitions() * amplificationFactor) : null;
        Properties properties2 = new Properties();
        properties2.putAll(versionCreationResponse.getPartitionerParams());
        return constructVeniceWriter(properties, new VeniceWriterOptions.Builder(versionCreationResponse.getKafkaTopic()).setTime(this.time).setPartitioner(PartitionUtils.getVenicePartitioner(versionCreationResponse.getPartitionerClass(), amplificationFactor, new VeniceProperties(properties2))).setPartitionCount(valueOf).setChunkingEnabled(this.isChunkingEnabled).build());
    }

    VeniceWriter<byte[], byte[], byte[]> constructVeniceWriter(Properties properties, VeniceWriterOptions veniceWriterOptions) {
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        properties2.putAll(this.additionalWriterConfigs);
        return new VeniceWriterFactory(properties2).createVeniceWriter(veniceWriterOptions);
    }

    public synchronized void start() {
        HttpsTransportClient d2TransportClient;
        if (this.isStarted) {
            return;
        }
        this.isStarted = true;
        if (this.discoveryUrl.isPresent()) {
            this.controllerClient = ControllerClient.discoverAndConstructControllerClient(this.storeName, this.discoveryUrl.get(), this.sslFactory, 1, this.authenticationProvider);
            if (this.verifyLatestProtocolPresent && this.routerUrl.isPresent()) {
                LOGGER.info("Start verifying the latest protocols at runtime are valid in Venice backend.");
                ClientConfig authenticationProvider = ClientConfig.defaultGenericClientConfig(AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getSystemStoreName()).setAuthenticationProvider(this.authenticationProvider);
                authenticationProvider.setVeniceURL(this.routerUrl.get());
                new SchemaPresenceChecker(ClientFactory.getSchemaReader(authenticationProvider, (ICProvider) null), AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE).verifySchemaVersionPresentOrExit();
                LOGGER.info("Successfully verified the latest protocols at runtime are valid in Venice backend.");
            } else {
                LOGGER.info("Skip verifying the latest protocols at runtime are valid in Venice backend.");
            }
            d2TransportClient = this.sslFactory.isPresent() ? new HttpsTransportClient(this.discoveryUrl.get(), this.sslFactory.get(), this.authenticationProvider) : new HttpTransportClient(this.discoveryUrl.get(), this.authenticationProvider);
        } else {
            this.primaryControllerColoD2Client = getStartedD2Client(this.primaryControllerColoD2ZKHost);
            this.childColoD2Client = getStartedD2Client(this.veniceChildD2ZkHost);
            D2ServiceDiscoveryResponse controllerRequestWithRetry = controllerRequestWithRetry(() -> {
                return D2ControllerClient.discoverCluster(this.primaryControllerColoD2Client, this.primaryControllerD2ServiceName, this.storeName);
            }, 10);
            String cluster = controllerRequestWithRetry.getCluster();
            LOGGER.info("Found cluster: {} for store: {}", cluster, this.storeName);
            if (this.verifyLatestProtocolPresent) {
                LOGGER.info("Start verifying the latest protocols at runtime are valid in Venice backend.");
                String systemStoreName = AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getSystemStoreName();
                D2ServiceDiscoveryResponse controllerRequestWithRetry2 = controllerRequestWithRetry(() -> {
                    return D2ControllerClient.discoverCluster(this.primaryControllerColoD2Client, this.primaryControllerD2ServiceName, systemStoreName);
                }, 2);
                ClientConfig authenticationProvider2 = ClientConfig.defaultGenericClientConfig(systemStoreName).setAuthenticationProvider(this.authenticationProvider);
                authenticationProvider2.setD2ServiceName(controllerRequestWithRetry2.getD2Service());
                authenticationProvider2.setD2Client(this.childColoD2Client);
                new SchemaPresenceChecker(ClientFactory.getSchemaReader(authenticationProvider2, (ICProvider) null), AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE).verifySchemaVersionPresentOrExit();
                LOGGER.info("Successfully verified the latest protocols at runtime are valid in Venice backend.");
            }
            this.controllerClient = new D2ControllerClient(this.primaryControllerD2ServiceName, cluster, this.primaryControllerColoD2Client, this.sslFactory);
            d2TransportClient = new D2TransportClient(controllerRequestWithRetry.getD2Service(), this.childColoD2Client, this.authenticationProvider);
        }
        VersionCreationResponse versionCreationResponse = (VersionCreationResponse) controllerRequestWithRetry(() -> {
            return this.controllerClient.requestTopicForWrites(this.storeName, 1L, this.pushType, this.samzaJobId, true, false, false, this.partitioners, Optional.empty(), Optional.ofNullable(this.runningFabric), false, -1L);
        }, 2);
        LOGGER.info("Got [store: {}] VersionCreationResponse: {}", this.storeName, versionCreationResponse);
        this.topicName = versionCreationResponse.getKafkaTopic();
        this.kafkaBootstrapServers = versionCreationResponse.getKafkaBootstrapServers();
        StoreResponse controllerRequestWithRetry3 = controllerRequestWithRetry(() -> {
            return this.controllerClient.getStore(this.storeName);
        }, 2);
        this.isWriteComputeEnabled = controllerRequestWithRetry3.getStore().isWriteComputationEnabled();
        boolean isHybridStoreDiskQuotaEnabled = controllerRequestWithRetry3.getStore().isHybridStoreDiskQuotaEnabled();
        SchemaResponse controllerRequestWithRetry4 = controllerRequestWithRetry(() -> {
            return this.controllerClient.getKeySchema(this.storeName);
        }, 2);
        LOGGER.info("Got [store: {}] SchemaResponse for key schema: {}", this.storeName, controllerRequestWithRetry4);
        this.keySchema = AvroSchemaParseUtils.parseSchemaFromJSONStrictValidation(controllerRequestWithRetry4.getSchemaStr());
        this.canonicalKeySchemaStr = AvroCompatibilityHelper.toParsingForm(this.keySchema);
        refreshSchemaCache();
        if (this.pushType.equals(Version.PushType.STREAM_REPROCESSING)) {
            this.pushMonitor = Optional.of(new RouterBasedPushMonitor(d2TransportClient, Version.composeVersionTopicFromStreamReprocessingTopic(this.topicName), this.factory, this));
            this.pushMonitor.get().start();
        }
        if (this.pushType.isBatchOrStreamReprocessing()) {
            int version = versionCreationResponse.getVersion();
            this.isChunkingEnabled = ((Version) controllerRequestWithRetry3.getStore().getVersion(version).orElseThrow(() -> {
                return new VeniceException("Version info for version " + version + " not available in store response");
            })).isChunkingEnabled();
        } else {
            this.isChunkingEnabled = false;
        }
        this.veniceWriter = getVeniceWriter(versionCreationResponse);
        if (this.pushMonitor.isPresent()) {
            if (ExecutionStatus.ERROR.equals(this.pushMonitor.get().getCurrentStatus())) {
                throw new VeniceException("Push job for resource " + this.topicName + " is in error state; please reach out to Venice team.");
            }
        }
        if ((this.pushType.equals(Version.PushType.STREAM) || this.pushType.equals(Version.PushType.STREAM_REPROCESSING)) && isHybridStoreDiskQuotaEnabled) {
            this.hybridStoreQuotaMonitor = Optional.of(new RouterBasedHybridStoreQuotaMonitor(d2TransportClient, this.storeName, this.pushType, this.topicName));
            this.hybridStoreQuotaMonitor.get().start();
        }
    }

    private void refreshSchemaCache() {
        MultiSchemaResponse controllerRequestWithRetry = controllerRequestWithRetry(() -> {
            return this.controllerClient.getAllValueAndDerivedSchema(this.storeName);
        }, 2);
        LOGGER.info("Got [store: {}] SchemaResponse for value schemas: {}", this.storeName, controllerRequestWithRetry);
        for (MultiSchemaResponse.Schema schema : controllerRequestWithRetry.getSchemas()) {
            Schema parseSchemaFromJSONLooseValidation = AvroSchemaParseUtils.parseSchemaFromJSONLooseValidation(schema.getSchemaStr());
            Pair pair = new Pair(Integer.valueOf(schema.getId()), Integer.valueOf(schema.getDerivedSchemaId()));
            this.valueSchemaToIdsMap.put(parseSchemaFromJSONLooseValidation, pair);
            this.valueSchemaIdsToSchemaMap.put(pair, parseSchemaFromJSONLooseValidation);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        stop();
    }

    public synchronized void stop() {
        this.isStarted = false;
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.veniceWriter});
        if (Version.PushType.STREAM_REPROCESSING.equals(this.pushType) && this.pushMonitor.isPresent()) {
            String composeVersionTopicFromStreamReprocessingTopic = Version.composeVersionTopicFromStreamReprocessingTopic(this.topicName);
            switch (AnonymousClass1.$SwitchMap$com$linkedin$venice$pushmonitor$ExecutionStatus[this.pushMonitor.get().getCurrentStatus().ordinal()]) {
                case 1:
                    LOGGER.info("Push job for {} is COMPLETED.", this.topicName);
                    break;
                case 2:
                    LOGGER.info("Batch load for {} has finished.", this.topicName);
                    break;
                case 3:
                    LOGGER.info("Push job for {} encountered error.", this.topicName);
                    break;
                default:
                    LOGGER.warn("Push job in Venice backend is still in progress... Will clean up resources in Venice");
                    Utils.sleep(ThreadLocalRandom.current().nextInt(30000));
                    this.controllerClient.retryableRequest(3, controllerClient -> {
                        return controllerClient.killOfflinePushJob(composeVersionTopicFromStreamReprocessingTopic);
                    });
                    LOGGER.info("Offline push job has been killed, topic: {}", composeVersionTopicFromStreamReprocessingTopic);
                    break;
            }
            Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.pushMonitor.get()});
        }
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.controllerClient});
        this.hybridStoreQuotaMonitor.ifPresent(closeable -> {
            Utils.closeQuietlyWithErrorLogged(new Closeable[]{closeable});
        });
        this.d2ZkHostToClientEnvelopeMap.values().forEach(closeable2 -> {
            Utils.closeQuietlyWithErrorLogged(new Closeable[]{closeable2});
        });
    }

    public void register(String str) {
    }

    public void send(String str, OutgoingMessageEnvelope outgoingMessageEnvelope) {
        if (!this.isStarted) {
            throw new SamzaException("Send called on Venice System Producer that is not started yet!");
        }
        String stream = outgoingMessageEnvelope.getSystemStream().getStream();
        if (!stream.equals(this.storeName)) {
            throw new SamzaException("The store of the incoming message: " + stream + " is unexpected, and it should be " + this.storeName);
        }
        if (this.pushMonitor.isPresent() && Version.PushType.STREAM_REPROCESSING.equals(this.pushType)) {
            switch (AnonymousClass1.$SwitchMap$com$linkedin$venice$pushmonitor$ExecutionStatus[this.pushMonitor.get().getCurrentStatus().ordinal()]) {
                case 1:
                case 2:
                    LOGGER.info("Stream reprocessing for resource {} has finished. No message will be sent.", this.topicName);
                    return;
                case 3:
                    throw new VeniceException("Push job for resource " + this.topicName + " is in error state; please reach out to Venice team.");
            }
        }
        if (this.hybridStoreQuotaMonitor.isPresent() && (Version.PushType.STREAM.equals(this.pushType) || Version.PushType.STREAM_REPROCESSING.equals(this.pushType))) {
            HybridStoreQuotaStatus currentStatus = this.hybridStoreQuotaMonitor.get().getCurrentStatus();
            switch (AnonymousClass1.$SwitchMap$com$linkedin$venice$pushmonitor$HybridStoreQuotaStatus[currentStatus.ordinal()]) {
                case 1:
                    LOGGER.error("Current hybrid store quota status: {}, should throw exception to kill the job.", currentStatus);
                    throw new VeniceException("Push job for resource " + this.topicName + " is in hybrid quota violated mode; please reach out to Venice team.");
            }
        }
        send(outgoingMessageEnvelope.getKey(), outgoingMessageEnvelope.getMessage());
    }

    protected CompletableFuture<Void> send(Object obj, Object obj2) {
        Schema schemaFromObject = getSchemaFromObject(obj);
        String computeIfAbsent = this.canonicalSchemaStrCache.computeIfAbsent(schemaFromObject, schema -> {
            return AvroCompatibilityHelper.toParsingForm(schemaFromObject);
        });
        if (!this.canonicalKeySchemaStr.equals(computeIfAbsent)) {
            throw new SamzaException("Cannot write record to Venice store " + this.storeName + ", key object has schema " + computeIfAbsent + " which does not match Venice key schema " + this.canonicalKeySchemaStr + VeniceSystemFactory.DOT);
        }
        byte[] serializeObject = serializeObject(obj);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        CompletableFutureCallback completableFutureCallback = new CompletableFutureCallback(completableFuture);
        long j = -1;
        if ((obj2 instanceof VeniceObjectWithTimestamp) && Version.isRealTimeTopic(this.topicName)) {
            VeniceObjectWithTimestamp veniceObjectWithTimestamp = (VeniceObjectWithTimestamp) obj2;
            j = veniceObjectWithTimestamp.getTimestamp();
            if (j <= 0) {
                throw new SamzaException("Timestamp specified in passed `VeniceObjectWithTimestamp` object should be positive, but received: " + j);
            }
            obj2 = veniceObjectWithTimestamp.getObject();
        }
        if (obj2 != null) {
            Pair<Integer, Integer> pair = (Pair) this.valueSchemaToIdsMap.computeIfAbsent(getSchemaFromObject(obj2), schema2 -> {
                SchemaResponse controllerRequestWithRetry = controllerRequestWithRetry(() -> {
                    return this.controllerClient.getValueOrDerivedSchemaId(this.storeName, schema2.toString());
                }, 2);
                LOGGER.info("Got [store: {}] SchemaResponse for schema: {}", this.storeName, schema2);
                return new Pair(Integer.valueOf(controllerRequestWithRetry.getId()), Integer.valueOf(controllerRequestWithRetry.getDerivedSchemaId()));
            });
            if (Version.isATopicThatIsVersioned(this.topicName) && ((Integer) pair.getSecond()).intValue() != -1) {
                int intValue = ((Integer) pair.getFirst()).intValue();
                obj2 = convertPartialUpdateToFullPut(pair, obj2);
                pair = new Pair<>(Integer.valueOf(intValue), -1);
            }
            byte[] serializeObject2 = serializeObject(obj2);
            if (((Integer) pair.getSecond()).intValue() == -1) {
                if (j > 0) {
                    this.veniceWriter.put(serializeObject, serializeObject2, ((Integer) pair.getFirst()).intValue(), j, completableFutureCallback);
                } else {
                    this.veniceWriter.put(serializeObject, serializeObject2, ((Integer) pair.getFirst()).intValue(), completableFutureCallback);
                }
            } else {
                if (!this.isWriteComputeEnabled) {
                    throw new SamzaException("Cannot write partial update record to Venice store " + this.storeName + " because write-compute is not enabled for it. Please contact Venice team to configure it.");
                }
                if (j > 0) {
                    this.veniceWriter.update(serializeObject, serializeObject2, ((Integer) pair.getFirst()).intValue(), ((Integer) pair.getSecond()).intValue(), completableFutureCallback, j);
                } else {
                    this.veniceWriter.update(serializeObject, serializeObject2, ((Integer) pair.getFirst()).intValue(), ((Integer) pair.getSecond()).intValue(), completableFutureCallback);
                }
            }
        } else if (j > 0) {
            this.veniceWriter.delete(serializeObject, j, completableFutureCallback);
        } else {
            this.veniceWriter.delete(serializeObject, completableFutureCallback);
        }
        return completableFuture;
    }

    public CompletableFuture<Void> put(Object obj, Object obj2) {
        return send(obj, obj2);
    }

    public CompletableFuture<Void> delete(Object obj) {
        return send(obj, (Object) null);
    }

    public void flush(String str) {
        this.veniceWriter.flush();
    }

    private static Schema getSchemaFromObject(Object obj) {
        if (obj instanceof IndexedRecord) {
            return ((IndexedRecord) obj).getSchema();
        }
        if (obj instanceof CharSequence) {
            return STRING_SCHEMA;
        }
        if (obj instanceof Integer) {
            return INT_SCHEMA;
        }
        if (obj instanceof Long) {
            return LONG_SCHEMA;
        }
        if (obj instanceof Double) {
            return DOUBLE_SCHEMA;
        }
        if (obj instanceof Float) {
            return FLOAT_SCHEMA;
        }
        if ((obj instanceof byte[]) || (obj instanceof ByteBuffer)) {
            return BYTES_SCHEMA;
        }
        if (obj instanceof Boolean) {
            return BOOL_SCHEMA;
        }
        throw new SamzaException("Venice System Producer only supports Avro objects, and primitives, found object of class: " + obj.getClass().toString());
    }

    private byte[] serializeObject(Object obj) {
        if (obj instanceof IndexedRecord) {
            return FastSerializerDeserializerFactory.getFastAvroGenericSerializer(((IndexedRecord) obj).getSchema()).serialize(obj);
        }
        if (obj instanceof CharSequence) {
            return serializePrimitive(new Utf8(obj.toString()), STRING_DATUM_WRITER);
        }
        if (obj instanceof Integer) {
            return serializePrimitive((Integer) obj, INT_DATUM_WRITER);
        }
        if (obj instanceof Long) {
            return serializePrimitive((Long) obj, LONG_DATUM_WRITER);
        }
        if (obj instanceof Double) {
            return serializePrimitive((Double) obj, DOUBLE_DATUM_WRITER);
        }
        if (obj instanceof Float) {
            return serializePrimitive((Float) obj, FLOAT_DATUM_WRITER);
        }
        if (obj instanceof ByteBuffer) {
            return serializePrimitive((ByteBuffer) obj, BYTES_DATUM_WRITER);
        }
        if (obj instanceof byte[]) {
            return serializePrimitive(ByteBuffer.wrap((byte[]) obj), BYTES_DATUM_WRITER);
        }
        if (obj instanceof Boolean) {
            return serializePrimitive((Boolean) obj, BOOL_DATUM_WRITER);
        }
        throw new SamzaException("Can only serialize avro objects, and primitives, cannot serialize: " + obj.getClass().toString());
    }

    private static <T> byte[] serializePrimitive(T t, DatumWriter<T> datumWriter) {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        BinaryEncoder newBinaryEncoder = AvroCompatibilityHelper.newBinaryEncoder(byteArrayOutputStream);
        try {
            datumWriter.write(t, newBinaryEncoder);
            newBinaryEncoder.flush();
            return byteArrayOutputStream.toByteArray();
        } catch (IOException e) {
            throw new RuntimeException("Failed to write input: " + t + " to binary encoder", e);
        }
    }

    protected Object convertPartialUpdateToFullPut(Pair<Integer, Integer> pair, Object obj) {
        Pair pair2 = new Pair(pair.getFirst(), -1);
        Schema schema = (Schema) this.valueSchemaIdsToSchemaMap.get(pair2);
        if (schema == null) {
            refreshSchemaCache();
            schema = (Schema) this.valueSchemaIdsToSchemaMap.get(pair2);
            if (schema == null) {
                throw new SamzaException("Unable to find base schema with id: " + pair.getFirst() + " for write compute schema with id " + pair.getSecond());
            }
        }
        return writeComputeHandlerV1.updateValueRecord(schema, (GenericRecord) null, (GenericRecord) obj);
    }

    public void setExitMode(SamzaExitMode samzaExitMode) {
        if (this.pushMonitor.isPresent()) {
            this.pushMonitor.get().setStreamReprocessingExitMode(samzaExitMode);
        }
    }

    public String getKafkaBootstrapServers() {
        return this.kafkaBootstrapServers;
    }

    public VeniceWriter<byte[], byte[], byte[]> getInternalProducer() {
        return this.veniceWriter;
    }

    protected void setControllerClient(D2ControllerClient d2ControllerClient) {
        this.controllerClient = d2ControllerClient;
    }

    private D2Client getStartedD2Client(String str) {
        return this.d2ZkHostToClientEnvelopeMap.computeIfAbsent(str, str2 -> {
            String uniqueTempPath = Utils.getUniqueTempPath("d2");
            D2Client build = new D2ClientBuilder().setZkHosts(str).setSSLContext((SSLContext) this.sslFactory.map((v0) -> {
                return v0.getSSLContext();
            }).orElse(null)).setIsSSLEnabled(this.sslFactory.isPresent()).setSSLParameters((SSLParameters) this.sslFactory.map((v0) -> {
                return v0.getSSLParameters();
            }).orElse(null)).setFsBasePath(uniqueTempPath).setEnableSaveUriDataOnDisk(true).build();
            D2ClientUtils.startClient(build);
            return new D2ClientEnvelope(build, uniqueTempPath);
        }).d2Client;
    }
}
