package io.debezium.connector.postgresql;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.postgresql.connection.LogicalDecodingMessage;
import io.debezium.data.Envelope;
import io.debezium.function.BlockingConsumer;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.util.HexConverter;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

/* loaded from: input_file:META-INF/bundled-dependencies/debezium-connector-postgres-2.5.4.Final.jar:io/debezium/connector/postgresql/LogicalDecodingMessageMonitor.class */
public class LogicalDecodingMessageMonitor {
    public static final String LOGICAL_DECODING_MESSAGE_TOPIC_SUFFIX = ".message";
    public static final String DEBEZIUM_LOGICAL_DECODING_MESSAGE_KEY = "message";
    public static final String DEBEZIUM_LOGICAL_DECODING_MESSAGE_PREFIX_KEY = "prefix";
    public static final String DEBEZIUM_LOGICAL_DECODING_MESSAGE_CONTENT_KEY = "content";
    private final SchemaNameAdjuster schemaNameAdjuster;
    private final BlockingConsumer<SourceRecord> sender;
    private final String topicName;
    private final CommonConnectorConfig.BinaryHandlingMode binaryMode;
    private final Base64.Encoder base64Encoder = Base64.getEncoder();
    private final Base64.Encoder base64UrlSafeEncoder = Base64.getUrlEncoder();
    private final Schema keySchema;
    private final Schema blockSchema;
    private final Schema valueSchema;

    public LogicalDecodingMessageMonitor(PostgresConnectorConfig postgresConnectorConfig, BlockingConsumer<SourceRecord> blockingConsumer) {
        this.schemaNameAdjuster = postgresConnectorConfig.schemaNameAdjuster();
        this.sender = blockingConsumer;
        this.topicName = postgresConnectorConfig.getLogicalName() + ".message";
        this.binaryMode = postgresConnectorConfig.binaryHandlingMode();
        this.keySchema = PostgresSchemaFactory.get().logicalDecodingMessageMonitorKeySchema(this.schemaNameAdjuster);
        this.blockSchema = PostgresSchemaFactory.get().logicalDecodingMessageMonitorBlockSchema(this.schemaNameAdjuster, this.binaryMode);
        this.valueSchema = PostgresSchemaFactory.get().logicalDecodingMessageMonitorValueSchema(this.schemaNameAdjuster, postgresConnectorConfig, this.binaryMode);
    }

    public void logicalDecodingMessageEvent(Partition partition, OffsetContext offsetContext, Long l, LogicalDecodingMessage logicalDecodingMessage) throws InterruptedException {
        Struct struct = new Struct(this.blockSchema);
        struct.put(DEBEZIUM_LOGICAL_DECODING_MESSAGE_PREFIX_KEY, logicalDecodingMessage.getPrefix());
        struct.put(DEBEZIUM_LOGICAL_DECODING_MESSAGE_CONTENT_KEY, convertContent(logicalDecodingMessage.getContent()));
        Struct struct2 = new Struct(this.keySchema);
        struct2.put(DEBEZIUM_LOGICAL_DECODING_MESSAGE_PREFIX_KEY, logicalDecodingMessage.getPrefix());
        Struct struct3 = new Struct(this.valueSchema);
        struct3.put(Envelope.FieldName.OPERATION, Envelope.Operation.MESSAGE.code());
        struct3.put("ts_ms", l);
        struct3.put(DEBEZIUM_LOGICAL_DECODING_MESSAGE_KEY, struct);
        struct3.put("source", offsetContext.getSourceInfo());
        this.sender.accept(new SourceRecord(partition.getSourcePartition(), offsetContext.getOffset(), this.topicName, this.keySchema, struct2, struct3.schema(), struct3));
        if (logicalDecodingMessage.isLastEventForLsn()) {
            offsetContext.getTransactionContext().endTransaction();
        }
    }

    private Object convertContent(byte[] bArr) {
        switch (this.binaryMode) {
            case BASE64:
                return new String(this.base64Encoder.encode(bArr), StandardCharsets.UTF_8);
            case BASE64_URL_SAFE:
                return new String(this.base64UrlSafeEncoder.encode(bArr), StandardCharsets.UTF_8);
            case HEX:
                return HexConverter.convertToHexString(bArr);
            case BYTES:
                return ByteBuffer.wrap(bArr);
            default:
                return ByteBuffer.wrap(bArr);
        }
    }
}
