package org.apache.bookkeeper.mledger.offload.jcloud.impl;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.bookkeeper.bookie.BookKeeperServerStats;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.BlobStoreLocation;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.domain.BlobBuilder;
import org.jclouds.blobstore.domain.MultipartUpload;
import org.jclouds.blobstore.options.PutOptions;
import org.jclouds.domain.Location;
import org.jclouds.domain.LocationBuilder;
import org.jclouds.domain.LocationScope;
import org.jclouds.io.Payloads;
import org.jclouds.io.payloads.InputStreamPayload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.class */
public class BlobStoreManagedLedgerOffloader implements LedgerOffloader {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) BlobStoreManagedLedgerOffloader.class);
    private final OrderedScheduler scheduler;
    private final TieredStorageConfiguration config;
    private final Location writeLocation;
    private final Map<String, String> userMetadata;
    private final ConcurrentMap<BlobStoreLocation, BlobStore> blobStores = new ConcurrentHashMap();

    public static BlobStoreManagedLedgerOffloader create(TieredStorageConfiguration tieredStorageConfiguration, Map<String, String> map, OrderedScheduler orderedScheduler) throws IOException {
        return new BlobStoreManagedLedgerOffloader(tieredStorageConfiguration, orderedScheduler, map);
    }

    BlobStoreManagedLedgerOffloader(TieredStorageConfiguration tieredStorageConfiguration, OrderedScheduler orderedScheduler, Map<String, String> map) {
        this.scheduler = orderedScheduler;
        this.userMetadata = map;
        this.config = tieredStorageConfiguration;
        if (Strings.isNullOrEmpty(tieredStorageConfiguration.getRegion())) {
            this.writeLocation = null;
        } else {
            this.writeLocation = new LocationBuilder().scope(LocationScope.REGION).id(tieredStorageConfiguration.getRegion()).description(tieredStorageConfiguration.getRegion()).build();
        }
        log.info("Constructor offload driver: {}, host: {}, container: {}, region: {} ", tieredStorageConfiguration.getProvider().getDriver(), tieredStorageConfiguration.getServiceEndpoint(), tieredStorageConfiguration.getBucket(), tieredStorageConfiguration.getRegion());
        this.blobStores.putIfAbsent(tieredStorageConfiguration.getBlobStoreLocation(), tieredStorageConfiguration.getBlobStore());
        log.info("The ledger offloader was created.");
    }

    @Override // org.apache.bookkeeper.mledger.LedgerOffloader
    public String getOffloadDriverName() {
        return this.config.getDriver();
    }

    @Override // org.apache.bookkeeper.mledger.LedgerOffloader
    public Map<String, String> getOffloadDriverMetadata() {
        return this.config.getOffloadDriverMetadata();
    }

    @Override // org.apache.bookkeeper.mledger.LedgerOffloader
    public CompletableFuture<Void> offload(ReadHandle readHandle, UUID uuid, Map<String, String> map) {
        BlobStore blobStore = this.blobStores.get(this.config.getBlobStoreLocation());
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.scheduler.chooseThread(readHandle.getId()).submit(() -> {
            if (readHandle.getLength() == 0 || !readHandle.isClosed() || readHandle.getLastAddConfirmed() < 0) {
                completableFuture.completeExceptionally(new IllegalArgumentException("An empty or open ledger should never be offloaded"));
                return;
            }
            OffloadIndexBlockBuilder withDataBlockHeaderLength = OffloadIndexBlockBuilder.create().withLedgerMetadata(readHandle.getLedgerMetadata()).withDataBlockHeaderLength(BlockAwareSegmentInputStreamImpl.getHeaderSize());
            String dataBlockOffloadKey = DataBlockUtils.dataBlockOffloadKey(readHandle.getId(), uuid);
            String indexBlockOffloadKey = DataBlockUtils.indexBlockOffloadKey(readHandle.getId(), uuid);
            log.info("ledger {} dataBlockKey {} indexBlockKey {}", Long.valueOf(readHandle.getId()), dataBlockOffloadKey, indexBlockOffloadKey);
            ArrayList newArrayList = Lists.newArrayList();
            try {
                BlobBuilder blobBuilder = blobStore.blobBuilder(dataBlockOffloadKey);
                HashMap hashMap = new HashMap(this.userMetadata);
                hashMap.put("role", "data");
                if (map != null) {
                    hashMap.putAll(map);
                }
                DataBlockUtils.addVersionInfo(blobBuilder, hashMap);
                MultipartUpload initiateMultipartUpload = blobStore.initiateMultipartUpload(this.config.getBucket(), blobBuilder.build().getMetadata(), new PutOptions());
                long j = 0;
                long j2 = 0;
                int i = 1;
                long j3 = 0;
                while (true) {
                    try {
                        if (j2 > readHandle.getLastAddConfirmed()) {
                            break;
                        }
                        int calculateBlockSize = BlockAwareSegmentInputStreamImpl.calculateBlockSize(this.config.getMaxBlockSizeInBytes().intValue(), readHandle, j2, j3);
                        BlockAwareSegmentInputStreamImpl blockAwareSegmentInputStreamImpl = new BlockAwareSegmentInputStreamImpl(readHandle, j2, calculateBlockSize);
                        try {
                            InputStreamPayload newInputStreamPayload = Payloads.newInputStreamPayload(blockAwareSegmentInputStreamImpl);
                            newInputStreamPayload.getContentMetadata().setContentLength(Long.valueOf(calculateBlockSize));
                            newInputStreamPayload.getContentMetadata().setContentType("application/octet-stream");
                            newArrayList.add(blobStore.uploadMultipartPart(initiateMultipartUpload, i, newInputStreamPayload));
                            log.debug("UploadMultipartPart. container: {}, blobName: {}, partId: {}, mpu: {}", this.config.getBucket(), dataBlockOffloadKey, Integer.valueOf(i), initiateMultipartUpload.id());
                            withDataBlockHeaderLength.addBlock(j2, i, calculateBlockSize);
                            if (blockAwareSegmentInputStreamImpl.getEndEntryId() == -1) {
                                blockAwareSegmentInputStreamImpl.close();
                                break;
                            }
                            j2 = blockAwareSegmentInputStreamImpl.getEndEntryId() + 1;
                            j3 += blockAwareSegmentInputStreamImpl.getBlockEntryBytesCount();
                            i++;
                            blockAwareSegmentInputStreamImpl.close();
                            j += calculateBlockSize;
                        } finally {
                        }
                    } catch (Throwable th) {
                        if (initiateMultipartUpload != null) {
                            try {
                                blobStore.abortMultipartUpload(initiateMultipartUpload);
                            } catch (Throwable th2) {
                                log.error("Failed abortMultipartUpload in bucket - {} with key - {}, uploadId - {}.", this.config.getBucket(), dataBlockOffloadKey, initiateMultipartUpload.id(), th2);
                                completableFuture.completeExceptionally(th);
                                return;
                            }
                        }
                        completableFuture.completeExceptionally(th);
                        return;
                    }
                }
                blobStore.completeMultipartUpload(initiateMultipartUpload, newArrayList);
                try {
                    OffloadIndexBlock build = withDataBlockHeaderLength.withDataObjectLength(j).build();
                    try {
                        OffloadIndexBlock.IndexInputStream stream = build.toStream();
                        try {
                            BlobBuilder blobBuilder2 = blobStore.blobBuilder(indexBlockOffloadKey);
                            HashMap hashMap2 = new HashMap(this.userMetadata);
                            hashMap2.put("role", BookKeeperServerStats.LD_INDEX_SCOPE);
                            if (map != null) {
                                hashMap2.putAll(map);
                            }
                            DataBlockUtils.addVersionInfo(blobBuilder2, hashMap2);
                            InputStreamPayload newInputStreamPayload2 = Payloads.newInputStreamPayload(stream);
                            newInputStreamPayload2.getContentMetadata().setContentLength(Long.valueOf(stream.getStreamSize()));
                            newInputStreamPayload2.getContentMetadata().setContentType("application/octet-stream");
                            blobStore.putBlob(this.config.getBucket(), blobBuilder2.payload(newInputStreamPayload2).contentLength(stream.getStreamSize()).build());
                            completableFuture.complete(null);
                            if (stream != null) {
                                stream.close();
                            }
                            if (build != null) {
                                build.close();
                            }
                        } catch (Throwable th3) {
                            if (stream != null) {
                                try {
                                    stream.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            }
                            throw th3;
                        }
                    } finally {
                    }
                } catch (Throwable th5) {
                    try {
                        blobStore.removeBlob(this.config.getBucket(), dataBlockOffloadKey);
                    } catch (Throwable th6) {
                        log.error("Failed deleteObject in bucket - {} with key - {}.", this.config.getBucket(), dataBlockOffloadKey, th6);
                    }
                    completableFuture.completeExceptionally(th5);
                }
            } catch (Throwable th7) {
                completableFuture.completeExceptionally(th7);
            }
        });
        return completableFuture;
    }

    private BlobStoreLocation getBlobStoreLocation(Map<String, String> map) {
        return !map.isEmpty() ? new BlobStoreLocation(map) : new BlobStoreLocation(getOffloadDriverMetadata());
    }

    @Override // org.apache.bookkeeper.mledger.LedgerOffloader
    public CompletableFuture<ReadHandle> readOffloaded(long j, UUID uuid, Map<String, String> map) {
        String bucket = getBlobStoreLocation(map).getBucket();
        BlobStore blobStore = this.blobStores.get(this.config.getBlobStoreLocation());
        CompletableFuture<ReadHandle> completableFuture = new CompletableFuture<>();
        String dataBlockOffloadKey = DataBlockUtils.dataBlockOffloadKey(j, uuid);
        String indexBlockOffloadKey = DataBlockUtils.indexBlockOffloadKey(j, uuid);
        this.scheduler.chooseThread(j).submit(() -> {
            try {
                completableFuture.complete(BlobStoreBackedReadHandleImpl.open(this.scheduler.chooseThread(j), blobStore, bucket, dataBlockOffloadKey, indexBlockOffloadKey, DataBlockUtils.VERSION_CHECK, j, this.config.getReadBufferSizeInBytes().intValue()));
            } catch (Throwable th) {
                log.error("Failed readOffloaded: ", th);
                completableFuture.completeExceptionally(th);
            }
        });
        return completableFuture;
    }

    @Override // org.apache.bookkeeper.mledger.LedgerOffloader
    public CompletableFuture<Void> deleteOffloaded(long j, UUID uuid, Map<String, String> map) {
        String bucket = getBlobStoreLocation(map).getBucket(map);
        BlobStore blobStore = this.blobStores.get(this.config.getBlobStoreLocation());
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.scheduler.chooseThread(j).submit(() -> {
            try {
                blobStore.removeBlobs(bucket, ImmutableList.of(DataBlockUtils.dataBlockOffloadKey(j, uuid), DataBlockUtils.indexBlockOffloadKey(j, uuid)));
                completableFuture.complete(null);
            } catch (Throwable th) {
                log.error("Failed delete Blob", th);
                completableFuture.completeExceptionally(th);
            }
        });
        return completableFuture;
    }

    @Override // org.apache.bookkeeper.mledger.LedgerOffloader
    public OffloadPolicies getOffloadPolicies() {
        Properties properties = new Properties();
        properties.putAll(this.config.getConfigProperties());
        return OffloadPolicies.create(properties);
    }

    @Override // org.apache.bookkeeper.mledger.LedgerOffloader
    public void close() {
        for (BlobStore blobStore : this.blobStores.values()) {
            if (blobStore != null) {
                blobStore.getContext().close();
            }
        }
    }
}
