package org.apache.pulsar.functions.instance;

import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.ErrorHandler;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LifeCycle;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-functions-instance-2.10.0.2.jar:org/apache/pulsar/functions/instance/LogAppender.class */
public class LogAppender implements Appender {
    private PulsarClient pulsarClient;
    private String logTopic;
    private String fqn;
    private String instance;
    private LifeCycle.State state;
    private ErrorHandler errorHandler;
    private Producer<byte[]> producer;

    public LogAppender(PulsarClient pulsarClient, String str, String str2, String str3) {
        this.pulsarClient = pulsarClient;
        this.logTopic = str;
        this.fqn = str2;
        this.instance = str3;
    }

    @Override // org.apache.logging.log4j.core.Appender
    public void append(LogEvent logEvent) {
        this.producer.newMessage().value(logEvent.getMessage().getFormattedMessage().getBytes(StandardCharsets.UTF_8)).property("loglevel", logEvent.getLevel().name()).property("instance", this.instance).property("fqn", this.fqn).sendAsync();
    }

    @Override // org.apache.logging.log4j.core.Appender
    public String getName() {
        return this.fqn;
    }

    @Override // org.apache.logging.log4j.core.Appender
    public Layout<? extends Serializable> getLayout() {
        return null;
    }

    @Override // org.apache.logging.log4j.core.Appender
    public boolean ignoreExceptions() {
        return false;
    }

    @Override // org.apache.logging.log4j.core.Appender
    public ErrorHandler getHandler() {
        return this.errorHandler;
    }

    @Override // org.apache.logging.log4j.core.Appender
    public void setHandler(ErrorHandler errorHandler) {
        this.errorHandler = errorHandler;
    }

    @Override // org.apache.logging.log4j.core.LifeCycle
    public LifeCycle.State getState() {
        return this.state;
    }

    @Override // org.apache.logging.log4j.core.LifeCycle
    public void initialize() {
        this.state = LifeCycle.State.INITIALIZED;
    }

    @Override // org.apache.logging.log4j.core.LifeCycle
    public void start() {
        this.state = LifeCycle.State.STARTING;
        try {
            this.producer = this.pulsarClient.newProducer().topic(this.logTopic).blockIfQueueFull(false).enableBatching(true).compressionType(CompressionType.LZ4).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).property("function", this.fqn).create();
            this.state = LifeCycle.State.STARTED;
        } catch (Exception e) {
            throw new RuntimeException("Error starting LogTopic Producer for function " + this.fqn, e);
        }
    }

    @Override // org.apache.logging.log4j.core.LifeCycle
    public void stop() {
        this.state = LifeCycle.State.STOPPING;
        if (this.producer != null) {
            this.producer.closeAsync();
            this.producer = null;
        }
        this.state = LifeCycle.State.STOPPED;
    }

    @Override // org.apache.logging.log4j.core.LifeCycle
    public boolean isStarted() {
        return this.state == LifeCycle.State.STARTED;
    }

    @Override // org.apache.logging.log4j.core.LifeCycle
    public boolean isStopped() {
        return this.state == LifeCycle.State.STOPPED;
    }
}
