package com.linkedin.venice.pushmonitor;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.venice.client.store.transport.D2TransportClient;
import com.linkedin.venice.client.store.transport.TransportClientResponse;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.routerapi.PushStatusResponse;
import com.linkedin.venice.samza.SamzaExitMode;
import com.linkedin.venice.samza.VeniceSystemFactory;
import com.linkedin.venice.utils.DaemonThreadFactory;
import com.linkedin.venice.utils.ObjectMapperFactory;
import com.linkedin.venice.utils.Utils;
import java.io.Closeable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.samza.system.SystemProducer;

/* loaded from: input_file:com/linkedin/venice/pushmonitor/RouterBasedPushMonitor.class */
public class RouterBasedPushMonitor implements Closeable {
    private static final Logger LOGGER = LogManager.getLogger(RouterBasedPushMonitor.class);
    private static final int POLL_CYCLE_DELAY_MS = 10000;
    private static final long POLL_TIMEOUT_MS = 10000;
    private final String topicName;
    private final PushMonitorTask pushMonitorTask;
    private ExecutionStatus currentStatus = ExecutionStatus.UNKNOWN;
    private final ExecutorService executor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("RouterBasedPushMonitor"));

    /* renamed from: com.linkedin.venice.pushmonitor.RouterBasedPushMonitor$1, reason: invalid class name */
    /* loaded from: input_file:com/linkedin/venice/pushmonitor/RouterBasedPushMonitor$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$linkedin$venice$pushmonitor$ExecutionStatus = new int[ExecutionStatus.values().length];

        static {
            try {
                $SwitchMap$com$linkedin$venice$pushmonitor$ExecutionStatus[ExecutionStatus.END_OF_PUSH_RECEIVED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$linkedin$venice$pushmonitor$ExecutionStatus[ExecutionStatus.COMPLETED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$linkedin$venice$pushmonitor$ExecutionStatus[ExecutionStatus.ERROR.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/venice/pushmonitor/RouterBasedPushMonitor$PushMonitorTask.class */
    public static class PushMonitorTask implements Runnable, Closeable {
        private static final ObjectMapper MAPPER = ObjectMapperFactory.getInstance();
        private final String topicName;
        private final D2TransportClient transportClient;
        private final String requestPath;
        private final RouterBasedPushMonitor pushMonitorService;
        private final VeniceSystemFactory factory;
        private final SystemProducer producer;
        private SamzaExitMode exitMode = SamzaExitMode.SUCCESS_EXIT;
        private final AtomicBoolean isRunning = new AtomicBoolean(true);

        public PushMonitorTask(D2TransportClient d2TransportClient, String str, RouterBasedPushMonitor routerBasedPushMonitor, VeniceSystemFactory veniceSystemFactory, SystemProducer systemProducer) {
            this.transportClient = d2TransportClient;
            this.topicName = str;
            this.requestPath = buildPushStatusRequestPath(str);
            this.pushMonitorService = routerBasedPushMonitor;
            this.factory = veniceSystemFactory;
            this.producer = systemProducer;
        }

        @Override // java.lang.Runnable
        public void run() {
            RouterBasedPushMonitor.LOGGER.info("Running {}", getClass().getSimpleName());
            while (this.isRunning.get()) {
                try {
                    PushStatusResponse pushStatusResponse = (PushStatusResponse) MAPPER.readValue(((TransportClientResponse) this.transportClient.get(this.requestPath).get(RouterBasedPushMonitor.POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS)).getBody(), PushStatusResponse.class);
                    if (pushStatusResponse.isError()) {
                        RouterBasedPushMonitor.LOGGER.error("Router was not able to get push status: {}", pushStatusResponse.getError());
                    } else {
                        this.pushMonitorService.setCurrentStatus(pushStatusResponse.getExecutionStatus());
                        switch (AnonymousClass1.$SwitchMap$com$linkedin$venice$pushmonitor$ExecutionStatus[pushStatusResponse.getExecutionStatus().ordinal()]) {
                            case 1:
                            case 2:
                                RouterBasedPushMonitor.LOGGER.info("Samza stream reprocessing has finished successfully for store version: {}", this.topicName);
                                this.factory.endStreamReprocessingSystemProducer(this.producer, true);
                                if (this.factory.getNumberOfActiveSystemProducers() == 0) {
                                    RouterBasedPushMonitor.LOGGER.info("Pause 30 seconds before exiting the Samza process.");
                                    Utils.sleep(30000L);
                                    if (this.factory.getNumberOfActiveSystemProducers() == 0) {
                                        if (!this.factory.getOverallExecutionStatus()) {
                                            throw new VeniceException("Not all stream reprocessing jobs succeeded.");
                                        }
                                        RouterBasedPushMonitor.LOGGER.info("Exiting Samza process after all stream reprocessing jobs succeeded.");
                                        this.exitMode.exit();
                                        return;
                                    }
                                    break;
                                } else {
                                    return;
                                }
                            case 3:
                                RouterBasedPushMonitor.LOGGER.info("Stream reprocessing job failed for store version: {}", this.topicName);
                                this.factory.endStreamReprocessingSystemProducer(this.producer, false);
                                return;
                            default:
                                RouterBasedPushMonitor.LOGGER.info("Current stream reprocessing job state: {} for store version: {}", pushStatusResponse.getExecutionStatus(), this.topicName);
                                break;
                        }
                        Utils.sleep(RouterBasedPushMonitor.POLL_TIMEOUT_MS);
                    }
                } catch (Exception e) {
                    RouterBasedPushMonitor.LOGGER.error("Error when polling push status from router for store version: {}", this.topicName, e);
                }
            }
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            this.isRunning.getAndSet(false);
        }

        private static String buildPushStatusRequestPath(String str) {
            return "push_status/" + str;
        }
    }

    public RouterBasedPushMonitor(D2TransportClient d2TransportClient, String str, VeniceSystemFactory veniceSystemFactory, SystemProducer systemProducer) {
        this.topicName = str;
        this.pushMonitorTask = new PushMonitorTask(d2TransportClient, this.topicName, this, veniceSystemFactory, systemProducer);
    }

    public void start() {
        this.executor.submit(this.pushMonitorTask);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.pushMonitorTask.close();
    }

    public void setCurrentStatus(ExecutionStatus executionStatus) {
        this.currentStatus = executionStatus;
    }

    public ExecutionStatus getCurrentStatus() {
        return this.currentStatus;
    }

    public void setStreamReprocessingExitMode(SamzaExitMode samzaExitMode) {
        this.pushMonitorTask.exitMode = samzaExitMode;
    }
}
