package com.linkedin.davinci.kafka.consumer;

import com.linkedin.davinci.notifier.VeniceNotifier;
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackendTest;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/davinci/kafka/consumer/StatusReportAdapterTest.class */
public class StatusReportAdapterTest {
    private static final Logger LOGGER = LogManager.getLogger(StatusReportAdapterTest.class);
    private final List<ExecutionStatus> recordStatusList = new ArrayList();
    private final Random random = new Random();

    /* renamed from: com.linkedin.davinci.kafka.consumer.StatusReportAdapterTest$2, reason: invalid class name */
    /* loaded from: input_file:com/linkedin/davinci/kafka/consumer/StatusReportAdapterTest$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$com$linkedin$venice$pushmonitor$ExecutionStatus = new int[ExecutionStatus.values().length];

        static {
            try {
                $SwitchMap$com$linkedin$venice$pushmonitor$ExecutionStatus[ExecutionStatus.STARTED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$linkedin$venice$pushmonitor$ExecutionStatus[ExecutionStatus.END_OF_PUSH_RECEIVED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$linkedin$venice$pushmonitor$ExecutionStatus[ExecutionStatus.COMPLETED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    @Test
    public void testStatusReportWithAmpFactor() {
        this.recordStatusList.clear();
        VeniceNotifier notifier = getNotifier();
        ArrayDeque arrayDeque = new ArrayDeque();
        arrayDeque.add(notifier);
        IngestionNotificationDispatcher ingestionNotificationDispatcher = new IngestionNotificationDispatcher(arrayDeque, "test_v1", () -> {
            return true;
        });
        ConcurrentMap<Integer, PartitionConsumptionState> generateMockedPcsMap = generateMockedPcsMap(3);
        AmplificationFactorAdapter amplificationFactorAdapter = (AmplificationFactorAdapter) Mockito.mock(AmplificationFactorAdapter.class);
        Mockito.when(Integer.valueOf(amplificationFactorAdapter.getAmplificationFactor())).thenReturn(3);
        StatusReportAdapter statusReportAdapter = new StatusReportAdapter(ingestionNotificationDispatcher, amplificationFactorAdapter);
        statusReportAdapter.initializePartitionReportStatus(0);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(ExecutionStatus.STARTED);
        arrayList2.add(ExecutionStatus.END_OF_PUSH_RECEIVED);
        arrayList2.add(ExecutionStatus.COMPLETED);
        for (int i = 0; i < 3; i++) {
            arrayList.add(getReportCallable(statusReportAdapter, generateMockedPcsMap.get(Integer.valueOf(i)), arrayList2));
        }
        try {
            newFixedThreadPool.invokeAll(arrayList, 10L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        Assert.assertEquals(this.recordStatusList, arrayList2, "Reported status " + this.recordStatusList);
    }

    private ConcurrentMap<Integer, PartitionConsumptionState> generateMockedPcsMap(int i) {
        VeniceConcurrentHashMap veniceConcurrentHashMap = new VeniceConcurrentHashMap();
        OffsetRecord offsetRecord = (OffsetRecord) Mockito.mock(OffsetRecord.class);
        Mockito.when(offsetRecord.toBytes()).thenReturn(new byte[0]);
        for (int i2 = 0; i2 < i; i2++) {
            PartitionConsumptionState partitionConsumptionState = (PartitionConsumptionState) Mockito.mock(PartitionConsumptionState.class);
            ((PartitionConsumptionState) Mockito.doReturn(Integer.valueOf(i2)).when(partitionConsumptionState)).getPartition();
            ((PartitionConsumptionState) Mockito.doReturn(0).when(partitionConsumptionState)).getUserPartition();
            ((PartitionConsumptionState) Mockito.doReturn(LeaderFollowerStateType.STANDBY).when(partitionConsumptionState)).getLeaderFollowerState();
            ((PartitionConsumptionState) Mockito.doReturn(true).when(partitionConsumptionState)).isComplete();
            ((PartitionConsumptionState) Mockito.doReturn(offsetRecord).when(partitionConsumptionState)).getOffsetRecord();
            veniceConcurrentHashMap.put(Integer.valueOf(i2), partitionConsumptionState);
        }
        return veniceConcurrentHashMap;
    }

    private Callable<Void> getReportCallable(StatusReportAdapter statusReportAdapter, PartitionConsumptionState partitionConsumptionState, List<ExecutionStatus> list) {
        return () -> {
            try {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    ExecutionStatus executionStatus = (ExecutionStatus) it.next();
                    Thread.sleep(this.random.nextInt(100));
                    LOGGER.info("Sending report status: {} to partition {}", executionStatus, Integer.valueOf(partitionConsumptionState.getPartition()));
                    switch (AnonymousClass2.$SwitchMap$com$linkedin$venice$pushmonitor$ExecutionStatus[executionStatus.ordinal()]) {
                        case ObjectCacheBackendTest.STORE_VERSION /* 1 */:
                            statusReportAdapter.reportStarted(partitionConsumptionState);
                        case 2:
                            statusReportAdapter.reportEndOfPushReceived(partitionConsumptionState);
                        case 3:
                            statusReportAdapter.reportCompleted(partitionConsumptionState);
                        default:
                            throw new VeniceException("Unsupported execution status: " + executionStatus);
                    }
                }
                return null;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return null;
            }
        };
    }

    private VeniceNotifier getNotifier() {
        return new VeniceNotifier() { // from class: com.linkedin.davinci.kafka.consumer.StatusReportAdapterTest.1
            public void started(String str, int i) {
                StatusReportAdapterTest.this.recordStatus(ExecutionStatus.STARTED);
            }

            public void endOfPushReceived(String str, int i, long j) {
                StatusReportAdapterTest.this.recordStatus(ExecutionStatus.END_OF_PUSH_RECEIVED);
            }

            public void completed(String str, int i, long j, String str2) {
                StatusReportAdapterTest.this.recordStatus(ExecutionStatus.COMPLETED);
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void recordStatus(ExecutionStatus executionStatus) {
        this.recordStatusList.add(executionStatus);
        LOGGER.info("Write push status: {} to mocked push monitor.", executionStatus);
    }
}
