>From Ritik Raj <[email protected]>: Ritik Raj has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20556?usp=email )
Change subject: [ASTERIXDB-3669][CLOUD] Retry downloadDirectories in parallel downloader ...................................................................... [ASTERIXDB-3669][CLOUD] Retry downloadDirectories in parallel downloader - user model changes: no - storage format changes: no - interface changes: no Details: Added retry with ExponentialRetryPolicy() for downloadDirectories in ParallelDownloader. Ext-ref: MB-69226 Change-Id: Ica47227218a323ededced75691d3f64070c97729 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20556 Reviewed-by: Murtadha Hubail <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Ritik Raj <[email protected]> --- M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java A asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/AbstractParallelDownloader.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java M hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java 10 files changed, 108 insertions(+), 37 deletions(-) Approvals: Anon. E. Moose #1000171: Murtadha Hubail: Looks good to me, approved Ritik Raj: Verified Jenkins: Verified diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java index 6fc271e..23df349 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java @@ -192,7 +192,7 @@ public void downloadLibrary(Collection<FileReference> libPath) throws HyracksDataException { try (IParallelDownloader downloader = cloudClient.createParallelDownloader(bucket, localIoManager)) { LOGGER.info("Downloading all files located in {}", libPath); - downloader.downloadDirectories(libPath); + downloader.downloadDirectoriesWithRetry(libPath); LOGGER.info("Finished downloading {}", libPath); } } @@ -202,7 +202,7 @@ FileReference appDir = resolveAbsolutePath( localIoManager.getWorkspacePath(0).getPath() + File.separator + APPLICATION_ROOT_DIR_NAME); LOGGER.info("Downloading all libraries in + {}", appDir); - downloader.downloadDirectories(Collections.singletonList(appDir)); + downloader.downloadDirectoriesWithRetry(Collections.singletonList(appDir)); LOGGER.info("Finished downloading all libraries"); } } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java index 1cb6077..59e21ac 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java @@ -62,7 +62,7 @@ protected void downloadPartitions(boolean metadataNode, int metadataPartition) throws HyracksDataException { IParallelDownloader downloader = cloudClient.createParallelDownloader(bucket, localIoManager); LOGGER.info("Downloading all files located in {}", partitionPaths); - downloader.downloadDirectories(partitionPaths); + downloader.downloadDirectoriesWithRetry(partitionPaths); LOGGER.info("Finished downloading {}", partitionPaths); } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java index 1c5efd9..999a6b6 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java @@ -156,7 +156,7 @@ LOGGER.info("Downloading metadata partition {}, Current uncached files: {}", metadataPartition, uncachedFiles); FileReference metadataDir = resolve(STORAGE_ROOT_DIR_NAME + File.separator + partitionDir); - downloader.downloadDirectories(Collections.singleton(metadataDir)); + downloader.downloadDirectoriesWithRetry(Collections.singleton(metadataDir)); uncachedFiles.removeIf(f -> f.getRelativePath().contains(partitionDir)); LOGGER.info("Finished downloading metadata partition. Current uncached files: {}", uncachedFiles); } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/AbstractParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/AbstractParallelDownloader.java new file mode 100644 index 0000000..29a50a6 --- /dev/null +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/AbstractParallelDownloader.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.cloud.clients; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutionException; + +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.util.ExceptionUtils; +import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil; +import org.apache.hyracks.util.ExponentialRetryPolicy; +import org.apache.hyracks.util.IRetryPolicy; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public abstract class AbstractParallelDownloader implements IParallelDownloader { + private static final Logger LOGGER = LogManager.getLogger(); + + public void downloadDirectoriesWithRetry(Collection<FileReference> toDownload) throws HyracksDataException { + Set<FileReference> failedFiles = new HashSet<>(toDownload); + IRetryPolicy retryPolicy = new ExponentialRetryPolicy(CloudRetryableRequestUtil.NUMBER_OF_RETRIES, + CloudRetryableRequestUtil.MAX_DELAY_BETWEEN_RETRIES); + int attempt = 1; + while (true) { + try { + failedFiles = downloadDirectories(toDownload); + + if (failedFiles.isEmpty()) { + return; + } + + if (!retryPolicy.retry(null)) { + LOGGER.error("Exhausted retries ({}) — failed to download {} directories: {}", + CloudRetryableRequestUtil.NUMBER_OF_RETRIES, failedFiles.size(), failedFiles); + throw HyracksDataException.create(ErrorCode.FAILED_IO_OPERATION); + } + + LOGGER.warn("Failed to download directories (attempt {}/{}), retrying. Remaining: {}", attempt, + CloudRetryableRequestUtil.NUMBER_OF_RETRIES, failedFiles.size()); + } catch (IOException | ExecutionException | InterruptedException e) { + if (ExceptionUtils.causedByInterrupt(e) && !Thread.currentThread().isInterrupted()) { + LOGGER.warn("Lost suppressed interrupt during downloadDirectory retry", e); + throw HyracksDataException.create(e); + } + try { + if (!retryPolicy.retry(e)) { + LOGGER.error("Exhausted retries ({}) — failed to download {} directories: {}", + CloudRetryableRequestUtil.NUMBER_OF_RETRIES, failedFiles.size(), failedFiles); + throw HyracksDataException.create(e); + } + } catch (InterruptedException e1) { + throw HyracksDataException.create(e1); + } + LOGGER.warn("Failed to downloadDirectories, performing {}/{}", attempt, + CloudRetryableRequestUtil.NUMBER_OF_RETRIES, e); + } + attempt++; + } + } + + protected abstract Set<FileReference> downloadDirectories(Collection<FileReference> toDownload) + throws ExecutionException, InterruptedException, IOException; +} diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java index 6f1c453..5f7ff0b 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/IParallelDownloader.java @@ -39,7 +39,7 @@ * @param toDownload all files to be downloaded * @return file that failed to download */ - Collection<FileReference> downloadDirectories(Collection<FileReference> toDownload) throws HyracksDataException; + void downloadDirectoriesWithRetry(Collection<FileReference> toDownload) throws HyracksDataException; /** * Close the downloader and release all of its resources diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java index 0321a35..bfb52c9 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java @@ -30,7 +30,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import org.apache.asterix.cloud.clients.IParallelDownloader; +import org.apache.asterix.cloud.clients.AbstractParallelDownloader; import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter; import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -57,7 +57,7 @@ import software.amazon.awssdk.utils.AttributeMap; @ThreadSafe -class S3ParallelDownloader implements IParallelDownloader { +class S3ParallelDownloader extends AbstractParallelDownloader { private final String bucket; private final IOManager ioManager; private final S3AsyncClient s3AsyncClient; @@ -84,17 +84,10 @@ } @Override - public Collection<FileReference> downloadDirectories(Collection<FileReference> toDownload) - throws HyracksDataException { - Set<FileReference> failedFiles; + public Set<FileReference> downloadDirectories(Collection<FileReference> toDownload) + throws HyracksDataException, ExecutionException, InterruptedException { List<CompletableFuture<CompletedDirectoryDownload>> downloads = startDownloadingDirectories(toDownload); - try { - failedFiles = waitForDirectoryDownloads(downloads); - } catch (ExecutionException | InterruptedException e) { - throw HyracksDataException.create(e); - } - - return failedFiles; + return waitForDirectoryDownloads(downloads); } @Override diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java index 53e3dad..498c34b 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3SyncDownloader.java @@ -33,7 +33,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; -import org.apache.asterix.cloud.clients.IParallelDownloader; +import org.apache.asterix.cloud.clients.AbstractParallelDownloader; import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter; import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -50,7 +50,7 @@ import software.amazon.awssdk.services.s3.model.S3Object; @ThreadSafe -public class S3SyncDownloader implements IParallelDownloader { +public class S3SyncDownloader extends AbstractParallelDownloader { private static final Logger LOGGER = LogManager.getLogger(); private final String bucket; @@ -126,14 +126,10 @@ } @Override - public Collection<FileReference> downloadDirectories(Collection<FileReference> toDownload) - throws HyracksDataException { + public Set<FileReference> downloadDirectories(Collection<FileReference> toDownload) + throws IOException, ExecutionException, InterruptedException { Set<FileReference> failedFiles; - try { - failedFiles = downloadDirectoriesAndWait(toDownload); - } catch (IOException | InterruptedException | ExecutionException e) { - throw HyracksDataException.create(e); - } + failedFiles = downloadDirectoriesAndWait(toDownload); return failedFiles; } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java index 364fb2a..12d8a19 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzureParallelDownloader.java @@ -29,7 +29,7 @@ import java.util.List; import java.util.Set; -import org.apache.asterix.cloud.clients.IParallelDownloader; +import org.apache.asterix.cloud.clients.AbstractParallelDownloader; import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; @@ -44,7 +44,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -public class AzureParallelDownloader implements IParallelDownloader { +public class AzureParallelDownloader extends AbstractParallelDownloader { private final IOManager ioManager; private final BlobContainerAsyncClient blobContainerAsyncClient; private final IRequestProfilerLimiter profiler; @@ -101,8 +101,7 @@ } @Override - public Collection<FileReference> downloadDirectories(Collection<FileReference> directories) - throws HyracksDataException { + public Set<FileReference> downloadDirectories(Collection<FileReference> directories) throws HyracksDataException { Set<FileReference> failedFiles = new HashSet<>(); List<Mono<Void>> directoryDownloads = new ArrayList<>(); @@ -196,4 +195,4 @@ throw HyracksDataException.create(e); } } -} \ No newline at end of file +} diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java index b9e7eee..574a9b6 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java @@ -30,7 +30,7 @@ import java.util.Map; import java.util.Set; -import org.apache.asterix.cloud.clients.IParallelDownloader; +import org.apache.asterix.cloud.clients.AbstractParallelDownloader; import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter; import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -51,7 +51,7 @@ import com.google.cloud.storage.transfermanager.TransferManagerConfig; import com.google.cloud.storage.transfermanager.TransferStatus; -public class GCSParallelDownloader implements IParallelDownloader { +public class GCSParallelDownloader extends AbstractParallelDownloader { private final String bucket; private final IOManager ioManager; @@ -102,8 +102,7 @@ } @Override - public Collection<FileReference> downloadDirectories(Collection<FileReference> toDownload) - throws HyracksDataException { + public Set<FileReference> downloadDirectories(Collection<FileReference> toDownload) { Set<FileReference> failedFiles = new HashSet<>(); ParallelDownloadConfig.Builder config = ParallelDownloadConfig.newBuilder().setBucketName(bucket).setStripPrefix(this.config.getPrefix()); diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java index e011900..5461ed5 100644 --- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java @@ -53,8 +53,8 @@ private static final int UNSTABLE_NUMBER_OF_RETRIES = 100; private static final int UNSTABLE_MAX_DELAY_BETWEEN_RETRIES_IN_MILLIS = 0; private static final Logger LOGGER = LogManager.getLogger(); - private static final int NUMBER_OF_RETRIES = getNumberOfRetries(); - private static final long MAX_DELAY_BETWEEN_RETRIES = getMaxDelayBetweenRetries(); + public static final int NUMBER_OF_RETRIES = getNumberOfRetries(); + public static final long MAX_DELAY_BETWEEN_RETRIES = getMaxDelayBetweenRetries(); private static final ICloudRetryPredicate RETRY_ALWAYS_PREDICATE = e -> true; private static final ICloudBeforeRetryRequest NO_OP_BEFORE_RETRY = () -> { -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20556?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: merged Gerrit-Project: asterixdb Gerrit-Branch: phoenix Gerrit-Change-Id: Ica47227218a323ededced75691d3f64070c97729 Gerrit-Change-Number: 20556 Gerrit-PatchSet: 3 Gerrit-Owner: Ritik Raj <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Ritik Raj <[email protected]>
