package com.linkedin.davinci.ingestion.main;

import com.linkedin.davinci.ingestion.utils.IsolatedIngestionUtils;
import com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType;
import com.linkedin.davinci.notifier.VeniceNotifier;
import com.linkedin.davinci.stats.IsolatedIngestionProcessStats;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.ingestion.protocol.IngestionMetricsReport;
import com.linkedin.venice.ingestion.protocol.IngestionTaskReport;
import com.linkedin.venice.ingestion.protocol.enums.IngestionAction;
import com.linkedin.venice.ingestion.protocol.enums.IngestionReportType;
import com.linkedin.venice.kafka.protocol.state.StoreVersionState;
import com.linkedin.venice.utils.ExceptionUtils;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.util.Optional;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/linkedin/davinci/ingestion/main/MainIngestionReportHandler.class */
public class MainIngestionReportHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    private static final Logger LOGGER = LogManager.getLogger((Class<?>) MainIngestionReportHandler.class);
    private final MainIngestionMonitorService mainIngestionMonitorService;

    public MainIngestionReportHandler(MainIngestionMonitorService mainIngestionMonitorService) {
        LOGGER.info("MainIngestionReportHandler created.");
        this.mainIngestionMonitorService = mainIngestionMonitorService;
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) {
        channelHandlerContext.flush();
    }

    @Override // io.netty.channel.SimpleChannelInboundHandler
    public void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) {
        IngestionAction ingestionActionFromRequest = IsolatedIngestionUtils.getIngestionActionFromRequest(fullHttpRequest);
        try {
            switch (ingestionActionFromRequest) {
                case METRIC:
                    handleMetricsReport((IngestionMetricsReport) IsolatedIngestionUtils.deserializeIngestionActionRequest(IngestionAction.METRIC, IsolatedIngestionUtils.readHttpRequestContent(fullHttpRequest)));
                    break;
                case REPORT:
                    handleIngestionReport((IngestionTaskReport) IsolatedIngestionUtils.deserializeIngestionActionRequest(IngestionAction.REPORT, IsolatedIngestionUtils.readHttpRequestContent(fullHttpRequest)));
                    break;
                default:
                    throw new UnsupportedOperationException("Unrecognized ingestion action: " + ingestionActionFromRequest);
            }
            channelHandlerContext.writeAndFlush(IsolatedIngestionUtils.buildHttpResponse(HttpResponseStatus.OK, IsolatedIngestionUtils.getDummyContent()));
        } catch (UnsupportedOperationException e) {
            LOGGER.error("Caught unrecognized request action:", (Throwable) e);
            channelHandlerContext.writeAndFlush(IsolatedIngestionUtils.buildHttpResponse(HttpResponseStatus.BAD_REQUEST, ExceptionUtils.compactExceptionDescription(e, "channelRead0")));
        }
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelHandlerAdapter, io.netty.channel.ChannelHandler, io.netty.channel.ChannelInboundHandler
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        LOGGER.error("Encounter exception during ingestion task report handling.", th);
        channelHandlerContext.writeAndFlush(IsolatedIngestionUtils.buildHttpResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, th.getClass().getSimpleName() + "_" + ExceptionUtils.compactExceptionDescription(th, "exceptionCaught")));
        channelHandlerContext.close();
    }

    void handleMetricsReport(IngestionMetricsReport ingestionMetricsReport) {
        IsolatedIngestionProcessStats isolatedIngestionProcessStats = this.mainIngestionMonitorService.getIsolatedIngestionProcessStats();
        if (isolatedIngestionProcessStats != null) {
            isolatedIngestionProcessStats.updateMetricMap(ingestionMetricsReport.aggregatedMetrics);
        } else {
            LOGGER.warn("IsolatedIngestionProcessStats is not initialized yet, will skip metrics update.");
        }
    }

    void handleIngestionReport(IngestionTaskReport ingestionTaskReport) {
        IngestionReportType valueOf = IngestionReportType.valueOf(ingestionTaskReport.reportType);
        String charSequence = ingestionTaskReport.topicName.toString();
        int i = ingestionTaskReport.partitionId;
        long j = ingestionTaskReport.offset;
        LOGGER.info("Received ingestion report {} for topic: {}, partition: {} from ingestion service. ", valueOf.name(), charSequence, Integer.valueOf(i));
        updateLocalStorageMetadata(ingestionTaskReport);
        switch (valueOf) {
            case COMPLETED:
                this.mainIngestionMonitorService.setVersionPartitionToLocalIngestion(charSequence, i);
                LeaderFollowerStateType valueOf2 = LeaderFollowerStateType.valueOf(ingestionTaskReport.leaderFollowerState);
                notifierHelper(veniceNotifier -> {
                    veniceNotifier.completed(charSequence, i, ingestionTaskReport.offset, "", Optional.of(valueOf2));
                });
                return;
            case ERROR:
                this.mainIngestionMonitorService.setVersionPartitionToLocalIngestion(charSequence, i);
                notifierHelper(veniceNotifier2 -> {
                    veniceNotifier2.error(charSequence, i, ingestionTaskReport.message.toString(), new VeniceException(ingestionTaskReport.message.toString()));
                });
                return;
            case STARTED:
                notifierHelper(veniceNotifier3 -> {
                    veniceNotifier3.started(charSequence, i);
                });
                return;
            case RESTARTED:
                notifierHelper(veniceNotifier4 -> {
                    veniceNotifier4.restarted(charSequence, i, j);
                });
                return;
            case PROGRESS:
                notifierHelper(veniceNotifier5 -> {
                    veniceNotifier5.progress(charSequence, i, j);
                });
                return;
            case END_OF_PUSH_RECEIVED:
                notifierHelper(veniceNotifier6 -> {
                    veniceNotifier6.endOfPushReceived(charSequence, i, j);
                });
                return;
            case START_OF_INCREMENTAL_PUSH_RECEIVED:
                notifierHelper(veniceNotifier7 -> {
                    veniceNotifier7.startOfIncrementalPushReceived(charSequence, i, j, ingestionTaskReport.message.toString());
                });
                return;
            case END_OF_INCREMENTAL_PUSH_RECEIVED:
                notifierHelper(veniceNotifier8 -> {
                    veniceNotifier8.endOfIncrementalPushReceived(charSequence, i, j, ingestionTaskReport.message.toString());
                });
                return;
            case TOPIC_SWITCH_RECEIVED:
                notifierHelper(veniceNotifier9 -> {
                    veniceNotifier9.topicSwitchReceived(charSequence, i, j);
                });
                return;
            default:
                LOGGER.warn("Received unsupported ingestion report: {} it will be ignored for now.", ingestionTaskReport);
                return;
        }
    }

    private void notifierHelper(Consumer<VeniceNotifier> consumer) {
        this.mainIngestionMonitorService.getPushStatusNotifierList().forEach(consumer);
        this.mainIngestionMonitorService.getIngestionNotifier().forEach(consumer);
    }

    private void updateLocalStorageMetadata(IngestionTaskReport ingestionTaskReport) {
        String charSequence = ingestionTaskReport.topicName.toString();
        int i = ingestionTaskReport.partitionId;
        if (this.mainIngestionMonitorService.getStorageMetadataService() != null) {
            if (!ingestionTaskReport.offsetRecordArray.isEmpty()) {
                this.mainIngestionMonitorService.getStoreIngestionService().updatePartitionOffsetRecords(charSequence, i, ingestionTaskReport.offsetRecordArray);
            }
            if (ingestionTaskReport.storeVersionState != null) {
                StoreVersionState deserializeStoreVersionState = IsolatedIngestionUtils.deserializeStoreVersionState(charSequence, ingestionTaskReport.storeVersionState.array());
                this.mainIngestionMonitorService.getStorageMetadataService().putStoreVersionState(charSequence, deserializeStoreVersionState);
                LOGGER.info("Updated storeVersionState: {} for topic: {}", deserializeStoreVersionState.toString(), charSequence);
            }
        }
    }
}
