package com.linkedin.davinci.kafka.consumer;

import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/RemoteIngestionRepairService.class */
public class RemoteIngestionRepairService extends AbstractVeniceService {
    private final Thread repairThread = new IngestionRepairServiceThread();
    private final Map<StoreIngestionTask, BlockingQueue<Runnable>> ingestionRepairTasks = new VeniceConcurrentHashMap();
    public static final int DEFAULT_REPAIR_THREAD_SLEEP_INTERVAL_SECONDS = 1800;
    private final int repairThreadSleepInterval;
    private static final Logger LOGGER = LogManager.getLogger(RemoteIngestionRepairService.class);

    /* loaded from: input_file:com/linkedin/davinci/kafka/consumer/RemoteIngestionRepairService$IngestionRepairServiceThread.class */
    private class IngestionRepairServiceThread extends Thread {
        IngestionRepairServiceThread() {
            super("Ingestion-Repair-Service-Thread");
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!Thread.interrupted()) {
                RemoteIngestionRepairService.this.pollRepairTasks();
                try {
                    TimeUnit.SECONDS.sleep(RemoteIngestionRepairService.this.repairThreadSleepInterval);
                } catch (InterruptedException e) {
                }
            }
            RemoteIngestionRepairService.LOGGER.info("RemoteIngestionRepairService thread interrupted!  Shutting down...");
        }
    }

    public RemoteIngestionRepairService(int i) {
        this.repairThreadSleepInterval = i;
    }

    public boolean startInner() throws Exception {
        this.repairThread.start();
        return true;
    }

    public void stopInner() throws Exception {
        this.repairThread.interrupt();
    }

    public void registerRepairTask(StoreIngestionTask storeIngestionTask, Runnable runnable) {
        if (!this.repairThread.isAlive()) {
            throw new IllegalStateException("RemoteIngestionRepairService is no longer running!  Rejecting repair task!!");
        }
        this.ingestionRepairTasks.computeIfAbsent(storeIngestionTask, storeIngestionTask2 -> {
            return new LinkedBlockingDeque();
        }).offer(runnable);
    }

    public void unregisterRepairTasksForStoreIngestionTask(StoreIngestionTask storeIngestionTask) {
        this.ingestionRepairTasks.remove(storeIngestionTask).clear();
    }

    Map<StoreIngestionTask, BlockingQueue<Runnable>> getIngestionRepairTasks() {
        return this.ingestionRepairTasks;
    }

    void pollRepairTasks() {
        for (Map.Entry<StoreIngestionTask, BlockingQueue<Runnable>> entry : this.ingestionRepairTasks.entrySet()) {
            BlockingQueue<Runnable> value = entry.getValue();
            if (!value.isEmpty()) {
                Runnable poll = value.poll();
                try {
                    poll.run();
                } catch (Exception e) {
                    LOGGER.error("Failed to repair partition for ingestion task for store: {}", entry.getKey());
                    value.add(poll);
                }
            }
        }
    }
}
