package com.linkedin.venice.hadoop.heartbeat;

import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.SchemaResponse;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.hadoop.VenicePushJob;
import com.linkedin.venice.meta.PartitionerConfig;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.status.BatchJobHeartbeatConfigs;
import com.linkedin.venice.utils.PartitionUtils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.avro.Schema;
import org.apache.commons.lang.Validate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/venice/hadoop/heartbeat/DefaultPushJobHeartbeatSenderFactory.class */
public class DefaultPushJobHeartbeatSenderFactory implements PushJobHeartbeatSenderFactory {
    private static final Logger LOGGER = LogManager.getLogger(DefaultPushJobHeartbeatSenderFactory.class);

    @Override // com.linkedin.venice.hadoop.heartbeat.PushJobHeartbeatSenderFactory
    public PushJobHeartbeatSender createHeartbeatSender(String str, VeniceProperties veniceProperties, @Nonnull ControllerClient controllerClient, Optional<Properties> optional) {
        Validate.notNull(controllerClient);
        String systemStoreName = AvroProtocolDefinition.BATCH_JOB_HEARTBEAT.getSystemStoreName();
        int i = veniceProperties.getInt(VenicePushJob.CONTROLLER_REQUEST_RETRY_ATTEMPTS, 3);
        StoreResponse retryableRequest = ControllerClient.retryableRequest(controllerClient, i, controllerClient2 -> {
            return controllerClient2.getStore(systemStoreName);
        });
        if (retryableRequest.isError()) {
            throw new VeniceException("Could not get store info for store: " + systemStoreName + " with error: " + retryableRequest.getError());
        }
        StoreInfo store = retryableRequest.getStore();
        PartitionerConfig partitionerConfig = store.getPartitionerConfig();
        int partitionCount = store.getPartitionCount();
        LOGGER.info("Got [heartbeat store: {}] Store Info: {}", systemStoreName, store);
        String composeRealTimeTopic = Version.composeRealTimeTopic(systemStoreName);
        DefaultPushJobHeartbeatSender defaultPushJobHeartbeatSender = new DefaultPushJobHeartbeatSender(Duration.ofMillis(veniceProperties.getLong(BatchJobHeartbeatConfigs.HEARTBEAT_INITIAL_DELAY_CONFIG.getConfigName(), BatchJobHeartbeatConfigs.HEARTBEAT_INITIAL_DELAY_CONFIG.getDefaultValue() == null ? 0L : ((Long) BatchJobHeartbeatConfigs.HEARTBEAT_INITIAL_DELAY_CONFIG.getDefaultValue()).longValue())), Duration.ofMillis(veniceProperties.getLong(BatchJobHeartbeatConfigs.HEARTBEAT_INTERVAL_CONFIG.getConfigName(), BatchJobHeartbeatConfigs.HEARTBEAT_INTERVAL_CONFIG.getDefaultValue() == null ? 0L : ((Long) BatchJobHeartbeatConfigs.HEARTBEAT_INTERVAL_CONFIG.getDefaultValue()).longValue())), getVeniceWriter(composeRealTimeTopic, partitionerConfig, getVeniceWriterProperties(optional, str), partitionCount), getHeartbeatKeySchema(controllerClient, i, systemStoreName), getHeartbeatValueSchemas(controllerClient, i, systemStoreName), composeRealTimeTopic, veniceProperties.getBoolean(BatchJobHeartbeatConfigs.HEARTBEAT_LAST_HEARTBEAT_IS_DELETE_CONFIG.getConfigName(), ((Boolean) BatchJobHeartbeatConfigs.HEARTBEAT_LAST_HEARTBEAT_IS_DELETE_CONFIG.getDefaultValue()).booleanValue()));
        LOGGER.info("Successfully created a default push job heartbeat sender with heartbeat store name {}", systemStoreName);
        return defaultPushJobHeartbeatSender;
    }

    private Properties getVeniceWriterProperties(Optional<Properties> optional, String str) {
        Properties properties = new Properties();
        properties.put("kafka.bootstrap.servers", str);
        if (optional.isPresent()) {
            properties.putAll(optional.get());
        }
        return properties;
    }

    private Map<Integer, Schema> getHeartbeatValueSchemas(ControllerClient controllerClient, int i, String str) {
        return (Map) Arrays.stream(ControllerClient.retryableRequest(controllerClient, i, controllerClient2 -> {
            return controllerClient2.getAllValueSchema(str);
        }).getSchemas()).collect(Collectors.toMap((v0) -> {
            return v0.getId();
        }, schema -> {
            return Schema.parse(schema.getSchemaStr());
        }));
    }

    private Schema getHeartbeatKeySchema(ControllerClient controllerClient, int i, String str) {
        SchemaResponse retryableRequest = ControllerClient.retryableRequest(controllerClient, i, controllerClient2 -> {
            return controllerClient2.getKeySchema(str);
        });
        LOGGER.info("Got [heartbeat store: {}] SchemaResponse for key schema: {}", str, retryableRequest);
        return Schema.parse(retryableRequest.getSchemaStr());
    }

    protected VeniceWriter<byte[], byte[], byte[]> getVeniceWriter(String str, PartitionerConfig partitionerConfig, Properties properties, int i) {
        Properties properties2 = new Properties();
        properties2.putAll(partitionerConfig.getPartitionerParams());
        return new VeniceWriterFactory(properties).createVeniceWriter(new VeniceWriterOptions.Builder(str).setPartitioner(PartitionUtils.getVenicePartitioner(partitionerConfig.getPartitionerClass(), partitionerConfig.getAmplificationFactor(), new VeniceProperties(properties2))).setPartitionCount(Integer.valueOf(i)).build());
    }
}
