/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.repositories.azure;

import com.azure.core.http.HttpMethod;
import com.azure.core.http.rest.ResponseBase;
import com.azure.core.util.BinaryData;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.blob.BlobAsyncClient;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceAsyncClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.batch.BlobBatch;
import com.azure.storage.blob.batch.BlobBatchAsyncClient;
import com.azure.storage.blob.batch.BlobBatchClientBuilder;
import com.azure.storage.blob.batch.BlobBatchStorageException;
import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobItemProperties;
import com.azure.storage.blob.models.BlobListDetails;
import com.azure.storage.blob.models.BlobRange;
import com.azure.storage.blob.models.BlobRequestConditions;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.models.DownloadRetryOptions;
import com.azure.storage.blob.models.ListBlobsOptions;
import com.azure.storage.blob.options.BlobParallelUploadOptions;
import com.azure.storage.blob.options.BlockBlobSimpleUploadOptions;
import com.azure.storage.blob.specialized.BlobClientBase;
import com.azure.storage.blob.specialized.BlobLeaseClient;
import com.azure.storage.blob.specialized.BlobLeaseClientBuilder;
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import java.io.Closeable;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.file.FileAlreadyExistsException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Spliterators;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.util.Throwables;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.BlobStoreActionStats;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.OperationPurpose;
import org.elasticsearch.common.blobstore.OptionalBytesReference;
import org.elasticsearch.common.blobstore.support.BlobContainerUtils;
import org.elasticsearch.common.blobstore.support.BlobMetadata;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.repositories.RepositoriesMetrics;
import org.elasticsearch.repositories.azure.AzureBlobContainer;
import org.elasticsearch.repositories.azure.AzureBlobServiceClient;
import org.elasticsearch.repositories.azure.AzureClientProvider;
import org.elasticsearch.repositories.azure.AzureRepository;
import org.elasticsearch.repositories.azure.AzureStorageService;
import org.elasticsearch.repositories.azure.CancellableRateLimitedFluxIterator;
import org.elasticsearch.repositories.azure.LocationMode;
import org.elasticsearch.repositories.blobstore.ChunkedBlobOutputStream;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class AzureBlobStore
implements BlobStore {
    private static final Logger logger = LogManager.getLogger(AzureBlobStore.class);
    public static final int MAX_ELEMENTS_PER_BATCH = 256;
    private static final long DEFAULT_READ_CHUNK_SIZE = ByteSizeValue.of((long)32L, (ByteSizeUnit)ByteSizeUnit.MB).getBytes();
    private static final int DEFAULT_UPLOAD_BUFFERS_SIZE = (int)ByteSizeValue.of((long)64L, (ByteSizeUnit)ByteSizeUnit.KB).getBytes();
    private final AzureStorageService service;
    private final BigArrays bigArrays;
    private final RepositoryMetadata repositoryMetadata;
    private final String clientName;
    private final String container;
    private final LocationMode locationMode;
    private final ByteSizeValue maxSinglePartUploadSize;
    private final int deletionBatchSize;
    private final int maxConcurrentBatchDeletes;
    private final int multipartUploadMaxConcurrency;
    private final RequestMetricsRecorder requestMetricsRecorder;
    private final AzureClientProvider.RequestMetricsHandler requestMetricsHandler;
    private static final Base64.Encoder base64Encoder = Base64.getEncoder().withoutPadding();
    private static final Base64.Decoder base64UrlDecoder = Base64.getUrlDecoder();

    public AzureBlobStore(RepositoryMetadata metadata, AzureStorageService service, BigArrays bigArrays, RepositoriesMetrics repositoriesMetrics) {
        this.container = (String)AzureRepository.Repository.CONTAINER_SETTING.get(metadata.settings());
        this.clientName = (String)AzureRepository.Repository.CLIENT_NAME.get(metadata.settings());
        this.service = service;
        this.bigArrays = bigArrays;
        this.requestMetricsRecorder = new RequestMetricsRecorder(repositoriesMetrics);
        this.repositoryMetadata = metadata;
        this.locationMode = (LocationMode)((Object)AzureRepository.Repository.LOCATION_MODE_SETTING.get(metadata.settings()));
        this.maxSinglePartUploadSize = (ByteSizeValue)AzureRepository.Repository.MAX_SINGLE_PART_UPLOAD_SIZE_SETTING.get(metadata.settings());
        this.deletionBatchSize = (Integer)AzureRepository.Repository.DELETION_BATCH_SIZE_SETTING.get(metadata.settings());
        this.maxConcurrentBatchDeletes = (Integer)AzureRepository.Repository.MAX_CONCURRENT_BATCH_DELETES_SETTING.get(metadata.settings());
        this.multipartUploadMaxConcurrency = service.getMultipartUploadMaxConcurrency();
        List<RequestMatcher> requestMatchers = List.of(new RequestMatcher((httpMethod, url) -> httpMethod == HttpMethod.HEAD, Operation.GET_BLOB_PROPERTIES), new RequestMatcher((httpMethod, url) -> httpMethod == HttpMethod.GET && !AzureBlobStore.isListRequest(httpMethod, url), Operation.GET_BLOB), new RequestMatcher(AzureBlobStore::isListRequest, Operation.LIST_BLOBS), new RequestMatcher(AzureBlobStore::isPutBlockRequest, Operation.PUT_BLOCK), new RequestMatcher(AzureBlobStore::isPutBlockListRequest, Operation.PUT_BLOCK_LIST), new RequestMatcher((httpMethod, url) -> httpMethod == HttpMethod.PUT && !AzureBlobStore.isPutBlockRequest(httpMethod, url) && !AzureBlobStore.isPutBlockListRequest(httpMethod, url), Operation.PUT_BLOB), new RequestMatcher(AzureBlobStore::isBlobBatch, Operation.BLOB_BATCH));
        this.requestMetricsHandler = (purpose, method, url, metrics) -> {
            try {
                String path;
                URI uri = url.toURI();
                String string = path = uri.getPath() == null ? "" : uri.getPath();
                assert (path.contains(this.container)) : uri.toString();
            }
            catch (URISyntaxException ignored) {
                return;
            }
            for (RequestMatcher requestMatcher : requestMatchers) {
                if (!requestMatcher.matches(method, url)) continue;
                this.requestMetricsRecorder.onRequestComplete(requestMatcher.operation, purpose, metrics);
                return;
            }
        };
    }

    private static boolean isBlobBatch(HttpMethod method, URL url) {
        return method == HttpMethod.POST && url.getQuery() != null && url.getQuery().contains("comp=batch");
    }

    private static boolean isListRequest(HttpMethod httpMethod, URL url) {
        return httpMethod == HttpMethod.GET && url.getQuery() != null && url.getQuery().contains("comp=list");
    }

    private static boolean isPutBlockRequest(HttpMethod httpMethod, URL url) {
        String queryParams = url.getQuery() == null ? "" : url.getQuery();
        return httpMethod == HttpMethod.PUT && queryParams.contains("comp=block") && queryParams.contains("blockid=");
    }

    private static boolean isPutBlockListRequest(HttpMethod httpMethod, URL url) {
        String queryParams = url.getQuery() == null ? "" : url.getQuery();
        return httpMethod == HttpMethod.PUT && queryParams.contains("comp=blocklist");
    }

    public long getReadChunkSize() {
        return DEFAULT_READ_CHUNK_SIZE;
    }

    public String toString() {
        return this.container;
    }

    public AzureStorageService getService() {
        return this.service;
    }

    public LocationMode getLocationMode() {
        return this.locationMode;
    }

    public BlobContainer blobContainer(BlobPath path) {
        return new AzureBlobContainer(path, this);
    }

    public void close() {
    }

    public boolean blobExists(OperationPurpose purpose, String blob) throws IOException {
        BlobServiceClient client = this.client(purpose);
        try {
            BlobClient azureBlob = client.getBlobContainerClient(this.container).getBlobClient(blob);
            return azureBlob.exists();
        }
        catch (Exception e) {
            logger.trace("can not access [{}] in container {{}}: {}", (Object)blob, (Object)this.container, (Object)e.getMessage());
            throw new IOException("Unable to check if blob " + blob + " exists", e);
        }
    }

    public DeleteResult deleteBlobDirectory(OperationPurpose purpose, String path) throws IOException {
        AtomicInteger blobsDeleted = new AtomicInteger(0);
        AtomicLong bytesDeleted = new AtomicLong(0L);
        AzureBlobServiceClient client = this.getAzureBlobServiceClientClient(purpose);
        BlobContainerAsyncClient blobContainerAsyncClient = client.getAsyncClient().getBlobContainerAsyncClient(this.container);
        ListBlobsOptions options = new ListBlobsOptions().setPrefix(path).setDetails(new BlobListDetails().setRetrieveMetadata(true));
        Flux blobsFlux = blobContainerAsyncClient.listBlobs(options).filter(bi -> bi.isPrefix() == false).map(bi -> {
            bytesDeleted.addAndGet(bi.getProperties().getContentLength());
            blobsDeleted.incrementAndGet();
            return bi.getName();
        });
        this.deleteListOfBlobs(client, (Flux<String>)blobsFlux);
        return new DeleteResult((long)blobsDeleted.get(), bytesDeleted.get());
    }

    void deleteBlobs(OperationPurpose purpose, Iterator<String> blobNames) throws IOException {
        if (!blobNames.hasNext()) {
            return;
        }
        this.deleteListOfBlobs(this.getAzureBlobServiceClientClient(purpose), (Flux<String>)Flux.fromStream(StreamSupport.stream(Spliterators.spliteratorUnknownSize(blobNames, 16), false)));
    }

    private void deleteListOfBlobs(AzureBlobServiceClient azureBlobServiceClient, Flux<String> blobNames) throws IOException {
        List errors;
        BlobBatchAsyncClient batchAsyncClient = new BlobBatchClientBuilder(azureBlobServiceClient.getAsyncClient().getBlobContainerAsyncClient(this.container)).buildAsyncClient();
        AtomicInteger errorsCollected = new AtomicInteger(0);
        try {
            errors = (List)blobNames.buffer(this.deletionBatchSize).flatMap(blobs -> {
                BlobBatch blobBatch = batchAsyncClient.getBlobBatch();
                blobs.forEach(blob -> blobBatch.deleteBlob(this.container, blob));
                return batchAsyncClient.submitBatch(blobBatch).then(Mono.empty()).onErrorResume(t -> {
                    if (AzureBlobStore.isIgnorableBatchDeleteException(t)) {
                        return Mono.empty();
                    }
                    if (errorsCollected.getAndIncrement() < 10) {
                        return Mono.just((Object)t);
                    }
                    return Mono.empty();
                });
            }, this.maxConcurrentBatchDeletes).collectList().block();
        }
        catch (Exception e) {
            throw new IOException("Error deleting batches", e);
        }
        if (!errors.isEmpty()) {
            int totalErrorCount = errorsCollected.get();
            String errorMessage = totalErrorCount > errors.size() ? "Some errors occurred deleting batches, the first " + errors.size() + " are included as suppressed, but the total count was " + totalErrorCount : "Some errors occurred deleting batches, all errors included as suppressed";
            IOException ex = new IOException(errorMessage);
            errors.forEach(ex::addSuppressed);
            throw ex;
        }
    }

    private static boolean isIgnorableBatchDeleteException(Throwable exception) {
        if (exception instanceof BlobBatchStorageException) {
            BlobBatchStorageException bbse = (BlobBatchStorageException)exception;
            Iterable batchExceptions = bbse.getBatchExceptions();
            for (BlobStorageException bse : batchExceptions) {
                if (BlobErrorCode.BLOB_NOT_FOUND.equals((Object)bse.getErrorCode())) continue;
                return false;
            }
            return true;
        }
        return false;
    }

    public InputStream getInputStream(OperationPurpose purpose, String blob, long position, @Nullable Long length) {
        logger.trace(() -> Strings.format((String)"reading container [%s], blob [%s]", (Object[])new Object[]{this.container, blob}));
        AzureBlobServiceClient azureBlobServiceClient = this.getAzureBlobServiceClientClient(purpose);
        BlobServiceClient syncClient = azureBlobServiceClient.getSyncClient();
        BlobServiceAsyncClient asyncClient = azureBlobServiceClient.getAsyncClient();
        BlobContainerClient blobContainerClient = syncClient.getBlobContainerClient(this.container);
        BlobClient blobClient = blobContainerClient.getBlobClient(blob);
        long totalSize = length == null ? blobClient.getProperties().getBlobSize() : position + length;
        BlobAsyncClient blobAsyncClient = asyncClient.getBlobContainerAsyncClient(this.container).getBlobAsyncClient(blob);
        int maxReadRetries = this.service.getMaxReadRetries(this.clientName);
        try {
            return new AzureInputStream(blobAsyncClient, position, length == null ? totalSize : length, totalSize, maxReadRetries, azureBlobServiceClient.getAllocator());
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    public Map<String, BlobMetadata> listBlobsByPrefix(OperationPurpose purpose, String keyPath, String prefix) throws IOException {
        HashMap<String, BlobMetadata> blobsBuilder = new HashMap<String, BlobMetadata>();
        logger.trace(() -> Strings.format((String)"listing container [%s], keyPath [%s], prefix [%s]", (Object[])new Object[]{this.container, keyPath, prefix}));
        try {
            BlobServiceClient client = this.client(purpose);
            BlobContainerClient containerClient = client.getBlobContainerClient(this.container);
            BlobListDetails details = new BlobListDetails().setRetrieveMetadata(true);
            ListBlobsOptions listBlobsOptions = new ListBlobsOptions().setPrefix(keyPath + (prefix == null ? "" : prefix)).setDetails(details);
            for (BlobItem blobItem : containerClient.listBlobsByHierarchy("/", listBlobsOptions, null)) {
                BlobItemProperties properties = blobItem.getProperties();
                if (blobItem.isPrefix().booleanValue()) continue;
                String blobName = blobItem.getName().substring(keyPath.length());
                blobsBuilder.put(blobName, new BlobMetadata(blobName, properties.getContentLength().longValue()));
            }
        }
        catch (Exception e) {
            throw new IOException("Unable to list blobs by prefix [" + prefix + "] for path " + keyPath, e);
        }
        return Map.copyOf(blobsBuilder);
    }

    public Map<String, BlobContainer> children(OperationPurpose purpose, BlobPath path) throws IOException {
        HashMap<String, AzureBlobContainer> childrenBuilder = new HashMap<String, AzureBlobContainer>();
        String keyPath = path.buildAsString();
        try {
            BlobServiceClient client = this.client(purpose);
            BlobContainerClient blobContainer = client.getBlobContainerClient(this.container);
            ListBlobsOptions listBlobsOptions = new ListBlobsOptions();
            listBlobsOptions.setPrefix(keyPath).setDetails(new BlobListDetails().setRetrieveMetadata(true));
            for (BlobItem blobItem : blobContainer.listBlobsByHierarchy("/", listBlobsOptions, null)) {
                Boolean isPrefix = blobItem.isPrefix();
                if (isPrefix == null || !isPrefix.booleanValue()) continue;
                String directoryName = blobItem.getName();
                if ((directoryName = directoryName.substring(keyPath.length())).isEmpty()) continue;
                directoryName = directoryName.substring(0, directoryName.length() - 1);
                childrenBuilder.put(directoryName, new AzureBlobContainer(BlobPath.EMPTY.add(blobItem.getName()), this));
            }
        }
        catch (Exception e) {
            throw new IOException("Unable to provide children blob containers for " + String.valueOf(path), e);
        }
        return Collections.unmodifiableMap(childrenBuilder);
    }

    public void writeBlob(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) {
        Flux byteBufferFlux = Flux.fromArray((Object[])BytesReference.toByteBuffers((BytesReference)bytes));
        this.executeSingleUpload(purpose, blobName, (Flux<ByteBuffer>)byteBufferFlux, bytes.length(), failIfAlreadyExists);
    }

    public void writeBlob(final OperationPurpose purpose, final String blobName, final boolean failIfAlreadyExists, CheckedConsumer<OutputStream, IOException> writer) throws IOException {
        final BlockBlobAsyncClient blockBlobAsyncClient = this.asyncClient(purpose).getBlobContainerAsyncClient(this.container).getBlobAsyncClient(blobName).getBlockBlobAsyncClient();
        try (ChunkedBlobOutputStream<String> out = new ChunkedBlobOutputStream<String>(this.bigArrays, this.getUploadBlockSize()){

            protected void flushBuffer() {
                if (this.buffer.size() == 0) {
                    return;
                }
                String blockId = AzureBlobStore.makeMultipartBlockId();
                blockBlobAsyncClient.stageBlock(blockId, Flux.fromArray((Object[])BytesReference.toByteBuffers((BytesReference)this.buffer.bytes())), (long)this.buffer.size()).block();
                this.finishPart(blockId);
            }

            protected void onCompletion() {
                if (this.flushedBytes == 0L) {
                    AzureBlobStore.this.writeBlob(purpose, blobName, this.buffer.bytes(), failIfAlreadyExists);
                } else {
                    this.flushBuffer();
                    blockBlobAsyncClient.commitBlockList(this.parts, !failIfAlreadyExists).block();
                }
            }

            protected void onFailure() {
            }
        };){
            writer.accept((Object)out);
            out.markSuccess();
        }
    }

    void writeBlobAtomic(OperationPurpose purpose, String blobName, long blobSize, BlobContainer.BlobMultiPartInputStreamProvider provider, boolean failIfAlreadyExists) throws IOException {
        block13: {
            try {
                List<MultiPart> multiParts = blobSize <= this.getLargeBlobThresholdInBytes() ? null : AzureBlobStore.computeMultiParts(blobSize, this.getUploadBlockSize());
                if (multiParts == null || multiParts.size() == 1) {
                    logger.debug("{}: uploading blob of size [{}] as single upload", (Object)blobName, (Object)blobSize);
                    try (InputStream stream = provider.apply(0L, blobSize);){
                        Flux<ByteBuffer> flux = this.convertStreamToByteBuffer(stream, blobSize, DEFAULT_UPLOAD_BUFFERS_SIZE);
                        this.executeSingleUpload(purpose, blobName, flux, blobSize, failIfAlreadyExists);
                        break block13;
                    }
                }
                logger.debug("{}: uploading blob of size [{}] using [{}] parts", (Object)blobName, (Object)blobSize, (Object)multiParts.size());
                assert (blobSize == (long)(multiParts.size() - 1) * this.getUploadBlockSize() + multiParts.getLast().blockSize());
                assert (multiParts.size() > 1);
                BlockBlobAsyncClient asyncClient = this.asyncClient(purpose).getBlobContainerAsyncClient(this.container).getBlobAsyncClient(blobName).getBlockBlobAsyncClient();
                Flux.fromIterable(multiParts).flatMapSequential(multipart -> AzureBlobStore.stageBlock(asyncClient, blobName, multipart, provider), this.multipartUploadMaxConcurrency).collect(Collectors.toList()).flatMap(blockIds -> {
                    logger.debug("{}: all {} parts uploaded, now committing", (Object)blobName, (Object)multiParts.size());
                    return asyncClient.commitBlockList(multiParts.stream().map(MultiPart::blockId).toList(), !failIfAlreadyExists).doOnSuccess(unused -> logger.debug("{}: all {} parts committed", (Object)blobName, (Object)multiParts.size())).doOnError(e -> logger.error(() -> Strings.format((String)"%s: failed to commit %d parts", (Object[])new Object[]{blobName, multiParts.size()}), e));
                }).block();
            }
            catch (BlobStorageException e) {
                if (failIfAlreadyExists && e.getStatusCode() == 409 && BlobErrorCode.BLOB_ALREADY_EXISTS.equals((Object)e.getErrorCode())) {
                    throw new FileAlreadyExistsException(blobName, null, e.getMessage());
                }
                throw new IOException("Unable to write blob " + blobName, e);
            }
            catch (Exception e) {
                throw new IOException("Unable to write blob " + blobName, e);
            }
        }
    }

    private static List<MultiPart> computeMultiParts(long totalSize, long partSize) {
        if (partSize <= 0L) {
            throw new IllegalArgumentException("Part size must be greater than zero");
        }
        if (totalSize == 0L || totalSize <= partSize) {
            return List.of(new MultiPart(0, AzureBlobStore.makeMultipartBlockId(), 0L, totalSize, true));
        }
        long remaining = totalSize % partSize;
        int parts = Math.toIntExact(totalSize / partSize) + (0L < remaining ? 1 : 0);
        long lastPartSize = 0L < remaining ? remaining : partSize;
        long blockOffset = 0L;
        ArrayList<MultiPart> list = new ArrayList<MultiPart>(parts);
        for (int p = 0; p < parts; ++p) {
            boolean isLast = p == parts - 1;
            MultiPart multipart = new MultiPart(p, AzureBlobStore.makeMultipartBlockId(), blockOffset, isLast ? lastPartSize : partSize, isLast);
            blockOffset += multipart.blockSize();
            list.add(multipart);
        }
        return List.copyOf(list);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static Mono<String> stageBlock(BlockBlobAsyncClient asyncClient, String blobName, MultiPart multiPart, BlobContainer.BlobMultiPartInputStreamProvider provider) {
        Mono mono;
        block7: {
            logger.debug("{}: staging part [{}] of size [{}] from offset [{}]", (Object)blobName, (Object)multiPart.part(), (Object)multiPart.blockSize(), (Object)multiPart.blockOffset());
            InputStream stream = provider.apply(multiPart.blockOffset(), multiPart.blockSize());
            assert (stream.markSupported()) : "provided input stream must support mark and reset";
            boolean success = false;
            try {
                Mono stageBlock = asyncClient.stageBlock(multiPart.blockId(), AzureBlobStore.toFlux(AzureBlobStore.wrapInputStream(blobName, stream, multiPart), multiPart.blockSize(), DEFAULT_UPLOAD_BUFFERS_SIZE), multiPart.blockSize()).doOnSuccess(unused -> {
                    logger.debug(() -> Strings.format((String)"%s: part [%s] of size [%s] uploaded", (Object[])new Object[]{blobName, multiPart.part(), multiPart.blockSize()}));
                    IOUtils.closeWhileHandlingException((Closeable)stream);
                }).doOnCancel(() -> {
                    logger.warn(() -> Strings.format((String)"%s: part [%s] of size [%s] cancelled", (Object[])new Object[]{blobName, multiPart.part(), multiPart.blockSize()}));
                    IOUtils.closeWhileHandlingException((Closeable)stream);
                }).doOnError(t -> {
                    logger.error(() -> Strings.format((String)"%s: part [%s] of size [%s] failed", (Object[])new Object[]{blobName, multiPart.part(), multiPart.blockSize()}), t);
                    IOUtils.closeWhileHandlingException((Closeable)stream);
                });
                logger.debug("{}: part [{}] of size [{}] from offset [{}] staged", (Object)blobName, (Object)multiPart.part(), (Object)multiPart.blockSize(), (Object)multiPart.blockOffset());
                success = true;
                mono = stageBlock.map(unused -> multiPart.blockId());
                if (success) break block7;
            }
            catch (Throwable throwable) {
                try {
                    if (!success) {
                        IOUtils.close((Closeable)stream);
                    }
                    throw throwable;
                }
                catch (IOException e) {
                    logger.error(() -> Strings.format((String)"%s: failed to stage part [%s] of size [%s]", (Object[])new Object[]{blobName, multiPart.part(), multiPart.blockSize()}), (Throwable)e);
                    return FluxUtil.monoError((ClientLogger)new ClientLogger(AzureBlobStore.class), (RuntimeException)new UncheckedIOException(e));
                }
            }
            IOUtils.close((Closeable)stream);
        }
        return mono;
    }

    public void writeBlob(OperationPurpose purpose, String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
        assert (inputStream.markSupported()) : "Should not be used with non-mark supporting streams as their retry handling in the SDK is broken";
        logger.trace(() -> Strings.format((String)"writeBlob(%s, stream, %s)", (Object[])new Object[]{blobName, blobSize}));
        try {
            if (blobSize <= this.getLargeBlobThresholdInBytes()) {
                Flux<ByteBuffer> byteBufferFlux = this.convertStreamToByteBuffer(inputStream, blobSize, DEFAULT_UPLOAD_BUFFERS_SIZE);
                this.executeSingleUpload(purpose, blobName, byteBufferFlux, blobSize, failIfAlreadyExists);
            } else {
                this.executeMultipartUpload(purpose, blobName, inputStream, blobSize, failIfAlreadyExists);
            }
        }
        catch (BlobStorageException e) {
            if (failIfAlreadyExists && e.getStatusCode() == 409 && BlobErrorCode.BLOB_ALREADY_EXISTS.equals((Object)e.getErrorCode())) {
                throw new FileAlreadyExistsException(blobName, null, e.getMessage());
            }
            throw new IOException("Unable to write blob " + blobName, e);
        }
        catch (Exception e) {
            throw new IOException("Unable to write blob " + blobName, e);
        }
        logger.trace(() -> Strings.format((String)"writeBlob(%s, stream, %s) - done", (Object[])new Object[]{blobName, blobSize}));
    }

    private void executeSingleUpload(OperationPurpose purpose, String blobName, Flux<ByteBuffer> byteBufferFlux, long blobSize, boolean failIfAlreadyExists) {
        BlobServiceAsyncClient asyncClient = this.asyncClient(purpose);
        BlobAsyncClient blobAsyncClient = asyncClient.getBlobContainerAsyncClient(this.container).getBlobAsyncClient(blobName);
        BlockBlobAsyncClient blockBlobAsyncClient = blobAsyncClient.getBlockBlobAsyncClient();
        BlockBlobSimpleUploadOptions options = new BlockBlobSimpleUploadOptions(byteBufferFlux, blobSize);
        BlobRequestConditions requestConditions = new BlobRequestConditions();
        if (failIfAlreadyExists) {
            requestConditions.setIfNoneMatch("*");
        }
        options.setRequestConditions(requestConditions);
        blockBlobAsyncClient.uploadWithResponse(options).block();
    }

    private void executeMultipartUpload(OperationPurpose purpose, String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) {
        BlobServiceAsyncClient asyncClient = this.asyncClient(purpose);
        BlobAsyncClient blobAsyncClient = asyncClient.getBlobContainerAsyncClient(this.container).getBlobAsyncClient(blobName);
        BlockBlobAsyncClient blockBlobAsyncClient = blobAsyncClient.getBlockBlobAsyncClient();
        long partSize = this.getUploadBlockSize();
        Tuple<Long, Long> multiParts = AzureBlobStore.numberOfMultiparts(blobSize, partSize);
        int nbParts = ((Long)multiParts.v1()).intValue();
        long lastPartSize = (Long)multiParts.v2();
        assert (blobSize == (long)(nbParts - 1) * partSize + lastPartSize) : "blobSize does not match multipart sizes";
        ArrayList<String> blockIds = new ArrayList<String>(nbParts);
        for (int i = 0; i < nbParts; ++i) {
            long length = i < nbParts - 1 ? partSize : lastPartSize;
            Flux<ByteBuffer> byteBufferFlux = this.convertStreamToByteBuffer(inputStream, length, DEFAULT_UPLOAD_BUFFERS_SIZE);
            String blockId = AzureBlobStore.makeMultipartBlockId();
            blockBlobAsyncClient.stageBlock(blockId, byteBufferFlux, length).block();
            blockIds.add(blockId);
        }
        blockBlobAsyncClient.commitBlockList(blockIds, !failIfAlreadyExists).block();
    }

    private static String makeMultipartBlockId() {
        return base64Encoder.encodeToString(base64UrlDecoder.decode(UUIDs.base64UUID()));
    }

    private Flux<ByteBuffer> convertStreamToByteBuffer(InputStream delegate, long length, int chunkSize) {
        assert (delegate.markSupported()) : "An InputStream with mark support was expected";
        FilterInputStream inputStream = new FilterInputStream(this, delegate){

            @Override
            public synchronized int read(byte[] b, int off, int len) throws IOException {
                return super.read(b, off, len);
            }

            @Override
            public synchronized int read() throws IOException {
                return super.read();
            }
        };
        ((InputStream)inputStream).mark(Integer.MAX_VALUE);
        return Flux.defer(() -> {
            AtomicLong currentTotalLength = new AtomicLong(0L);
            try {
                inputStream.reset();
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            int parts = (int)length / chunkSize;
            long remaining = length % (long)chunkSize;
            return Flux.range((int)0, (int)(remaining == 0L ? parts : parts + 1)).map(i -> i * chunkSize).concatMap(pos -> Mono.fromCallable(() -> {
                long count = (long)(pos + chunkSize) > length ? length - (long)pos.intValue() : (long)chunkSize;
                int numOfBytesRead = 0;
                int offset = 0;
                int len = (int)count;
                byte[] buffer = new byte[len];
                while (numOfBytesRead != -1 && (long)offset < count) {
                    numOfBytesRead = inputStream.read(buffer, offset, len);
                    offset += numOfBytesRead;
                    len -= numOfBytesRead;
                    if (numOfBytesRead == -1) continue;
                    currentTotalLength.addAndGet(numOfBytesRead);
                }
                if (numOfBytesRead == -1 && currentTotalLength.get() < length) {
                    throw new IllegalStateException("InputStream provided" + String.valueOf(currentTotalLength) + " bytes, less than the expected" + length + " bytes");
                }
                return ByteBuffer.wrap(buffer);
            })).doOnComplete(() -> {
                if (currentTotalLength.get() > length) {
                    throw new IllegalStateException("Read more data than was requested. Size of data read: " + currentTotalLength.get() + ". Size of data requested: " + length);
                }
            });
        }).subscribeOn(Schedulers.elastic());
    }

    private static InputStream wrapInputStream(final String blobName, InputStream delegate, final MultiPart multipart) {
        return new FilterInputStream(delegate){
            private final AtomicReference<Thread> currentThread;
            private final boolean isTraceEnabled;
            {
                super(arg0);
                this.currentThread = Assertions.ENABLED ? new AtomicReference() : null;
                this.isTraceEnabled = logger.isTraceEnabled();
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public int read(byte[] b, int off, int len) throws IOException {
                assert (this.assertThread(null, Thread.currentThread()));
                assert (ThreadPool.assertCurrentThreadPool((String[])new String[]{"repository_azure"}));
                try {
                    int result = super.read(b, off, len);
                    if (this.isTraceEnabled) {
                        logger.trace("{} reads {} bytes from {} part {}", (Object)Thread.currentThread(), (Object)result, (Object)blobName, (Object)multipart.part());
                    }
                    int n = result;
                    return n;
                }
                finally {
                    assert (this.assertThread(Thread.currentThread(), null));
                }
            }

            @Override
            public int read() throws IOException {
                assert (this.assertThread(null, Thread.currentThread()));
                assert (ThreadPool.assertCurrentThreadPool((String[])new String[]{"repository_azure"}));
                try {
                    int result = super.read();
                    if (this.isTraceEnabled) {
                        logger.trace("{} reads {} byte from {} part {}", (Object)Thread.currentThread(), (Object)result, (Object)blobName, (Object)multipart.part());
                    }
                    int n = result;
                    return n;
                }
                finally {
                    assert (this.assertThread(Thread.currentThread(), null));
                }
            }

            private boolean assertThread(Thread current, Thread updated) {
                Thread witness = this.currentThread.compareAndExchange(current, updated);
                assert (witness == current) : "Unable to set current thread to [" + String.valueOf(updated) + "]: expected thread [" + String.valueOf(current) + "] to be the thread currently accessing the input stream for reading, but thread " + String.valueOf(witness) + " is already reading " + blobName + " part " + multipart.part();
                return true;
            }
        };
    }

    private static Flux<ByteBuffer> toFlux(InputStream stream, long length, int byteBufferSize) {
        assert (stream.markSupported()) : "input stream must support mark and reset";
        stream.mark(Integer.MAX_VALUE);
        return Flux.defer(() -> {
            try {
                stream.reset();
            }
            catch (IOException e) {
                throw new UncheckedIOException(e);
            }
            AtomicLong bytesRead = new AtomicLong(0L);
            assert (length <= ByteSizeValue.ofMb((long)100L).getBytes()) : length;
            int parts = Math.toIntExact(length / (long)byteBufferSize);
            long remaining = length % (long)byteBufferSize;
            return Flux.range((int)0, (int)(remaining == 0L ? parts : parts + 1)).map(i -> i * byteBufferSize).concatMap(pos -> Mono.fromCallable(() -> {
                long count = (long)(pos + byteBufferSize) > length ? length - (long)pos.intValue() : (long)byteBufferSize;
                int numOfBytesRead = 0;
                int offset = 0;
                int len = (int)count;
                byte[] buffer = new byte[len];
                while (numOfBytesRead != -1 && (long)offset < count) {
                    numOfBytesRead = stream.read(buffer, offset, len);
                    offset += numOfBytesRead;
                    len -= numOfBytesRead;
                    if (numOfBytesRead == -1) continue;
                    bytesRead.addAndGet(numOfBytesRead);
                }
                if (numOfBytesRead == -1 && bytesRead.get() < length) {
                    throw new IllegalStateException(Strings.format((String)"Input stream [%s] emitted %d bytes, less than the expected %d bytes.", (Object[])new Object[]{stream, bytesRead, length}));
                }
                return ByteBuffer.wrap(buffer);
            })).doOnComplete(() -> {
                if (bytesRead.get() > length) {
                    throw new IllegalStateException(Strings.format((String)"Input stream [%s] emitted %d bytes, more than the expected %d bytes.", (Object[])new Object[]{stream, bytesRead, length}));
                }
            });
        }).subscribeOn(Schedulers.elastic());
    }

    static Tuple<Long, Long> numberOfMultiparts(long totalSize, long partSize) {
        if (partSize <= 0L) {
            throw new IllegalArgumentException("Part size must be greater than zero");
        }
        if (totalSize == 0L || totalSize <= partSize) {
            return Tuple.tuple((Object)1L, (Object)totalSize);
        }
        long parts = totalSize / partSize;
        long remaining = totalSize % partSize;
        if (remaining == 0L) {
            return Tuple.tuple((Object)parts, (Object)partSize);
        }
        return Tuple.tuple((Object)(parts + 1L), (Object)remaining);
    }

    long getLargeBlobThresholdInBytes() {
        return this.maxSinglePartUploadSize.getBytes();
    }

    long getUploadBlockSize() {
        return this.service.getUploadBlockSize();
    }

    private BlobServiceClient client(OperationPurpose purpose) {
        return this.getAzureBlobServiceClientClient(purpose).getSyncClient();
    }

    private BlobServiceAsyncClient asyncClient(OperationPurpose purpose) {
        return this.getAzureBlobServiceClientClient(purpose).getAsyncClient();
    }

    private AzureBlobServiceClient getAzureBlobServiceClientClient(OperationPurpose purpose) {
        return this.service.client(this.clientName, this.locationMode, purpose, this.requestMetricsHandler);
    }

    public Map<String, BlobStoreActionStats> stats() {
        return this.requestMetricsRecorder.statsMap(this.service.isStateless());
    }

    RequestMetricsRecorder getMetricsRecorder() {
        return this.requestMetricsRecorder;
    }

    OptionalBytesReference getRegister(OperationPurpose purpose, String blobPath, String containerPath, String blobKey) {
        try {
            return OptionalBytesReference.of((BytesReference)AzureBlobStore.downloadRegisterBlob(containerPath, blobKey, this.getAzureBlobServiceClientClient(purpose).getSyncClient().getBlobContainerClient(this.container).getBlobClient(blobPath), null));
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        catch (Exception e) {
            BlobStorageException blobStorageException;
            Throwable throwable = Throwables.getRootCause((Throwable)e);
            if (throwable instanceof BlobStorageException && (blobStorageException = (BlobStorageException)throwable).getStatusCode() == RestStatus.NOT_FOUND.getStatus()) {
                return OptionalBytesReference.EMPTY;
            }
            throw e;
        }
    }

    OptionalBytesReference compareAndExchangeRegister(OperationPurpose purpose, String blobPath, String containerPath, String blobKey, BytesReference expected, BytesReference updated) {
        BlobContainerUtils.ensureValidRegisterContent((BytesReference)updated);
        try {
            return OptionalBytesReference.of((BytesReference)AzureBlobStore.innerCompareAndExchangeRegister(containerPath, blobKey, this.getAzureBlobServiceClientClient(purpose).getSyncClient().getBlobContainerClient(this.container).getBlobClient(blobPath), expected, updated));
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        catch (Exception e) {
            BlobStorageException blobStorageException;
            Throwable throwable = Throwables.getRootCause((Throwable)e);
            if (throwable instanceof BlobStorageException && ((blobStorageException = (BlobStorageException)throwable).getStatusCode() == RestStatus.PRECONDITION_FAILED.getStatus() || blobStorageException.getStatusCode() == RestStatus.CONFLICT.getStatus())) {
                return OptionalBytesReference.MISSING;
            }
            throw e;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static BytesReference innerCompareAndExchangeRegister(String containerPath, String blobKey, BlobClient blobClient, BytesReference expected, BytesReference updated) throws IOException {
        if (blobClient.exists().booleanValue()) {
            BlobLeaseClient leaseClient = new BlobLeaseClientBuilder().blobClient((BlobClientBase)blobClient).buildClient();
            String leaseId = leaseClient.acquireLease(60);
            try {
                BytesReference currentValue = AzureBlobStore.downloadRegisterBlob(containerPath, blobKey, blobClient, new BlobRequestConditions().setLeaseId(leaseId));
                if (currentValue.equals((Object)expected)) {
                    AzureBlobStore.uploadRegisterBlob(updated, blobClient, new BlobRequestConditions().setLeaseId(leaseId));
                }
                BytesReference bytesReference = currentValue;
                return bytesReference;
            }
            finally {
                AzureBlobStore.bestEffortRelease(leaseClient);
            }
        }
        if (expected.length() == 0) {
            AzureBlobStore.uploadRegisterBlob(updated, blobClient, new BlobRequestConditions().setIfNoneMatch("*"));
        }
        return BytesArray.EMPTY;
    }

    private static void bestEffortRelease(BlobLeaseClient leaseClient) {
        try {
            leaseClient.releaseLease();
        }
        catch (BlobStorageException blobStorageException) {
            if (blobStorageException.getStatusCode() == RestStatus.CONFLICT.getStatus()) {
                logger.debug("Ignored conflict on release: errorCode={}, message={}", (Object)blobStorageException.getErrorCode(), (Object)blobStorageException.getMessage());
            }
            throw blobStorageException;
        }
    }

    private static BytesReference downloadRegisterBlob(String containerPath, String blobKey, BlobClient blobClient, BlobRequestConditions blobRequestConditions) throws IOException {
        return BlobContainerUtils.getRegisterUsingConsistentRead((InputStream)((BinaryData)blobClient.downloadContentWithResponse(new DownloadRetryOptions().setMaxRetryRequests(0), blobRequestConditions, null, null).getValue()).toStream(), (String)containerPath, (String)blobKey);
    }

    private static void uploadRegisterBlob(BytesReference blobContents, BlobClient blobClient, BlobRequestConditions requestConditions) throws IOException {
        blobClient.uploadWithResponse(new BlobParallelUploadOptions(BinaryData.fromStream((InputStream)blobContents.streamInput(), (Long)Long.valueOf(blobContents.length()))).setRequestConditions(requestConditions), null, null);
    }

    class RequestMetricsRecorder {
        private final RepositoriesMetrics repositoriesMetrics;
        final Map<StatsKey, StatsCounter> statsCounters = new ConcurrentHashMap<StatsKey, StatsCounter>();
        final Map<StatsKey, Map<String, Object>> opsAttributes = new ConcurrentHashMap<StatsKey, Map<String, Object>>();

        RequestMetricsRecorder(RepositoriesMetrics repositoriesMetrics) {
            this.repositoriesMetrics = repositoriesMetrics;
        }

        Map<String, BlobStoreActionStats> statsMap(boolean stateless) {
            if (stateless) {
                return this.statsCounters.entrySet().stream().collect(Collectors.toUnmodifiableMap(e -> ((StatsKey)e.getKey()).toString(), e -> ((StatsCounter)e.getValue()).getEndpointStats()));
            }
            Map<String, BlobStoreActionStats> normalisedStats = Arrays.stream(Operation.values()).collect(Collectors.toMap(Operation::getKey, o -> BlobStoreActionStats.ZERO));
            this.statsCounters.forEach((key, value) -> normalisedStats.compute(key.operation.getKey(), (k, current) -> value.getEndpointStats().add(Objects.requireNonNull(current))));
            return Map.copyOf(normalisedStats);
        }

        public void onRequestComplete(Operation operation, OperationPurpose purpose, AzureClientProvider.RequestMetrics requestMetrics) {
            StatsKey statsKey = new StatsKey(operation, purpose);
            StatsCounter counter = this.statsCounters.computeIfAbsent(statsKey, k -> new StatsCounter());
            Map attributes = this.opsAttributes.computeIfAbsent(statsKey, k -> RepositoriesMetrics.createAttributesMap((RepositoryMetadata)AzureBlobStore.this.repositoryMetadata, (OperationPurpose)purpose, (String)operation.getKey()));
            counter.operations.increment();
            counter.requests.add(requestMetrics.getRequestCount());
            if (requestMetrics.getStatusCode() == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()) {
                this.repositoriesMetrics.requestRangeNotSatisfiedExceptionCounter().incrementBy(1L, attributes);
            }
            this.repositoriesMetrics.operationCounter().incrementBy(1L, attributes);
            if (!RestStatus.isSuccessful((int)requestMetrics.getStatusCode())) {
                this.repositoriesMetrics.unsuccessfulOperationCounter().incrementBy(1L, attributes);
            }
            this.repositoriesMetrics.requestCounter().incrementBy((long)requestMetrics.getRequestCount(), attributes);
            if (requestMetrics.getErrorCount() > 0) {
                this.repositoriesMetrics.exceptionCounter().incrementBy((long)requestMetrics.getErrorCount(), attributes);
                this.repositoriesMetrics.exceptionHistogram().record((long)requestMetrics.getErrorCount(), attributes);
            }
            if (requestMetrics.getThrottleCount() > 0) {
                this.repositoriesMetrics.throttleCounter().incrementBy((long)requestMetrics.getThrottleCount(), attributes);
                this.repositoriesMetrics.throttleHistogram().record((long)requestMetrics.getThrottleCount(), attributes);
            }
            if (requestMetrics.getTotalRequestTimeNanos() > 0L) {
                this.repositoriesMetrics.httpRequestTimeInMillisHistogram().record(TimeUnit.NANOSECONDS.toMillis(requestMetrics.getTotalRequestTimeNanos()), attributes);
            }
        }
    }

    private record RequestMatcher(BiPredicate<HttpMethod, URL> filter, Operation operation) {
        private boolean matches(HttpMethod httpMethod, URL url) {
            return this.filter.test(httpMethod, url);
        }
    }

    static enum Operation {
        GET_BLOB("GetBlob"),
        LIST_BLOBS("ListBlobs"),
        GET_BLOB_PROPERTIES("GetBlobProperties"),
        PUT_BLOB("PutBlob"),
        PUT_BLOCK("PutBlock"),
        PUT_BLOCK_LIST("PutBlockList"),
        BLOB_BATCH("BlobBatch");

        private final String key;

        public String getKey() {
            return this.key;
        }

        private Operation(String key) {
            this.key = key;
        }

        public static Operation fromKey(String key) {
            for (Operation operation : Operation.values()) {
                if (!operation.key.equals(key)) continue;
                return operation;
            }
            throw new IllegalArgumentException("No matching key: " + key);
        }
    }

    private static class AzureInputStream
    extends InputStream {
        private final CancellableRateLimitedFluxIterator<ByteBuf> cancellableRateLimitedFluxIterator;
        private ByteBuf byteBuf;
        private boolean closed;
        private final ByteBufAllocator allocator;

        private AzureInputStream(BlobAsyncClient client, long rangeOffset, long rangeLength, long contentLength, int maxRetries, ByteBufAllocator allocator) throws IOException {
            rangeLength = Math.min(rangeLength, contentLength - rangeOffset);
            BlobRange range = new BlobRange(rangeOffset, Long.valueOf(rangeLength));
            DownloadRetryOptions downloadRetryOptions = new DownloadRetryOptions().setMaxRetryRequests(maxRetries);
            Flux byteBufFlux = client.downloadWithResponse(range, downloadRetryOptions, null, false).flux().concatMap(ResponseBase::getValue).filter(Objects::nonNull).map(this::copyBuffer);
            this.allocator = allocator;
            this.cancellableRateLimitedFluxIterator = new CancellableRateLimitedFluxIterator<ByteBuf>(8, ReferenceCountUtil::safeRelease);
            byteBufFlux.subscribe(this.cancellableRateLimitedFluxIterator);
            this.getNextByteBuf();
        }

        private ByteBuf copyBuffer(ByteBuffer buffer) {
            ByteBuf byteBuffer = this.allocator.heapBuffer(buffer.remaining(), buffer.remaining());
            byteBuffer.writeBytes(buffer);
            return byteBuffer;
        }

        @Override
        public int read() throws IOException {
            byte[] b = new byte[1];
            int bytesRead = this.read(b, 0, 1);
            if (bytesRead > 1) {
                throw new IOException("Stream returned more data than requested");
            }
            if (bytesRead == 1) {
                return b[0] & 0xFF;
            }
            if (bytesRead == 0) {
                throw new IOException("Stream returned unexpected number of bytes");
            }
            return -1;
        }

        @Override
        public int read(byte[] b, int off, int len) throws IOException {
            int totalBytesRead;
            int toRead;
            if (off < 0 || len < 0 || len > b.length - off) {
                throw new IndexOutOfBoundsException();
            }
            ByteBuf buffer = this.getNextByteBuf();
            if (buffer == null || buffer.readableBytes() == 0) {
                this.releaseByteBuf(buffer);
                return -1;
            }
            for (totalBytesRead = 0; buffer != null && totalBytesRead < len; totalBytesRead += toRead) {
                toRead = Math.min(len - totalBytesRead, buffer.readableBytes());
                buffer.readBytes(b, off + totalBytesRead, toRead);
                if (buffer.readableBytes() != 0) continue;
                this.releaseByteBuf(buffer);
                buffer = this.getNextByteBuf();
            }
            return totalBytesRead;
        }

        @Override
        public void close() {
            if (!this.closed) {
                this.cancellableRateLimitedFluxIterator.cancel();
                this.closed = true;
                this.releaseByteBuf(this.byteBuf);
            }
        }

        private void releaseByteBuf(ByteBuf buf) {
            ReferenceCountUtil.safeRelease((Object)buf);
            this.byteBuf = null;
        }

        @Nullable
        private ByteBuf getNextByteBuf() throws IOException {
            try {
                if (this.byteBuf == null && !this.cancellableRateLimitedFluxIterator.hasNext()) {
                    return null;
                }
                if (this.byteBuf != null) {
                    return this.byteBuf;
                }
                this.byteBuf = this.cancellableRateLimitedFluxIterator.next();
                return this.byteBuf;
            }
            catch (Exception e) {
                throw new IOException("Unable to read blob", e.getCause());
            }
        }
    }

    private record MultiPart(int part, String blockId, long blockOffset, long blockSize, boolean isLast) {
    }

    record StatsCounter(LongAdder operations, LongAdder requests) {
        StatsCounter() {
            this(new LongAdder(), new LongAdder());
        }

        BlobStoreActionStats getEndpointStats() {
            return new BlobStoreActionStats(this.operations.sum(), this.requests.sum());
        }
    }

    record StatsKey(Operation operation, OperationPurpose purpose) {
        @Override
        public String toString() {
            return this.purpose.getKey() + "_" + this.operation.getKey();
        }
    }
}

