package com.linkedin.davinci.ingestion.isolated;

import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
import com.linkedin.davinci.ingestion.utils.IsolatedIngestionUtils;
import com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService;
import com.linkedin.davinci.store.record.ValueRecord;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.ingestion.protocol.IngestionMetricsReport;
import com.linkedin.venice.ingestion.protocol.IngestionStorageMetadata;
import com.linkedin.venice.ingestion.protocol.IngestionTaskCommand;
import com.linkedin.venice.ingestion.protocol.IngestionTaskReport;
import com.linkedin.venice.ingestion.protocol.ProcessShutdownCommand;
import com.linkedin.venice.ingestion.protocol.enums.IngestionAction;
import com.linkedin.venice.ingestion.protocol.enums.IngestionCommandType;
import com.linkedin.venice.ingestion.protocol.enums.IngestionComponentType;
import com.linkedin.venice.meta.IngestionMetadataUpdateType;
import com.linkedin.venice.meta.SubscriptionBasedReadOnlyStoreRepository;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.utils.ExceptionUtils;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.util.HashMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/davinci/ingestion/isolated/IsolatedIngestionServerHandler.class */
public class IsolatedIngestionServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    private static final Logger LOGGER = LogManager.getLogger(IsolatedIngestionServerHandler.class);
    private final IsolatedIngestionServer isolatedIngestionServer;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.linkedin.davinci.ingestion.isolated.IsolatedIngestionServerHandler$1, reason: invalid class name */
    /* loaded from: input_file:com/linkedin/davinci/ingestion/isolated/IsolatedIngestionServerHandler$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$linkedin$venice$ingestion$protocol$enums$IngestionAction;
        static final /* synthetic */ int[] $SwitchMap$com$linkedin$venice$ingestion$protocol$enums$IngestionCommandType;
        static final /* synthetic */ int[] $SwitchMap$com$linkedin$venice$meta$IngestionMetadataUpdateType;
        static final /* synthetic */ int[] $SwitchMap$com$linkedin$venice$ingestion$protocol$enums$IngestionComponentType = new int[IngestionComponentType.values().length];

        static {
            try {
                $SwitchMap$com$linkedin$venice$ingestion$protocol$enums$IngestionComponentType[IngestionComponentType.KAFKA_INGESTION_SERVICE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$linkedin$venice$ingestion$protocol$enums$IngestionComponentType[IngestionComponentType.STORAGE_SERVICE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            $SwitchMap$com$linkedin$venice$meta$IngestionMetadataUpdateType = new int[IngestionMetadataUpdateType.values().length];
            try {
                $SwitchMap$com$linkedin$venice$meta$IngestionMetadataUpdateType[IngestionMetadataUpdateType.PUT_OFFSET_RECORD.ordinal()] = 1;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$linkedin$venice$meta$IngestionMetadataUpdateType[IngestionMetadataUpdateType.CLEAR_OFFSET_RECORD.ordinal()] = 2;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$linkedin$venice$meta$IngestionMetadataUpdateType[IngestionMetadataUpdateType.PUT_STORE_VERSION_STATE.ordinal()] = 3;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$com$linkedin$venice$meta$IngestionMetadataUpdateType[IngestionMetadataUpdateType.CLEAR_STORE_VERSION_STATE.ordinal()] = 4;
            } catch (NoSuchFieldError e6) {
            }
            $SwitchMap$com$linkedin$venice$ingestion$protocol$enums$IngestionCommandType = new int[IngestionCommandType.values().length];
            try {
                $SwitchMap$com$linkedin$venice$ingestion$protocol$enums$IngestionCommandType[IngestionCommandType.START_CONSUMPTION.ordinal()] = 1;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$com$linkedin$venice$ingestion$protocol$enums$IngestionCommandType[IngestionCommandType.STOP_CONSUMPTION.ordinal()] = 2;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$com$linkedin$venice$ingestion$protocol$enums$IngestionCommandType[IngestionCommandType.KILL_CONSUMPTION.ordinal()] = 3;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$com$linkedin$venice$ingestion$protocol$enums$IngestionCommandType[IngestionCommandType.SHUTDOWN_INGESTION_TASK.ordinal()] = 4;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$com$linkedin$venice$ingestion$protocol$enums$IngestionCommandType[IngestionCommandType.IS_PARTITION_CONSUMING.ordinal()] = 5;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$com$linkedin$venice$ingestion$protocol$enums$IngestionCommandType[IngestionCommandType.REMOVE_STORAGE_ENGINE.ordinal()] = 6;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$com$linkedin$venice$ingestion$protocol$enums$IngestionCommandType[IngestionCommandType.REMOVE_PARTITION.ordinal()] = 7;
            } catch (NoSuchFieldError e13) {
            }
            try {
                $SwitchMap$com$linkedin$venice$ingestion$protocol$enums$IngestionCommandType[IngestionCommandType.OPEN_STORAGE_ENGINE.ordinal()] = 8;
            } catch (NoSuchFieldError e14) {
            }
            try {
                $SwitchMap$com$linkedin$venice$ingestion$protocol$enums$IngestionCommandType[IngestionCommandType.PROMOTE_TO_LEADER.ordinal()] = 9;
            } catch (NoSuchFieldError e15) {
            }
            try {
                $SwitchMap$com$linkedin$venice$ingestion$protocol$enums$IngestionCommandType[IngestionCommandType.DEMOTE_TO_STANDBY.ordinal()] = 10;
            } catch (NoSuchFieldError e16) {
            }
            try {
                $SwitchMap$com$linkedin$venice$ingestion$protocol$enums$IngestionCommandType[IngestionCommandType.RESET_PARTITION.ordinal()] = 11;
            } catch (NoSuchFieldError e17) {
            }
            $SwitchMap$com$linkedin$venice$ingestion$protocol$enums$IngestionAction = new int[IngestionAction.values().length];
            try {
                $SwitchMap$com$linkedin$venice$ingestion$protocol$enums$IngestionAction[IngestionAction.COMMAND.ordinal()] = 1;
            } catch (NoSuchFieldError e18) {
            }
            try {
                $SwitchMap$com$linkedin$venice$ingestion$protocol$enums$IngestionAction[IngestionAction.METRIC.ordinal()] = 2;
            } catch (NoSuchFieldError e19) {
            }
            try {
                $SwitchMap$com$linkedin$venice$ingestion$protocol$enums$IngestionAction[IngestionAction.HEARTBEAT.ordinal()] = 3;
            } catch (NoSuchFieldError e20) {
            }
            try {
                $SwitchMap$com$linkedin$venice$ingestion$protocol$enums$IngestionAction[IngestionAction.UPDATE_METADATA.ordinal()] = 4;
            } catch (NoSuchFieldError e21) {
            }
            try {
                $SwitchMap$com$linkedin$venice$ingestion$protocol$enums$IngestionAction[IngestionAction.SHUTDOWN_COMPONENT.ordinal()] = 5;
            } catch (NoSuchFieldError e22) {
            }
        }
    }

    public IsolatedIngestionServerHandler(IsolatedIngestionServer isolatedIngestionServer) {
        this.isolatedIngestionServer = isolatedIngestionServer;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("IsolatedIngestionServerHandler created for listener service.");
        }
    }

    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) {
        channelHandlerContext.flush();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) throws Exception {
        try {
            IngestionAction ingestionActionFromRequest = IsolatedIngestionUtils.getIngestionActionFromRequest(fullHttpRequest);
            byte[] dummyContent = IsolatedIngestionUtils.getDummyContent();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Received {} message: {}", ingestionActionFromRequest.name(), fullHttpRequest);
            }
            if (!this.isolatedIngestionServer.isInitiated()) {
                throw new VeniceException("Isolated ingestion server is not initialized yet!");
            }
            switch (AnonymousClass1.$SwitchMap$com$linkedin$venice$ingestion$protocol$enums$IngestionAction[ingestionActionFromRequest.ordinal()]) {
                case 1:
                    dummyContent = IsolatedIngestionUtils.serializeIngestionActionResponse(ingestionActionFromRequest, handleIngestionTaskCommand((IngestionTaskCommand) IsolatedIngestionUtils.deserializeIngestionActionRequest(ingestionActionFromRequest, IsolatedIngestionUtils.readHttpRequestContent(fullHttpRequest))));
                    break;
                case 2:
                    dummyContent = IsolatedIngestionUtils.serializeIngestionActionResponse(ingestionActionFromRequest, handleMetricsRequest());
                    break;
                case VeniceServerConfig.MINIMUM_CONSUMER_NUM_IN_CONSUMER_POOL_PER_KAFKA_CLUSTER /* 3 */:
                    this.isolatedIngestionServer.updateHeartbeatTime();
                    break;
                case ValueRecord.SCHEMA_HEADER_LENGTH /* 4 */:
                    dummyContent = IsolatedIngestionUtils.serializeIngestionActionResponse(ingestionActionFromRequest, handleIngestionStorageMetadataUpdate((IngestionStorageMetadata) IsolatedIngestionUtils.deserializeIngestionActionRequest(ingestionActionFromRequest, IsolatedIngestionUtils.readHttpRequestContent(fullHttpRequest))));
                    break;
                case 5:
                    dummyContent = IsolatedIngestionUtils.serializeIngestionActionResponse(ingestionActionFromRequest, handleProcessShutdownCommand((ProcessShutdownCommand) IsolatedIngestionUtils.deserializeIngestionActionRequest(ingestionActionFromRequest, IsolatedIngestionUtils.readHttpRequestContent(fullHttpRequest))));
                    break;
                default:
                    throw new UnsupportedOperationException("Unrecognized ingestion action: " + ingestionActionFromRequest);
            }
            channelHandlerContext.writeAndFlush(IsolatedIngestionUtils.buildHttpResponse(HttpResponseStatus.OK, dummyContent));
        } catch (UnsupportedOperationException e) {
            LOGGER.error("Caught unrecognized request action:", e);
            channelHandlerContext.writeAndFlush(IsolatedIngestionUtils.buildHttpResponse(HttpResponseStatus.BAD_REQUEST, ExceptionUtils.compactExceptionDescription(e, "channelRead0")));
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        LOGGER.error("Encounter exception -  message: {}, cause: {}", th.getMessage(), th);
        channelHandlerContext.writeAndFlush(IsolatedIngestionUtils.buildHttpResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, ExceptionUtils.compactExceptionDescription(th, "exceptionCaught")));
        channelHandlerContext.close();
    }

    private IngestionTaskReport handleIngestionTaskCommand(IngestionTaskCommand ingestionTaskCommand) {
        long currentTimeMillis = System.currentTimeMillis();
        String charSequence = ingestionTaskCommand.topicName.toString();
        int i = ingestionTaskCommand.partitionId;
        String parseStoreFromKafkaTopicName = Version.parseStoreFromKafkaTopicName(charSequence);
        IngestionTaskReport createIngestionTaskReport = IsolatedIngestionUtils.createIngestionTaskReport(charSequence, i);
        IngestionCommandType valueOf = IngestionCommandType.valueOf(ingestionTaskCommand.commandType);
        LOGGER.info("Received ingestion command {} for topic: {}, partition: {} in timestamp: {}", valueOf, charSequence, Integer.valueOf(i), Long.valueOf(currentTimeMillis));
        try {
        } catch (Exception e) {
            LOGGER.error("Encounter exception while handling ingestion command", e);
            createIngestionTaskReport.isPositive = false;
            createIngestionTaskReport.exceptionThrown = true;
            createIngestionTaskReport.message = e.getClass().getSimpleName() + "_" + ExceptionUtils.compactExceptionDescription(e, "handleIngestionTaskCommand");
        }
        if (!this.isolatedIngestionServer.isInitiated()) {
            throw new VeniceException("IsolatedIngestionServer has not been initiated.");
        }
        KafkaStoreIngestionService storeIngestionService = this.isolatedIngestionServer.getStoreIngestionService();
        VeniceStoreVersionConfig storeConfig = this.isolatedIngestionServer.getConfigLoader().getStoreConfig(charSequence);
        storeConfig.setRestoreDataPartitions(false);
        switch (AnonymousClass1.$SwitchMap$com$linkedin$venice$ingestion$protocol$enums$IngestionCommandType[valueOf.ordinal()]) {
            case 1:
                this.isolatedIngestionServer.maybeSubscribeNewResource(charSequence, i);
                validateAndExecuteCommand(valueOf, createIngestionTaskReport, () -> {
                    SubscriptionBasedReadOnlyStoreRepository storeRepository = this.isolatedIngestionServer.getStoreRepository();
                    if (storeRepository instanceof SubscriptionBasedReadOnlyStoreRepository) {
                        LOGGER.info("Ingestion Service subscribing to store: {}", parseStoreFromKafkaTopicName);
                        try {
                            storeRepository.subscribe(parseStoreFromKafkaTopicName);
                        } catch (InterruptedException e2) {
                            LOGGER.warn("Subscription to store: {} is interrupted. ", parseStoreFromKafkaTopicName);
                        }
                    }
                    LOGGER.info("Start ingesting partition: {} of topic: {}", Integer.valueOf(i), charSequence);
                    this.isolatedIngestionServer.setResourceToBeSubscribed(charSequence, i);
                    this.isolatedIngestionServer.getIngestionBackend().startConsumption(storeConfig, i);
                });
                break;
            case 2:
                validateAndExecuteCommand(valueOf, createIngestionTaskReport, () -> {
                    this.isolatedIngestionServer.getIngestionBackend().stopConsumption(storeConfig, i);
                });
                break;
            case VeniceServerConfig.MINIMUM_CONSUMER_NUM_IN_CONSUMER_POOL_PER_KAFKA_CLUSTER /* 3 */:
                this.isolatedIngestionServer.getIngestionBackend().killConsumptionTask(charSequence);
                this.isolatedIngestionServer.cleanupTopicState(charSequence);
                break;
            case ValueRecord.SCHEMA_HEADER_LENGTH /* 4 */:
                this.isolatedIngestionServer.getIngestionBackend().shutdownIngestionTask(charSequence);
                this.isolatedIngestionServer.cleanupTopicState(charSequence);
                break;
            case 5:
                createIngestionTaskReport.isPositive = storeIngestionService.isPartitionConsuming(charSequence, i);
                break;
            case 6:
                this.isolatedIngestionServer.getIngestionBackend().removeStorageEngine(charSequence);
                this.isolatedIngestionServer.cleanupTopicState(charSequence);
                break;
            case 7:
                validateAndExecuteCommand(valueOf, createIngestionTaskReport, () -> {
                    this.isolatedIngestionServer.getIngestionBackend().dropStoragePartitionGracefully(storeConfig, i, this.isolatedIngestionServer.getStopConsumptionWaitRetriesNum(), false);
                    this.isolatedIngestionServer.cleanupTopicPartitionState(charSequence, i);
                });
                break;
            case 8:
                storeConfig.setRestoreDataPartitions(false);
                storeConfig.setRestoreMetadataPartition(true);
                this.isolatedIngestionServer.getStorageService().openStore(storeConfig, () -> {
                    return null;
                });
                LOGGER.info("Metadata partition of topic: {} restored.", ingestionTaskCommand.topicName);
                break;
            case 9:
                validateAndExecuteCommand(valueOf, createIngestionTaskReport, () -> {
                    this.isolatedIngestionServer.getIngestionBackend().promoteToLeader(storeConfig, i, this.isolatedIngestionServer.getLeaderSectionIdChecker(charSequence, i));
                });
                break;
            case 10:
                validateAndExecuteCommand(valueOf, createIngestionTaskReport, () -> {
                    this.isolatedIngestionServer.getIngestionBackend().demoteToStandby(storeConfig, i, this.isolatedIngestionServer.getLeaderSectionIdChecker(charSequence, i));
                });
                break;
            case 11:
                this.isolatedIngestionServer.cleanupTopicPartitionState(charSequence, i);
                break;
        }
        LOGGER.info("Completed ingestion command {} for topic: {}, partition: {} in {} ms.", valueOf, charSequence, Integer.valueOf(i), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        return createIngestionTaskReport;
    }

    private IngestionMetricsReport handleMetricsRequest() {
        IngestionMetricsReport ingestionMetricsReport = new IngestionMetricsReport();
        ingestionMetricsReport.aggregatedMetrics = new HashMap();
        if (this.isolatedIngestionServer.getMetricsRepository() != null) {
            this.isolatedIngestionServer.getMetricsRepository().metrics().forEach((str, metric) -> {
                if (metric != null) {
                    try {
                        Double d = this.isolatedIngestionServer.getMetricsMap().get(str);
                        if (d == null || !d.equals(Double.valueOf(metric.value()))) {
                            ingestionMetricsReport.aggregatedMetrics.put(str, Double.valueOf(metric.value()));
                        }
                        this.isolatedIngestionServer.getMetricsMap().put(str, Double.valueOf(metric.value()));
                    } catch (Exception e) {
                        String str = "Encounter exception when retrieving value of metric: " + str;
                        if (this.isolatedIngestionServer.getRedundantExceptionFilter().isRedundantException(str)) {
                            return;
                        }
                        LOGGER.error(str, e);
                    }
                }
            });
        }
        return ingestionMetricsReport;
    }

    private IngestionTaskReport handleIngestionStorageMetadataUpdate(IngestionStorageMetadata ingestionStorageMetadata) {
        String charSequence = ingestionStorageMetadata.topicName.toString();
        int i = ingestionStorageMetadata.partitionId;
        IngestionTaskReport createIngestionTaskReport = IsolatedIngestionUtils.createIngestionTaskReport(charSequence, i);
        try {
        } catch (VeniceException e) {
            LOGGER.error("Encounter exception while updating storage metadata", e);
            createIngestionTaskReport.isPositive = true;
            createIngestionTaskReport.message = e.getClass().getSimpleName() + "_" + ExceptionUtils.compactExceptionDescription(e, "handleIngestionStorageMetadataUpdate");
        }
        if (!this.isolatedIngestionServer.isInitiated()) {
            LOGGER.error("IsolatedIngestionServer has not been initiated.");
            createIngestionTaskReport.isPositive = false;
            createIngestionTaskReport.message = "IsolatedIngestionServer has not been initiated.";
            return createIngestionTaskReport;
        }
        switch (AnonymousClass1.$SwitchMap$com$linkedin$venice$meta$IngestionMetadataUpdateType[IngestionMetadataUpdateType.valueOf(ingestionStorageMetadata.metadataUpdateType).ordinal()]) {
            case 1:
                this.isolatedIngestionServer.getStorageMetadataService().put(charSequence, i, new OffsetRecord(ingestionStorageMetadata.payload.array(), this.isolatedIngestionServer.getPartitionStateSerializer()));
                break;
            case 2:
                this.isolatedIngestionServer.getStorageMetadataService().clearOffset(charSequence, i);
                break;
            case VeniceServerConfig.MINIMUM_CONSUMER_NUM_IN_CONSUMER_POOL_PER_KAFKA_CLUSTER /* 3 */:
                this.isolatedIngestionServer.getStorageMetadataService().computeStoreVersionState(charSequence, storeVersionState -> {
                    return IsolatedIngestionUtils.deserializeStoreVersionState(charSequence, ingestionStorageMetadata.payload.array());
                });
                break;
            case ValueRecord.SCHEMA_HEADER_LENGTH /* 4 */:
                this.isolatedIngestionServer.getStorageMetadataService().clearStoreVersionState(charSequence);
                break;
        }
        return createIngestionTaskReport;
    }

    private IngestionTaskReport handleProcessShutdownCommand(ProcessShutdownCommand processShutdownCommand) {
        IngestionTaskReport createIngestionTaskReport = IsolatedIngestionUtils.createIngestionTaskReport();
        try {
        } catch (Exception e) {
            LOGGER.error("Encounter exception while shutting down ingestion components in forked process", e);
            createIngestionTaskReport.isPositive = false;
            createIngestionTaskReport.message = e.getClass().getSimpleName() + "_" + ExceptionUtils.compactExceptionDescription(e, "handleProcessShutdownCommand");
        }
        if (!this.isolatedIngestionServer.isInitiated()) {
            throw new VeniceException("IsolatedIngestionServer has not been initiated.");
        }
        switch (AnonymousClass1.$SwitchMap$com$linkedin$venice$ingestion$protocol$enums$IngestionComponentType[IngestionComponentType.valueOf(processShutdownCommand.componentType).ordinal()]) {
            case 1:
                this.isolatedIngestionServer.getStoreIngestionService().stop();
                break;
            case 2:
                this.isolatedIngestionServer.getStorageService().stop();
                break;
        }
        return createIngestionTaskReport;
    }

    protected void validateAndExecuteCommand(IngestionCommandType ingestionCommandType, IngestionTaskReport ingestionTaskReport, Runnable runnable) {
        String charSequence = ingestionTaskReport.topicName.toString();
        int i = ingestionTaskReport.partitionId;
        if (this.isolatedIngestionServer.isResourceSubscribed(charSequence, i)) {
            runnable.run();
        } else {
            ingestionTaskReport.isPositive = false;
            LOGGER.info("Topic: {}, partition {} is being unsubscribed, will reject command {}", charSequence, Integer.valueOf(i), ingestionCommandType.name());
        }
    }
}
