package io.debezium.testing.testcontainers;

import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Predicate;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.lifecycle.Startables;

/* loaded from: input_file:io/debezium/testing/testcontainers/DebeziumContainerTest.class */
public class DebeziumContainerTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumContainerTest.class);
    private static final Network network = Network.newNetwork();
    private static final KafkaContainer kafkaContainer = new KafkaContainer().withNetwork(network);
    public static PostgreSQLContainer<?> postgresContainer = new PostgreSQLContainer(ImageNames.POSTGRES_DOCKER_IMAGE_NAME).withNetwork(network).withNetworkAliases(new String[]{"postgres"});
    public static DebeziumContainer debeziumContainer = DebeziumContainer.nightly().withNetwork(network).withKafka(kafkaContainer).withLogConsumer(new Slf4jLogConsumer(LOGGER)).dependsOn(new Startable[]{kafkaContainer});

    @BeforeAll
    public static void startContainers() {
        Startables.deepStart(Stream.of((Object[]) new GenericContainer[]{kafkaContainer, postgresContainer, debeziumContainer})).join();
    }

    @Test
    public void canRegisterConnector() throws Exception {
        debeziumContainer.registerConnector("my-connector-1", getConfiguration(1));
        Awaitility.await().pollInterval(Duration.ofMillis(250L)).atMost(Duration.ofSeconds(30L)).untilAsserted(() -> {
            String executeHttpRequest = executeHttpRequest(debeziumContainer.getConnectorStatusUri("my-connector-1"));
            Assertions.assertThat((String) JsonPath.read(executeHttpRequest, "$.name", new Predicate[0])).isEqualTo("my-connector-1");
            Assertions.assertThat((String) JsonPath.read(executeHttpRequest, "$.connector.state", new Predicate[0])).isEqualTo("RUNNING");
            Assertions.assertThat((String) JsonPath.read(executeHttpRequest, "$.tasks[0].state", new Predicate[0])).isEqualTo("RUNNING");
        });
    }

    @Test
    public void shouldRegisterPostgreSQLConnector() throws Exception {
        Connection connection = getConnection(postgresContainer);
        try {
            Statement createStatement = connection.createStatement();
            try {
                KafkaConsumer<String, String> consumer = getConsumer(kafkaContainer);
                try {
                    createStatement.execute("create schema todo");
                    createStatement.execute("create table todo.Todo (id int8 not null, title varchar(255), primary key (id))");
                    createStatement.execute("alter table todo.Todo replica identity full");
                    createStatement.execute("insert into todo.Todo values (1, 'Be Awesome')");
                    createStatement.execute("insert into todo.Todo values (2, 'Learn Quarkus')");
                    debeziumContainer.registerConnector("my-connector", getConfiguration(2));
                    consumer.subscribe(Arrays.asList("dbserver2.todo.todo"));
                    List<ConsumerRecord<String, String>> drain = drain(consumer, 2);
                    Assertions.assertThat((Integer) JsonPath.read((String) drain.get(0).key(), "$.id", new Predicate[0])).isEqualTo(1);
                    Assertions.assertThat((String) JsonPath.read((String) drain.get(0).value(), "$.op", new Predicate[0])).isEqualTo("r");
                    Assertions.assertThat((String) JsonPath.read((String) drain.get(0).value(), "$.after.title", new Predicate[0])).isEqualTo("Be Awesome");
                    Assertions.assertThat((Integer) JsonPath.read((String) drain.get(1).key(), "$.id", new Predicate[0])).isEqualTo(2);
                    Assertions.assertThat((String) JsonPath.read((String) drain.get(1).value(), "$.op", new Predicate[0])).isEqualTo("r");
                    Assertions.assertThat((String) JsonPath.read((String) drain.get(1).value(), "$.after.title", new Predicate[0])).isEqualTo("Learn Quarkus");
                    createStatement.execute("update todo.Todo set title = 'Learn Java' where id = 2");
                    List<ConsumerRecord<String, String>> drain2 = drain(consumer, 1);
                    Assertions.assertThat((Integer) JsonPath.read((String) drain2.get(0).key(), "$.id", new Predicate[0])).isEqualTo(2);
                    Assertions.assertThat((String) JsonPath.read((String) drain2.get(0).value(), "$.op", new Predicate[0])).isEqualTo("u");
                    Assertions.assertThat((String) JsonPath.read((String) drain2.get(0).value(), "$.before.title", new Predicate[0])).isEqualTo("Learn Quarkus");
                    Assertions.assertThat((String) JsonPath.read((String) drain2.get(0).value(), "$.after.title", new Predicate[0])).isEqualTo("Learn Java");
                    consumer.unsubscribe();
                    if (consumer != null) {
                        consumer.close();
                    }
                    if (createStatement != null) {
                        createStatement.close();
                    }
                    if (connection != null) {
                        connection.close();
                    }
                } catch (Throwable th) {
                    if (consumer != null) {
                        try {
                            consumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (connection != null) {
                try {
                    connection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    private Connection getConnection(PostgreSQLContainer<?> postgreSQLContainer) throws SQLException {
        return DriverManager.getConnection(postgreSQLContainer.getJdbcUrl(), postgreSQLContainer.getUsername(), postgreSQLContainer.getPassword());
    }

    private KafkaConsumer<String, String> getConsumer(KafkaContainer kafkaContainer2) {
        return new KafkaConsumer<>(Map.of("bootstrap.servers", kafkaContainer2.getBootstrapServers(), "group.id", "tc-" + UUID.randomUUID(), "auto.offset.reset", "earliest"), new StringDeserializer(), new StringDeserializer());
    }

    private List<ConsumerRecord<String, String>> drain(KafkaConsumer<String, String> kafkaConsumer, int i) {
        ArrayList arrayList = new ArrayList();
        Unreliables.retryUntilTrue(10, TimeUnit.SECONDS, () -> {
            Iterator it = kafkaConsumer.poll(Duration.ofMillis(50L)).iterator();
            Objects.requireNonNull(arrayList);
            it.forEachRemaining((v1) -> {
                r1.add(v1);
            });
            return Boolean.valueOf(arrayList.size() == i);
        });
        return arrayList;
    }

    private ConnectorConfiguration getConfiguration(int i) {
        return ConnectorConfiguration.forJdbcContainer(postgresContainer).with("topic.prefix", "dbserver" + i).with("slot.name", "debezium_" + i);
    }

    private String executeHttpRequest(String str) throws IOException {
        Response execute = new OkHttpClient().newCall(new Request.Builder().url(str).build()).execute();
        try {
            String string = execute.body().string();
            if (execute != null) {
                execute.close();
            }
            return string;
        } catch (Throwable th) {
            if (execute != null) {
                try {
                    execute.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @AfterAll
    public static void stopContainers() {
        try {
            if (postgresContainer != null) {
                postgresContainer.stop();
            }
            if (kafkaContainer != null) {
                kafkaContainer.stop();
            }
            if (debeziumContainer != null) {
                debeziumContainer.stop();
            }
        } catch (Exception e) {
        }
    }
}
