package io.debezium.pipeline.signal.channels;

import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.pipeline.signal.SignalRecord;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:META-INF/bundled-dependencies/debezium-core-2.6.1.Final.jar:io/debezium/pipeline/signal/channels/SourceSignalChannel.class */
public class SourceSignalChannel implements SignalChannelReader {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) SourceSignalChannel.class);
    public static final String CHANNEL_NAME = "source";
    public final Queue<SignalRecord> signals = new ConcurrentLinkedQueue();
    public CommonConnectorConfig connectorConfig;

    @Override // io.debezium.pipeline.signal.channels.SignalChannelReader
    public String name() {
        return "source";
    }

    @Override // io.debezium.pipeline.signal.channels.SignalChannelReader
    public void init(CommonConnectorConfig commonConnectorConfig) {
        this.connectorConfig = commonConnectorConfig;
    }

    @Override // io.debezium.pipeline.signal.channels.SignalChannelReader
    public List<SignalRecord> read() {
        LOGGER.trace("Reading signaling events from queue");
        SignalRecord poll = this.signals.poll();
        return poll == null ? List.of() : List.of(poll);
    }

    @Override // io.debezium.pipeline.signal.channels.SignalChannelReader
    public void close() {
        this.signals.clear();
    }

    public boolean process(Struct struct) throws InterruptedException {
        LOGGER.trace("Received event from signaling table. Enqueue for process");
        try {
            Optional<SignalRecord> buildSignalRecordFromChangeEventSource = SignalRecord.buildSignalRecordFromChangeEventSource(struct, this.connectorConfig);
            if (buildSignalRecordFromChangeEventSource.isEmpty()) {
                return false;
            }
            this.signals.add(buildSignalRecordFromChangeEventSource.get());
            return true;
        } catch (Exception e) {
            LOGGER.warn("Exception while preparing to process the signal '{}'", struct, e);
            return false;
        }
    }
}
