package io.debezium.relational;

import io.debezium.DebeziumException;
import io.debezium.connector.SnapshotRecord;
import io.debezium.jdbc.CancellableResultSet;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.MainConnectionProvidingConnectionFactory;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.SnapshottingTask;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.spi.snapshot.Snapshotter;
import io.debezium.util.Clock;
import io.debezium.util.ColumnUtils;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/debezium-core-2.6.1.Final.jar:io/debezium/relational/RelationalSnapshotChangeEventSource.class */
public abstract class RelationalSnapshotChangeEventSource<P extends Partition, O extends OffsetContext> extends AbstractSnapshotChangeEventSource<P, O> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) RelationalSnapshotChangeEventSource.class);
    public static final Pattern SELECT_ALL_PATTERN = Pattern.compile("\\*");
    public static final Pattern MATCH_ALL_PATTERN = Pattern.compile(".*");
    private final RelationalDatabaseConnectorConfig connectorConfig;
    private final JdbcConnection jdbcConnection;
    private final MainConnectionProvidingConnectionFactory<? extends JdbcConnection> jdbcConnectionFactory;
    private final RelationalDatabaseSchema schema;
    protected final EventDispatcher<P, TableId> dispatcher;
    protected final Clock clock;
    private final SnapshotProgressListener<P> snapshotProgressListener;
    protected final SnapshotterService snapshotterService;
    protected Queue<JdbcConnection> connectionPool;

    /* loaded from: input_file:META-INF/bundled-dependencies/debezium-core-2.6.1.Final.jar:io/debezium/relational/RelationalSnapshotChangeEventSource$RelationalSnapshotContext.class */
    public static class RelationalSnapshotContext<P extends Partition, O extends OffsetContext> extends AbstractSnapshotChangeEventSource.SnapshotContext<P, O> {
        public final String catalogName;
        public final Tables tables;
        public final boolean onDemand;
        public Set<TableId> capturedTables;
        public Set<TableId> capturedSchemaTables;

        public RelationalSnapshotContext(P p, String str, boolean z) {
            super(p);
            this.catalogName = str;
            this.tables = new Tables();
            this.onDemand = z;
        }
    }

    public RelationalSnapshotChangeEventSource(RelationalDatabaseConnectorConfig relationalDatabaseConnectorConfig, MainConnectionProvidingConnectionFactory<? extends JdbcConnection> mainConnectionProvidingConnectionFactory, RelationalDatabaseSchema relationalDatabaseSchema, EventDispatcher<P, TableId> eventDispatcher, Clock clock, SnapshotProgressListener<P> snapshotProgressListener, NotificationService<P, O> notificationService, SnapshotterService snapshotterService) {
        super(relationalDatabaseConnectorConfig, snapshotProgressListener, notificationService);
        this.connectorConfig = relationalDatabaseConnectorConfig;
        this.jdbcConnection = mainConnectionProvidingConnectionFactory.mainConnection();
        this.jdbcConnectionFactory = mainConnectionProvidingConnectionFactory;
        this.schema = relationalDatabaseSchema;
        this.dispatcher = eventDispatcher;
        this.clock = clock;
        this.snapshotProgressListener = snapshotProgressListener;
        this.snapshotterService = snapshotterService;
    }

    @Override // io.debezium.pipeline.source.AbstractSnapshotChangeEventSource
    public SnapshotResult<O> doExecute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, O o, AbstractSnapshotChangeEventSource.SnapshotContext<P, O> snapshotContext, SnapshottingTask snapshottingTask) throws Exception {
        RelationalSnapshotContext<P, O> relationalSnapshotContext = (RelationalSnapshotContext) snapshotContext;
        Connection connection = null;
        try {
            try {
                Set<Pattern> dataCollectionPattern = getDataCollectionPattern(snapshottingTask.getDataCollections());
                Map<DataCollectionId, String> map = (Map) snapshottingTask.getFilterQueries().entrySet().stream().collect(Collectors.toMap(entry -> {
                    return TableId.parse((String) entry.getKey());
                }, (v0) -> {
                    return v0.getValue();
                }));
                preSnapshot();
                LOGGER.info("Snapshot step 1 - Preparing");
                if (o != null && o.isSnapshotRunning()) {
                    LOGGER.info("Previous snapshot was cancelled before completion; a new snapshot will be taken.");
                }
                connection = createSnapshotConnection();
                connectionCreated(relationalSnapshotContext);
                LOGGER.info("Snapshot step 2 - Determining captured tables");
                determineCapturedTables(relationalSnapshotContext, dataCollectionPattern, snapshottingTask);
                this.snapshotProgressListener.monitoredDataCollectionsDetermined(snapshotContext.partition, relationalSnapshotContext.capturedTables);
                this.connectionPool = createConnectionPool(relationalSnapshotContext);
                LOGGER.info("Snapshot step 3 - Locking captured tables {}", relationalSnapshotContext.capturedTables);
                if (snapshottingTask.snapshotSchema()) {
                    lockTablesForSchemaSnapshot(changeEventSourceContext, relationalSnapshotContext);
                }
                LOGGER.info("Snapshot step 4 - Determining snapshot offset");
                determineSnapshotOffset(relationalSnapshotContext, o);
                LOGGER.info("Snapshot step 5 - Reading structure of captured tables");
                readTableStructure(changeEventSourceContext, relationalSnapshotContext, o, snapshottingTask);
                if (snapshottingTask.snapshotSchema()) {
                    LOGGER.info("Snapshot step 6 - Persisting schema history");
                    createSchemaChangeEventsForTables(changeEventSourceContext, relationalSnapshotContext, snapshottingTask);
                    releaseSchemaSnapshotLocks(relationalSnapshotContext);
                } else {
                    LOGGER.info("Snapshot step 6 - Skipping persisting of schema history");
                }
                if (snapshottingTask.snapshotData()) {
                    LOGGER.info("Snapshot step 7 - Snapshotting data");
                    createDataEvents(changeEventSourceContext, relationalSnapshotContext, this.connectionPool, map, snapshottingTask);
                } else {
                    LOGGER.info("Snapshot step 7 - Skipping snapshotting of data");
                    releaseDataSnapshotLocks(relationalSnapshotContext);
                    relationalSnapshotContext.offset.preSnapshotCompletion();
                    relationalSnapshotContext.offset.postSnapshotCompletion();
                }
                postSnapshot();
                this.dispatcher.alwaysDispatchHeartbeatEvent(relationalSnapshotContext.partition, relationalSnapshotContext.offset);
                SnapshotResult<O> completed = SnapshotResult.completed(relationalSnapshotContext.offset);
                try {
                    if (this.connectionPool != null) {
                        for (JdbcConnection jdbcConnection : this.connectionPool) {
                            if (!this.jdbcConnection.equals(jdbcConnection)) {
                                jdbcConnection.close();
                            }
                        }
                    }
                    rollbackTransaction(connection);
                    return completed;
                } catch (Exception e) {
                    LOGGER.error("Error in finally block", (Throwable) e);
                    if (0 != 0) {
                        e.addSuppressed(null);
                    }
                    throw e;
                }
            } catch (Throwable th) {
                try {
                    if (this.connectionPool != null) {
                        for (JdbcConnection jdbcConnection2 : this.connectionPool) {
                            if (!this.jdbcConnection.equals(jdbcConnection2)) {
                                jdbcConnection2.close();
                            }
                        }
                    }
                    rollbackTransaction(connection);
                    throw th;
                } catch (Exception e2) {
                    LOGGER.error("Error in finally block", (Throwable) e2);
                    if (0 != 0) {
                        e2.addSuppressed(null);
                    }
                    throw e2;
                }
            }
        } catch (AssertionError | Exception e3) {
            LOGGER.error("Error during snapshot", e3);
            throw e3;
        }
    }

    private Queue<JdbcConnection> createConnectionPool(RelationalSnapshotContext<P, O> relationalSnapshotContext) throws SQLException {
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(this.jdbcConnection);
        int max = Math.max(1, Math.min(this.connectorConfig.getSnapshotMaxThreads(), relationalSnapshotContext.capturedTables.size()));
        if (max > 1) {
            Optional<String> snapshotConnectionFirstSelect = getSnapshotConnectionFirstSelect(relationalSnapshotContext, relationalSnapshotContext.capturedTables.iterator().next());
            for (int i = 1; i < max; i++) {
                JdbcConnection autoCommit = this.jdbcConnectionFactory.newConnection().setAutoCommit(false);
                autoCommit.connection().setTransactionIsolation(this.jdbcConnection.connection().getTransactionIsolation());
                connectionPoolConnectionCreated(relationalSnapshotContext, autoCommit);
                concurrentLinkedQueue.add(autoCommit);
                if (snapshotConnectionFirstSelect.isPresent()) {
                    autoCommit.execute(snapshotConnectionFirstSelect.get());
                }
            }
        }
        LOGGER.info("Created connection pool with {} threads", Integer.valueOf(max));
        return concurrentLinkedQueue;
    }

    public Connection createSnapshotConnection() throws SQLException {
        Connection connection = this.jdbcConnection.connection();
        connection.setAutoCommit(false);
        return connection;
    }

    @Override // io.debezium.pipeline.source.spi.SnapshotChangeEventSource
    public SnapshottingTask getSnapshottingTask(P p, O o) {
        Snapshotter snapshotter = this.snapshotterService.getSnapshotter();
        List<String> dataCollectionsToBeSnapshotted = this.connectorConfig.getDataCollectionsToBeSnapshotted();
        Map map = (Map) this.connectorConfig.getSnapshotSelectOverridesByTable().entrySet().stream().collect(Collectors.toMap(entry -> {
            return ((DataCollectionId) entry.getKey()).identifier();
        }, (v0) -> {
            return v0.getValue();
        }));
        boolean z = o != null;
        boolean z2 = false;
        if (z) {
            z2 = o.isSnapshotRunning();
        }
        if (z && !o.isSnapshotRunning()) {
            LOGGER.info("A previous offset indicating a completed snapshot has been found.");
        }
        boolean shouldSnapshotSchema = snapshotter.shouldSnapshotSchema(z, z2);
        boolean shouldSnapshotData = snapshotter.shouldSnapshotData(z, z2);
        if (shouldSnapshotData && shouldSnapshotSchema) {
            LOGGER.info("According to the connector configuration both schema and data will be snapshot.");
        } else if (shouldSnapshotSchema) {
            LOGGER.info("According to the connector configuration only schema will be snapshot.");
        }
        return new SnapshottingTask(shouldSnapshotSchema, shouldSnapshotData, dataCollectionsToBeSnapshotted, map, false);
    }

    protected void connectionCreated(RelationalSnapshotContext<P, O> relationalSnapshotContext) throws Exception {
    }

    protected void connectionPoolConnectionCreated(RelationalSnapshotContext<P, O> relationalSnapshotContext, JdbcConnection jdbcConnection) throws SQLException {
    }

    protected List<Pattern> getSignalDataCollectionPattern(String str) {
        return Strings.listOfRegex(str, 2);
    }

    private Stream<TableId> toTableIds(Set<TableId> set, Pattern pattern) {
        return set.stream().filter(tableId -> {
            return pattern.asMatchPredicate().test(this.connectorConfig.getTableIdMapper().toString(tableId));
        }).sorted();
    }

    private Set<TableId> addSignalingCollectionAndSort(Set<TableId> set) {
        String tableIncludeList = this.connectorConfig.tableIncludeList();
        String signalingDataCollectionId = this.connectorConfig.getSignalingDataCollectionId();
        ArrayList arrayList = new ArrayList();
        if (Strings.isNullOrBlank(tableIncludeList)) {
            arrayList.add(MATCH_ALL_PATTERN);
        } else {
            arrayList.addAll(Strings.listOfRegex(tableIncludeList, 2));
        }
        if (!Strings.isNullOrBlank(signalingDataCollectionId)) {
            arrayList.addAll(getSignalDataCollectionPattern(signalingDataCollectionId));
        }
        return (Set) arrayList.stream().flatMap(pattern -> {
            return toTableIds(set, pattern);
        }).collect(Collectors.toCollection(LinkedHashSet::new));
    }

    private void determineCapturedTables(RelationalSnapshotContext<P, O> relationalSnapshotContext, Set<Pattern> set, SnapshottingTask snapshottingTask) throws Exception {
        Set<TableId> allTableIds = getAllTableIds(relationalSnapshotContext);
        Set<TableId> set2 = (Set) determineDataCollectionsToBeSnapshotted(allTableIds, set).collect(Collectors.toSet());
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        for (TableId tableId : allTableIds) {
            if (this.connectorConfig.getTableFilters().eligibleForSchemaDataCollectionFilter().isIncluded(tableId) && !snapshottingTask.isOnDemand()) {
                LOGGER.info("Adding table {} to the list of capture schema tables", tableId);
                hashSet2.add(tableId);
            }
        }
        for (TableId tableId2 : set2) {
            if (this.connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId2)) {
                LOGGER.trace("Adding table {} to the list of captured tables for which the data will be snapshotted", tableId2);
                hashSet.add(tableId2);
            } else {
                LOGGER.trace("Ignoring table {} for data snapshotting as it's not included in the filter configuration", tableId2);
            }
        }
        relationalSnapshotContext.capturedTables = addSignalingCollectionAndSort(hashSet);
        relationalSnapshotContext.capturedSchemaTables = snapshottingTask.isOnDemand() ? relationalSnapshotContext.capturedTables : (Set) hashSet2.stream().sorted().collect(Collectors.toCollection(LinkedHashSet::new));
    }

    protected abstract Set<TableId> getAllTableIds(RelationalSnapshotContext<P, O> relationalSnapshotContext) throws Exception;

    protected abstract void lockTablesForSchemaSnapshot(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, RelationalSnapshotContext<P, O> relationalSnapshotContext) throws Exception;

    protected abstract void determineSnapshotOffset(RelationalSnapshotContext<P, O> relationalSnapshotContext, O o) throws Exception;

    protected abstract void readTableStructure(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, RelationalSnapshotContext<P, O> relationalSnapshotContext, O o, SnapshottingTask snapshottingTask) throws Exception;

    protected abstract void releaseSchemaSnapshotLocks(RelationalSnapshotContext<P, O> relationalSnapshotContext) throws Exception;

    protected void releaseDataSnapshotLocks(RelationalSnapshotContext<P, O> relationalSnapshotContext) throws Exception {
    }

    protected void createSchemaChangeEventsForTables(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, RelationalSnapshotContext<P, O> relationalSnapshotContext, SnapshottingTask snapshottingTask) throws Exception {
        tryStartingSnapshot(relationalSnapshotContext);
        if (this.schema.isHistorized()) {
            Iterator<TableId> it = getTablesForSchemaChange(relationalSnapshotContext).iterator();
            while (it.hasNext()) {
                TableId next = it.next();
                if (!changeEventSourceContext.isRunning()) {
                    throw new InterruptedException("Interrupted while capturing schema of table " + next);
                }
                LOGGER.info("Capturing structure of table {}", next);
                relationalSnapshotContext.offset.event(next, getClock().currentTime());
                if (!snapshottingTask.snapshotData() && !it.hasNext()) {
                    lastSnapshotRecord(relationalSnapshotContext);
                }
                SchemaChangeEvent createTableEvent = getCreateTableEvent(relationalSnapshotContext, relationalSnapshotContext.tables.forTable(next));
                if (!HistorizedRelationalDatabaseSchema.class.isAssignableFrom(this.schema.getClass()) || !((HistorizedRelationalDatabaseSchema) this.schema).skipSchemaChangeEvent(createTableEvent)) {
                    this.dispatcher.dispatchSchemaChangeEvent(relationalSnapshotContext.partition, relationalSnapshotContext.offset, next, receiver -> {
                        try {
                            receiver.schemaChangeEvent(createTableEvent);
                        } catch (Exception e) {
                            throw new DebeziumException(e);
                        }
                    });
                }
            }
        }
    }

    protected Collection<TableId> getTablesForSchemaChange(RelationalSnapshotContext<P, O> relationalSnapshotContext) {
        return relationalSnapshotContext.capturedTables;
    }

    protected abstract SchemaChangeEvent getCreateTableEvent(RelationalSnapshotContext<P, O> relationalSnapshotContext, Table table) throws Exception;

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v100, types: [java.util.Map] */
    private void createDataEvents(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, RelationalSnapshotContext<P, O> relationalSnapshotContext, Queue<JdbcConnection> queue, Map<DataCollectionId, String> map, SnapshottingTask snapshottingTask) throws Exception {
        tryStartingSnapshot(relationalSnapshotContext);
        EventDispatcher.SnapshotReceiver<P> snapshotChangeEventReceiver = this.dispatcher.getSnapshotChangeEventReceiver();
        int size = queue.size();
        LOGGER.info("Creating snapshot worker pool with {} worker thread(s)", Integer.valueOf(size));
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(size);
        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(newFixedThreadPool);
        HashMap hashMap = new HashMap();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (TableId tableId : relationalSnapshotContext.capturedTables) {
            Optional<String> determineSnapshotSelect = determineSnapshotSelect(relationalSnapshotContext, tableId, map);
            if (determineSnapshotSelect.isPresent()) {
                LOGGER.info("For table '{}' using select statement: '{}'", tableId, determineSnapshotSelect.get());
                hashMap.put(tableId, determineSnapshotSelect.get());
                linkedHashMap.put(tableId, rowCountForTable(tableId));
            } else {
                LOGGER.warn("For table '{}' the select statement was not provided, skipping table", tableId);
                this.snapshotProgressListener.dataCollectionSnapshotCompleted(relationalSnapshotContext.partition, tableId, 0L);
            }
        }
        if (this.connectorConfig.snapshotOrderByRowCount() != RelationalDatabaseConnectorConfig.SnapshotTablesRowCountOrder.DISABLED) {
            LOGGER.info("Sort tables by row count '{}'", this.connectorConfig.snapshotOrderByRowCount());
            int i = this.connectorConfig.snapshotOrderByRowCount() == RelationalDatabaseConnectorConfig.SnapshotTablesRowCountOrder.ASCENDING ? 1 : -1;
            linkedHashMap = (Map) linkedHashMap.entrySet().stream().sorted(Map.Entry.comparingByValue((optionalLong, optionalLong2) -> {
                return i * Long.compare(optionalLong.orElse(0L), optionalLong2.orElse(0L));
            })).collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            }, (optionalLong3, optionalLong4) -> {
                return optionalLong3;
            }, LinkedHashMap::new));
        }
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        concurrentLinkedQueue.add(relationalSnapshotContext.offset);
        for (int i2 = 1; i2 < size; i2++) {
            concurrentLinkedQueue.add(copyOffset(relationalSnapshotContext));
        }
        try {
            int size2 = linkedHashMap.size();
            int i3 = 1;
            for (TableId tableId2 : linkedHashMap.keySet()) {
                boolean z = i3 == 1 && size == 1;
                boolean z2 = i3 == size2 && size == 1;
                String str = (String) hashMap.get(tableId2);
                OptionalLong optionalLong5 = (OptionalLong) linkedHashMap.get(tableId2);
                this.notificationService.initialSnapshotNotificationService().notifyTableInProgress(relationalSnapshotContext.partition, relationalSnapshotContext.offset, tableId2.identifier(), linkedHashMap.keySet());
                int i4 = i3;
                i3++;
                executorCompletionService.submit(createDataEventsForTableCallable(changeEventSourceContext, relationalSnapshotContext, snapshotChangeEventReceiver, relationalSnapshotContext.tables.forTable(tableId2), z, z2, i4, size2, str, optionalLong5, queue, concurrentLinkedQueue));
            }
            for (int i5 = 0; i5 < size2; i5++) {
                executorCompletionService.take().get();
            }
            releaseDataSnapshotLocks(relationalSnapshotContext);
            Iterator<O> it = concurrentLinkedQueue.iterator();
            while (it.hasNext()) {
                it.next().preSnapshotCompletion();
            }
            snapshotChangeEventReceiver.completeSnapshot();
            Iterator<O> it2 = concurrentLinkedQueue.iterator();
            while (it2.hasNext()) {
                it2.next().postSnapshotCompletion();
            }
        } finally {
            newFixedThreadPool.shutdownNow();
        }
    }

    protected abstract O copyOffset(RelationalSnapshotContext<P, O> relationalSnapshotContext);

    /* JADX INFO: Access modifiers changed from: protected */
    public void tryStartingSnapshot(RelationalSnapshotContext<P, O> relationalSnapshotContext) {
        if (relationalSnapshotContext.offset.isSnapshotRunning()) {
            return;
        }
        relationalSnapshotContext.offset.preSnapshotStart();
    }

    protected Instant getSnapshotSourceTimestamp(JdbcConnection jdbcConnection, O o, TableId tableId) {
        try {
            Optional<Instant> currentTimestamp = jdbcConnection.getCurrentTimestamp();
            if (currentTimestamp.isEmpty()) {
                throw new ConnectException("Failed reading CURRENT_TIMESTAMP from source database");
            }
            return currentTimestamp.get();
        } catch (SQLException e) {
            throw new ConnectException("Failed reading CURRENT_TIMESTAMP from source database", e);
        }
    }

    protected Callable<Void> createDataEventsForTableCallable(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, RelationalSnapshotContext<P, O> relationalSnapshotContext, EventDispatcher.SnapshotReceiver<P> snapshotReceiver, Table table, boolean z, boolean z2, int i, int i2, String str, OptionalLong optionalLong, Queue<JdbcConnection> queue, Queue<O> queue2) {
        return () -> {
            JdbcConnection jdbcConnection = (JdbcConnection) queue.poll();
            OffsetContext offsetContext = (OffsetContext) queue2.poll();
            try {
                try {
                    doCreateDataEventsForTable(changeEventSourceContext, relationalSnapshotContext, offsetContext, snapshotReceiver, table, z, z2, i, i2, str, optionalLong, jdbcConnection);
                    queue2.add(offsetContext);
                    queue.add(jdbcConnection);
                    return null;
                } catch (SQLException e) {
                    this.notificationService.initialSnapshotNotificationService().notifyCompletedTableWithError(relationalSnapshotContext.partition, relationalSnapshotContext.offset, table.id().identifier());
                    throw new ConnectException("Snapshotting of table " + table.id() + " failed", e);
                }
            } catch (Throwable th) {
                queue2.add(offsetContext);
                queue.add(jdbcConnection);
                throw th;
            }
        };
    }

    protected void doCreateDataEventsForTable(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, RelationalSnapshotContext<P, O> relationalSnapshotContext, O o, EventDispatcher.SnapshotReceiver<P> snapshotReceiver, Table table, boolean z, boolean z2, int i, int i2, String str, OptionalLong optionalLong, JdbcConnection jdbcConnection) throws InterruptedException, SQLException {
        if (!changeEventSourceContext.isRunning()) {
            throw new InterruptedException("Interrupted while snapshotting table " + table.id());
        }
        long currentTimeInMillis = this.clock.currentTimeInMillis();
        LOGGER.info("Exporting data from table '{}' ({} of {} tables)", table.id(), Integer.valueOf(i), Integer.valueOf(i2));
        Instant snapshotSourceTimestamp = getSnapshotSourceTimestamp(jdbcConnection, o, table.id());
        Statement readTableStatement = readTableStatement(jdbcConnection, optionalLong);
        try {
            ResultSet resultSetForDataEvents = resultSetForDataEvents(str, readTableStatement);
            try {
                ColumnUtils.ColumnArray array = ColumnUtils.toArray(resultSetForDataEvents, table);
                long j = 0;
                Threads.Timer tableScanLogTimer = getTableScanLogTimer();
                boolean next = resultSetForDataEvents.next();
                if (next) {
                    while (next) {
                        if (!changeEventSourceContext.isRunning()) {
                            throw new InterruptedException("Interrupted while snapshotting table " + table.id());
                        }
                        j++;
                        Object[] rowToArray = jdbcConnection.rowToArray(table, resultSetForDataEvents, array);
                        if (tableScanLogTimer.expired()) {
                            long currentTimeInMillis2 = this.clock.currentTimeInMillis();
                            if (optionalLong.isPresent()) {
                                LOGGER.info("\t Exported {} of {} records for table '{}' after {}", Long.valueOf(j), Long.valueOf(optionalLong.getAsLong()), table.id(), Strings.duration(currentTimeInMillis2 - currentTimeInMillis));
                            } else {
                                LOGGER.info("\t Exported {} records for table '{}' after {}", Long.valueOf(j), table.id(), Strings.duration(currentTimeInMillis2 - currentTimeInMillis));
                            }
                            this.snapshotProgressListener.rowsScanned(relationalSnapshotContext.partition, table.id(), j);
                            tableScanLogTimer = getTableScanLogTimer();
                        }
                        next = resultSetForDataEvents.next();
                        setSnapshotMarker(o, z, z2, j == 1, !next);
                        this.dispatcher.dispatchSnapshotEvent(relationalSnapshotContext.partition, table.id(), getChangeRecordEmitter(relationalSnapshotContext.partition, o, table.id(), rowToArray, snapshotSourceTimestamp), snapshotReceiver);
                    }
                } else {
                    setSnapshotMarker(o, z, z2, false, true);
                }
                LOGGER.info("\t Finished exporting {} records for table '{}' ({} of {} tables); total duration '{}'", Long.valueOf(j), table.id(), Integer.valueOf(i), Integer.valueOf(i2), Strings.duration(this.clock.currentTimeInMillis() - currentTimeInMillis));
                this.snapshotProgressListener.dataCollectionSnapshotCompleted(relationalSnapshotContext.partition, table.id(), j);
                this.notificationService.initialSnapshotNotificationService().notifyCompletedTableSuccessfully(relationalSnapshotContext.partition, relationalSnapshotContext.offset, table.id().identifier(), j, relationalSnapshotContext.capturedTables);
                if (resultSetForDataEvents != null) {
                    resultSetForDataEvents.close();
                }
                if (readTableStatement != null) {
                    readTableStatement.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (readTableStatement != null) {
                try {
                    readTableStatement.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    protected ResultSet resultSetForDataEvents(String str, Statement statement) throws SQLException {
        return CancellableResultSet.from(statement.executeQuery(str));
    }

    private void setSnapshotMarker(OffsetContext offsetContext, boolean z, boolean z2, boolean z3, boolean z4) {
        if (z4 && z2) {
            offsetContext.markSnapshotRecord(SnapshotRecord.LAST);
            return;
        }
        if (z3 && z) {
            offsetContext.markSnapshotRecord(SnapshotRecord.FIRST);
            return;
        }
        if (z4) {
            offsetContext.markSnapshotRecord(SnapshotRecord.LAST_IN_DATA_COLLECTION);
        } else if (z3) {
            offsetContext.markSnapshotRecord(SnapshotRecord.FIRST_IN_DATA_COLLECTION);
        } else {
            offsetContext.markSnapshotRecord(SnapshotRecord.TRUE);
        }
    }

    protected void lastSnapshotRecord(RelationalSnapshotContext<P, O> relationalSnapshotContext) {
        relationalSnapshotContext.offset.markSnapshotRecord(SnapshotRecord.LAST);
    }

    protected OptionalLong rowCountForTable(TableId tableId) {
        return OptionalLong.empty();
    }

    private Threads.Timer getTableScanLogTimer() {
        return Threads.timer(this.clock, LOG_INTERVAL);
    }

    protected ChangeRecordEmitter<P> getChangeRecordEmitter(P p, O o, TableId tableId, Object[] objArr, Instant instant) {
        o.event(tableId, instant);
        return new SnapshotChangeRecordEmitter(p, o, objArr, getClock(), this.connectorConfig);
    }

    private Optional<String> determineSnapshotSelect(RelationalSnapshotContext<P, O> relationalSnapshotContext, TableId tableId, Map<DataCollectionId, String> map) {
        String snapshotSelectOverridesByTable = getSnapshotSelectOverridesByTable(tableId, map);
        return snapshotSelectOverridesByTable != null ? Optional.of(enhanceOverriddenSelect(relationalSnapshotContext, snapshotSelectOverridesByTable, tableId)) : getSnapshotSelect(relationalSnapshotContext, tableId, getPreparedColumnNames(relationalSnapshotContext.partition, this.schema.tableFor(tableId)));
    }

    protected String getSnapshotSelectOverridesByTable(TableId tableId, Map<DataCollectionId, String> map) {
        String str = map.get(tableId);
        if (str == null) {
            str = map.get(new TableId(null, tableId.schema(), tableId.table()));
        }
        return str;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<String> getPreparedColumnNames(P p, Table table) {
        Stream<String> filter = table.retrieveColumnNames().stream().filter(str -> {
            return additionalColumnFilter(p, table.id(), str);
        }).filter(str2 -> {
            return this.connectorConfig.getColumnFilter().matches(table.id().catalog(), table.id().schema(), table.id().table(), str2);
        });
        JdbcConnection jdbcConnection = this.jdbcConnection;
        Objects.requireNonNull(jdbcConnection);
        List<String> list = (List) filter.map(jdbcConnection::quotedColumnIdString).collect(Collectors.toList());
        if (list.isEmpty()) {
            LOGGER.info("\t All columns in table {} were excluded due to include/exclude lists, defaulting to selecting all columns", table.id());
            Stream<String> stream = table.retrieveColumnNames().stream();
            JdbcConnection jdbcConnection2 = this.jdbcConnection;
            Objects.requireNonNull(jdbcConnection2);
            list = (List) stream.map(jdbcConnection2::quotedColumnIdString).collect(Collectors.toList());
        }
        return list;
    }

    protected boolean additionalColumnFilter(P p, TableId tableId, String str) {
        return true;
    }

    protected String enhanceOverriddenSelect(RelationalSnapshotContext<P, O> relationalSnapshotContext, String str, TableId tableId) {
        return str;
    }

    protected abstract Optional<String> getSnapshotSelect(RelationalSnapshotContext<P, O> relationalSnapshotContext, TableId tableId, List<String> list);

    protected Optional<String> getSnapshotConnectionFirstSelect(RelationalSnapshotContext<P, O> relationalSnapshotContext, TableId tableId) {
        return Optional.empty();
    }

    protected Statement readTableStatement(JdbcConnection jdbcConnection, OptionalLong optionalLong) throws SQLException {
        return jdbcConnection.readTableStatement(this.connectorConfig, optionalLong);
    }

    private void rollbackTransaction(Connection connection) {
        if (connection != null) {
            try {
                connection.rollback();
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }
        }
    }

    protected Clock getClock() {
        return this.clock;
    }

    protected void postSnapshot() throws InterruptedException {
    }

    protected void preSnapshot() throws InterruptedException {
    }
}
