package io.stargate.it.bridge;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import io.stargate.grpc.StargateBearerToken;
import io.stargate.it.BaseIntegrationTest;
import io.stargate.it.driver.CqlSessionExtension;
import io.stargate.it.driver.TestKeyspace;
import io.stargate.it.storage.StargateConnectionInfo;
import io.stargate.proto.QueryOuterClass;
import io.stargate.proto.Schema;
import io.stargate.proto.StargateBridgeGrpc;
import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({CqlSessionExtension.class})
/* loaded from: input_file:io/stargate/it/bridge/SchemaNotificationsTest.class */
public class SchemaNotificationsTest extends BaseIntegrationTest {
    private StargateBridgeGrpc.StargateBridgeStub asyncStub;
    private ManagedChannel channel;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/stargate/it/bridge/SchemaNotificationsTest$SchemaNotificationObserver.class */
    public static class SchemaNotificationObserver implements StreamObserver<Schema.SchemaNotification> {
        private final ConcurrentLinkedQueue<Schema.SchemaNotification> changes = new ConcurrentLinkedQueue<>();
        private volatile AssertionError error;

        SchemaNotificationObserver() {
        }

        boolean hasNext() {
            return !this.changes.isEmpty();
        }

        Schema.SchemaNotification next() {
            return this.changes.poll();
        }

        public void onNext(Schema.SchemaNotification schemaNotification) {
            this.changes.offer(schemaNotification);
        }

        public void onError(Throwable th) {
            this.error = new AssertionError("Unexpected onError", th);
        }

        public void onCompleted() {
            this.error = new AssertionError("Unexpected onCompleted");
        }
    }

    @BeforeEach
    public void setup(StargateConnectionInfo stargateConnectionInfo) throws IOException {
        this.channel = ManagedChannelBuilder.forAddress(stargateConnectionInfo.seedAddress(), 8091).usePlaintext().build();
        this.asyncStub = StargateBridgeGrpc.newStub(this.channel).withCallCredentials(new StargateBearerToken("mockAdminToken"));
    }

    @AfterEach
    public void teardown() throws InterruptedException {
        this.channel.shutdownNow();
        Assertions.assertThat(this.channel.awaitTermination(10L, TimeUnit.SECONDS)).isTrue();
    }

    @DisplayName("Should receive table changes")
    @Test
    public void tableChangesTest(CqlSession cqlSession, @TestKeyspace CqlIdentifier cqlIdentifier) {
        SchemaNotificationObserver schemaNotificationObserver = new SchemaNotificationObserver();
        this.asyncStub.getSchemaNotifications(Schema.GetSchemaNotificationsParams.newBuilder().build(), schemaNotificationObserver);
        assertReady(schemaNotificationObserver);
        cqlSession.execute("CREATE TABLE foo(k int PRIMARY KEY)");
        assertNextChange(schemaNotificationObserver, QueryOuterClass.SchemaChange.Type.CREATED, QueryOuterClass.SchemaChange.Target.TABLE, cqlIdentifier, "foo");
        cqlSession.execute("ALTER TABLE foo ADD v int");
        assertNextChange(schemaNotificationObserver, QueryOuterClass.SchemaChange.Type.UPDATED, QueryOuterClass.SchemaChange.Target.TABLE, cqlIdentifier, "foo");
        cqlSession.execute("DROP TABLE foo");
        assertNextChange(schemaNotificationObserver, QueryOuterClass.SchemaChange.Type.DROPPED, QueryOuterClass.SchemaChange.Target.TABLE, cqlIdentifier, "foo");
    }

    private void assertReady(SchemaNotificationObserver schemaNotificationObserver) {
        Awaitility.await().until(() -> {
            return Boolean.valueOf(schemaNotificationObserver.hasNext() || schemaNotificationObserver.error != null);
        });
        if (schemaNotificationObserver.error != null) {
            throw schemaNotificationObserver.error;
        }
        Assertions.assertThat(schemaNotificationObserver.next().getInnerCase()).isEqualTo(Schema.SchemaNotification.InnerCase.READY);
    }

    private void assertNextChange(SchemaNotificationObserver schemaNotificationObserver, QueryOuterClass.SchemaChange.Type type, QueryOuterClass.SchemaChange.Target target, CqlIdentifier cqlIdentifier, String str) {
        Awaitility.await().until(() -> {
            return Boolean.valueOf(schemaNotificationObserver.hasNext() || schemaNotificationObserver.error != null);
        });
        if (schemaNotificationObserver.error != null) {
            throw schemaNotificationObserver.error;
        }
        QueryOuterClass.SchemaChange change = schemaNotificationObserver.next().getChange();
        Assertions.assertThat(change.getChangeType()).isEqualTo(type);
        Assertions.assertThat(change.getTarget()).isEqualTo(target);
        Assertions.assertThat(change.getKeyspace()).isEqualTo(cqlIdentifier.asInternal());
        Assertions.assertThat(change.getName().getValue()).isEqualTo(str);
        Assertions.assertThat(change.getArgumentTypesList()).isEmpty();
    }
}
