package org.apache.pulsar.io.flume.sink;

import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.flume.FlumeConfig;
import org.apache.pulsar.io.flume.FlumeConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/io/flume/sink/AbstractSink.class */
public abstract class AbstractSink<T> implements Sink<T> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) AbstractSink.class);
    protected static BlockingQueue<Object> records;
    protected FlumeConnector flumeConnector;

    public abstract T extractValue(Record<T> record);

    public static BlockingQueue<Object> getQueue() {
        return records;
    }

    @Override // org.apache.pulsar.io.core.Sink
    public void open(Map<String, Object> map, SinkContext sinkContext) throws Exception {
        records = new LinkedBlockingQueue();
        FlumeConfig load = FlumeConfig.load(map);
        this.flumeConnector = new FlumeConnector();
        this.flumeConnector.startConnector(load);
    }

    @Override // org.apache.pulsar.io.core.Sink
    public void write(Record<T> record) {
        try {
            records.put(extractValue(record));
            record.ack();
        } catch (InterruptedException e) {
            record.fail();
            log.error("error", (Throwable) e);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.flumeConnector != null) {
            this.flumeConnector.stop();
        }
    }
}
