package org.apache.pulsar.functions.api.examples;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-api-examples-2.10.6.0.jar:org/apache/pulsar/functions/api/examples/MergeTopicFunction.class */
public class MergeTopicFunction implements Function<GenericRecord, byte[]> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MergeTopicFunction.class);

    @Override // org.apache.pulsar.functions.api.Function
    public byte[] process(GenericRecord genericRecord, Context context) throws Exception {
        if (!context.getCurrentRecord().getMessage().isPresent()) {
            log.warn("context current record message is not present.");
            return null;
        }
        Message<?> message = context.getCurrentRecord().getMessage().get();
        if (!message.getReaderSchema().isPresent()) {
            log.warn("The reader schema is null.");
            return null;
        }
        log.info("process message with reader schema {}", message.getReaderSchema().get());
        TypedMessageBuilder newOutputMessage = context.newOutputMessage(context.getOutputTopic(), Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()));
        newOutputMessage.value(message.getData()).property("__original_topic", message.getTopicName()).property("__publish_time", String.valueOf(message.getPublishTime())).property("__sequence_id", String.valueOf(message.getSequenceId())).property("__producer_name", message.getProducerName());
        if (message.getKeyBytes() != null) {
            newOutputMessage.keyBytes(message.getKeyBytes());
        }
        if (message.getEventTime() > 0) {
            newOutputMessage.eventTime(message.getEventTime());
        }
        if (!message.getProperties().isEmpty()) {
            newOutputMessage.properties(message.getProperties());
        }
        newOutputMessage.send();
        log.info("send message successfully");
        return null;
    }
}
