package io.debezium.connector.oracle.logminer.processor.memory;

import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.LogMinerQueryBuilder;
import io.debezium.connector.oracle.logminer.SqlUtils;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
import io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.relational.TableId;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/debezium-connector-oracle-1.9.5.Final.jar:io/debezium/connector/oracle/logminer/processor/memory/MemoryLogMinerEventProcessor.class */
public class MemoryLogMinerEventProcessor extends AbstractLogMinerEventProcessor<MemoryTransaction> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) MemoryLogMinerEventProcessor.class);
    private final OracleConnection jdbcConnection;
    private final EventDispatcher<OraclePartition, TableId> dispatcher;
    private final OraclePartition partition;
    private final OracleOffsetContext offsetContext;
    private final OracleStreamingChangeEventSourceMetrics metrics;
    private final Map<String, MemoryTransaction> transactionCache;
    private final Map<String, Scn> recentlyProcessedTransactionsCache;
    private final Set<Scn> schemaChangesCache;
    private final Set<String> abandonedTransactionsCache;

    public MemoryLogMinerEventProcessor(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, OracleConnectorConfig oracleConnectorConfig, OracleConnection oracleConnection, EventDispatcher<OraclePartition, TableId> eventDispatcher, OraclePartition oraclePartition, OracleOffsetContext oracleOffsetContext, OracleDatabaseSchema oracleDatabaseSchema, OracleStreamingChangeEventSourceMetrics oracleStreamingChangeEventSourceMetrics) {
        super(changeEventSourceContext, oracleConnectorConfig, oracleDatabaseSchema, oraclePartition, oracleOffsetContext, eventDispatcher, oracleStreamingChangeEventSourceMetrics);
        this.transactionCache = new HashMap();
        this.recentlyProcessedTransactionsCache = new HashMap();
        this.schemaChangesCache = new HashSet();
        this.abandonedTransactionsCache = new HashSet();
        this.jdbcConnection = oracleConnection;
        this.dispatcher = eventDispatcher;
        this.partition = oraclePartition;
        this.offsetContext = oracleOffsetContext;
        this.metrics = oracleStreamingChangeEventSourceMetrics;
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor, io.debezium.connector.oracle.logminer.processor.infinispan.CacheProvider
    protected Map<String, MemoryTransaction> getTransactionCache() {
        return this.transactionCache;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    public MemoryTransaction createTransaction(LogMinerEventRow logMinerEventRow) {
        return new MemoryTransaction(logMinerEventRow.getTransactionId(), logMinerEventRow.getScn(), logMinerEventRow.getChangeTime(), logMinerEventRow.getUserName());
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    protected void removeEventWithRowId(LogMinerEventRow logMinerEventRow) {
        MemoryTransaction memoryTransaction;
        MemoryTransaction memoryTransaction2 = getTransactionCache().get(logMinerEventRow.getTransactionId());
        if (memoryTransaction2 != null) {
            if (memoryTransaction2.removeEventWithRowId(logMinerEventRow.getRowId())) {
                return;
            }
            LOGGER.warn("Cannot undo change '{}' since event with row-id {} was not found.", logMinerEventRow, logMinerEventRow.getRowId());
            return;
        }
        if (!isTransactionIdWithNoSequence(logMinerEventRow.getTransactionId())) {
            LOGGER.warn("Cannot undo change '{}' since transaction was not found.", logMinerEventRow);
            return;
        }
        String transactionIdPrefix = getTransactionIdPrefix(logMinerEventRow.getTransactionId());
        LOGGER.debug("Undo change refers to a transaction that has no explicit sequence, '{}'", logMinerEventRow.getTransactionId());
        LOGGER.debug("Checking all transactions with prefix '{}'", transactionIdPrefix);
        for (String str : getTransactionCache().keySet()) {
            if (str.startsWith(transactionIdPrefix) && (memoryTransaction = getTransactionCache().get(str)) != null && memoryTransaction.removeEventWithRowId(logMinerEventRow.getRowId())) {
                LOGGER.debug("Undo change '{}' applied to transaction '{}'", logMinerEventRow, str);
                return;
            }
        }
        LOGGER.warn("Cannot undo change '{}' since event with row-id {} was not found.", logMinerEventRow, logMinerEventRow.getRowId());
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
    }

    @Override // io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor
    public void abandonTransactions(Duration duration) throws InterruptedException {
        if (Duration.ZERO.equals(duration)) {
            return;
        }
        Optional<Scn> lastScnToAbandon = getLastScnToAbandon(this.jdbcConnection, this.offsetContext.getScn(), duration);
        if (lastScnToAbandon.isPresent()) {
            Scn scn = lastScnToAbandon.get();
            LOGGER.warn("All transactions with SCN <= {} will be abandoned.", scn);
            Scn transactionCacheMinimumScn = getTransactionCacheMinimumScn();
            if (!transactionCacheMinimumScn.isNull()) {
                if (scn.compareTo(transactionCacheMinimumScn) < 0) {
                    scn = transactionCacheMinimumScn;
                }
                Iterator<Map.Entry<String, MemoryTransaction>> it = this.transactionCache.entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry<String, MemoryTransaction> next = it.next();
                    if (next.getValue().getStartScn().compareTo(scn) <= 0) {
                        LOGGER.warn("Transaction {} is being abandoned.", next.getKey());
                        this.abandonedTransactionsCache.add(next.getKey());
                        it.remove();
                        this.metrics.addAbandonedTransactionId(next.getKey());
                        this.metrics.setActiveTransactions(this.transactionCache.size());
                    }
                }
                Scn transactionCacheMinimumScn2 = getTransactionCacheMinimumScn();
                this.metrics.setOldestScn(transactionCacheMinimumScn2.isNull() ? Scn.valueOf(-1) : transactionCacheMinimumScn2);
            }
            this.offsetContext.setScn(scn);
            this.dispatcher.dispatchHeartbeatEvent(this.partition, this.offsetContext);
        }
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    protected boolean isRecentlyProcessed(String str) {
        return this.recentlyProcessedTransactionsCache.containsKey(str);
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    protected boolean hasSchemaChangeBeenSeen(LogMinerEventRow logMinerEventRow) {
        return this.schemaChangesCache.contains(logMinerEventRow.getScn());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    public MemoryTransaction getAndRemoveTransactionFromCache(String str) {
        return getTransactionCache().remove(str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    public void removeTransactionAndEventsFromCache(MemoryTransaction memoryTransaction) {
        this.abandonedTransactionsCache.remove(memoryTransaction.getTransactionId());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    public Iterator<LogMinerEvent> getTransactionEventIterator(MemoryTransaction memoryTransaction) {
        return memoryTransaction.getEvents().iterator();
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    protected void finalizeTransactionCommit(String str, Scn scn) {
        if (getConfig().isLobEnabled()) {
            this.recentlyProcessedTransactionsCache.put(str, scn);
        }
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    protected void finalizeTransactionRollback(String str, Scn scn) {
        this.transactionCache.remove(str);
        this.abandonedTransactionsCache.remove(str);
        this.recentlyProcessedTransactionsCache.put(str, scn);
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    protected void handleSchemaChange(LogMinerEventRow logMinerEventRow) throws InterruptedException {
        super.handleSchemaChange(logMinerEventRow);
        if (logMinerEventRow.getTableName() == null || !getConfig().isLobEnabled()) {
            return;
        }
        this.schemaChangesCache.add(logMinerEventRow.getScn());
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    protected void addToTransaction(String str, LogMinerEventRow logMinerEventRow, Supplier<LogMinerEvent> supplier) {
        if (this.abandonedTransactionsCache.contains(str)) {
            LOGGER.warn("Event for abandoned transaction {}, skipped.", str);
            return;
        }
        if (isRecentlyProcessed(str)) {
            if (getConfig().isLobEnabled()) {
                return;
            }
            LOGGER.warn("Event for transaction {} has already been processed, skipped.", str);
            return;
        }
        MemoryTransaction memoryTransaction = getTransactionCache().get(str);
        if (memoryTransaction == null) {
            LOGGER.trace("Transaction {} not in cache for DML, creating.", str);
            memoryTransaction = createTransaction(logMinerEventRow);
            getTransactionCache().put(str, memoryTransaction);
        }
        int nextEventId = memoryTransaction.getNextEventId();
        if (memoryTransaction.getEvents().size() <= nextEventId) {
            LOGGER.trace("Transaction {}, adding event reference at index {}", str, Integer.valueOf(nextEventId));
            memoryTransaction.getEvents().add(supplier.get());
            this.metrics.calculateLagMetrics(logMinerEventRow.getChangeTime());
        }
        this.metrics.setActiveTransactions(getTransactionCache().size());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    public int getTransactionEventCount(MemoryTransaction memoryTransaction) {
        return memoryTransaction.getEvents().size();
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    protected PreparedStatement createQueryStatement() throws SQLException {
        return this.jdbcConnection.connection().prepareStatement(LogMinerQueryBuilder.build(getConfig(), getSchema()), 1003, 1007, 1);
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    protected Scn calculateNewStartScn(Scn scn, Scn scn2) throws InterruptedException {
        if (getConfig().isLobEnabled()) {
            if (!this.transactionCache.isEmpty() || scn2.isNull()) {
                abandonTransactions(getConfig().getLogMiningTransactionRetention());
                Scn transactionCacheMinimumScn = getTransactionCacheMinimumScn();
                if (!transactionCacheMinimumScn.isNull()) {
                    this.recentlyProcessedTransactionsCache.entrySet().removeIf(entry -> {
                        return ((Scn) entry.getValue()).compareTo(transactionCacheMinimumScn) < 0;
                    });
                    this.schemaChangesCache.removeIf(scn3 -> {
                        return scn3.compareTo(transactionCacheMinimumScn) < 0;
                    });
                    this.offsetContext.setScn(transactionCacheMinimumScn.subtract(Scn.valueOf(1)));
                    this.dispatcher.dispatchHeartbeatEvent(this.partition, this.offsetContext);
                }
            } else {
                this.offsetContext.setScn(scn2);
                this.dispatcher.dispatchHeartbeatEvent(this.partition, this.offsetContext);
            }
            return this.offsetContext.getScn();
        }
        if (!getLastProcessedScn().isNull() && getLastProcessedScn().compareTo(scn) < 0) {
            scn = getLastProcessedScn();
        }
        if (this.transactionCache.isEmpty()) {
            this.offsetContext.setScn(scn);
            this.dispatcher.dispatchHeartbeatEvent(this.partition, this.offsetContext);
        } else {
            abandonTransactions(getConfig().getLogMiningTransactionRetention());
            Scn transactionCacheMinimumScn2 = getTransactionCacheMinimumScn();
            if (!transactionCacheMinimumScn2.isNull()) {
                this.offsetContext.setScn(transactionCacheMinimumScn2.subtract(Scn.valueOf(1)));
                this.dispatcher.dispatchHeartbeatEvent(this.partition, this.offsetContext);
            }
        }
        return scn;
    }

    protected Optional<Scn> getLastScnToAbandon(OracleConnection oracleConnection, Scn scn, Duration duration) {
        try {
            Float f = (Float) oracleConnection.singleOptionalValue(SqlUtils.diffInDaysQuery(scn), resultSet -> {
                return Float.valueOf(resultSet.getFloat(1));
            });
            return (f == null || f.floatValue() * 24.0f <= ((float) duration.toHours())) ? Optional.empty() : Optional.of(scn);
        } catch (SQLException e) {
            LOGGER.error("Cannot calculate days difference for transaction abandonment", (Throwable) e);
            this.metrics.incrementErrorCount();
            return Optional.of(scn);
        }
    }

    @Override // io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor
    protected Scn getTransactionCacheMinimumScn() {
        return (Scn) this.transactionCache.values().stream().map((v0) -> {
            return v0.getStartScn();
        }).min((v0, v1) -> {
            return v0.compareTo(v1);
        }).orElse(Scn.NULL);
    }
}
