package com.linkedin.davinci.ingestion.main;

import com.linkedin.davinci.config.VeniceConfigLoader;
import com.linkedin.davinci.ingestion.IsolatedIngestionBackend;
import com.linkedin.davinci.ingestion.utils.IsolatedIngestionUtils;
import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService;
import com.linkedin.davinci.notifier.VeniceNotifier;
import com.linkedin.davinci.stats.IsolatedIngestionProcessHeartbeatStats;
import com.linkedin.davinci.stats.IsolatedIngestionProcessStats;
import com.linkedin.venice.ConfigKeys;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import com.linkedin.venice.utils.locks.AutoCloseableLock;
import com.linkedin.venice.utils.locks.AutoCloseableSingleLock;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.tehuti.metrics.MetricsRepository;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/davinci/ingestion/main/MainIngestionMonitorService.class */
public class MainIngestionMonitorService extends AbstractVeniceService {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) MainIngestionMonitorService.class);
    private final IsolatedIngestionBackend ingestionBackend;
    private final MainIngestionRequestClient heartbeatClient;
    private IsolatedIngestionProcessHeartbeatStats heartbeatStats;
    private ChannelFuture serverFuture;
    private MetricsRepository metricsRepository;
    private IsolatedIngestionProcessStats isolatedIngestionProcessStats;
    private MainIngestionStorageMetadataService storageMetadataService;
    private KafkaStoreIngestionService storeIngestionService;
    private ReadOnlyStoreRepository storeRepository;
    private final VeniceConfigLoader configLoader;
    private long connectionTimeoutMs;
    private final ScheduledExecutorService heartbeatCheckScheduler = Executors.newScheduledThreadPool(1);
    private final ExecutorService longRunningTaskExecutor = Executors.newSingleThreadExecutor();
    private final Map<String, MainTopicIngestionStatus> topicIngestionStatusMap = new VeniceConcurrentHashMap();
    private final Map<String, Map<Integer, Boolean>> topicPartitionLeaderStatusMap = new VeniceConcurrentHashMap();
    private final List<VeniceNotifier> ingestionNotifierList = new ArrayList();
    private final List<VeniceNotifier> pushStatusNotifierList = new ArrayList();
    private volatile long latestHeartbeatTimestamp = -1;
    private final ReentrantReadWriteLock forkProcessActionLock = new ReentrantReadWriteLock();
    private final EventLoopGroup bossGroup = new NioEventLoopGroup();
    private final EventLoopGroup workerGroup = new NioEventLoopGroup();
    private final ServerBootstrap bootstrap = new ServerBootstrap();

    public MainIngestionMonitorService(IsolatedIngestionBackend isolatedIngestionBackend, VeniceConfigLoader veniceConfigLoader) {
        this.configLoader = veniceConfigLoader;
        this.ingestionBackend = isolatedIngestionBackend;
        this.bootstrap.group(this.bossGroup, this.workerGroup).channel(NioServerSocketChannel.class).childHandler(new MainIngestionReportChannelInitializer(this, IsolatedIngestionUtils.getSSLFactory(veniceConfigLoader))).option(ChannelOption.SO_BACKLOG, 1000).childOption(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.SO_REUSEADDR, true).childOption(ChannelOption.TCP_NODELAY, true);
        this.heartbeatClient = new MainIngestionRequestClient(veniceConfigLoader);
    }

    /* JADX WARN: Type inference failed for: r1v3, types: [io.netty.channel.ChannelFuture] */
    @Override // com.linkedin.venice.service.AbstractVeniceService
    public boolean startInner() throws Exception {
        int ingestionApplicationPort = this.configLoader.getVeniceServerConfig().getIngestionApplicationPort();
        this.serverFuture = this.bootstrap.bind(ingestionApplicationPort).sync2();
        LOGGER.info("Report listener service started on port: {}", Integer.valueOf(ingestionApplicationPort));
        this.connectionTimeoutMs = this.configLoader.getCombinedProperties().getLong(ConfigKeys.SERVER_INGESTION_ISOLATION_CONNECTION_TIMEOUT_SECONDS, 180L) * 1000;
        setupMetricsCollection();
        return true;
    }

    @Override // com.linkedin.venice.service.AbstractVeniceService
    public void stopInner() throws Exception {
        shutdownScheduler(this.heartbeatCheckScheduler, "Heartbeat check");
        shutdownScheduler(this.longRunningTaskExecutor, "Long running task");
        this.heartbeatClient.close();
        ChannelFuture closeFuture = this.serverFuture.channel().closeFuture();
        this.workerGroup.shutdownGracefully();
        this.bossGroup.shutdownGracefully();
        closeFuture.sync2();
    }

    public void addIngestionNotifier(VeniceNotifier veniceNotifier) {
        if (veniceNotifier != null) {
            this.ingestionNotifierList.add(veniceNotifier);
        }
    }

    public List<VeniceNotifier> getIngestionNotifier() {
        return this.ingestionNotifierList;
    }

    public void addPushStatusNotifier(VeniceNotifier veniceNotifier) {
        if (veniceNotifier != null) {
            this.pushStatusNotifierList.add(veniceNotifier);
        }
    }

    public List<VeniceNotifier> getPushStatusNotifierList() {
        return this.pushStatusNotifierList;
    }

    public void setMetricsRepository(MetricsRepository metricsRepository) {
        this.metricsRepository = metricsRepository;
    }

    public MetricsRepository getMetricsRepository() {
        return this.metricsRepository;
    }

    public void setStorageMetadataService(MainIngestionStorageMetadataService mainIngestionStorageMetadataService) {
        this.storageMetadataService = mainIngestionStorageMetadataService;
    }

    public MainIngestionStorageMetadataService getStorageMetadataService() {
        return this.storageMetadataService;
    }

    public VeniceConfigLoader getConfigLoader() {
        return this.configLoader;
    }

    public void setStoreIngestionService(KafkaStoreIngestionService kafkaStoreIngestionService) {
        this.storeIngestionService = kafkaStoreIngestionService;
    }

    public KafkaStoreIngestionService getStoreIngestionService() {
        return this.storeIngestionService;
    }

    public void setStoreRepository(ReadOnlyStoreRepository readOnlyStoreRepository) {
        this.storeRepository = readOnlyStoreRepository;
    }

    public ReadOnlyStoreRepository getStoreRepository() {
        return this.storeRepository;
    }

    public boolean isTopicPartitionInLeaderState(String str, int i) {
        return getTopicPartitionLeaderStatusMap().getOrDefault(str, Collections.emptyMap()).getOrDefault(Integer.valueOf(i), false).booleanValue();
    }

    public void setTopicPartitionToLeaderState(String str, int i) {
        getTopicPartitionLeaderStatusMap().computeIfAbsent(str, str2 -> {
            return new VeniceConcurrentHashMap();
        }).put(Integer.valueOf(i), true);
    }

    public void setTopicIngestionToFollowerState(String str, int i) {
        getTopicPartitionLeaderStatusMap().computeIfAbsent(str, str2 -> {
            return new VeniceConcurrentHashMap();
        }).put(Integer.valueOf(i), false);
    }

    public MainPartitionIngestionStatus getTopicPartitionIngestionStatus(String str, int i) {
        MainTopicIngestionStatus mainTopicIngestionStatus = getTopicIngestionStatusMap().get(str);
        return mainTopicIngestionStatus != null ? mainTopicIngestionStatus.getPartitionIngestionStatus(i) : MainPartitionIngestionStatus.NOT_EXIST;
    }

    public void setVersionPartitionToLocalIngestion(String str, int i) {
        getTopicIngestionStatusMap().computeIfAbsent(str, str2 -> {
            return new MainTopicIngestionStatus(str);
        }).setPartitionIngestionStatusToLocalIngestion(i);
    }

    public void setVersionPartitionToIsolatedIngestion(String str, int i) {
        getTopicIngestionStatusMap().computeIfAbsent(str, str2 -> {
            return new MainTopicIngestionStatus(str);
        }).setPartitionIngestionStatusToIsolatedIngestion(i);
    }

    public void cleanupTopicPartitionState(String str, int i) {
        MainTopicIngestionStatus mainTopicIngestionStatus = getTopicIngestionStatusMap().get(str);
        if (mainTopicIngestionStatus != null) {
            mainTopicIngestionStatus.removePartitionIngestionStatus(i);
        }
        Map<Integer, Boolean> map = getTopicPartitionLeaderStatusMap().get(str);
        if (map != null) {
            map.remove(Integer.valueOf(i));
        }
        LOGGER.info("Ingestion status removed from main process for topic: {}, partition: {}", str, Integer.valueOf(i));
    }

    public void cleanupTopicState(String str) {
        getTopicIngestionStatusMap().remove(str);
        getTopicPartitionLeaderStatusMap().remove(str);
        LOGGER.info("Ingestion status removed from main process for topic: {}", str);
    }

    public long getTopicPartitionCount(String str) {
        MainTopicIngestionStatus mainTopicIngestionStatus = getTopicIngestionStatusMap().get(str);
        if (mainTopicIngestionStatus != null) {
            return mainTopicIngestionStatus.getIngestingPartitionCount();
        }
        return 0L;
    }

    private void setupMetricsCollection() {
        if (this.metricsRepository == null) {
            LOGGER.warn("No metrics repository is set up in ingestion report listener, skipping metrics collection");
            return;
        }
        this.heartbeatStats = new IsolatedIngestionProcessHeartbeatStats(this.metricsRepository);
        this.isolatedIngestionProcessStats = new IsolatedIngestionProcessStats(this.metricsRepository);
        this.heartbeatCheckScheduler.scheduleAtFixedRate(this::checkHeartbeatTimeout, 0L, 10L, TimeUnit.SECONDS);
    }

    private synchronized void tryRestartForkedProcess() {
        if (System.currentTimeMillis() - this.latestHeartbeatTimestamp <= this.connectionTimeoutMs) {
            return;
        }
        LOGGER.warn("Lost connection to forked ingestion process since timestamp {}, restarting forked process.", Long.valueOf(this.latestHeartbeatTimestamp));
        this.heartbeatStats.recordForkedProcessRestart();
        MainIngestionRequestClient mainIngestionRequestClient = new MainIngestionRequestClient(this.configLoader);
        try {
            IsolatedIngestionUtils.destroyIsolatedIngestionProcess(this.ingestionBackend.getIsolatedIngestionServiceProcess());
            this.ingestionBackend.setIsolatedIngestionServiceProcess(mainIngestionRequestClient.startForkedIngestionProcess(this.configLoader));
            LOGGER.info("Forked process has been recovered.");
            mainIngestionRequestClient.close();
            this.latestHeartbeatTimestamp = System.currentTimeMillis();
            this.heartbeatStats.recordHeartbeatAge(0L);
            this.longRunningTaskExecutor.execute(this::resumeOngoingIngestionTasks);
        } catch (Throwable th) {
            try {
                mainIngestionRequestClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    int resumeOngoingIngestionTasks() {
        AtomicInteger atomicInteger = new AtomicInteger();
        AutoCloseableLock of = AutoCloseableSingleLock.of(getForkProcessActionLock().writeLock());
        try {
            MainIngestionRequestClient createClient = createClient();
            try {
                Map<String, MainTopicIngestionStatus> topicIngestionStatusMap = getTopicIngestionStatusMap();
                LOGGER.info("Start to recover ongoing ingestion tasks: {}", topicIngestionStatusMap.keySet());
                Set<String> keySet = topicIngestionStatusMap.keySet();
                Objects.requireNonNull(createClient);
                keySet.forEach(createClient::openStorageEngine);
                topicIngestionStatusMap.forEach((str, mainTopicIngestionStatus) -> {
                    mainTopicIngestionStatus.getPartitionIngestionStatusSet().forEach((num, mainPartitionIngestionStatus) -> {
                        if (mainPartitionIngestionStatus.equals(MainPartitionIngestionStatus.ISOLATED)) {
                            try {
                                createClient.startConsumption(str, num.intValue());
                                LOGGER.info("Recovered ingestion task in isolated process for topic: {}, partition: {}", str, num);
                                atomicInteger.addAndGet(1);
                                if (isTopicPartitionInLeaderState(str, num.intValue())) {
                                    createClient.promoteToLeader(str, num.intValue());
                                    LOGGER.info("Delivered leader promotion message for topic: {}, partition: {}", str, num);
                                }
                            } catch (Exception e) {
                                LOGGER.warn("Recovery of ingestion failed for topic: {}, partition: {}", str, num, e);
                            }
                        }
                    });
                });
                LOGGER.info("Resumed {} topic partition ingestion tasks.", Integer.valueOf(atomicInteger.get()));
                if (createClient != null) {
                    createClient.close();
                }
                if (of != null) {
                    of.close();
                }
                return atomicInteger.get();
            } finally {
            }
        } catch (Throwable th) {
            if (of != null) {
                try {
                    of.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void checkHeartbeatTimeout() {
        long currentTimeMillis = System.currentTimeMillis();
        LOGGER.info("Checking heartbeat timeout at {}, latest heartbeat received: {}", Long.valueOf(currentTimeMillis), Long.valueOf(this.latestHeartbeatTimestamp));
        if (this.heartbeatClient.sendHeartbeatRequest()) {
            this.latestHeartbeatTimestamp = System.currentTimeMillis();
            this.heartbeatStats.recordHeartbeatAge(0L);
            LOGGER.info("Received forked process heartbeat ack at: {}", Long.valueOf(this.latestHeartbeatTimestamp));
        } else {
            long currentTimeMillis2 = System.currentTimeMillis();
            this.heartbeatStats.recordHeartbeatAge(currentTimeMillis - this.latestHeartbeatTimestamp);
            LOGGER.warn("Heartbeat request to forked process issued at {}, failed at {}, latest successful timestamp: {}", Long.valueOf(currentTimeMillis2), Long.valueOf(currentTimeMillis), Long.valueOf(this.latestHeartbeatTimestamp));
        }
        if (this.latestHeartbeatTimestamp == -1 || currentTimeMillis - this.latestHeartbeatTimestamp <= this.connectionTimeoutMs) {
            return;
        }
        tryRestartForkedProcess();
    }

    private void shutdownScheduler(ExecutorService executorService, String str) {
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(5L, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
                LOGGER.info("{} scheduler has been shutdown.", str);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    MainIngestionRequestClient createClient() {
        return new MainIngestionRequestClient(this.configLoader);
    }

    public Map<String, MainTopicIngestionStatus> getTopicIngestionStatusMap() {
        return this.topicIngestionStatusMap;
    }

    public Map<String, Map<Integer, Boolean>> getTopicPartitionLeaderStatusMap() {
        return this.topicPartitionLeaderStatusMap;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public IsolatedIngestionProcessStats getIsolatedIngestionProcessStats() {
        return this.isolatedIngestionProcessStats;
    }

    public ReentrantReadWriteLock getForkProcessActionLock() {
        return this.forkProcessActionLock;
    }
}
