package com.datastax.oss.driver.core.cql.reactive;

import com.datastax.dse.driver.api.core.cql.reactive.ReactiveRow;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy;
import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metrics.DefaultNodeMetric;
import com.datastax.oss.driver.api.core.metrics.Metrics;
import com.datastax.oss.driver.api.core.servererrors.UnavailableException;
import com.datastax.oss.driver.api.core.session.Request;
import com.datastax.oss.driver.api.core.session.Session;
import com.datastax.oss.driver.api.testinfra.loadbalancing.NodeComparator;
import com.datastax.oss.driver.api.testinfra.session.SessionRule;
import com.datastax.oss.driver.api.testinfra.session.SessionUtils;
import com.datastax.oss.driver.api.testinfra.simulacron.SimulacronRule;
import com.datastax.oss.driver.categories.ParallelizableTests;
import com.datastax.oss.driver.core.retry.PerProfileRetryPolicyIT;
import com.datastax.oss.simulacron.common.cluster.ClusterSpec;
import com.datastax.oss.simulacron.common.codec.ConsistencyLevel;
import com.datastax.oss.simulacron.common.stubbing.PrimeDsl;
import com.datastax.oss.simulacron.server.BoundCluster;
import com.google.common.collect.Iterables;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import io.reactivex.Flowable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.TreeSet;
import java.util.UUID;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import org.mockito.Mockito;
import org.mockito.internal.verification.VerificationModeFactory;

@Category({ParallelizableTests.class})
/* loaded from: input_file:com/datastax/oss/driver/core/cql/reactive/ReactiveRetryIT.class */
public class ReactiveRetryIT {
    private static final SimulacronRule SIMULACRON_RULE = new SimulacronRule(ClusterSpec.builder().withNodes(new int[]{3}));
    private static final SessionRule<CqlSession> SESSION_RULE = SessionRule.builder(SIMULACRON_RULE).withConfigLoader(SessionUtils.configLoaderBuilder().withBoolean(DefaultDriverOption.REQUEST_DEFAULT_IDEMPOTENCE, true).withClass(DefaultDriverOption.LOAD_BALANCING_POLICY_CLASS, CyclingLoadBalancingPolicy.class).withClass(DefaultDriverOption.RETRY_POLICY_CLASS, PerProfileRetryPolicyIT.NoRetryPolicy.class).withStringList(DefaultDriverOption.METRICS_NODE_ENABLED, Collections.singletonList("errors.request.unavailables")).build()).build();

    @ClassRule
    public static final TestRule CHAIN = RuleChain.outerRule(SIMULACRON_RULE).around(SESSION_RULE);
    private static final String QUERY_STRING = "select * from foo";
    private List<Node> nodes;

    /* loaded from: input_file:com/datastax/oss/driver/core/cql/reactive/ReactiveRetryIT$CyclingLoadBalancingPolicy.class */
    public static class CyclingLoadBalancingPolicy implements LoadBalancingPolicy {
        private final TreeSet<Node> nodes = new TreeSet<>((Comparator) NodeComparator.INSTANCE);
        private volatile Iterator<Node> iterator = Iterables.cycle(this.nodes).iterator();

        public CyclingLoadBalancingPolicy(DriverContext driverContext, String str) {
        }

        public void init(@NonNull Map<UUID, Node> map, @NonNull LoadBalancingPolicy.DistanceReporter distanceReporter) {
            this.nodes.addAll(map.values());
            this.nodes.forEach(node -> {
                distanceReporter.setDistance(node, NodeDistance.LOCAL);
            });
            this.iterator = Iterables.cycle(this.nodes).iterator();
        }

        @NonNull
        public Queue<Node> newQueryPlan(@Nullable Request request, @Nullable Session session) {
            return new ArrayDeque(Collections.singleton(this.iterator.next()));
        }

        public void onAdd(@NonNull Node node) {
        }

        public void onUp(@NonNull Node node) {
        }

        public void onDown(@NonNull Node node) {
        }

        public void onRemove(@NonNull Node node) {
        }

        public void close() {
        }
    }

    @Before
    public void clearPrimes() {
        SIMULACRON_RULE.cluster().clearLogs();
        SIMULACRON_RULE.cluster().clearPrimes(true);
    }

    @Before
    public void createNodesList() {
        this.nodes = new ArrayList(SESSION_RULE.session().getMetadata().getNodes().values());
        this.nodes.sort(NodeComparator.INSTANCE);
    }

    @Test
    public void should_retry_at_application_level() {
        CqlSession cqlSession = (CqlSession) Mockito.spy(SESSION_RULE.session());
        BoundCluster cluster = SIMULACRON_RULE.cluster();
        cluster.node(0L).prime(PrimeDsl.when(QUERY_STRING).then(PrimeDsl.unavailable(ConsistencyLevel.ONE, 1, 0)));
        cluster.node(1L).prime(PrimeDsl.when(QUERY_STRING).then(PrimeDsl.unavailable(ConsistencyLevel.ONE, 1, 0)));
        cluster.node(2L).prime(PrimeDsl.when(QUERY_STRING).then(PrimeDsl.rows().row(new Object[]{"col1", "Yay!"})));
        Assertions.assertThat(((ReactiveRow) Flowable.defer(() -> {
            return cqlSession.executeReactive(QUERY_STRING);
        }).retry((num, th) -> {
            Assertions.assertThat(th).isInstanceOf(UnavailableException.class);
            Node coordinator = ((UnavailableException) th).getCoordinator();
            if (num.intValue() == 1) {
                assertCoordinator(0, coordinator);
                return true;
            }
            if (num.intValue() == 2) {
                assertCoordinator(1, coordinator);
                return true;
            }
            Assert.fail("Unexpected retry attempt");
            return false;
        }).blockingLast()).getString(0)).isEqualTo("Yay!");
        ((CqlSession) Mockito.verify(cqlSession, VerificationModeFactory.times(3))).executeReactive(QUERY_STRING);
        assertUnavailableMetric(0, 1L);
        assertUnavailableMetric(1, 1L);
        assertUnavailableMetric(2, 0L);
    }

    private void assertCoordinator(int i, Node node) {
        Assertions.assertThat(node).isSameAs(this.nodes.get(i));
    }

    private void assertUnavailableMetric(int i, long j) {
        Assertions.assertThat(((Metrics) SESSION_RULE.session().getMetrics().orElseThrow(AssertionError::new)).getNodeMetric(this.nodes.get(i), DefaultNodeMetric.UNAVAILABLES)).isPresent().hasValueSatisfying(metric -> {
            Assertions.assertThat(metric).extracting("count").isEqualTo(Long.valueOf(j));
        });
    }
}
