package io.stargate.it.grpc.streaming;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.google.rpc.Status;
import io.grpc.StatusRuntimeException;
import io.stargate.grpc.Values;
import io.stargate.it.driver.CqlSessionExtension;
import io.stargate.it.driver.CqlSessionSpec;
import io.stargate.it.driver.TestKeyspace;
import io.stargate.it.grpc.GrpcIntegrationTest;
import io.stargate.proto.QueryOuterClass;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Objects;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

@ExtendWith({CqlSessionExtension.class})
@CqlSessionSpec(initQueries = {"CREATE TABLE IF NOT EXISTS test (k text, v int, PRIMARY KEY(k, v))"})
/* loaded from: input_file:io/stargate/it/grpc/streaming/ReactiveBatchTest.class */
public class ReactiveBatchTest extends GrpcIntegrationTest {
    @AfterEach
    public void cleanup(CqlSession cqlSession) {
        cqlSession.execute("TRUNCATE TABLE test");
    }

    @Test
    public void simpleReactiveBiDirectionalQueries(@TestKeyspace CqlIdentifier cqlIdentifier) {
        StepVerifier.create(reactiveStubWithCallCredentials().executeBatchStream(Flux.create(fluxSink -> {
            fluxSink.next(QueryOuterClass.Batch.newBuilder().addQueries(cqlBatchQuery("INSERT INTO test (k, v) VALUES ('a', 1)", new QueryOuterClass.Value[0])).addQueries(cqlBatchQuery("INSERT INTO test (k, v) VALUES (?, ?)", Values.of("b"), Values.of(2L))).setParameters(batchParameters(cqlIdentifier)).build());
            fluxSink.next(QueryOuterClass.Batch.newBuilder().addQueries(cqlBatchQuery("INSERT INTO test (k, v) VALUES (?, ?)", Values.of("c"), Values.of(3L))).setParameters(batchParameters(cqlIdentifier)).build());
            fluxSink.complete();
        }))).expectNextMatches((v0) -> {
            return Objects.nonNull(v0);
        }).expectNextMatches((v0) -> {
            return Objects.nonNull(v0);
        }).expectComplete().verify();
        QueryOuterClass.Response executeQuery = stubWithCallCredentials().executeQuery(cqlQuery("SELECT * FROM test", queryParameters(cqlIdentifier), new QueryOuterClass.Value[0]));
        Assertions.assertThat(executeQuery.hasResultSet()).isTrue();
        Assertions.assertThat(new HashSet(executeQuery.getResultSet().getRowsList())).isEqualTo(new HashSet(Arrays.asList(rowOf(Values.of("a"), Values.of(1L)), rowOf(Values.of("b"), Values.of(2L)), rowOf(Values.of("c"), Values.of(3L)))));
    }

    @Test
    public void reactiveServerSideErrorPropagationCompleteProcessing(@TestKeyspace CqlIdentifier cqlIdentifier) {
        StepVerifier.create(reactiveStubWithCallCredentials().executeBatchStream(Flux.create(fluxSink -> {
            fluxSink.next(QueryOuterClass.Batch.newBuilder().addQueries(cqlBatchQuery("INSERT INTO not_existing (k, v) VALUES ('a', 1)", new QueryOuterClass.Value[0])).setParameters(batchParameters(cqlIdentifier)).build());
            fluxSink.complete();
        }))).expectNextMatches(streamingResponse -> {
            Status status = streamingResponse.getStatus();
            return status.getCode() == 3 && status.getMessage().contains("not_existing");
        }).expectComplete().verify();
    }

    @Test
    public void reactiveClientSideErrorPropagation() {
        StepVerifier.create(reactiveStubWithCallCredentials().executeBatchStream(Flux.create(fluxSink -> {
            fluxSink.error(new IllegalArgumentException("some client side processing error"));
            fluxSink.complete();
        }))).expectErrorMatches(th -> {
            return (th instanceof StatusRuntimeException) && th.getMessage().contains("CANCELLED: Cancelled by client");
        }).verify();
    }

    @Test
    public void completeEmptyStream() {
        StepVerifier.create(reactiveStubWithCallCredentials().executeBatchStream(Flux.create((v0) -> {
            v0.complete();
        }))).expectComplete().verify();
    }
}
