package com.datastax.dse.driver.api.core.cql.continuous;

import com.datastax.dse.driver.api.core.config.DseDriverOption;
import com.datastax.dse.driver.api.core.cql.continuous.ContinuousPagingITBase;
import com.datastax.dse.driver.api.core.metrics.DseSessionMetric;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.DriverTimeoutException;
import com.datastax.oss.driver.api.core.Version;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.metrics.DefaultNodeMetric;
import com.datastax.oss.driver.api.testinfra.DseRequirement;
import com.datastax.oss.driver.api.testinfra.ccm.CcmRule;
import com.datastax.oss.driver.api.testinfra.session.SessionRule;
import com.datastax.oss.driver.api.testinfra.session.SessionUtils;
import com.datastax.oss.driver.categories.ParallelizableTests;
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.Uninterruptibles;
import com.tngtech.java.junit.dataprovider.DataProviderRunner;
import com.tngtech.java.junit.dataprovider.UseDataProvider;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Fail;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;

@RunWith(DataProviderRunner.class)
@DseRequirement(min = "5.1.0", description = "Continuous paging is only available from 5.1.0 onwards")
@Category({ParallelizableTests.class})
/* loaded from: input_file:com/datastax/dse/driver/api/core/cql/continuous/ContinuousPagingIT.class */
public class ContinuousPagingIT extends ContinuousPagingITBase {
    private static CcmRule ccmRule = CcmRule.getInstance();
    private static SessionRule<CqlSession> sessionRule = SessionRule.builder(ccmRule).withConfigLoader(SessionUtils.configLoaderBuilder().withStringList(DefaultDriverOption.METRICS_SESSION_ENABLED, Collections.singletonList(DseSessionMetric.CONTINUOUS_CQL_REQUESTS.getPath())).withStringList(DefaultDriverOption.METRICS_NODE_ENABLED, Collections.singletonList(DefaultNodeMetric.CQL_MESSAGES.getPath())).build()).build();

    @ClassRule
    public static TestRule chain = RuleChain.outerRule(ccmRule).around(sessionRule);

    /* loaded from: input_file:com/datastax/dse/driver/api/core/cql/continuous/ContinuousPagingIT$AsyncContinuousPagingFunction.class */
    private static class AsyncContinuousPagingFunction implements Function<ContinuousAsyncResultSet, CompletionStage<PageStatistics>> {
        private final int rowsSoFar;

        AsyncContinuousPagingFunction() {
            this(0);
        }

        AsyncContinuousPagingFunction(int i) {
            this.rowsSoFar = i;
        }

        @Override // java.util.function.Function
        public CompletionStage<PageStatistics> apply(ContinuousAsyncResultSet continuousAsyncResultSet) {
            int i = this.rowsSoFar;
            Iterator it = continuousAsyncResultSet.currentPage().iterator();
            while (it.hasNext()) {
                int i2 = ((Row) it.next()).getInt("v");
                if (i2 != i) {
                    Fail.fail(String.format("Expected v == %d, got %d.", Integer.valueOf(i), Integer.valueOf(i2)));
                }
                i++;
            }
            if (continuousAsyncResultSet.hasMorePages()) {
                return continuousAsyncResultSet.fetchNextPage().thenCompose(new AsyncContinuousPagingFunction(i));
            }
            int pageNumber = i == this.rowsSoFar ? continuousAsyncResultSet.pageNumber() - 1 : continuousAsyncResultSet.pageNumber();
            CompletableFuture completableFuture = new CompletableFuture();
            completableFuture.complete(new PageStatistics(i, pageNumber));
            return completableFuture;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datastax/dse/driver/api/core/cql/continuous/ContinuousPagingIT$PageStatistics.class */
    public static class PageStatistics {
        int rows;
        int pages;

        PageStatistics(int i, int i2) {
            this.rows = i;
            this.pages = i2;
        }
    }

    @BeforeClass
    public static void setUp() {
        initialize(sessionRule.session(), sessionRule.slowProfile());
    }

    @Test
    @UseDataProvider("pagingOptions")
    public void should_execute_synchronously(ContinuousPagingITBase.Options options) {
        CqlSession cqlSession = (CqlSession) sessionRule.session();
        int i = 0;
        Iterator it = cqlSession.executeContinuously(SimpleStatement.newInstance("SELECT v from test where k=?", new Object[]{"k"}).setExecutionProfile(options.asProfile(cqlSession))).iterator();
        while (it.hasNext()) {
            Assertions.assertThat(((Row) it.next()).getInt("v")).isEqualTo(i);
            i++;
        }
        Assertions.assertThat(i).isEqualTo(options.expectedRows);
        validateMetrics(cqlSession);
    }

    @Test
    @UseDataProvider("pagingOptions")
    public void should_execute_prepared_statement_synchronously(ContinuousPagingITBase.Options options) {
        CqlSession cqlSession = (CqlSession) sessionRule.session();
        int i = 0;
        Iterator it = cqlSession.executeContinuously(prepared.bind(new Object[]{"k"}).setExecutionProfile(options.asProfile(cqlSession))).iterator();
        while (it.hasNext()) {
            Assertions.assertThat(((Row) it.next()).getInt("v")).isEqualTo(i);
            i++;
        }
        Assertions.assertThat(i).isEqualTo(options.expectedRows);
        validateMetrics(cqlSession);
    }

    @Test
    @UseDataProvider("pagingOptions")
    public void should_execute_asynchronously(ContinuousPagingITBase.Options options) {
        CqlSession cqlSession = (CqlSession) sessionRule.session();
        PageStatistics pageStatistics = (PageStatistics) CompletableFutures.getUninterruptibly(cqlSession.executeContinuouslyAsync(SimpleStatement.newInstance("SELECT v from test where k=?", new Object[]{"k"}).setExecutionProfile(options.asProfile(cqlSession))).thenCompose(new AsyncContinuousPagingFunction()));
        Assertions.assertThat(pageStatistics.rows).isEqualTo(options.expectedRows);
        Assertions.assertThat(pageStatistics.pages).isEqualTo(options.expectedPages);
        validateMetrics(cqlSession);
    }

    @Test
    public void simple_statement_paging_should_be_resilient_to_schema_change() {
        CqlSession session = sessionRule.session();
        SimpleStatement newInstance = SimpleStatement.newInstance("select * from test_prepare");
        Iterator it = session.executeContinuously(newInstance.setExecutionProfile(session.getContext().getConfig().getDefaultProfile().withInt(DseDriverOption.CONTINUOUS_PAGING_MAX_ENQUEUED_PAGES, 1).withInt(DseDriverOption.CONTINUOUS_PAGING_PAGE_SIZE, 1).withInt(DefaultDriverOption.REQUEST_TIMEOUT, 120000000))).iterator();
        Row row = (Row) it.next();
        Assertions.assertThat(row.getString("k")).isNotNull();
        Assertions.assertThat(row.isNull("v")).isFalse();
        CqlSession newSession = SessionUtils.newSession(ccmRule, (CqlIdentifier) session.getKeyspace().orElseThrow(IllegalStateException::new), SessionUtils.configLoaderBuilder().withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(30L)).build());
        newSession.execute(SimpleStatement.newInstance("ALTER TABLE test_prepare add b int").setExecutionProfile(sessionRule.slowProfile()));
        newSession.checkSchemaAgreement();
        while (it.hasNext()) {
            Row row2 = (Row) it.next();
            Assertions.assertThat(row2.getString("k")).isNotNull();
            Assertions.assertThat(row2.isNull("v")).isFalse();
            Assertions.assertThat(row2.getColumnDefinitions().contains("b")).isFalse();
        }
        for (Row row3 : session.executeContinuously(newInstance)) {
            Assertions.assertThat(row3.getString("k")).isNotNull();
            Assertions.assertThat(row3.isNull("v")).isFalse();
            Assertions.assertThat(row3.isNull("b")).isTrue();
            Assertions.assertThat(row3.getColumnDefinitions().contains("b")).isTrue();
        }
    }

    @Test
    public void prepared_statement_paging_should_be_resilient_to_schema_change() {
        CqlSession session = sessionRule.session();
        session.execute("CREATE TABLE test_prep (k text PRIMARY KEY, v int)");
        for (int i = 0; i < 100; i++) {
            session.execute(String.format("INSERT INTO test_prep (k, v) VALUES ('foo', %d)", Integer.valueOf(i)));
        }
        PreparedStatement prepare = session.prepare("SELECT * FROM test_prep WHERE k = ?");
        DriverExecutionProfile withInt = session.getContext().getConfig().getDefaultProfile().withInt(DseDriverOption.CONTINUOUS_PAGING_MAX_ENQUEUED_PAGES, 1).withInt(DseDriverOption.CONTINUOUS_PAGING_PAGE_SIZE, 1);
        Iterator it = session.executeContinuously(prepare.bind(new Object[]{"foo"}).setExecutionProfile(withInt)).iterator();
        Row row = (Row) it.next();
        Assertions.assertThat(row.getString("k")).isNotNull();
        Assertions.assertThat(row.isNull("v")).isFalse();
        SessionUtils.newSession(ccmRule, (CqlIdentifier) session.getKeyspace().orElseThrow(IllegalStateException::new), SessionUtils.configLoaderBuilder().withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(30L)).build()).execute("ALTER TABLE test_prep DROP v;");
        while (it.hasNext()) {
            Row row2 = (Row) it.next();
            Assertions.assertThat(row2.getString("k")).isNotNull();
            if (((Version) ccmRule.getDseVersion().orElseThrow(IllegalStateException::new)).compareTo(Version.parse("6.0.0")) >= 0) {
                Assertions.assertThat(row2.isNull("v")).isTrue();
            }
            Assertions.assertThat(row2.getColumnDefinitions().contains("v")).isTrue();
        }
        for (Row row3 : session.executeContinuously(session.prepare("SELECT * FROM test_prep WHERE k = ?").bind(new Object[]{"foo"}).setExecutionProfile(withInt))) {
            Assertions.assertThat(row3.getString("k")).isNotNull();
            Assertions.assertThat(row3.getColumnDefinitions().contains("v")).isFalse();
        }
    }

    @Test
    public void should_cancel_with_synchronous_paging() {
        CqlSession session = sessionRule.session();
        SimpleStatement newInstance = SimpleStatement.newInstance("SELECT v from test where k=?", new Object[]{"k"});
        DriverExecutionProfile withInt = session.getContext().getConfig().getDefaultProfile().withInt(DseDriverOption.CONTINUOUS_PAGING_MAX_ENQUEUED_PAGES, 1).withInt(DseDriverOption.CONTINUOUS_PAGING_PAGE_SIZE, 10).withInt(DseDriverOption.CONTINUOUS_PAGING_MAX_PAGES_PER_SECOND, 1);
        ContinuousResultSet executeContinuously = session.executeContinuously(newInstance.setExecutionProfile(withInt));
        executeContinuously.cancel();
        int i = 0;
        Iterator it = executeContinuously.iterator();
        while (it.hasNext()) {
            Assertions.assertThat(((Row) it.next()).getInt("v")).isEqualTo(i);
            i++;
        }
        Assertions.assertThat(i).isEqualTo(10);
        Iterator it2 = session.executeContinuously(newInstance.setExecutionProfile(withInt.withInt(DseDriverOption.CONTINUOUS_PAGING_MAX_PAGES_PER_SECOND, 0)).setPagingState(executeContinuously.getExecutionInfo().getPagingState())).iterator();
        while (it2.hasNext()) {
            Assertions.assertThat(((Row) it2.next()).getInt("v")).isEqualTo(i);
            i++;
        }
        Assertions.assertThat(i).isEqualTo(100);
    }

    @Test
    public void should_cancel_with_asynchronous_paging() {
        ContinuousAsyncResultSet continuousAsyncResultSet;
        CqlSession session = sessionRule.session();
        SimpleStatement newInstance = SimpleStatement.newInstance("SELECT v from test where k=?", new Object[]{"k"});
        DriverExecutionProfile withInt = session.getContext().getConfig().getDefaultProfile().withInt(DseDriverOption.CONTINUOUS_PAGING_PAGE_SIZE, 10).withInt(DseDriverOption.CONTINUOUS_PAGING_MAX_PAGES_PER_SECOND, 1);
        ContinuousAsyncResultSet continuousAsyncResultSet2 = (ContinuousAsyncResultSet) CompletableFutures.getUninterruptibly(session.executeContinuouslyAsync(newInstance.setExecutionProfile(withInt)));
        continuousAsyncResultSet2.cancel();
        try {
            CompletableFutures.getUninterruptibly(continuousAsyncResultSet2.fetchNextPage());
            Fail.fail("Expected an execution exception since paging was cancelled.");
        } catch (CancellationException e) {
            Assertions.assertThat(e).hasMessageContaining("Can't get more results").hasMessageContaining("query was cancelled");
        }
        int i = 0;
        Iterator it = continuousAsyncResultSet2.currentPage().iterator();
        while (it.hasNext()) {
            Assertions.assertThat(((Row) it.next()).getInt("v")).isEqualTo(i);
            i++;
        }
        Assertions.assertThat(i).isEqualTo(10);
        CompletionStage executeContinuouslyAsync = session.executeContinuouslyAsync(newInstance.setExecutionProfile(withInt.withInt(DseDriverOption.CONTINUOUS_PAGING_MAX_PAGES_PER_SECOND, 0)).setPagingState(continuousAsyncResultSet2.getExecutionInfo().getPagingState()));
        do {
            continuousAsyncResultSet = (ContinuousAsyncResultSet) CompletableFutures.getUninterruptibly(executeContinuouslyAsync);
            Iterator it2 = continuousAsyncResultSet.currentPage().iterator();
            while (it2.hasNext()) {
                Assertions.assertThat(((Row) it2.next()).getInt("v")).isEqualTo(i);
                i++;
            }
            if (continuousAsyncResultSet.hasMorePages()) {
                executeContinuouslyAsync = continuousAsyncResultSet.fetchNextPage();
            }
        } while (continuousAsyncResultSet.hasMorePages());
        Assertions.assertThat(i).isEqualTo(100);
    }

    @Test
    public void should_cancel_future_when_cancelling_previous_result() {
        ContinuousAsyncResultSet continuousAsyncResultSet;
        CqlSession session = sessionRule.session();
        SimpleStatement newInstance = SimpleStatement.newInstance("SELECT v from test where k=?", new Object[]{"k"});
        DriverExecutionProfile withInt = session.getContext().getConfig().getDefaultProfile().withInt(DseDriverOption.CONTINUOUS_PAGING_PAGE_SIZE, 10).withInt(DseDriverOption.CONTINUOUS_PAGING_MAX_PAGES_PER_SECOND, 1);
        ContinuousAsyncResultSet continuousAsyncResultSet2 = (ContinuousAsyncResultSet) CompletableFutures.getUninterruptibly(session.executeContinuouslyAsync(newInstance.setExecutionProfile(withInt)));
        CompletionStage fetchNextPage = continuousAsyncResultSet2.fetchNextPage();
        continuousAsyncResultSet2.cancel();
        Assertions.assertThat(fetchNextPage.toCompletableFuture().isCancelled()).isTrue();
        try {
            CompletableFutures.getUninterruptibly(fetchNextPage);
            Fail.fail("Expected a cancellation exception since previous result was cancelled.");
        } catch (CancellationException e) {
        }
        int i = 0;
        Iterator it = continuousAsyncResultSet2.currentPage().iterator();
        while (it.hasNext()) {
            Assertions.assertThat(((Row) it.next()).getInt("v")).isEqualTo(i);
            i++;
        }
        Assertions.assertThat(i).isEqualTo(10);
        CompletionStage executeContinuouslyAsync = session.executeContinuouslyAsync(newInstance.setExecutionProfile(withInt.withInt(DseDriverOption.CONTINUOUS_PAGING_MAX_PAGES_PER_SECOND, 0)).setPagingState(continuousAsyncResultSet2.getExecutionInfo().getPagingState()));
        do {
            continuousAsyncResultSet = (ContinuousAsyncResultSet) CompletableFutures.getUninterruptibly(executeContinuouslyAsync);
            Iterator it2 = continuousAsyncResultSet.currentPage().iterator();
            while (it2.hasNext()) {
                Assertions.assertThat(((Row) it2.next()).getInt("v")).isEqualTo(i);
                i++;
            }
            if (continuousAsyncResultSet.hasMorePages()) {
                executeContinuouslyAsync = continuousAsyncResultSet.fetchNextPage();
            }
        } while (continuousAsyncResultSet.hasMorePages());
        Assertions.assertThat(i).isEqualTo(100);
    }

    @Test
    public void should_cancel_when_future_is_cancelled() {
        ContinuousAsyncResultSet continuousAsyncResultSet;
        CqlSession session = sessionRule.session();
        SimpleStatement newInstance = SimpleStatement.newInstance("SELECT v from test where k=?", new Object[]{"k"});
        DriverExecutionProfile withInt = session.getContext().getConfig().getDefaultProfile().withInt(DseDriverOption.CONTINUOUS_PAGING_PAGE_SIZE, 10).withInt(DseDriverOption.CONTINUOUS_PAGING_MAX_PAGES_PER_SECOND, 1);
        ContinuousAsyncResultSet continuousAsyncResultSet2 = (ContinuousAsyncResultSet) CompletableFutures.getUninterruptibly(session.executeContinuouslyAsync(newInstance.setExecutionProfile(withInt)));
        CompletableFuture completableFuture = continuousAsyncResultSet2.fetchNextPage().toCompletableFuture();
        completableFuture.cancel(false);
        Assertions.assertThat(completableFuture.isCancelled()).isTrue();
        try {
            CompletableFutures.getUninterruptibly(completableFuture);
            Fail.fail("Expected a cancellation exception since future was cancelled.");
        } catch (CancellationException e) {
        }
        int i = 0;
        Iterator it = continuousAsyncResultSet2.currentPage().iterator();
        while (it.hasNext()) {
            Assertions.assertThat(((Row) it.next()).getInt("v")).isEqualTo(i);
            i++;
        }
        Assertions.assertThat(i).isEqualTo(10);
        CompletionStage executeContinuouslyAsync = session.executeContinuouslyAsync(newInstance.setExecutionProfile(withInt.withInt(DseDriverOption.CONTINUOUS_PAGING_MAX_PAGES_PER_SECOND, 0)).setPagingState(continuousAsyncResultSet2.getExecutionInfo().getPagingState()));
        do {
            continuousAsyncResultSet = (ContinuousAsyncResultSet) CompletableFutures.getUninterruptibly(executeContinuouslyAsync);
            Iterator it2 = continuousAsyncResultSet.currentPage().iterator();
            while (it2.hasNext()) {
                Assertions.assertThat(((Row) it2.next()).getInt("v")).isEqualTo(i);
                i++;
            }
            if (continuousAsyncResultSet.hasMorePages()) {
                executeContinuouslyAsync = continuousAsyncResultSet.fetchNextPage();
            }
        } while (continuousAsyncResultSet.hasMorePages());
        Assertions.assertThat(i).isEqualTo(100);
    }

    @Test
    public void should_time_out_when_server_does_not_produce_pages_fast_enough() throws Exception {
        CqlSession session = sessionRule.session();
        try {
            ((ContinuousAsyncResultSet) CompletableFutures.getUninterruptibly(session.executeContinuouslyAsync(SimpleStatement.newInstance("SELECT v from test where k=?", new Object[]{"k"}).setExecutionProfile(session.getContext().getConfig().getDefaultProfile().withInt(DseDriverOption.CONTINUOUS_PAGING_PAGE_SIZE, 10).withInt(DseDriverOption.CONTINUOUS_PAGING_MAX_PAGES_PER_SECOND, 1).withDuration(DseDriverOption.CONTINUOUS_PAGING_TIMEOUT_OTHER_PAGES, Duration.ofMillis(100L)))))).fetchNextPage().toCompletableFuture().get();
            Fail.fail("Expected a timeout");
        } catch (ExecutionException e) {
            Assertions.assertThat(e.getCause()).isInstanceOf(DriverTimeoutException.class).hasMessageContaining("Timed out waiting for page 2");
        }
    }

    @Test
    public void should_resume_reading_when_client_catches_up() {
        CqlSession session = sessionRule.session();
        CompletionStage executeContinuouslyAsync = session.executeContinuouslyAsync(SimpleStatement.newInstance("SELECT * from test_autoread where k=?", new Object[]{"k"}).setExecutionProfile(session.getContext().getConfig().getDefaultProfile().withInt(DseDriverOption.CONTINUOUS_PAGING_PAGE_SIZE, 100)));
        Uninterruptibles.sleepUninterruptibly(1L, TimeUnit.SECONDS);
        PageStatistics pageStatistics = (PageStatistics) CompletableFutures.getUninterruptibly(executeContinuouslyAsync.thenCompose(new AsyncContinuousPagingFunction()));
        Assertions.assertThat(pageStatistics.rows).isEqualTo(20000);
        Assertions.assertThat(pageStatistics.pages).isEqualTo(200);
    }
}
