package org.apache.pulsar.io.flume.source;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.lang.Thread;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.flume.FlumeConfig;
import org.apache.pulsar.io.flume.FlumeConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/io/flume/source/AbstractSource.class */
public abstract class AbstractSource<V> extends PushSource<V> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) AbstractSource.class);
    protected Thread thread = null;
    protected volatile boolean running = false;
    protected final Thread.UncaughtExceptionHandler handler = (thread, th) -> {
        log.error("[{}] parse events has an error", thread.getName(), th);
    };

    /* loaded from: input_file:org/apache/pulsar/io/flume/source/AbstractSource$FlumeRecord.class */
    private static class FlumeRecord<V> implements Record<V> {
        private V record;
        private Long id;

        private FlumeRecord() {
        }

        @Override // org.apache.pulsar.functions.api.Record
        public Optional<String> getKey() {
            return Optional.of(Long.toString(this.id.longValue()));
        }

        @Override // org.apache.pulsar.functions.api.Record
        public V getValue() {
            return this.record;
        }

        public V getRecord() {
            return this.record;
        }

        public Long getId() {
            return this.id;
        }

        public void setRecord(V v) {
            this.record = v;
        }

        public void setId(Long l) {
            this.id = l;
        }
    }

    @Override // org.apache.pulsar.io.core.PushSource, org.apache.pulsar.io.core.Source
    public void open(Map<String, Object> map, SourceContext sourceContext) throws Exception {
        new FlumeConnector().StartConnector(FlumeConfig.load(map));
        start();
    }

    public abstract V extractValue(String str);

    protected void start() {
        this.thread = new Thread(this::process);
        this.thread.setName("flume source thread");
        this.thread.setUncaughtExceptionHandler(this.handler);
        this.running = true;
        this.thread.start();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws InterruptedException {
        log.info("close flume source");
        if (this.running) {
            this.running = false;
            if (this.thread != null) {
                this.thread.interrupt();
                this.thread.join();
            }
        }
    }

    protected void process() {
        while (this.running) {
            try {
                log.info("start flume receive from sink process");
                while (this.running) {
                    BlockingQueue queue = SinkOfFlume.getQueue();
                    while (queue != null && !queue.isEmpty()) {
                        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                        ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
                        objectOutputStream.writeObject(((Map) queue.take()).get("body"));
                        objectOutputStream.flush();
                        String str = new String(byteArrayOutputStream.toByteArray());
                        byteArrayOutputStream.close();
                        FlumeRecord flumeRecord = new FlumeRecord();
                        flumeRecord.setRecord(extractValue(str));
                        consume(flumeRecord);
                    }
                }
            } catch (Exception e) {
                log.error("process error!", (Throwable) e);
            }
        }
    }
}
