package org.apache.pulsar.packages.management.storage.bookkeeper;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.LogRecord;
import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.api.DistributedLogManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.class */
public class DLOutputStream {
    private static final Logger log = LoggerFactory.getLogger(DLOutputStream.class);
    private final DistributedLogManager distributedLogManager;
    private final AsyncLogWriter writer;
    private long offset = 0;

    private DLOutputStream(DistributedLogManager distributedLogManager, AsyncLogWriter asyncLogWriter) {
        this.distributedLogManager = distributedLogManager;
        this.writer = asyncLogWriter;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static CompletableFuture<DLOutputStream> openWriterAsync(DistributedLogManager distributedLogManager) {
        log.info("Open a dlog manager");
        return distributedLogManager.openAsyncLogWriter().thenApply(asyncLogWriter -> {
            return new DLOutputStream(distributedLogManager, asyncLogWriter);
        });
    }

    private CompletableFuture<List<LogRecord>> getRecords(InputStream inputStream) {
        CompletableFuture<List<LogRecord>> completableFuture = new CompletableFuture<>();
        CompletableFuture.runAsync(() -> {
            byte[] bArr = new byte[8192];
            ArrayList arrayList = new ArrayList();
            while (true) {
                try {
                    int read = inputStream.read(bArr);
                    if (read == -1) {
                        completableFuture.complete(arrayList);
                        return;
                    }
                    log.info("write something into the ledgers offset: {}, length: {}", Long.valueOf(this.offset), Integer.valueOf(read));
                    ByteBuf copiedBuffer = Unpooled.copiedBuffer(bArr, 0, read);
                    this.offset += copiedBuffer.readableBytes();
                    arrayList.add(new LogRecord(this.offset, copiedBuffer));
                } catch (IOException e) {
                    log.error("Failed to get all records from the input stream", e);
                    completableFuture.completeExceptionally(e);
                    return;
                }
            }
        });
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<DLOutputStream> writeAsync(InputStream inputStream) {
        return getRecords(inputStream).thenCompose(this::writeAsync);
    }

    private CompletableFuture<DLOutputStream> writeAsync(List<LogRecord> list) {
        return this.writer.writeBulk(list).thenApply(list2 -> {
            return this;
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> closeAsync() {
        return this.writer.markEndOfStream().thenCompose(l -> {
            return this.writer.asyncClose();
        }).thenCompose(r3 -> {
            return this.distributedLogManager.asyncClose();
        });
    }
}
