package io.stargate.it.grpc.streaming;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.google.rpc.Status;
import io.grpc.StatusRuntimeException;
import io.stargate.grpc.Values;
import io.stargate.it.driver.CqlSessionExtension;
import io.stargate.it.driver.CqlSessionSpec;
import io.stargate.it.driver.TestKeyspace;
import io.stargate.it.grpc.GrpcIntegrationTest;
import io.stargate.proto.QueryOuterClass;
import io.stargate.proto.ReactorStargateGrpc;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Objects;
import org.assertj.core.api.AssertionsForClassTypes;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

@ExtendWith({CqlSessionExtension.class})
@CqlSessionSpec(initQueries = {"CREATE TABLE IF NOT EXISTS test (k text, v int, PRIMARY KEY(k, v))"})
/* loaded from: input_file:io/stargate/it/grpc/streaming/ReactiveQueryTest.class */
public class ReactiveQueryTest extends GrpcIntegrationTest {
    @AfterEach
    public void cleanup(CqlSession cqlSession) {
        cqlSession.execute("TRUNCATE TABLE test");
    }

    @Test
    public void simpleReactiveBiDirectionalQueries(@TestKeyspace CqlIdentifier cqlIdentifier) {
        ReactorStargateGrpc.ReactorStargateStub reactiveStubWithCallCredentials = reactiveStubWithCallCredentials();
        Flux create = Flux.create(fluxSink -> {
            fluxSink.next(cqlQuery("INSERT INTO test (k, v) VALUES ('a', 1)", queryParameters(cqlIdentifier), new QueryOuterClass.Value[0]));
            fluxSink.next(cqlQuery("INSERT INTO test (k, v) VALUES (?, ?)", queryParameters(cqlIdentifier), Values.of("b"), Values.of(2L)));
            fluxSink.complete();
        });
        Flux create2 = Flux.create(fluxSink2 -> {
            fluxSink2.next(cqlQuery("SELECT * FROM test", queryParameters(cqlIdentifier), new QueryOuterClass.Value[0]));
            fluxSink2.complete();
        });
        StepVerifier.create(reactiveStubWithCallCredentials.executeQueryStream(create)).expectNextMatches((v0) -> {
            return Objects.nonNull(v0);
        }).expectNextMatches((v0) -> {
            return Objects.nonNull(v0);
        }).expectComplete().verify();
        StepVerifier.create(reactiveStubWithCallCredentials.executeQueryStream(create2)).expectNextMatches(streamingResponse -> {
            AssertionsForClassTypes.assertThat(streamingResponse.getResponse().hasResultSet()).isTrue();
            QueryOuterClass.ResultSet resultSet = streamingResponse.getResponse().getResultSet();
            AssertionsForClassTypes.assertThat(new HashSet(resultSet.getRowsList())).isEqualTo(new HashSet(Arrays.asList(rowOf(Values.of("a"), Values.of(1L)), rowOf(Values.of("b"), Values.of(2L)))));
            return resultSet.getRowsList().size() == 2;
        }).expectComplete().verify();
    }

    @Test
    public void reactiveServerSideErrorPropagationCompleteProcessing(@TestKeyspace CqlIdentifier cqlIdentifier) {
        StepVerifier.create(reactiveStubWithCallCredentials().executeQueryStream(Flux.create(fluxSink -> {
            fluxSink.next(cqlQuery("INSERT INTO not_existing (k, v) VALUES ('a', 1)", queryParameters(cqlIdentifier), new QueryOuterClass.Value[0]));
            fluxSink.complete();
        }))).expectNextMatches(streamingResponse -> {
            Status status = streamingResponse.getStatus();
            return status.getCode() == 3 && status.getMessage().contains("not_existing");
        }).expectComplete().verify();
    }

    @Test
    public void reactiveClientSideErrorPropagation() {
        StepVerifier.create(reactiveStubWithCallCredentials().executeQueryStream(Flux.create(fluxSink -> {
            fluxSink.error(new IllegalArgumentException("some client side processing error"));
            fluxSink.complete();
        }))).expectErrorMatches(th -> {
            return (th instanceof StatusRuntimeException) && th.getMessage().contains("CANCELLED: Cancelled by client");
        }).verify();
    }

    @Test
    public void completeEmptyStream() {
        StepVerifier.create(reactiveStubWithCallCredentials().executeQueryStream(Flux.create((v0) -> {
            v0.complete();
        }))).expectComplete().verify();
    }
}
