package io.debezium.pipeline;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.data.Envelope;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.heartbeat.HeartbeatFactory;
import io.debezium.pipeline.signal.Signal;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.ChangeEventCreator;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
import io.debezium.pipeline.txmetadata.TransactionMonitor;
import io.debezium.relational.history.ConnectTableChangeSerializer;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.schema.DataCollectionFilters;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DataCollectionSchema;
import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.HistorizedDatabaseSchema;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/debezium-core-1.9.5.Final.jar:io/debezium/pipeline/EventDispatcher.class */
public class EventDispatcher<P extends Partition, T extends DataCollectionId> implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) EventDispatcher.class);
    private final TopicSelector<T> topicSelector;
    private final DatabaseSchema<T> schema;
    private final HistorizedDatabaseSchema<T> historizedSchema;
    private final ChangeEventQueue<DataChangeEvent> queue;
    private final DataCollectionFilters.DataCollectionFilter<T> filter;
    private final ChangeEventCreator changeEventCreator;
    private final Heartbeat heartbeat;
    private DataChangeEventListener<P> eventListener;
    private final boolean emitTombstonesOnDelete;
    private final InconsistentSchemaHandler<P, T> inconsistentSchemaHandler;
    private final TransactionMonitor transactionMonitor;
    private final CommonConnectorConfig connectorConfig;
    private final EnumSet<Envelope.Operation> skippedOperations;
    private final boolean neverSkip;
    private final Schema schemaChangeKeySchema;
    private final Schema schemaChangeValueSchema;
    private final ConnectTableChangeSerializer tableChangesSerializer;
    private final Signal<P> signal;
    private IncrementalSnapshotChangeEventSource<P, T> incrementalSnapshotChangeEventSource;
    private final EventDispatcher<P, T>.StreamingChangeRecordReceiver streamingReceiver;

    /* loaded from: input_file:META-INF/bundled-dependencies/debezium-core-1.9.5.Final.jar:io/debezium/pipeline/EventDispatcher$BufferingSnapshotChangeRecordReceiver.class */
    private final class BufferingSnapshotChangeRecordReceiver implements SnapshotReceiver<P> {
        private Supplier<DataChangeEvent> bufferedEvent;

        private BufferingSnapshotChangeRecordReceiver() {
        }

        @Override // io.debezium.pipeline.spi.ChangeRecordEmitter.Receiver
        public void changeRecord(P p, DataCollectionSchema dataCollectionSchema, Envelope.Operation operation, Object obj, Struct struct, OffsetContext offsetContext, ConnectHeaders connectHeaders) throws InterruptedException {
            Objects.requireNonNull(struct, "value must not be null");
            EventDispatcher.LOGGER.trace("Received change record for {} operation on key {}", operation, obj);
            if (this.bufferedEvent != null) {
                EventDispatcher.this.queue.enqueue(this.bufferedEvent.get());
            }
            Schema keySchema = dataCollectionSchema.keySchema();
            String str = EventDispatcher.this.topicSelector.topicNameFor(dataCollectionSchema.id());
            this.bufferedEvent = () -> {
                return EventDispatcher.this.changeEventCreator.createDataChangeEvent(new SourceRecord(p.getSourcePartition(), offsetContext.getOffset(), str, null, keySchema, obj, dataCollectionSchema.getEnvelopeSchema().schema(), struct, null, connectHeaders));
            };
        }

        @Override // io.debezium.pipeline.EventDispatcher.SnapshotReceiver
        public void completeSnapshot() throws InterruptedException {
            if (this.bufferedEvent != null) {
                DataChangeEvent dataChangeEvent = this.bufferedEvent.get();
                Struct struct = (Struct) dataChangeEvent.getRecord().value();
                if (struct.schema().field("source") != null) {
                    Struct struct2 = struct.getStruct("source");
                    if (SnapshotRecord.fromSource(struct2) == SnapshotRecord.TRUE) {
                        SnapshotRecord.LAST.toSource(struct2);
                    }
                }
                EventDispatcher.this.queue.enqueue(dataChangeEvent);
                this.bufferedEvent = null;
            }
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:META-INF/bundled-dependencies/debezium-core-1.9.5.Final.jar:io/debezium/pipeline/EventDispatcher$InconsistentSchemaHandler.class */
    public interface InconsistentSchemaHandler<P extends Partition, T extends DataCollectionId> {
        Optional<DataCollectionSchema> handle(P p, T t, ChangeRecordEmitter changeRecordEmitter);
    }

    /* loaded from: input_file:META-INF/bundled-dependencies/debezium-core-1.9.5.Final.jar:io/debezium/pipeline/EventDispatcher$IncrementalSnapshotChangeRecordReceiver.class */
    private final class IncrementalSnapshotChangeRecordReceiver implements SnapshotReceiver<P> {
        public final DataChangeEventListener<P> dataListener;

        public IncrementalSnapshotChangeRecordReceiver(DataChangeEventListener<P> dataChangeEventListener) {
            this.dataListener = dataChangeEventListener;
        }

        @Override // io.debezium.pipeline.spi.ChangeRecordEmitter.Receiver
        public void changeRecord(P p, DataCollectionSchema dataCollectionSchema, Envelope.Operation operation, Object obj, Struct struct, OffsetContext offsetContext, ConnectHeaders connectHeaders) throws InterruptedException {
            Objects.requireNonNull(struct, "value must not be null");
            EventDispatcher.LOGGER.trace("Received change record for {} operation on key {}", operation, obj);
            Schema keySchema = dataCollectionSchema.keySchema();
            SourceRecord sourceRecord = new SourceRecord(p.getSourcePartition(), offsetContext.getOffset(), EventDispatcher.this.topicSelector.topicNameFor(dataCollectionSchema.id()), null, keySchema, obj, dataCollectionSchema.getEnvelopeSchema().schema(), struct, null, connectHeaders);
            this.dataListener.onEvent(p, dataCollectionSchema.id(), offsetContext, keySchema, struct, operation);
            EventDispatcher.this.queue.enqueue(EventDispatcher.this.changeEventCreator.createDataChangeEvent(sourceRecord));
        }

        @Override // io.debezium.pipeline.EventDispatcher.SnapshotReceiver
        public void completeSnapshot() throws InterruptedException {
        }
    }

    /* loaded from: input_file:META-INF/bundled-dependencies/debezium-core-1.9.5.Final.jar:io/debezium/pipeline/EventDispatcher$SchemaChangeEventReceiver.class */
    private final class SchemaChangeEventReceiver implements SchemaChangeEventEmitter.Receiver {
        private SchemaChangeEventReceiver() {
        }

        private Struct schemaChangeRecordKey(SchemaChangeEvent schemaChangeEvent) {
            Struct struct = new Struct(EventDispatcher.this.schemaChangeKeySchema);
            struct.put(HistoryRecord.Fields.DATABASE_NAME, schemaChangeEvent.getDatabase());
            return struct;
        }

        private Struct schemaChangeRecordValue(SchemaChangeEvent schemaChangeEvent) {
            Struct struct = new Struct(EventDispatcher.this.schemaChangeValueSchema);
            struct.put("source", schemaChangeEvent.getSource());
            struct.put(HistoryRecord.Fields.DATABASE_NAME, schemaChangeEvent.getDatabase());
            struct.put(HistoryRecord.Fields.SCHEMA_NAME, schemaChangeEvent.getSchema());
            struct.put(HistoryRecord.Fields.DDL_STATEMENTS, schemaChangeEvent.getDdl());
            struct.put(HistoryRecord.Fields.TABLE_CHANGES, EventDispatcher.this.tableChangesSerializer.serialize(schemaChangeEvent.getTableChanges()));
            return struct;
        }

        @Override // io.debezium.pipeline.spi.SchemaChangeEventEmitter.Receiver
        public void schemaChangeEvent(SchemaChangeEvent schemaChangeEvent) throws InterruptedException {
            EventDispatcher.this.historizedSchema.applySchemaChange(schemaChangeEvent);
            if (EventDispatcher.this.connectorConfig.isSchemaChangesHistoryEnabled()) {
                EventDispatcher.this.enqueueSchemaChangeMessage(new SourceRecord(schemaChangeEvent.getPartition(), schemaChangeEvent.getOffset(), EventDispatcher.this.topicSelector.getPrimaryTopic(), 0, EventDispatcher.this.schemaChangeKeySchema, schemaChangeRecordKey(schemaChangeEvent), EventDispatcher.this.schemaChangeValueSchema, schemaChangeRecordValue(schemaChangeEvent)));
            }
        }
    }

    /* loaded from: input_file:META-INF/bundled-dependencies/debezium-core-1.9.5.Final.jar:io/debezium/pipeline/EventDispatcher$SnapshotReceiver.class */
    public interface SnapshotReceiver<P extends Partition> extends ChangeRecordEmitter.Receiver<P> {
        void completeSnapshot() throws InterruptedException;
    }

    /* loaded from: input_file:META-INF/bundled-dependencies/debezium-core-1.9.5.Final.jar:io/debezium/pipeline/EventDispatcher$StreamingChangeRecordReceiver.class */
    private final class StreamingChangeRecordReceiver implements ChangeRecordEmitter.Receiver<P> {
        private StreamingChangeRecordReceiver() {
        }

        @Override // io.debezium.pipeline.spi.ChangeRecordEmitter.Receiver
        public void changeRecord(P p, DataCollectionSchema dataCollectionSchema, Envelope.Operation operation, Object obj, Struct struct, OffsetContext offsetContext, ConnectHeaders connectHeaders) throws InterruptedException {
            Objects.requireNonNull(struct, "value must not be null");
            EventDispatcher.LOGGER.trace("Received change record for {} operation on key {}", operation, obj);
            SourceRecord sourceRecord = new SourceRecord(p.getSourcePartition(), offsetContext.getOffset(), EventDispatcher.this.topicSelector.topicNameFor(dataCollectionSchema.id()), null, (obj == null && operation == Envelope.Operation.TRUNCATE) ? null : dataCollectionSchema.keySchema(), obj, dataCollectionSchema.getEnvelopeSchema().schema(), struct, null, connectHeaders);
            EventDispatcher.this.queue.enqueue(EventDispatcher.this.changeEventCreator.createDataChangeEvent(sourceRecord));
            if (EventDispatcher.this.emitTombstonesOnDelete && operation == Envelope.Operation.DELETE) {
                EventDispatcher.this.queue.enqueue(EventDispatcher.this.changeEventCreator.createDataChangeEvent(sourceRecord.newRecord(sourceRecord.topic(), sourceRecord.kafkaPartition(), sourceRecord.keySchema(), sourceRecord.key(), (Schema) null, (Object) null, sourceRecord.timestamp(), (Iterable<Header>) sourceRecord.headers())));
            }
        }
    }

    public EventDispatcher(CommonConnectorConfig commonConnectorConfig, TopicSelector<T> topicSelector, DatabaseSchema<T> databaseSchema, ChangeEventQueue<DataChangeEvent> changeEventQueue, DataCollectionFilters.DataCollectionFilter<T> dataCollectionFilter, ChangeEventCreator changeEventCreator, EventMetadataProvider eventMetadataProvider, SchemaNameAdjuster schemaNameAdjuster) {
        this(commonConnectorConfig, topicSelector, databaseSchema, changeEventQueue, dataCollectionFilter, changeEventCreator, null, eventMetadataProvider, new HeartbeatFactory(commonConnectorConfig, topicSelector, schemaNameAdjuster), schemaNameAdjuster);
    }

    public EventDispatcher(CommonConnectorConfig commonConnectorConfig, TopicSelector<T> topicSelector, DatabaseSchema<T> databaseSchema, ChangeEventQueue<DataChangeEvent> changeEventQueue, DataCollectionFilters.DataCollectionFilter<T> dataCollectionFilter, ChangeEventCreator changeEventCreator, EventMetadataProvider eventMetadataProvider, HeartbeatFactory<T> heartbeatFactory, SchemaNameAdjuster schemaNameAdjuster) {
        this(commonConnectorConfig, topicSelector, databaseSchema, changeEventQueue, dataCollectionFilter, changeEventCreator, null, eventMetadataProvider, heartbeatFactory, schemaNameAdjuster);
    }

    public EventDispatcher(CommonConnectorConfig commonConnectorConfig, TopicSelector<T> topicSelector, DatabaseSchema<T> databaseSchema, ChangeEventQueue<DataChangeEvent> changeEventQueue, DataCollectionFilters.DataCollectionFilter<T> dataCollectionFilter, ChangeEventCreator changeEventCreator, InconsistentSchemaHandler<P, T> inconsistentSchemaHandler, EventMetadataProvider eventMetadataProvider, HeartbeatFactory<T> heartbeatFactory, SchemaNameAdjuster schemaNameAdjuster) {
        this.eventListener = DataChangeEventListener.NO_OP();
        this.tableChangesSerializer = new ConnectTableChangeSerializer(schemaNameAdjuster);
        this.connectorConfig = commonConnectorConfig;
        this.topicSelector = topicSelector;
        this.schema = databaseSchema;
        this.historizedSchema = databaseSchema.isHistorized() ? (HistorizedDatabaseSchema) databaseSchema : null;
        this.queue = changeEventQueue;
        this.filter = dataCollectionFilter;
        this.changeEventCreator = changeEventCreator;
        this.streamingReceiver = new StreamingChangeRecordReceiver();
        this.emitTombstonesOnDelete = commonConnectorConfig.isEmitTombstoneOnDelete();
        this.inconsistentSchemaHandler = inconsistentSchemaHandler != null ? inconsistentSchemaHandler : this::errorOnMissingSchema;
        this.skippedOperations = commonConnectorConfig.getSkippedOperations();
        this.neverSkip = commonConnectorConfig.supportsOperationFiltering() || this.skippedOperations.isEmpty();
        this.transactionMonitor = new TransactionMonitor(commonConnectorConfig, eventMetadataProvider, schemaNameAdjuster, this::enqueueTransactionMessage);
        this.signal = new Signal<>(commonConnectorConfig, this);
        this.heartbeat = heartbeatFactory.createHeartbeat();
        this.schemaChangeKeySchema = SchemaBuilder.struct().name(schemaNameAdjuster.adjust("io.debezium.connector." + commonConnectorConfig.getConnectorName() + ".SchemaChangeKey")).field(HistoryRecord.Fields.DATABASE_NAME, Schema.STRING_SCHEMA).build();
        this.schemaChangeValueSchema = SchemaBuilder.struct().name(schemaNameAdjuster.adjust("io.debezium.connector." + commonConnectorConfig.getConnectorName() + ".SchemaChangeValue")).field("source", commonConnectorConfig.getSourceInfoStructMaker().schema()).field(HistoryRecord.Fields.DATABASE_NAME, Schema.OPTIONAL_STRING_SCHEMA).field(HistoryRecord.Fields.SCHEMA_NAME, Schema.OPTIONAL_STRING_SCHEMA).field(HistoryRecord.Fields.DDL_STATEMENTS, Schema.OPTIONAL_STRING_SCHEMA).field(HistoryRecord.Fields.TABLE_CHANGES, SchemaBuilder.array(this.tableChangesSerializer.getChangeSchema()).build()).build();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void dispatchSnapshotEvent(P p, T t, ChangeRecordEmitter<P> changeRecordEmitter, final SnapshotReceiver<P> snapshotReceiver) throws InterruptedException {
        final DataCollectionSchema schemaFor = this.schema.schemaFor(t);
        if (schemaFor == null) {
            errorOnMissingSchema(p, t, changeRecordEmitter);
        }
        changeRecordEmitter.emitChangeRecords(schemaFor, new ChangeRecordEmitter.Receiver<P>() { // from class: io.debezium.pipeline.EventDispatcher.1
            @Override // io.debezium.pipeline.spi.ChangeRecordEmitter.Receiver
            public void changeRecord(P p2, DataCollectionSchema dataCollectionSchema, Envelope.Operation operation, Object obj, Struct struct, OffsetContext offsetContext, ConnectHeaders connectHeaders) throws InterruptedException {
                EventDispatcher.this.eventListener.onEvent(p2, schemaFor.id(), offsetContext, obj, struct, operation);
                snapshotReceiver.changeRecord(p2, schemaFor, operation, obj, struct, offsetContext, connectHeaders);
            }
        });
    }

    public SnapshotReceiver<P> getSnapshotChangeEventReceiver() {
        return new BufferingSnapshotChangeRecordReceiver();
    }

    public SnapshotReceiver<P> getIncrementalSnapshotChangeEventReceiver(DataChangeEventListener<P> dataChangeEventListener) {
        return new IncrementalSnapshotChangeRecordReceiver(dataChangeEventListener);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public boolean dispatchDataChangeEvent(P p, final T t, ChangeRecordEmitter<P> changeRecordEmitter) throws InterruptedException {
        try {
            boolean z = false;
            if (this.filter.isIncluded(t)) {
                DataCollectionSchema schemaFor = this.schema.schemaFor(t);
                if (schemaFor == null) {
                    Optional<DataCollectionSchema> handle = this.inconsistentSchemaHandler.handle(p, t, changeRecordEmitter);
                    if (!handle.isPresent()) {
                        return false;
                    }
                    schemaFor = handle.get();
                }
                changeRecordEmitter.emitChangeRecords(schemaFor, new ChangeRecordEmitter.Receiver<P>() { // from class: io.debezium.pipeline.EventDispatcher.2
                    @Override // io.debezium.pipeline.spi.ChangeRecordEmitter.Receiver
                    public void changeRecord(P p2, DataCollectionSchema dataCollectionSchema, Envelope.Operation operation, Object obj, Struct struct, OffsetContext offsetContext, ConnectHeaders connectHeaders) throws InterruptedException {
                        if (operation == Envelope.Operation.CREATE && EventDispatcher.this.signal.isSignal(t)) {
                            EventDispatcher.this.signal.process(p2, struct, offsetContext);
                        }
                        if (EventDispatcher.this.neverSkip || !EventDispatcher.this.skippedOperations.contains(operation)) {
                            EventDispatcher.this.transactionMonitor.dataEvent(p2, t, offsetContext, obj, struct);
                            EventDispatcher.this.eventListener.onEvent(p2, t, offsetContext, obj, struct, operation);
                            if (EventDispatcher.this.incrementalSnapshotChangeEventSource != null) {
                                EventDispatcher.this.incrementalSnapshotChangeEventSource.processMessage(p2, t, obj, offsetContext);
                            }
                            EventDispatcher.this.streamingReceiver.changeRecord(p2, dataCollectionSchema, operation, obj, struct, offsetContext, connectHeaders);
                        }
                    }
                });
                z = true;
            } else {
                LOGGER.trace("Filtered data change event for {}", t);
                this.eventListener.onFilteredEvent(p, "source = " + t, changeRecordEmitter.getOperation());
                dispatchFilteredEvent(changeRecordEmitter.getPartition(), changeRecordEmitter.getOffset());
            }
            this.heartbeat.heartbeat(changeRecordEmitter.getPartition().getSourcePartition(), changeRecordEmitter.getOffset().getOffset(), this::enqueueHeartbeat);
            return z;
        } catch (Exception e) {
            switch (this.connectorConfig.getEventProcessingFailureHandlingMode()) {
                case FAIL:
                    throw new ConnectException("Error while processing event at offset " + changeRecordEmitter.getOffset().getOffset(), e);
                case WARN:
                    LOGGER.warn("Error while processing event at offset {}", changeRecordEmitter.getOffset().getOffset());
                    return false;
                case SKIP:
                    LOGGER.debug("Error while processing event at offset {}", changeRecordEmitter.getOffset().getOffset());
                    return false;
                default:
                    return false;
            }
        }
    }

    public void dispatchFilteredEvent(P p, OffsetContext offsetContext) throws InterruptedException {
        if (this.incrementalSnapshotChangeEventSource != null) {
            this.incrementalSnapshotChangeEventSource.processFilteredEvent(p, offsetContext);
        }
    }

    public void dispatchTransactionCommittedEvent(P p, OffsetContext offsetContext) throws InterruptedException {
        this.transactionMonitor.transactionComittedEvent(p, offsetContext);
        if (this.incrementalSnapshotChangeEventSource != null) {
            this.incrementalSnapshotChangeEventSource.processTransactionCommittedEvent(p, offsetContext);
        }
    }

    public void dispatchTransactionStartedEvent(P p, String str, OffsetContext offsetContext) throws InterruptedException {
        this.transactionMonitor.transactionStartedEvent(p, str, offsetContext);
        if (this.incrementalSnapshotChangeEventSource != null) {
            this.incrementalSnapshotChangeEventSource.processTransactionStartedEvent(p, offsetContext);
        }
    }

    public void dispatchConnectorEvent(P p, ConnectorEvent connectorEvent) {
        this.eventListener.onConnectorEvent(p, connectorEvent);
    }

    public Optional<DataCollectionSchema> errorOnMissingSchema(P p, T t, ChangeRecordEmitter<P> changeRecordEmitter) {
        this.eventListener.onErroneousEvent(p, "source = " + t, changeRecordEmitter.getOperation());
        throw new IllegalArgumentException("No metadata registered for captured table " + t);
    }

    public Optional<DataCollectionSchema> ignoreMissingSchema(T t, ChangeRecordEmitter<P> changeRecordEmitter) {
        return Optional.empty();
    }

    public void dispatchSchemaChangeEvent(P p, T t, SchemaChangeEventEmitter schemaChangeEventEmitter) throws InterruptedException {
        if (t != null && !this.filter.isIncluded(t) && (this.historizedSchema == null || this.historizedSchema.storeOnlyCapturedTables())) {
            LOGGER.trace("Filtering schema change event for {}", t);
            return;
        }
        schemaChangeEventEmitter.emitSchemaChangeEvent(new SchemaChangeEventReceiver());
        if (this.incrementalSnapshotChangeEventSource != null) {
            this.incrementalSnapshotChangeEventSource.processSchemaChange(p, t);
        }
    }

    public void dispatchSchemaChangeEvent(Collection<T> collection, SchemaChangeEventEmitter schemaChangeEventEmitter) throws InterruptedException {
        boolean z = false;
        if (collection != null && !collection.isEmpty()) {
            Iterator<T> it = collection.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                if (this.filter.isIncluded(it.next())) {
                    z = true;
                    break;
                }
            }
        } else {
            z = true;
        }
        if (z || !(this.historizedSchema == null || this.historizedSchema.storeOnlyCapturedTables())) {
            schemaChangeEventEmitter.emitSchemaChangeEvent(new SchemaChangeEventReceiver());
        } else {
            LOGGER.trace("Filtering schema change event for {}", collection);
        }
    }

    public void alwaysDispatchHeartbeatEvent(P p, OffsetContext offsetContext) throws InterruptedException {
        this.heartbeat.forcedBeat(p.getSourcePartition(), offsetContext.getOffset(), this::enqueueHeartbeat);
    }

    public void dispatchHeartbeatEvent(P p, OffsetContext offsetContext) throws InterruptedException {
        this.heartbeat.heartbeat(p.getSourcePartition(), offsetContext.getOffset(), this::enqueueHeartbeat);
    }

    public boolean heartbeatsEnabled() {
        return this.heartbeat.isEnabled();
    }

    private void enqueueHeartbeat(SourceRecord sourceRecord) throws InterruptedException {
        this.queue.enqueue(new DataChangeEvent(sourceRecord));
    }

    private void enqueueTransactionMessage(SourceRecord sourceRecord) throws InterruptedException {
        this.queue.enqueue(new DataChangeEvent(sourceRecord));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void enqueueSchemaChangeMessage(SourceRecord sourceRecord) throws InterruptedException {
        this.queue.enqueue(new DataChangeEvent(sourceRecord));
    }

    public void dispatchServerHeartbeatEvent(P p, OffsetContext offsetContext) throws InterruptedException {
        if (this.incrementalSnapshotChangeEventSource != null) {
            this.incrementalSnapshotChangeEventSource.processHeartbeat(p, offsetContext);
        }
    }

    public void setEventListener(DataChangeEventListener<P> dataChangeEventListener) {
        this.eventListener = dataChangeEventListener;
    }

    public void setIncrementalSnapshotChangeEventSource(Optional<IncrementalSnapshotChangeEventSource<P, ? extends DataCollectionId>> optional) {
        this.incrementalSnapshotChangeEventSource = optional.orElse(null);
    }

    public DatabaseSchema<T> getSchema() {
        return this.schema;
    }

    public HistorizedDatabaseSchema<T> getHistorizedSchema() {
        return this.historizedSchema;
    }

    public IncrementalSnapshotChangeEventSource<P, T> getIncrementalSnapshotChangeEventSource() {
        return this.incrementalSnapshotChangeEventSource;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (heartbeatsEnabled()) {
            this.heartbeat.close();
        }
    }
}
