package com.linkedin.venice.router;

import com.linkedin.venice.client.store.AvroGenericStoreClient;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceRouterWrapper;
import com.linkedin.venice.integration.utils.VeniceServerWrapper;
import com.linkedin.venice.listener.request.RouterRequest;
import com.linkedin.venice.listener.response.HttpShortcutResponse;
import com.linkedin.venice.utils.Utils;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.io.Closeable;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/* loaded from: input_file:com/linkedin/venice/router/ReplicaFailoverTest.class */
public class ReplicaFailoverTest {
    private static final Logger LOGGER = LogManager.getLogger(ReplicaFailoverTest.class);
    private static final int KEY_COUNT = 1000;
    private static final int TEST_TIMEOUT = 15000;
    private static final int REPLICATION_FACTOR = 2;
    private static final int MAX_CONCURRENT_REQUESTS = 20;
    private static final double REQUEST_PERCENTILE = 90.0d;
    private static final long MAX_REQUEST_LATENCY_QD1 = 50;
    private static final double MIN_QPS = 1.0d;
    private VeniceClusterWrapper cluster;
    private String storeName;
    private Map<Integer, AtomicInteger> errorHitCountMap = new HashMap();

    @BeforeClass
    public void setUp() throws Exception {
        Utils.thisIsLocalhost();
        this.cluster = ServiceFactory.getVeniceCluster(1, REPLICATION_FACTOR, 0, REPLICATION_FACTOR);
        Properties properties = new Properties();
        properties.setProperty("router.stateful.healthcheck.enabled", "true");
        properties.setProperty("router.async.start.enabled", "true");
        properties.setProperty("router.unhealthy.pending.connection.threshold.per.host", String.valueOf(MAX_CONCURRENT_REQUESTS));
        properties.setProperty("router.long.tail.retry.for.single.get.threshold.ms", String.valueOf(25L));
        this.cluster.addVeniceRouter(properties);
        this.storeName = this.cluster.createStore(KEY_COUNT);
    }

    @AfterClass
    public void cleanUp() {
        Utils.closeQuietlyWithErrorLogged(new Closeable[]{this.cluster});
    }

    @BeforeMethod
    public void setupTestCase() {
        Iterator<VeniceServerWrapper> it = this.cluster.getVeniceServers().iterator();
        while (it.hasNext()) {
            it.next().getVeniceServer().setRequestHandler(null);
        }
        this.errorHitCountMap.clear();
        for (VeniceRouterWrapper veniceRouterWrapper : this.cluster.getVeniceRouters()) {
            this.cluster.stopVeniceRouter(veniceRouterWrapper.getPort());
            this.cluster.restartVeniceRouter(veniceRouterWrapper.getPort());
        }
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "workloadParams")
    public static Object[][] workloadParams() {
        return new Object[]{new Object[]{1}, new Object[]{Integer.valueOf(MAX_CONCURRENT_REQUESTS)}};
    }

    @Test(timeOut = 15000, dataProvider = "workloadParams")
    public void testDeadReplica(int i) {
        List<VeniceServerWrapper> subList = this.cluster.getVeniceServers().subList(0, 1);
        Iterator<VeniceServerWrapper> it = subList.iterator();
        while (it.hasNext()) {
            this.cluster.stopVeniceServer(it.next().getPort());
        }
        runWorkload(i, REQUEST_PERCENTILE, MAX_REQUEST_LATENCY_QD1);
        Iterator<VeniceServerWrapper> it2 = subList.iterator();
        while (it2.hasNext()) {
            this.cluster.restartVeniceServer(it2.next().getPort());
        }
    }

    @Test(timeOut = 15000, dataProvider = "workloadParams")
    public void testDisconnectedReplica(int i) {
        for (VeniceServerWrapper veniceServerWrapper : this.cluster.getVeniceServers().subList(0, 1)) {
            AtomicInteger atomicInteger = new AtomicInteger();
            this.errorHitCountMap.put(Integer.valueOf(veniceServerWrapper.getPort()), atomicInteger);
            veniceServerWrapper.getVeniceServer().setRequestHandler((channelHandlerContext, obj) -> {
                atomicInteger.incrementAndGet();
                channelHandlerContext.close();
                return true;
            });
        }
        runWorkload(i, REQUEST_PERCENTILE, MAX_REQUEST_LATENCY_QD1);
    }

    @Test(timeOut = 15000, dataProvider = "workloadParams")
    public void testHungReplica(int i) {
        for (VeniceServerWrapper veniceServerWrapper : this.cluster.getVeniceServers().subList(0, 1)) {
            AtomicInteger atomicInteger = new AtomicInteger();
            this.errorHitCountMap.put(Integer.valueOf(veniceServerWrapper.getPort()), atomicInteger);
            veniceServerWrapper.getVeniceServer().setRequestHandler((channelHandlerContext, obj) -> {
                if (!(obj instanceof RouterRequest)) {
                    return false;
                }
                atomicInteger.incrementAndGet();
                return true;
            });
        }
        runWorkload(i, REQUEST_PERCENTILE, MAX_REQUEST_LATENCY_QD1);
    }

    @Test(timeOut = 15000, dataProvider = "workloadParams")
    public void testSlowReplica(int i) {
        for (VeniceServerWrapper veniceServerWrapper : this.cluster.getVeniceServers().subList(0, 1)) {
            AtomicInteger atomicInteger = new AtomicInteger();
            this.errorHitCountMap.put(Integer.valueOf(veniceServerWrapper.getPort()), atomicInteger);
            veniceServerWrapper.getVeniceServer().setRequestHandler((channelHandlerContext, obj) -> {
                atomicInteger.incrementAndGet();
                Utils.sleep(500L);
                return false;
            });
        }
        runWorkload(i, REQUEST_PERCENTILE, MAX_REQUEST_LATENCY_QD1);
    }

    @Test(timeOut = 15000, dataProvider = "workloadParams")
    public void testFaultyReplica(int i) {
        for (VeniceServerWrapper veniceServerWrapper : this.cluster.getVeniceServers().subList(0, 1)) {
            AtomicInteger atomicInteger = new AtomicInteger();
            this.errorHitCountMap.put(Integer.valueOf(veniceServerWrapper.getPort()), atomicInteger);
            veniceServerWrapper.getVeniceServer().setRequestHandler((channelHandlerContext, obj) -> {
                if (!(obj instanceof RouterRequest)) {
                    return false;
                }
                atomicInteger.incrementAndGet();
                channelHandlerContext.writeAndFlush(new HttpShortcutResponse("", HttpResponseStatus.INTERNAL_SERVER_ERROR));
                return true;
            });
        }
        runWorkload(i, REQUEST_PERCENTILE, MAX_REQUEST_LATENCY_QD1);
    }

    long getMaxTimeToCompleteRequests(int i) {
        return Math.round((1000.0d * i) / MIN_QPS);
    }

    void runWorkload(int i, double d, long j) {
        long j2 = j * i;
        int i2 = 0;
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        AtomicLong atomicLong = new AtomicLong();
        AtomicLong atomicLong2 = new AtomicLong();
        try {
            try {
                AvroGenericStoreClient andStartGenericAvroClient = ClientFactory.getAndStartGenericAvroClient(ClientConfig.defaultGenericClientConfig(this.storeName).setD2ServiceName(VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME).setVeniceURL(this.cluster.getZk().getAddress()));
                try {
                    Semaphore semaphore = new Semaphore(i);
                    AtomicReference atomicReference = new AtomicReference();
                    while (i2 < KEY_COUNT) {
                        Assert.assertTrue(semaphore.tryAcquire(1, getMaxTimeToCompleteRequests(1), TimeUnit.MILLISECONDS), "Minimal QPS requirement not met");
                        long nanoTime = System.nanoTime();
                        andStartGenericAvroClient.get(Integer.valueOf(i2)).whenComplete((obj, th) -> {
                            if (th != null) {
                                atomicReference.set(th);
                            }
                            if (!Objects.equals(obj, 1)) {
                                atomicReference.set(new AssertionError("Unexpected single-get result, expected=1, actual=" + obj));
                            }
                            long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime);
                            atomicInteger2.incrementAndGet();
                            atomicLong.addAndGet(millis);
                            if (millis > j2) {
                                atomicInteger.incrementAndGet();
                                atomicLong2.addAndGet(millis);
                            }
                            semaphore.release();
                        });
                        i2++;
                        if (atomicReference.get() != null) {
                            throw new AssertionError(atomicReference.get());
                        }
                        if (i2 == KEY_COUNT) {
                            Assert.assertTrue(semaphore.tryAcquire(i, getMaxTimeToCompleteRequests(i), TimeUnit.MILLISECONDS), "Minimal QPS requirement not met");
                        }
                        if (atomicInteger.get() > 1000.0d * (MIN_QPS - (d / 100.0d))) {
                            Assert.fail(atomicInteger.get() + " out of " + atomicInteger2.get() + " single-get requests exceeded " + j2 + " ms, average outlier latency is " + (atomicLong2.get() / Math.max(1, atomicInteger.get())) + " ms, average latency is " + (atomicLong.get() / Math.max(1, atomicInteger2.get())) + " ms");
                        }
                    }
                    Iterator<AtomicInteger> it = this.errorHitCountMap.values().iterator();
                    while (it.hasNext()) {
                        Assert.assertNotEquals(Integer.valueOf(it.next().get()), 0);
                    }
                    if (andStartGenericAvroClient != null) {
                        andStartGenericAvroClient.close();
                    }
                    LOGGER.info("{} out of {} single-get requests exceeded {} ms, average outlier latency is {} ms, average latency is {} ms", Integer.valueOf(atomicInteger.get()), Integer.valueOf(atomicInteger2.get()), Long.valueOf(j2), Long.valueOf(atomicLong2.get() / Math.max(1, atomicInteger.get())), Long.valueOf(atomicLong.get() / Math.max(1, atomicInteger2.get())));
                    if (this.errorHitCountMap.isEmpty()) {
                        return;
                    }
                    LOGGER.info(this.errorHitCountMap.values().stream().mapToInt((v0) -> {
                        return v0.get();
                    }).sum() + " errors were triggered in total");
                } catch (Throwable th2) {
                    if (andStartGenericAvroClient != null) {
                        try {
                            andStartGenericAvroClient.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    }
                    throw th2;
                }
            } catch (Throwable th4) {
                LOGGER.info("{} out of {} single-get requests exceeded {} ms, average outlier latency is {} ms, average latency is {} ms", Integer.valueOf(atomicInteger.get()), Integer.valueOf(atomicInteger2.get()), Long.valueOf(j2), Long.valueOf(atomicLong2.get() / Math.max(1, atomicInteger.get())), Long.valueOf(atomicLong.get() / Math.max(1, atomicInteger2.get())));
                if (!this.errorHitCountMap.isEmpty()) {
                    LOGGER.info(this.errorHitCountMap.values().stream().mapToInt((v0) -> {
                        return v0.get();
                    }).sum() + " errors were triggered in total");
                }
                throw th4;
            }
        } catch (InterruptedException e) {
            throw new AssertionError(e);
        }
    }
}
