package com.linkedin.venice.pulsar.sink;

import com.linkedin.venice.meta.Version;
import com.linkedin.venice.samza.VeniceSystemFactory;
import com.linkedin.venice.samza.VeniceSystemProducer;
import com.linkedin.venice.utils.Utils;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.samza.config.MapConfig;
import org.apache.samza.metrics.MetricsRegistry;

/* loaded from: input_file:com/linkedin/venice/pulsar/sink/VenicePulsarSink.class */
public class VenicePulsarSink implements Sink<GenericObject> {
    private static final Logger LOGGER = LogManager.getLogger(VenicePulsarSink.class);
    VenicePulsarSinkConfig config;
    VeniceSystemProducer producer;
    private final AtomicInteger pendingRecordsCount = new AtomicInteger(0);
    private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> {
        return new Thread(runnable, "pulsar-venice-sink-flush-thread");
    });
    private int maxNumberUnflushedRecords = 10;
    private long flushIntervalMs = 500;
    private volatile long lastFlush = 0;
    private volatile Throwable flushException = null;
    private volatile boolean doThrottle = false;

    public void open(Map<String, Object> map, SinkContext sinkContext) throws Exception {
        VenicePulsarSinkConfig load = VenicePulsarSinkConfig.load(map, sinkContext);
        LOGGER.info("Starting, Venice config {}", load);
        VeniceSystemProducer closableProducer = new VeniceSystemFactory().getClosableProducer("venice", new MapConfig(getConfig(load, "venice")), (MetricsRegistry) null);
        closableProducer.start();
        LOGGER.info("Kafka bootstrap for Venice producer {}", closableProducer.getKafkaBootstrapServers());
        LOGGER.info("Kafka topic name is {}", closableProducer.getTopicName());
        open(load, closableProducer, sinkContext);
    }

    public void open(VenicePulsarSinkConfig venicePulsarSinkConfig, VeniceSystemProducer veniceSystemProducer, SinkContext sinkContext) throws Exception {
        this.config = venicePulsarSinkConfig;
        this.producer = veniceSystemProducer;
        this.maxNumberUnflushedRecords = this.config.getMaxNumberUnflushedRecords();
        this.flushIntervalMs = this.config.getFlushIntervalMs();
        this.scheduledExecutor.scheduleAtFixedRate(() -> {
            flush(false);
        }, this.flushIntervalMs, this.flushIntervalMs, TimeUnit.MILLISECONDS);
    }

    public void write(Record<GenericObject> record) throws Exception {
        Object key;
        Object extract;
        if (this.flushException != null) {
            LOGGER.error("Error while flushing records, stopping processing", this.flushException);
            throw new RuntimeException("Error while flushing records", this.flushException);
        }
        if (this.doThrottle) {
            LOGGER.warn("Throttling, not accepting new records; {} records pending", Integer.valueOf(this.pendingRecordsCount.get()));
            long currentTimeMillis = System.currentTimeMillis();
            throttle();
            this.doThrottle = false;
            LOGGER.warn("Throttling is done, took {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        }
        Object nativeObject = ((GenericObject) record.getValue()).getNativeObject();
        if (nativeObject instanceof KeyValue) {
            KeyValue keyValue = (KeyValue) nativeObject;
            key = extract(keyValue.getKey());
            extract = extract(keyValue.getValue());
        } else {
            key = record.getKey();
            extract = extract(record.getValue());
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Processing key: {}, value: {}", key, extract);
        }
        if (extract == null) {
            Object obj = key;
            this.producer.delete(key).whenComplete((r8, th) -> {
                this.pendingRecordsCount.decrementAndGet();
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Deleted key: {}", obj);
                }
                if (th == null) {
                    record.ack();
                    return;
                }
                LOGGER.error("Error deleting record with key {}", obj, th);
                this.flushException = th;
                record.fail();
            });
        } else {
            Object obj2 = key;
            Object obj3 = extract;
            this.producer.put(key, extract).whenComplete((r9, th2) -> {
                this.pendingRecordsCount.decrementAndGet();
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Processed key: {}, value: {}", obj2, obj3);
                }
                if (th2 == null) {
                    record.ack();
                    return;
                }
                LOGGER.error("Error handling record with key {}", obj2, th2);
                this.flushException = th2;
                record.fail();
            });
        }
        this.pendingRecordsCount.incrementAndGet();
        maybeSubmitFlush();
    }

    protected void throttle() throws InterruptedException {
        while (this.pendingRecordsCount.get() > this.maxNumberUnflushedRecords) {
            Thread.sleep(1L);
        }
    }

    private void maybeSubmitFlush() {
        int i = this.pendingRecordsCount.get();
        if (i >= this.maxNumberUnflushedRecords) {
            this.scheduledExecutor.submit(() -> {
                flush(false);
            });
            if (i > 2 * this.maxNumberUnflushedRecords) {
                LOGGER.info("Too many records pending: {}. Will throttle.", Integer.valueOf(i));
                this.doThrottle = true;
            }
        }
    }

    private void flush(boolean z) {
        long currentTimeMillis = System.currentTimeMillis();
        int i = this.pendingRecordsCount.get();
        if (!z && i < this.maxNumberUnflushedRecords && currentTimeMillis - this.lastFlush <= this.flushIntervalMs) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Skipping flush of {} records", Integer.valueOf(i));
                return;
            }
            return;
        }
        this.lastFlush = System.currentTimeMillis();
        if (i == 0) {
            LOGGER.debug("Nothing to flush");
            return;
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Flushing {} records", Integer.valueOf(i));
        }
        try {
            this.producer.flush("not used");
            LOGGER.info("Flush of {} records took {} ms", Integer.valueOf(i), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        } catch (Throwable th) {
            LOGGER.error("Error flushing", th);
            this.flushException = th;
            LOGGER.error("Error while flushing records", th);
            throw new RuntimeException("Error while flushing records", this.flushException);
        }
    }

    private static Object extract(Object obj) {
        return obj instanceof GenericRecord ? ((GenericRecord) obj).getNativeObject() : obj;
    }

    public void close() throws Exception {
        if (this.producer != null) {
            Future<?> submit = this.scheduledExecutor.submit(() -> {
                flush(true);
            });
            this.scheduledExecutor.shutdown();
            try {
                submit.get();
            } finally {
                if (!this.scheduledExecutor.awaitTermination(1L, TimeUnit.MINUTES)) {
                    LOGGER.error("Failed to shutdown scheduledExecutor");
                    this.scheduledExecutor.shutdownNow();
                }
                this.producer.close();
            }
        }
    }

    public static Map<String, String> getConfig(VenicePulsarSinkConfig venicePulsarSinkConfig, String str) {
        HashMap hashMap = new HashMap();
        String str2 = "systems." + str + ".";
        hashMap.put(str2 + "push.type", Version.PushType.INCREMENTAL.toString());
        hashMap.put(str2 + "store", venicePulsarSinkConfig.getStoreName());
        hashMap.put(str2 + "aggregate", "false");
        hashMap.put("venice.discover.urls", venicePulsarSinkConfig.getVeniceDiscoveryUrl());
        hashMap.put("venice.controller.discovery.url", venicePulsarSinkConfig.getVeniceDiscoveryUrl());
        hashMap.put("venice.router.url", venicePulsarSinkConfig.getVeniceRouterUrl());
        hashMap.put("deployment.id", Utils.getUniqueString("venice-push-id-pulsar-sink"));
        hashMap.put("ssl.enabled", "false");
        if (venicePulsarSinkConfig.getKafkaSaslConfig() != null && !venicePulsarSinkConfig.getKafkaSaslConfig().isEmpty()) {
            hashMap.put("kafka.sasl.jaas.config", venicePulsarSinkConfig.getKafkaSaslConfig());
        }
        if (venicePulsarSinkConfig.getVeniceToken() != null && !venicePulsarSinkConfig.getVeniceToken().isEmpty()) {
            hashMap.put("authentication.token", venicePulsarSinkConfig.getVeniceToken());
        }
        hashMap.put("kafka.sasl.mechanism", venicePulsarSinkConfig.getKafkaSaslMechanism());
        hashMap.put("kafka.security.protocol", venicePulsarSinkConfig.getKafkaSecurityProtocol());
        if (venicePulsarSinkConfig.getWriterConfig() != null && !venicePulsarSinkConfig.getWriterConfig().isEmpty()) {
            LOGGER.info("Additional WriterConfig: {}", venicePulsarSinkConfig.getWriterConfig());
            hashMap.putAll(venicePulsarSinkConfig.getWriterConfig());
        }
        LOGGER.info("CONFIG: {}", hashMap);
        return hashMap;
    }
}
