package io.debezium.pipeline.source.snapshot.incremental;

import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.signal.actions.snapshotting.OpenIncrementalSnapshotWindow;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.schema.DatabaseSchema;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:META-INF/bundled-dependencies/debezium-core-2.6.1.Final.jar:io/debezium/pipeline/source/snapshot/incremental/SignalBasedIncrementalSnapshotChangeEventSource.class */
public class SignalBasedIncrementalSnapshotChangeEventSource<P extends Partition, T extends DataCollectionId> extends AbstractIncrementalSnapshotChangeEventSource<P, T> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) SignalBasedIncrementalSnapshotChangeEventSource.class);
    private final String signalWindowStatement;
    private final String signalWindowDeleteStatement;
    private SignalMetadata signalMetadata;

    public SignalBasedIncrementalSnapshotChangeEventSource(RelationalDatabaseConnectorConfig relationalDatabaseConnectorConfig, JdbcConnection jdbcConnection, EventDispatcher<P, T> eventDispatcher, DatabaseSchema<?> databaseSchema, Clock clock, SnapshotProgressListener<P> snapshotProgressListener, DataChangeEventListener<P> dataChangeEventListener, NotificationService<P, ? extends OffsetContext> notificationService) {
        super(relationalDatabaseConnectorConfig, jdbcConnection, eventDispatcher, databaseSchema, clock, snapshotProgressListener, dataChangeEventListener, notificationService);
        this.signalWindowStatement = "INSERT INTO " + getSignalTableName(relationalDatabaseConnectorConfig.getSignalingDataCollectionId()) + " VALUES (?, ?, ?)";
        this.signalWindowDeleteStatement = "DELETE FROM " + getSignalTableName(relationalDatabaseConnectorConfig.getSignalingDataCollectionId()) + " WHERE id = ?";
    }

    @Override // io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource
    public void processMessage(Partition partition, DataCollectionId dataCollectionId, Object obj, OffsetContext offsetContext) {
        this.context = (IncrementalSnapshotContext<T>) offsetContext.getIncrementalSnapshotContext();
        if (this.context == null) {
            LOGGER.warn("Context is null, skipping message processing");
            return;
        }
        LOGGER.trace("Checking window for table '{}', key '{}', window contains '{}'", dataCollectionId, obj, this.window);
        if (this.window.isEmpty() || !this.context.deduplicationNeeded()) {
            return;
        }
        deduplicateWindow(dataCollectionId, obj);
    }

    @Override // io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource
    protected void emitWindowOpen() throws SQLException {
        this.signalMetadata = new SignalMetadata(Instant.now(), null);
        this.jdbcConnection.prepareUpdate(this.signalWindowStatement, preparedStatement -> {
            LOGGER.trace("Emitting open window for chunk = '{}'", this.context.currentChunkId());
            preparedStatement.setString(1, this.context.currentChunkId() + "-open");
            preparedStatement.setString(2, OpenIncrementalSnapshotWindow.NAME);
            preparedStatement.setString(3, this.signalMetadata.metadataString());
        });
        this.jdbcConnection.commit();
    }

    @Override // io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource
    protected void emitWindowClose(Partition partition, OffsetContext offsetContext) throws Exception {
        getWatermarkWindowCloser(this.connectorConfig, this.jdbcConnection, getSignalTableName(this.connectorConfig.getSignalingDataCollectionId())).closeWindow(partition, offsetContext, this.context.currentChunkId());
    }

    private WatermarkWindowCloser getWatermarkWindowCloser(CommonConnectorConfig commonConnectorConfig, JdbcConnection jdbcConnection, String str) {
        return Objects.requireNonNull(commonConnectorConfig.getIncrementalSnapshotWatermarkingStrategy()) == CommonConnectorConfig.WatermarkStrategy.INSERT_DELETE ? new DeleteWindowCloser(jdbcConnection, str, this) : new InsertWindowCloser(jdbcConnection, str, new SignalMetadata(this.signalMetadata.getOpenWindowTimestamp(), Instant.now()));
    }
}
