package io.debezium.connector.mongodb;

import com.mongodb.client.MongoClient;
import io.debezium.DebeziumException;
import io.debezium.annotation.ThreadSafe;
import io.debezium.bean.StandardBeanNames;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.connector.mongodb.MongoDbOffsetContext;
import io.debezium.connector.mongodb.metrics.MongoDbChangeEventSourceMetricsFactory;
import io.debezium.document.DocumentReader;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.notification.channels.NotificationChannel;
import io.debezium.pipeline.signal.SignalProcessor;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.schema.SchemaFactory;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Clock;
import io.debezium.util.LoggingContext;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:META-INF/bundled-dependencies/debezium-connector-mongodb-2.5.4.Final.jar:io/debezium/connector/mongodb/MongoDbConnectorTask.class */
public final class MongoDbConnectorTask extends BaseSourceTask<MongoDbPartition, MongoDbOffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) MongoDbConnectorTask.class);
    private static final String CONTEXT_NAME = "mongodb-connector-task";
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private volatile ChangeEventQueue<DataChangeEvent> queue;
    private volatile String taskName;
    private volatile MongoDbTaskContext taskContext;
    private volatile ErrorHandler errorHandler;
    private volatile MongoDbSchema schema;

    @Override // org.apache.kafka.connect.connector.Task
    public String version() {
        return Module.version();
    }

    @Override // io.debezium.connector.common.BaseSourceTask
    public ChangeEventSourceCoordinator<MongoDbPartition, MongoDbOffsetContext> start(Configuration configuration) {
        MongoDbConnectorConfig mongoDbConnectorConfig = new MongoDbConnectorConfig(configuration);
        SchemaNameAdjuster schemaNameAdjuster = mongoDbConnectorConfig.schemaNameAdjuster();
        this.taskName = "task" + configuration.getInteger(MongoDbConnectorConfig.TASK_ID);
        this.taskContext = new MongoDbTaskContext(configuration);
        this.schema = new MongoDbSchema(this.taskContext.filters(), this.taskContext.topicNamingStrategy(), mongoDbConnectorConfig.getSourceInfoStructMaker().schema(), schemaNameAdjuster);
        ReplicaSets replicaSets = getReplicaSets(mongoDbConnectorConfig);
        MongoDbOffsetContext previousOffset = getPreviousOffset(mongoDbConnectorConfig, replicaSets);
        Clock system = Clock.system();
        LoggingContext.PreviousContext configureLoggingContext = this.taskContext.configureLoggingContext(this.taskName);
        try {
            this.queue = new ChangeEventQueue.Builder().pollInterval(mongoDbConnectorConfig.getPollInterval()).maxBatchSize(mongoDbConnectorConfig.getMaxBatchSize()).maxQueueSize(mongoDbConnectorConfig.getMaxQueueSize()).maxQueueSizeInBytes(mongoDbConnectorConfig.getMaxQueueSizeInBytes()).loggingContextSupplier(() -> {
                return this.taskContext.configureLoggingContext(CONTEXT_NAME);
            }).build();
            this.errorHandler = new MongoDbErrorHandler(mongoDbConnectorConfig, this.queue, this.errorHandler);
            MongoDbEventMetadataProvider mongoDbEventMetadataProvider = new MongoDbEventMetadataProvider();
            SignalProcessor signalProcessor = new SignalProcessor(MongoDbConnector.class, mongoDbConnectorConfig, Map.of(), getAvailableSignalChannels(), DocumentReader.defaultReader(), Offsets.of(Collections.singletonMap(new MongoDbPartition(), previousOffset)));
            mongoDbConnectorConfig.getBeanRegistry().add(StandardBeanNames.CONNECTOR_CONFIG, mongoDbConnectorConfig);
            mongoDbConnectorConfig.getBeanRegistry().add(StandardBeanNames.DATABASE_SCHEMA, this.schema);
            registerServiceProviders(mongoDbConnectorConfig.getServiceRegistry());
            TopicNamingStrategy<CollectionId> topicNamingStrategy = this.taskContext.topicNamingStrategy();
            MongoDbSchema mongoDbSchema = this.schema;
            ChangeEventQueue<DataChangeEvent> changeEventQueue = this.queue;
            Predicate<CollectionId> collectionFilter = this.taskContext.filters().collectionFilter();
            Objects.requireNonNull(collectionFilter);
            EventDispatcher eventDispatcher = new EventDispatcher(mongoDbConnectorConfig, topicNamingStrategy, mongoDbSchema, changeEventQueue, (v1) -> {
                return r6.test(v1);
            }, DataChangeEvent::new, mongoDbEventMetadataProvider, schemaNameAdjuster, signalProcessor);
            List<NotificationChannel> notificationChannels = getNotificationChannels();
            SchemaFactory schemaFactory = SchemaFactory.get();
            Objects.requireNonNull(eventDispatcher);
            NotificationService notificationService = new NotificationService(notificationChannels, mongoDbConnectorConfig, schemaFactory, eventDispatcher::enqueueNotification);
            MongoDbChangeEventSourceMetricsFactory mongoDbChangeEventSourceMetricsFactory = new MongoDbChangeEventSourceMetricsFactory();
            ChangeEventSourceCoordinator<MongoDbPartition, MongoDbOffsetContext> changeEventSourceCoordinator = new ChangeEventSourceCoordinator<>(Offsets.of(Collections.singletonMap(new MongoDbPartition(), previousOffset)), this.errorHandler, MongoDbConnector.class, mongoDbConnectorConfig, new MongoDbChangeEventSourceFactory(mongoDbConnectorConfig, this.errorHandler, eventDispatcher, system, replicaSets, this.taskContext, this.schema, mongoDbChangeEventSourceMetricsFactory.getStreamingMetrics((MongoDbChangeEventSourceMetricsFactory) this.taskContext, (ChangeEventQueueMetrics) this.queue, (EventMetadataProvider) mongoDbEventMetadataProvider)), mongoDbChangeEventSourceMetricsFactory, eventDispatcher, this.schema, signalProcessor, notificationService);
            changeEventSourceCoordinator.start(this.taskContext, this.queue, mongoDbEventMetadataProvider);
            configureLoggingContext.restore();
            return changeEventSourceCoordinator;
        } catch (Throwable th) {
            configureLoggingContext.restore();
            throw th;
        }
    }

    @Override // io.debezium.connector.common.BaseSourceTask
    public List<SourceRecord> doPoll() throws InterruptedException {
        return (List) this.queue.poll().stream().map((v0) -> {
            return v0.getRecord();
        }).collect(Collectors.toList());
    }

    @Override // io.debezium.connector.common.BaseSourceTask
    public void doStop() {
        LoggingContext.PreviousContext configureLoggingContext = this.taskContext.configureLoggingContext(this.taskName);
        try {
            if (this.schema != null) {
                this.schema.close();
            }
        } finally {
            configureLoggingContext.restore();
        }
    }

    @Override // io.debezium.connector.common.BaseSourceTask
    protected Iterable<Field> getAllConfigurationFields() {
        return MongoDbConnectorConfig.ALL_FIELDS;
    }

    private MongoDbOffsetContext getPreviousOffset(MongoDbConnectorConfig mongoDbConnectorConfig, ReplicaSets replicaSets) {
        MongoDbOffsetContext.Loader loader = new MongoDbOffsetContext.Loader(mongoDbConnectorConfig, replicaSets);
        Map<Map<String, String>, Map<String, Object>> offsets = this.context.offsetStorageReader().offsets(loader.getPartitions());
        if (offsets == null || !offsets.values().stream().anyMatch((v0) -> {
            return Objects.nonNull(v0);
        })) {
            checkShardSpecificOffsetsIfNeeded(mongoDbConnectorConfig, replicaSets);
            return null;
        }
        MongoDbOffsetContext loadOffsets = loader.loadOffsets(offsets);
        this.logger.info("Found previous offsets {}", loadOffsets);
        return loadOffsets;
    }

    private void checkShardSpecificOffsetsIfNeeded(MongoDbConnectorConfig mongoDbConnectorConfig, ReplicaSets replicaSets) {
        if (replicaSets.size() == 1 && replicaSets.getSnapshotReplicaSet().isClusterRs()) {
            this.logger.info("Previous offset not found, checking shard specific offsets from replica_set connection mode.");
            ReplicaSetDiscovery replicaSetDiscovery = new ReplicaSetDiscovery(this.taskContext);
            HashSet hashSet = new HashSet();
            try {
                MongoClient connect = this.taskContext.getConnectionContext().connect();
                try {
                    replicaSetDiscovery.readReplicaSetsFromShardedCluster(hashSet, connect);
                    if (connect != null) {
                        connect.close();
                    }
                    Map offsets = this.context.offsetStorageReader().offsets(new MongoDbOffsetContext.Loader(mongoDbConnectorConfig, new ReplicaSets(hashSet)).getPartitions());
                    if (offsets == null || !offsets.values().stream().anyMatch((v0) -> {
                        return Objects.nonNull(v0);
                    })) {
                        return;
                    }
                    this.logger.warn("Found at least one shard specific offset from previous run");
                    if (!mongoDbConnectorConfig.isOffsetInvalidationAllowed()) {
                        throw new DebeziumException("Found at least one shard specific offset from previous run.The default connection mode for sharded has changed to 'sharded' and previous offsets would be invalidated.Either explicitly set '" + MongoDbConnectorConfig.CONNECTION_MODE.name() + "=replica_set' to postpone the migration or set '" + MongoDbConnectorConfig.ALLOW_OFFSET_INVALIDATION.name() + "=true' to re-execute snapshot and reset offsets. In next release the 'replica_set' connection mode will be removed.");
                    }
                    this.logger.warn("Offset invalidation is allowed, previous shard specific offsets will be ignored and snapshot re-executed");
                } finally {
                }
            } catch (Throwable th) {
                this.logger.warn("Unable to read shard topology.");
            }
        }
    }

    private ReplicaSets getReplicaSets(MongoDbConnectorConfig mongoDbConnectorConfig) {
        ReplicaSets replicaSets = mongoDbConnectorConfig.getReplicaSets();
        if (replicaSets.size() == 0) {
            throw new DebeziumException("Unable to start MongoDB connector task since no replica sets were found");
        }
        return replicaSets;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.connector.common.BaseSourceTask
    public Configuration withMaskedSensitiveOptions(Configuration configuration) {
        return super.withMaskedSensitiveOptions(configuration).withMasked(MongoDbConnectorConfig.CONNECTION_STRING.name());
    }
}
