package com.linkedin.venice.hadoop.heartbeat;

import com.linkedin.venice.pubsub.api.PubSubProducerCallback;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.status.protocol.BatchJobHeartbeatKey;
import com.linkedin.venice.status.protocol.BatchJobHeartbeatValue;
import com.linkedin.venice.utils.DaemonThreadFactory;
import com.linkedin.venice.writer.VeniceWriter;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.avro.Schema;
import org.apache.commons.lang.Validate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

@NotThreadSafe
/* loaded from: input_file:com/linkedin/venice/hadoop/heartbeat/DefaultPushJobHeartbeatSender.class */
class DefaultPushJobHeartbeatSender implements PushJobHeartbeatSender {
    private static final Logger LOGGER = LogManager.getLogger(DefaultPushJobHeartbeatSender.class);
    private static final Duration DEFAULT_SEND_CALLBACK_AWAIT_TIMEOUT = Duration.ofSeconds(10);
    private final Duration interval;
    private final Duration initialDelay;
    private boolean running;
    private final ScheduledExecutorService executorService;
    private final VeniceWriter<byte[], byte[], byte[]> veniceWriter;
    private final int valueSchemaId;
    private final String heartbeatKafkaTopicName;
    private final VeniceAvroKafkaSerializer keySerializer;
    private final VeniceAvroKafkaSerializer valueSerializer;
    private String storeName;
    private int storeVersion;
    private Instant heartbeatStartTime;
    private long successfulHeartbeatCount;
    private long failedHeartbeatCount;
    private boolean sendDeleteAsLasHeartbeat;
    private Exception firstSendHeartbeatException;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultPushJobHeartbeatSender(@Nonnull Duration duration, @Nonnull Duration duration2, @Nonnull VeniceWriter<byte[], byte[], byte[]> veniceWriter, @Nonnull Schema schema, @Nonnull Map<Integer, Schema> map, @Nonnull String str, boolean z) {
        Validate.notNull(duration);
        Validate.notNull(duration2);
        Validate.notNull(veniceWriter);
        Validate.notEmpty(str);
        this.initialDelay = duration;
        this.interval = duration2;
        this.veniceWriter = veniceWriter;
        validateSchemasMatch(BatchJobHeartbeatKey.SCHEMA$, schema);
        this.valueSchemaId = getSchemaIdForSchemaOrFail(BatchJobHeartbeatValue.SCHEMA$, map);
        this.heartbeatKafkaTopicName = str;
        this.keySerializer = new VeniceAvroKafkaSerializer(schema);
        this.valueSerializer = new VeniceAvroKafkaSerializer(BatchJobHeartbeatValue.SCHEMA$);
        this.running = false;
        this.executorService = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("push-job-heartbeat-thread"));
        this.successfulHeartbeatCount = 0L;
        this.failedHeartbeatCount = 0L;
        this.sendDeleteAsLasHeartbeat = z;
    }

    VeniceWriter<byte[], byte[], byte[]> getVeniceWriter() {
        return this.veniceWriter;
    }

    private int getSchemaIdForSchemaOrFail(Schema schema, Map<Integer, Schema> map) {
        for (Map.Entry<Integer, Schema> entry : map.entrySet()) {
            if (Objects.equals(schema, entry.getValue())) {
                return entry.getKey().intValue();
            }
        }
        throw new IllegalArgumentException("No schema %s found in valueSchemasById %s");
    }

    private void validateSchemasMatch(Schema schema, Schema schema2) {
        if (!Objects.equals(schema, schema2)) {
            throw new IllegalArgumentException(String.format("Expected schema %s and actual schema %s", schema.toString(), schema2.toString()));
        }
    }

    @Override // com.linkedin.venice.hadoop.heartbeat.PushJobHeartbeatSender
    public void start(@Nonnull String str, int i) {
        Validate.notEmpty(str);
        if (this.running) {
            LOGGER.warn("Already started");
            return;
        }
        this.running = true;
        this.storeName = str;
        this.storeVersion = i;
        this.heartbeatStartTime = Instant.now();
        LOGGER.info("Start sending liveness heartbeats for [store={}, version={}] with initial delay {} ms and interval {} ms...", this.storeName, Integer.valueOf(this.storeVersion), Long.valueOf(this.initialDelay.toMillis()), Long.valueOf(this.interval.toMillis()));
        this.executorService.scheduleAtFixedRate(this, this.initialDelay.toMillis(), this.interval.toMillis(), TimeUnit.MILLISECONDS);
    }

    @Override // com.linkedin.venice.hadoop.heartbeat.PushJobHeartbeatSender
    public void stop() {
        if (!this.running) {
            LOGGER.warn("Already stopped or never started");
            return;
        }
        this.running = false;
        this.executorService.shutdown();
        LOGGER.info("Sending last heartbeat...");
        sendHeartbeat(createHeartbeatKey(), createHeartbeatValue(), DEFAULT_SEND_CALLBACK_AWAIT_TIMEOUT, true);
        LOGGER.info("Closing the heartbeat VeniceWriter");
        this.veniceWriter.close();
        LOGGER.info("Liveness heartbeat stopped for [store={}, version={}] with {} successful heartbeat(s) and {} failed heartbeat(s) and in total took {} second(s)", this.storeName, Integer.valueOf(this.storeVersion), Long.valueOf(this.successfulHeartbeatCount), Long.valueOf(this.failedHeartbeatCount), Long.valueOf(Duration.between(this.heartbeatStartTime, Instant.now()).getSeconds()));
    }

    @Override // com.linkedin.venice.hadoop.heartbeat.PushJobHeartbeatSender
    @Nonnull
    public Duration getHeartbeatSendInterval() {
        return this.interval;
    }

    @Override // com.linkedin.venice.hadoop.heartbeat.PushJobHeartbeatSender
    @Nonnull
    public Duration getHeartbeatInitialDelay() {
        return this.initialDelay;
    }

    @Override // java.lang.Runnable
    public void run() {
        sendHeartbeat(createHeartbeatKey(), createHeartbeatValue(), DEFAULT_SEND_CALLBACK_AWAIT_TIMEOUT, false);
    }

    @Override // com.linkedin.venice.hadoop.heartbeat.PushJobHeartbeatSender
    public Optional<Exception> getFirstSendHeartbeatException() {
        return Optional.ofNullable(this.firstSendHeartbeatException);
    }

    private BatchJobHeartbeatKey createHeartbeatKey() {
        BatchJobHeartbeatKey batchJobHeartbeatKey = new BatchJobHeartbeatKey();
        batchJobHeartbeatKey.storeName = this.storeName;
        batchJobHeartbeatKey.storeVersion = this.storeVersion;
        return batchJobHeartbeatKey;
    }

    private BatchJobHeartbeatValue createHeartbeatValue() {
        BatchJobHeartbeatValue batchJobHeartbeatValue = new BatchJobHeartbeatValue();
        batchJobHeartbeatValue.timestamp = System.currentTimeMillis();
        return batchJobHeartbeatValue;
    }

    private void sendHeartbeat(BatchJobHeartbeatKey batchJobHeartbeatKey, BatchJobHeartbeatValue batchJobHeartbeatValue, Duration duration, boolean z) {
        byte[] serialize = this.keySerializer.serialize(this.heartbeatKafkaTopicName, batchJobHeartbeatKey);
        byte[] serialize2 = this.valueSerializer.serialize(this.heartbeatKafkaTopicName, batchJobHeartbeatValue);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Instant now = Instant.now();
        PubSubProducerCallback pubSubProducerCallback = (pubSubProduceResult, exc) -> {
            Duration between = Duration.between(now, Instant.now());
            if (exc == null) {
                this.successfulHeartbeatCount++;
                LOGGER.info("Sending one heartbeat event successfully. Took: {} ms", Long.valueOf(between.toMillis()));
            } else {
                this.failedHeartbeatCount++;
                if (this.firstSendHeartbeatException == null) {
                    this.firstSendHeartbeatException = exc;
                }
                LOGGER.info("Failed to send one heartbeat event after {} ms", Long.valueOf(between.toMillis()), exc);
            }
            countDownLatch.countDown();
        };
        if (z && this.sendDeleteAsLasHeartbeat) {
            this.veniceWriter.delete(serialize, pubSubProducerCallback);
        } else {
            this.veniceWriter.put(serialize, serialize2, this.valueSchemaId, pubSubProducerCallback);
        }
        this.veniceWriter.flush();
        try {
            if (!countDownLatch.await(duration.toMillis(), TimeUnit.MILLISECONDS)) {
                LOGGER.warn("Liveness heartbeat sent did not get ack-ed by remote server after {} ms", Long.valueOf(duration.toMillis()));
            }
        } catch (InterruptedException e) {
            LOGGER.warn("Liveness heartbeat sent was interrupted", e);
        }
    }
}
