package io.debezium.connector.oracle.olr;

import io.debezium.DebeziumException;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleSchemaChangeEventEmitter;
import io.debezium.connector.oracle.OracleValueConverters;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.olr.client.OlrNetworkClient;
import io.debezium.connector.oracle.olr.client.PayloadEvent;
import io.debezium.connector.oracle.olr.client.StreamingEvent;
import io.debezium.connector.oracle.olr.client.payloads.AbstractMutationEvent;
import io.debezium.connector.oracle.olr.client.payloads.SchemaChangeEvent;
import io.debezium.connector.oracle.olr.client.payloads.Values;
import io.debezium.data.Envelope;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.util.Clock;
import java.time.Instant;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/debezium-connector-oracle-2.6.1.Final.jar:io/debezium/connector/oracle/olr/OpenLogReplicatorStreamingChangeEventSource.class */
public class OpenLogReplicatorStreamingChangeEventSource implements StreamingChangeEventSource<OraclePartition, OracleOffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) OpenLogReplicatorStreamingChangeEventSource.class);
    private final OracleConnectorConfig connectorConfig;
    private final OracleConnection jdbcConnection;
    private final EventDispatcher<OraclePartition, TableId> dispatcher;
    private final ErrorHandler errorHandler;
    private final Clock clock;
    private final OracleDatabaseSchema schema;
    private final OpenLogReplicatorStreamingChangeEventSourceMetrics streamingMetrics;
    private final SnapshotterService snapshotterService;
    private OlrNetworkClient client;
    private OraclePartition partition;
    private OracleOffsetContext offsetContext;
    private boolean transactionEvents = false;
    private Scn lastCheckpointScn = Scn.NULL;
    private long lastCheckpointIndex;

    public OpenLogReplicatorStreamingChangeEventSource(OracleConnectorConfig oracleConnectorConfig, OracleConnection oracleConnection, EventDispatcher<OraclePartition, TableId> eventDispatcher, ErrorHandler errorHandler, Clock clock, OracleDatabaseSchema oracleDatabaseSchema, OpenLogReplicatorStreamingChangeEventSourceMetrics openLogReplicatorStreamingChangeEventSourceMetrics, SnapshotterService snapshotterService) {
        this.connectorConfig = oracleConnectorConfig;
        this.dispatcher = eventDispatcher;
        this.jdbcConnection = oracleConnection;
        this.errorHandler = errorHandler;
        this.clock = clock;
        this.schema = oracleDatabaseSchema;
        this.streamingMetrics = openLogReplicatorStreamingChangeEventSourceMetrics;
        this.snapshotterService = snapshotterService;
    }

    @Override // io.debezium.pipeline.source.spi.StreamingChangeEventSource
    public void init(OracleOffsetContext oracleOffsetContext) throws InterruptedException {
        this.offsetContext = oracleOffsetContext == null ? emptyContext() : oracleOffsetContext;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // io.debezium.pipeline.source.spi.StreamingChangeEventSource
    public OracleOffsetContext getOffsetContext() {
        return this.offsetContext;
    }

    private OracleOffsetContext emptyContext() {
        return OracleOffsetContext.create().logicalName(this.connectorConfig).snapshotPendingTransactions(Collections.emptyMap()).transactionContext(new TransactionContext()).incrementalSnapshotContext(new SignalBasedIncrementalSnapshotContext()).build();
    }

    @Override // io.debezium.pipeline.source.spi.StreamingChangeEventSource
    public void execute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, OraclePartition oraclePartition, OracleOffsetContext oracleOffsetContext) throws InterruptedException {
        try {
            try {
                this.partition = oraclePartition;
                this.offsetContext = oracleOffsetContext;
                this.jdbcConnection.setAutoCommit(false);
                Scn offsetScn = this.connectorConfig.getAdapter().getOffsetScn(oracleOffsetContext);
                Long scnIndex = oracleOffsetContext.getScnIndex();
                this.client = new OlrNetworkClient(this.connectorConfig);
                if (this.client.connect(offsetScn, scnIndex)) {
                    while (this.client.isConnected() && changeEventSourceContext.isRunning()) {
                        StreamingEvent readEvent = this.client.readEvent();
                        if (readEvent != null) {
                            onEvent(readEvent);
                        }
                        if (changeEventSourceContext.isPaused()) {
                            LOGGER.info("Streaming will now pause");
                            changeEventSourceContext.streamingPaused();
                            changeEventSourceContext.waitSnapshotCompletion();
                            LOGGER.info("Streaming resumed");
                        }
                    }
                    this.client.disconnect();
                    LOGGER.info("Client disconnected.");
                } else {
                    LOGGER.warn("Failed to connect to OpenLogReplicator server.");
                }
                LOGGER.info("Streaming metrics dump: {}", this.streamingMetrics.toString());
                LOGGER.info("Offsets: {}", oracleOffsetContext);
            } catch (Exception e) {
                LOGGER.error("Failed: {}", e.getMessage(), e);
                this.errorHandler.setProducerThrowable(e);
                LOGGER.info("Streaming metrics dump: {}", this.streamingMetrics.toString());
                LOGGER.info("Offsets: {}", oracleOffsetContext);
            }
        } catch (Throwable th) {
            LOGGER.info("Streaming metrics dump: {}", this.streamingMetrics.toString());
            LOGGER.info("Offsets: {}", oracleOffsetContext);
            throw th;
        }
    }

    @Override // io.debezium.pipeline.source.spi.StreamingChangeEventSource
    public void commitOffset(Map<String, ?> map, Map<String, ?> map2) {
        confirmLastCheckpointScn();
    }

    private void confirmLastCheckpointScn() {
        if (!this.lastCheckpointScn.isNull() && this.lastCheckpointIndex > 0 && this.client != null && this.client.isConnected()) {
            this.client.confirm(this.lastCheckpointScn, Long.valueOf(this.lastCheckpointIndex));
        } else if (this.lastCheckpointScn.isNull()) {
            LOGGER.warn("Cannot flush latest offset SCN as no checkpoint event was received.");
        }
    }

    private void onEvent(StreamingEvent streamingEvent) throws Exception {
        for (PayloadEvent payloadEvent : streamingEvent.getPayload()) {
            switch (payloadEvent.getType()) {
                case BEGIN:
                    onBeginEvent(streamingEvent);
                    break;
                case COMMIT:
                    onCommitEvent(streamingEvent);
                    break;
                case CHECKPOINT:
                    onCheckpointEvent(streamingEvent);
                    break;
                case DDL:
                    onSchemaChangeEvent(streamingEvent, (SchemaChangeEvent) payloadEvent);
                    break;
                case INSERT:
                case UPDATE:
                case DELETE:
                    onMutationEvent(streamingEvent, (AbstractMutationEvent) payloadEvent);
                    break;
                default:
                    throw new DebeziumException("Unexpected event type detected: " + payloadEvent.getType());
            }
        }
        this.streamingMetrics.incrementProcessedEventsCount();
        this.streamingMetrics.setCheckpointDetails(streamingEvent.getCheckpointScn(), streamingEvent.getCheckpointIndex());
    }

    private void onBeginEvent(StreamingEvent streamingEvent) {
        this.offsetContext.setScn(streamingEvent.getCheckpointScn());
        this.offsetContext.setScnIndex(streamingEvent.getCheckpointIndex());
        this.offsetContext.setEventScn(streamingEvent.getCheckpointScn());
        this.offsetContext.setTransactionId(streamingEvent.getXid());
        this.offsetContext.setSourceTime(streamingEvent.getTimestamp());
        this.transactionEvents = false;
    }

    private void onCommitEvent(StreamingEvent streamingEvent) throws InterruptedException {
        this.offsetContext.setScn(streamingEvent.getCheckpointScn());
        this.offsetContext.setScnIndex(streamingEvent.getCheckpointIndex());
        this.offsetContext.setEventScn(streamingEvent.getCheckpointScn());
        this.offsetContext.setTransactionId(streamingEvent.getXid());
        this.offsetContext.setSourceTime(streamingEvent.getTimestamp());
        this.streamingMetrics.incrementCommittedTransactionCount();
        if (this.transactionEvents) {
            this.dispatcher.dispatchTransactionCommittedEvent(this.partition, this.offsetContext, streamingEvent.getTimestamp());
        }
        updateCheckpoint(streamingEvent);
        this.dispatcher.alwaysDispatchHeartbeatEvent(this.partition, this.offsetContext);
    }

    private void onCheckpointEvent(StreamingEvent streamingEvent) throws InterruptedException {
        this.offsetContext.setScn(streamingEvent.getCheckpointScn());
        this.offsetContext.setScnIndex(streamingEvent.getCheckpointIndex());
        this.offsetContext.setEventScn(streamingEvent.getCheckpointScn());
        this.offsetContext.setTransactionId(streamingEvent.getXid());
        this.offsetContext.setSourceTime(streamingEvent.getTimestamp());
        updateCheckpoint(streamingEvent);
        this.dispatcher.alwaysDispatchHeartbeatEvent(this.partition, this.offsetContext);
    }

    private void onMutationEvent(StreamingEvent streamingEvent, AbstractMutationEvent abstractMutationEvent) throws Exception {
        Envelope.Operation operation;
        PayloadEvent.Type type = abstractMutationEvent.getType();
        TableId tableId = abstractMutationEvent.getSchema().getTableId(streamingEvent.getDatabaseName());
        if (this.connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) {
            Table tableFor = this.schema.tableFor(tableId);
            if (tableFor == null) {
                Optional<Table> potentiallyEmitSchemaChangeForUnknownTable = potentiallyEmitSchemaChangeForUnknownTable(type, tableId);
                if (potentiallyEmitSchemaChangeForUnknownTable.isEmpty()) {
                    return;
                } else {
                    tableFor = potentiallyEmitSchemaChangeForUnknownTable.get();
                }
            }
            switch (type) {
                case INSERT:
                    operation = Envelope.Operation.CREATE;
                    break;
                case UPDATE:
                    operation = Envelope.Operation.UPDATE;
                    break;
                case DELETE:
                    operation = Envelope.Operation.DELETE;
                    break;
                default:
                    throw new DebeziumException("Unexpected DML event type: " + type);
            }
            this.offsetContext.setScn(streamingEvent.getCheckpointScn());
            this.offsetContext.setScnIndex(streamingEvent.getCheckpointIndex());
            this.offsetContext.setEventScn(streamingEvent.getCheckpointScn());
            this.offsetContext.setTransactionId(streamingEvent.getXid());
            this.offsetContext.tableEvent(tableId, streamingEvent.getTimestamp());
            this.streamingMetrics.setLastCapturedDmlCount(1);
            updateCheckpoint(streamingEvent);
            if (!this.transactionEvents) {
                this.dispatcher.dispatchTransactionStartedEvent(this.partition, streamingEvent.getXid(), this.offsetContext, streamingEvent.getTimestamp());
                this.transactionEvents = true;
            }
            Object[] columnValuesArray = toColumnValuesArray(tableFor, abstractMutationEvent.getBefore());
            Object[] columnValuesArray2 = toColumnValuesArray(tableFor, abstractMutationEvent.getAfter());
            LOGGER.trace("Dispatching {} (SCN {}) for table {}", type, streamingEvent.getScn(), tableId);
            this.dispatcher.dispatchDataChangeEvent(this.partition, tableId, new OpenLogReplicatorChangeRecordEmitter(this.connectorConfig, this.partition, this.offsetContext, operation, columnValuesArray, columnValuesArray2, tableFor, this.schema, this.clock));
        }
    }

    private void onSchemaChangeEvent(StreamingEvent streamingEvent, SchemaChangeEvent schemaChangeEvent) throws Exception {
        TableId tableId = schemaChangeEvent.getSchema().getTableId(streamingEvent.getDatabaseName());
        if (tableId.schema() == null || tableId.table().startsWith("OBJ_")) {
            LOGGER.trace("Cannot process DDL due to missing schema: {}", schemaChangeEvent.getSql());
            return;
        }
        if (tableId.table().startsWith("BIN$") && tableId.table().endsWith("==$0")) {
            LOGGER.trace("Skipping DDL for recycling bin table: {}", schemaChangeEvent.getSql());
            return;
        }
        this.offsetContext.setScn(streamingEvent.getCheckpointScn());
        this.offsetContext.setScnIndex(streamingEvent.getCheckpointIndex());
        this.offsetContext.setEventScn(streamingEvent.getCheckpointScn());
        this.offsetContext.setTransactionId(streamingEvent.getXid());
        this.offsetContext.tableEvent(tableId, streamingEvent.getTimestamp());
        String trim = schemaChangeEvent.getSql().toLowerCase().trim();
        if (!isTableSqlStatement(trim)) {
            LOGGER.trace("Skipping internal DDL: {}", schemaChangeEvent.getSql());
            return;
        }
        if (trim.contains("rename constraint ")) {
            LOGGER.trace("Ignoring constraint rename: {}", schemaChangeEvent.getSql());
        } else {
            if (trim.contains("rename to \"bin$")) {
                LOGGER.trace("Ignoring table rename to recycling object: {}", schemaChangeEvent.getSql());
                return;
            }
            updateCheckpoint(streamingEvent);
            LOGGER.trace("Dispatching DDL (SCN {}): [{}]", streamingEvent.getScn(), schemaChangeEvent.getSql());
            this.dispatcher.dispatchSchemaChangeEvent(this.partition, this.offsetContext, tableId, new OracleSchemaChangeEventEmitter(this.connectorConfig, this.partition, this.offsetContext, tableId, tableId.catalog(), tableId.schema(), schemaChangeEvent.getSql(), this.schema, streamingEvent.getTimestamp(), this.streamingMetrics, () -> {
                processTruncateEvent(streamingEvent, schemaChangeEvent);
            }));
        }
    }

    private boolean isTableSqlStatement(String str) {
        return str.startsWith("create table ") || str.startsWith("alter table ") || str.startsWith("drop table ") || str.startsWith("truncate table ");
    }

    private Object[] toColumnValuesArray(Table table, Values values) {
        Object[] objArr = new Object[table.columns().size()];
        if (values != null) {
            try {
                TableId id = table.id();
                for (Column column : table.columns()) {
                    int position = column.position() - 1;
                    Object resolveColumnValue = resolveColumnValue(id, column, values);
                    Logger logger = LOGGER;
                    Object[] objArr2 = new Object[8];
                    objArr2[0] = Integer.valueOf(position);
                    objArr2[1] = column.name();
                    objArr2[2] = Integer.valueOf(column.jdbcType());
                    objArr2[3] = column.typeName();
                    objArr2[4] = Integer.valueOf(column.length());
                    objArr2[5] = column.scale().orElse(0);
                    objArr2[6] = resolveColumnValue;
                    objArr2[7] = resolveColumnValue != null ? resolveColumnValue.getClass() : "<null>";
                    logger.trace("Processing column at {} with name {} [jdbcType={}, type={},length={},scale={}] and value {} ({}).", objArr2);
                    objArr[position] = resolveColumnValue;
                }
            } catch (Exception e) {
                throw new DebeziumException("Failed to create column array values", e);
            }
        }
        return objArr;
    }

    private Optional<Table> potentiallyEmitSchemaChangeForUnknownTable(PayloadEvent.Type type, TableId tableId) throws Exception {
        if (!this.connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) {
            LOGGER.trace("{} for non-captured table {} detected.", type, tableId);
            return Optional.empty();
        }
        LOGGER.warn("Fetching schema for table {}, which should already be loaded. This may indicate a potential error in your configuration.", tableId);
        try {
            this.dispatcher.dispatchSchemaChangeEvent(this.partition, this.offsetContext, tableId, new OracleSchemaChangeEventEmitter(this.connectorConfig, this.partition, this.offsetContext, tableId, tableId.catalog(), tableId.schema(), this.jdbcConnection.getTableMetadataDdl(tableId), this.schema, Instant.now(), this.streamingMetrics, null));
            return Optional.ofNullable(this.schema.tableFor(tableId));
        } catch (OracleConnection.NonRelationalTableException e) {
            LOGGER.warn("Table {} is not a relational table, the {} will be skipped.", tableId, type);
            this.streamingMetrics.incrementWarningCount();
            return Optional.empty();
        }
    }

    private void processTruncateEvent(StreamingEvent streamingEvent, SchemaChangeEvent schemaChangeEvent) throws InterruptedException {
        if (schemaChangeEvent.getSchema() == null) {
            LOGGER.warn("Truncate event ignored, no schema found.");
            return;
        }
        TableId tableId = schemaChangeEvent.getSchema().getTableId(streamingEvent.getDatabaseName());
        if (!this.connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) {
            LOGGER.warn("Truncate event ignored, table is no included.");
            return;
        }
        Table tableFor = this.schema.tableFor(tableId);
        if (tableFor == null) {
            try {
                Optional<Table> potentiallyEmitSchemaChangeForUnknownTable = potentiallyEmitSchemaChangeForUnknownTable(schemaChangeEvent.getType(), tableId);
                if (potentiallyEmitSchemaChangeForUnknownTable.isEmpty()) {
                    LOGGER.warn("Truncate ignored, cannot find table relational model");
                    return;
                }
                tableFor = potentiallyEmitSchemaChangeForUnknownTable.get();
            } catch (Exception e) {
                LOGGER.warn("Truncate ignored, failed to emit schema change", (Throwable) e);
                return;
            }
        }
        this.offsetContext.setScn(streamingEvent.getScn());
        this.offsetContext.setEventScn(streamingEvent.getScn());
        this.offsetContext.setTransactionId(streamingEvent.getXid());
        this.offsetContext.tableEvent(tableId, streamingEvent.getTimestamp());
        updateCheckpoint(streamingEvent);
        LOGGER.trace("Dispatching {} (SCN {}) for table {}", Envelope.Operation.TRUNCATE, streamingEvent.getScn(), tableId);
        this.dispatcher.dispatchDataChangeEvent(this.partition, tableId, new OpenLogReplicatorChangeRecordEmitter(this.connectorConfig, this.partition, this.offsetContext, Envelope.Operation.TRUNCATE, new Object[tableFor.columns().size()], new Object[tableFor.columns().size()], tableFor, this.schema, this.clock));
    }

    private Object resolveColumnValue(TableId tableId, Column column, Values values) {
        Object orDefault = values.getValues().getOrDefault(column.name(), OracleValueConverters.UNAVAILABLE_VALUE);
        if (orDefault == OracleValueConverters.UNAVAILABLE_VALUE) {
            Iterator<Column> it = this.schema.getLobColumnsForTable(tableId).iterator();
            while (it.hasNext()) {
                if (it.next().equals(column)) {
                    return orDefault;
                }
            }
            orDefault = null;
        }
        return orDefault;
    }

    private void updateCheckpoint(StreamingEvent streamingEvent) {
        this.lastCheckpointScn = streamingEvent.getCheckpointScn();
        this.lastCheckpointIndex = streamingEvent.getCheckpointIndex().longValue();
    }
}
