package io.stargate.it.grpc.streaming;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.rpc.ErrorInfo;
import com.google.rpc.Status;
import io.grpc.stub.StreamObserver;
import io.stargate.grpc.Values;
import io.stargate.it.driver.CqlSessionExtension;
import io.stargate.it.driver.CqlSessionSpec;
import io.stargate.it.driver.TestKeyspace;
import io.stargate.it.grpc.GrpcIntegrationTest;
import io.stargate.proto.QueryOuterClass;
import java.util.Arrays;
import java.util.HashSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({CqlSessionExtension.class})
@CqlSessionSpec(initQueries = {"CREATE TABLE IF NOT EXISTS test (k text, v int, PRIMARY KEY(k, v))"})
/* loaded from: input_file:io/stargate/it/grpc/streaming/ExecuteQueryStreamingTest.class */
public class ExecuteQueryStreamingTest extends GrpcIntegrationTest {
    @AfterEach
    public void cleanup(CqlSession cqlSession) {
        cqlSession.execute("TRUNCATE TABLE test");
    }

    @Test
    public void simpleStreamingQuery(@TestKeyspace CqlIdentifier cqlIdentifier) {
        final CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        StreamObserver executeQueryStream = asyncStubWithCallCredentials().executeQueryStream(new StreamObserver<QueryOuterClass.StreamingResponse>() { // from class: io.stargate.it.grpc.streaming.ExecuteQueryStreamingTest.1
            public void onNext(QueryOuterClass.StreamingResponse streamingResponse) {
                copyOnWriteArrayList.add(streamingResponse);
            }

            public void onError(Throwable th) {
            }

            public void onCompleted() {
            }
        });
        executeQueryStream.onNext(cqlQuery("INSERT INTO test (k, v) VALUES ('a', 1)", queryParameters(cqlIdentifier), new QueryOuterClass.Value[0]));
        executeQueryStream.onNext(cqlQuery("INSERT INTO test (k, v) VALUES (?, ?)", queryParameters(cqlIdentifier), Values.of("b"), Values.of(2L)));
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 2);
        });
        executeQueryStream.onNext(cqlQuery("SELECT * FROM test", queryParameters(cqlIdentifier), new QueryOuterClass.Value[0]));
        executeQueryStream.onCompleted();
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 3);
        });
        Assertions.assertThat(copyOnWriteArrayList.get(0)).isNotNull();
        Assertions.assertThat(copyOnWriteArrayList.get(1)).isNotNull();
        QueryOuterClass.StreamingResponse streamingResponse = (QueryOuterClass.StreamingResponse) copyOnWriteArrayList.get(2);
        Assertions.assertThat(streamingResponse.getResponse().hasResultSet()).isTrue();
        Assertions.assertThat(new HashSet(streamingResponse.getResponse().getResultSet().getRowsList())).isEqualTo(new HashSet(Arrays.asList(rowOf(Values.of("a"), Values.of(1L)), rowOf(Values.of("b"), Values.of(2L)))));
    }

    @Test
    public void streamingQueryWithError(@TestKeyspace CqlIdentifier cqlIdentifier) throws InvalidProtocolBufferException {
        final CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        final AtomicReference atomicReference = new AtomicReference();
        StreamObserver executeQueryStream = asyncStubWithCallCredentials().executeQueryStream(new StreamObserver<QueryOuterClass.StreamingResponse>() { // from class: io.stargate.it.grpc.streaming.ExecuteQueryStreamingTest.2
            public void onNext(QueryOuterClass.StreamingResponse streamingResponse) {
                copyOnWriteArrayList.add(streamingResponse);
            }

            public void onError(Throwable th) {
                atomicReference.set(th);
            }

            public void onCompleted() {
            }
        });
        executeQueryStream.onNext(cqlQuery("INSERT INTO not_existing (k, v) VALUES ('a', 1)", queryParameters(cqlIdentifier), new QueryOuterClass.Value[0]));
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 1);
        });
        executeQueryStream.onCompleted();
        Assertions.assertThat((Throwable) atomicReference.get()).isNull();
        Status status = ((QueryOuterClass.StreamingResponse) copyOnWriteArrayList.get(0)).getStatus();
        Assertions.assertThat(status.getCode()).isEqualTo(3);
        Assertions.assertThat(status.getMessage()).contains(new CharSequence[]{"INVALID_ARGUMENT"});
        Assertions.assertThat(status.getMessage()).contains(new CharSequence[]{"not_existing"});
        Assertions.assertThat(ErrorInfo.parseFrom(status.getDetails(0).getValue()).getReason()).contains(new CharSequence[]{"not_existing"});
    }

    @Test
    public void streamingQueryWithNextAndError(@TestKeyspace CqlIdentifier cqlIdentifier) throws InvalidProtocolBufferException {
        final AtomicReference atomicReference = new AtomicReference();
        final CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        StreamObserver executeQueryStream = asyncStubWithCallCredentials().executeQueryStream(new StreamObserver<QueryOuterClass.StreamingResponse>() { // from class: io.stargate.it.grpc.streaming.ExecuteQueryStreamingTest.3
            public void onNext(QueryOuterClass.StreamingResponse streamingResponse) {
                copyOnWriteArrayList.add(streamingResponse);
            }

            public void onError(Throwable th) {
                atomicReference.set(th);
            }

            public void onCompleted() {
            }
        });
        executeQueryStream.onNext(cqlQuery("INSERT INTO test (k, v) VALUES ('a', 1)", queryParameters(cqlIdentifier), new QueryOuterClass.Value[0]));
        executeQueryStream.onNext(cqlQuery("INSERT INTO not_existing (k, v) VALUES ('a', 1)", queryParameters(cqlIdentifier), new QueryOuterClass.Value[0]));
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 2);
        });
        executeQueryStream.onCompleted();
        Assertions.assertThat((Throwable) atomicReference.get()).isNull();
        Assertions.assertThat((Throwable) atomicReference.get()).isNull();
        Status status = ((QueryOuterClass.StreamingResponse) copyOnWriteArrayList.stream().filter(streamingResponse -> {
            return streamingResponse.getStatus().getCode() == 3;
        }).findFirst().get()).getStatus();
        Assertions.assertThat(status.getCode()).isEqualTo(3);
        Assertions.assertThat(status.getMessage()).contains(new CharSequence[]{"INVALID_ARGUMENT"});
        Assertions.assertThat(status.getMessage()).contains(new CharSequence[]{"not_existing"});
        Assertions.assertThat(ErrorInfo.parseFrom(status.getDetails(0).getValue()).getReason()).contains(new CharSequence[]{"not_existing"});
    }
}
