package io.debezium.connector.mysql.strategy.mysql;

import io.debezium.DebeziumException;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.connector.mysql.SourceInfo;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.signal.SignalPayload;
import io.debezium.pipeline.signal.actions.snapshotting.SnapshotConfiguration;
import io.debezium.pipeline.signal.channels.KafkaSignalChannel;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.schema.DatabaseSchema;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/debezium-connector-mysql-2.5.4.Final.jar:io/debezium/connector/mysql/strategy/mysql/MySqlReadOnlyIncrementalSnapshotChangeEventSource.class */
public class MySqlReadOnlyIncrementalSnapshotChangeEventSource<T extends DataCollectionId> extends AbstractIncrementalSnapshotChangeEventSource<MySqlPartition, T> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) MySqlReadOnlyIncrementalSnapshotChangeEventSource.class);
    private static final String SHOW_MASTER_STMT = "SHOW MASTER STATUS";

    public MySqlReadOnlyIncrementalSnapshotChangeEventSource(RelationalDatabaseConnectorConfig relationalDatabaseConnectorConfig, JdbcConnection jdbcConnection, EventDispatcher<MySqlPartition, T> eventDispatcher, DatabaseSchema<?> databaseSchema, Clock clock, SnapshotProgressListener<MySqlPartition> snapshotProgressListener, DataChangeEventListener<MySqlPartition> dataChangeEventListener, NotificationService<MySqlPartition, MySqlOffsetContext> notificationService) {
        super(relationalDatabaseConnectorConfig, jdbcConnection, eventDispatcher, databaseSchema, clock, snapshotProgressListener, dataChangeEventListener, notificationService);
    }

    @Override // io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource
    public void processMessage(MySqlPartition mySqlPartition, DataCollectionId dataCollectionId, Object obj, OffsetContext offsetContext) throws InterruptedException {
        if (getContext() == null) {
            LOGGER.warn("Context is null, skipping message processing");
            return;
        }
        LOGGER.trace("Checking window for table '{}', key '{}', window contains '{}'", dataCollectionId, obj, this.window);
        if (getContext().updateWindowState(offsetContext)) {
            sendWindowEvents(mySqlPartition, offsetContext);
            readChunk(mySqlPartition, offsetContext);
        } else {
            if (this.window.isEmpty() || !getContext().deduplicationNeeded()) {
                return;
            }
            deduplicateWindow(dataCollectionId, obj);
        }
    }

    @Override // io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource
    public void processHeartbeat(MySqlPartition mySqlPartition, OffsetContext offsetContext) throws InterruptedException {
        if (getContext() == null) {
            LOGGER.warn("Context is null, skipping message processing");
        } else {
            readUntilGtidChange(mySqlPartition, offsetContext);
        }
    }

    private void readUntilGtidChange(MySqlPartition mySqlPartition, OffsetContext offsetContext) throws InterruptedException {
        String currentGtid = getContext().getCurrentGtid(offsetContext);
        while (getContext().snapshotRunning() && getContext().reachedHighWatermark(currentGtid)) {
            getContext().closeWindow();
            sendWindowEvents(mySqlPartition, offsetContext);
            readChunk(mySqlPartition, offsetContext);
            if (currentGtid == null && getContext().watermarksChanged()) {
                return;
            }
        }
    }

    @Override // io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource
    public void processFilteredEvent(MySqlPartition mySqlPartition, OffsetContext offsetContext) throws InterruptedException {
        if (getContext() == null) {
            LOGGER.warn("Context is null, skipping message processing");
        } else if (getContext().updateWindowState(offsetContext)) {
            sendWindowEvents(mySqlPartition, offsetContext);
            readChunk(mySqlPartition, offsetContext);
        }
    }

    @Override // io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource
    public void processTransactionStartedEvent(MySqlPartition mySqlPartition, OffsetContext offsetContext) throws InterruptedException {
        if (getContext() == null) {
            LOGGER.warn("Context is null, skipping message processing");
        } else if (getContext().updateWindowState(offsetContext)) {
            sendWindowEvents(mySqlPartition, offsetContext);
            readChunk(mySqlPartition, offsetContext);
        }
    }

    @Override // io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource
    public void processTransactionCommittedEvent(MySqlPartition mySqlPartition, OffsetContext offsetContext) throws InterruptedException {
        if (getContext() == null) {
            LOGGER.warn("Context is null, skipping message processing");
        } else {
            readUntilGtidChange(mySqlPartition, offsetContext);
        }
    }

    protected void updateLowWatermark() {
        MySqlReadOnlyIncrementalSnapshotContext<T> context = getContext();
        Objects.requireNonNull(context);
        getExecutedGtidSet(context::setLowWatermark);
    }

    protected void updateHighWatermark() {
        MySqlReadOnlyIncrementalSnapshotContext<T> context = getContext();
        Objects.requireNonNull(context);
        getExecutedGtidSet(context::setHighWatermark);
    }

    private void getExecutedGtidSet(Consumer<MySqlGtidSet> consumer) {
        try {
            this.jdbcConnection.query(SHOW_MASTER_STMT, resultSet -> {
                if (resultSet.next()) {
                    if (resultSet.getMetaData().getColumnCount() <= 4) {
                        throw new UnsupportedOperationException("Need to add support for executed GTIDs for versions prior to 5.6.5");
                    }
                    consumer.accept(new MySqlGtidSet(resultSet.getString(5)));
                }
            });
            this.jdbcConnection.commit();
        } catch (SQLException e) {
            throw new DebeziumException(e);
        }
    }

    @Override // io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource
    protected void emitWindowOpen() {
        updateLowWatermark();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource
    public void emitWindowClose(MySqlPartition mySqlPartition, OffsetContext offsetContext) throws InterruptedException {
        updateHighWatermark();
        if (getContext().serverUuidChanged()) {
            rereadChunk(mySqlPartition, offsetContext);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource
    public void sendEvent(MySqlPartition mySqlPartition, EventDispatcher<MySqlPartition, T> eventDispatcher, OffsetContext offsetContext, Object[] objArr) throws InterruptedException {
        SourceInfo source = ((MySqlOffsetContext) offsetContext).getSource();
        String query = source.getQuery();
        source.setQuery(null);
        super.sendEvent((MySqlReadOnlyIncrementalSnapshotChangeEventSource<T>) mySqlPartition, (EventDispatcher<MySqlReadOnlyIncrementalSnapshotChangeEventSource<T>, T>) eventDispatcher, offsetContext, objArr);
        source.setQuery(query);
    }

    @Override // io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource, io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource
    public void addDataCollectionNamesToSnapshot(SignalPayload<MySqlPartition> signalPayload, SnapshotConfiguration snapshotConfiguration) throws InterruptedException {
        Map<String, Object> map = signalPayload.additionalData;
        super.addDataCollectionNamesToSnapshot(signalPayload, snapshotConfiguration);
        getContext().setSignalOffset((Long) map.get(KafkaSignalChannel.CHANNEL_OFFSET));
    }

    public void stopSnapshot(MySqlPartition mySqlPartition, OffsetContext offsetContext, Map<String, Object> map, List<String> list) {
        super.stopSnapshot((MySqlReadOnlyIncrementalSnapshotChangeEventSource<T>) mySqlPartition, offsetContext, map, list);
        getContext().setSignalOffset((Long) map.get(KafkaSignalChannel.CHANNEL_OFFSET));
    }

    private MySqlReadOnlyIncrementalSnapshotContext<T> getContext() {
        return (MySqlReadOnlyIncrementalSnapshotContext) this.context;
    }

    @Override // io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource, io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource
    public /* bridge */ /* synthetic */ void stopSnapshot(Partition partition, OffsetContext offsetContext, Map map, List list) {
        stopSnapshot((MySqlPartition) partition, offsetContext, (Map<String, Object>) map, (List<String>) list);
    }
}
