package io.debezium.testing.testcontainers;

import com.mongodb.ConnectionString;
import com.mongodb.ReadPreference;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.internal.selector.ReadPreferenceServerSelector;
import io.debezium.testing.testcontainers.MongoDbReplicaSet;
import io.debezium.testing.testcontainers.util.DockerUtils;
import io.debezium.testing.testcontainers.util.ParsingPortResolver;
import io.debezium.testing.testcontainers.util.PooledPortResolver;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.bson.BsonDocument;
import org.bson.Document;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/testing/testcontainers/MongoDbReplicaSetTest.class */
public class MongoDbReplicaSetTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbReplicaSetTest.class);
    public static final String MONGO_DOCKER_DESKTOP_PORT_PROPERTY = "mongodb.docker.desktop.ports";

    /* loaded from: input_file:io/debezium/testing/testcontainers/MongoDbReplicaSetTest$ResumableCursorException.class */
    public static class ResumableCursorException extends RuntimeException {
        private final BsonDocument resumeToken;

        ResumableCursorException(BsonDocument bsonDocument) {
            this.resumeToken = bsonDocument;
        }

        public BsonDocument resumeToken() {
            return this.resumeToken;
        }
    }

    @BeforeAll
    static void setupAll() {
        DockerUtils.enableFakeDnsIfRequired();
    }

    @AfterAll
    static void tearDownAll() {
        DockerUtils.disableFakeDns();
    }

    @AfterEach
    void tearDown() {
        System.clearProperty(MONGO_DOCKER_DESKTOP_PORT_PROPERTY);
    }

    @Test
    public void testCluster() throws InterruptedException {
        testCluster(MongoDbReplicaSet.replicaSet());
    }

    @Test
    @EnabledOnOs({OS.MAC, OS.WINDOWS})
    public void testClusterWithPropertyPortList() throws InterruptedException {
        System.setProperty(MONGO_DOCKER_DESKTOP_PORT_PROPERTY, "27017,27018,27019");
        testCluster(MongoDbReplicaSet.replicaSet().portResolver(ParsingPortResolver.parseProperty(MONGO_DOCKER_DESKTOP_PORT_PROPERTY)));
    }

    @Test
    @EnabledOnOs({OS.MAC, OS.WINDOWS})
    public void testClusterWithPropertyPorRange() throws InterruptedException {
        System.setProperty(MONGO_DOCKER_DESKTOP_PORT_PROPERTY, "27017:27019");
        testCluster(MongoDbReplicaSet.replicaSet().portResolver(ParsingPortResolver.parseProperty(MONGO_DOCKER_DESKTOP_PORT_PROPERTY)));
    }

    @Test
    @EnabledOnOs({OS.MAC, OS.WINDOWS})
    public void testClusterWithInsufficientNumberOfPorts() throws InterruptedException {
        PooledPortResolver pooledPortResolver = new PooledPortResolver(Set.of(27017, 27018));
        Assertions.assertThatExceptionOfType(IllegalStateException.class).describedAs("Exception is thrown when two ports are available but three ports are required", new Object[0]).isThrownBy(() -> {
            testCluster(MongoDbReplicaSet.replicaSet().portResolver(pooledPortResolver));
        });
    }

    public void testCluster(MongoDbReplicaSet.Builder builder) throws InterruptedException {
        MongoDbReplicaSet build = builder.build();
        try {
            LOGGER.info("Starting {}...", build);
            build.start();
            ConnectionString connectionString = new ConnectionString(build.getConnectionString() + "/?readPreference=" + ReadPreference.primary().getName());
            LOGGER.info("Connecting to cluster: {}", connectionString);
            MongoClient create = MongoClients.create(connectionString);
            try {
                LOGGER.info("Connected to cluster: {}", create.getClusterDescription());
                run(build, create, setup(create));
                if (create != null) {
                    create.close();
                }
                if (build != null) {
                    build.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void run(MongoDbReplicaSet mongoDbReplicaSet, MongoClient mongoClient, MongoCollection<Document> mongoCollection) {
        try {
            MongoChangeStreamCursor cursor = mongoCollection.watch().batchSize(1).cursor();
            try {
                mongoCollection.insertOne(Document.parse("{username: 'user1', name: 'User 1'}"));
                mongoCollection.insertOne(Document.parse("{username: 'user2', name: 'User 2'}"));
                LOGGER.info("{}", cursor.next());
                LOGGER.info("Demoting primary");
                mongoDbReplicaSet.stepDown();
                Awaitility.await().atMost(30L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> {
                    return (Boolean) mongoDbReplicaSet.tryPrimary().map(mongoDbContainer -> {
                        return Boolean.valueOf((mongoDbContainer.getNamedAddress().toString().equals(cursor.getServerAddress().toString()) || mongoDbContainer.getClientAddress().toString().equals(cursor.getServerAddress().toString())) ? false : true);
                    }).orElse(false);
                });
                if (!MongoDbContainer.IMAGE_VERSION.equals("4.0")) {
                    Assertions.assertThat(isSelectedReadPreference(mongoClient, mongoCollection, cursor)).isFalse();
                }
                throw new ResumableCursorException(cursor.getResumeToken());
            } catch (Throwable th) {
                if (cursor != null) {
                    try {
                        cursor.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (ResumableCursorException e) {
            MongoChangeStreamCursor cursor2 = mongoCollection.watch().resumeAfter(e.resumeToken()).batchSize(1).cursor();
            try {
                Assertions.assertThat(isSelectedReadPreference(mongoClient, mongoCollection, cursor2)).isTrue();
                LOGGER.info("{}", cursor2.next());
                if (cursor2 != null) {
                    cursor2.close();
                }
            } catch (Throwable th3) {
                if (cursor2 != null) {
                    try {
                        cursor2.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        }
    }

    private static boolean isSelectedReadPreference(MongoClient mongoClient, MongoCollection<Document> mongoCollection, MongoChangeStreamCursor<ChangeStreamDocument<Document>> mongoChangeStreamCursor) {
        return new ReadPreferenceServerSelector(mongoCollection.getReadPreference()).select(mongoClient.getClusterDescription()).stream().map((v0) -> {
            return v0.getAddress();
        }).anyMatch(serverAddress -> {
            return serverAddress.equals(mongoChangeStreamCursor.getServerCursor() == null ? null : mongoChangeStreamCursor.getServerCursor().getAddress());
        });
    }

    private static MongoCollection<Document> setup(MongoClient mongoClient) throws InterruptedException {
        MongoDatabase database = mongoClient.getDatabase("testChangeStreams");
        database.drop();
        Thread.sleep(1000L);
        return database.getCollection("documents");
    }
}
