[
https://issues.apache.org/jira/browse/HADOOP-13600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16165694#comment-16165694
]
ASF GitHub Bot commented on HADOOP-13600:
-----------------------------------------
Github user sahilTakiar commented on a diff in the pull request:
https://github.com/apache/hadoop/pull/157#discussion_r138791756
--- Diff:
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
---
@@ -891,50 +902,123 @@ private boolean innerRename(Path source, Path dest)
}
List<DeleteObjectsRequest.KeyVersion> keysToDelete = new
ArrayList<>();
+ List<DeleteObjectsRequest.KeyVersion> dirKeysToDelete = new
ArrayList<>();
if (dstStatus != null && dstStatus.isEmptyDirectory() ==
Tristate.TRUE) {
// delete unnecessary fake directory.
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(dstKey));
}
- Path parentPath = keyToPath(srcKey);
- RemoteIterator<LocatedFileStatus> iterator =
listFilesAndEmptyDirectories(
- parentPath, true);
- while (iterator.hasNext()) {
- LocatedFileStatus status = iterator.next();
- long length = status.getLen();
- String key = pathToKey(status.getPath());
- if (status.isDirectory() && !key.endsWith("/")) {
- key += "/";
- }
- keysToDelete
- .add(new DeleteObjectsRequest.KeyVersion(key));
- String newDstKey =
- dstKey + key.substring(srcKey.length());
- copyFile(key, newDstKey, length);
-
- if (hasMetadataStore()) {
- // with a metadata store, the object entries need to be updated,
- // including, potentially, the ancestors
- Path childSrc = keyToQualifiedPath(key);
- Path childDst = keyToQualifiedPath(newDstKey);
- if (objectRepresentsDirectory(key, length)) {
- S3Guard.addMoveDir(metadataStore, srcPaths, dstMetas, childSrc,
- childDst, username);
+ // A blocking queue that tracks all objects that need to be deleted
+ BlockingQueue<Optional<DeleteObjectsRequest.KeyVersion>> deleteQueue
= new ArrayBlockingQueue<>(
+ (int) Math.round(MAX_ENTRIES_TO_DELETE * 1.5));
+
+ // Used to track if the delete thread was gracefully shutdown
+ boolean deleteFutureComplete = false;
+ FutureTask<Void> deleteFuture = null;
+
+ try {
+ // Launch a thread that will read from the deleteQueue and batch
delete any files that have already been copied
+ deleteFuture = new FutureTask<>(() -> {
+ while (true) {
+ while (keysToDelete.size() < MAX_ENTRIES_TO_DELETE) {
+ Optional<DeleteObjectsRequest.KeyVersion> key =
deleteQueue.take();
+
+ // The thread runs until is is given an EOF message (an
Optional#empty())
+ if (key.isPresent()) {
+ keysToDelete.add(key.get());
+ } else {
+
+ // Delete any remaining keys and exit
+ removeKeys(keysToDelete, true, false);
+ return null;
+ }
+ }
+ removeKeys(keysToDelete, true, false);
+ }
+ });
+
+ Thread deleteThread = new Thread(deleteFuture);
+ deleteThread.setName("s3a-rename-delete-thread");
+ deleteThread.start();
+
+ // Used to abort future copy tasks as soon as one copy task fails
+ AtomicBoolean copyFailure = new AtomicBoolean(false);
+ List<CopyContext> copies = new ArrayList<>();
+
+ Path parentPath = keyToPath(srcKey);
+ RemoteIterator<LocatedFileStatus> iterator =
listFilesAndEmptyDirectories(
+ parentPath, true);
+ while (iterator.hasNext()) {
+ LocatedFileStatus status = iterator.next();
+ long length = status.getLen();
+ String key = pathToKey(status.getPath());
+ if (status.isDirectory() && !key.endsWith("/")) {
+ key += "/";
+ }
+ if (status.isDirectory()) {
+ dirKeysToDelete.add(new DeleteObjectsRequest.KeyVersion(key));
+ }
+ String newDstKey =
+ dstKey + key.substring(srcKey.length());
+
+ // If no previous file hit a copy failure, copy this file
+ if (!copyFailure.get()) {
+ copies.add(new CopyContext(copyFileAsync(key, newDstKey,
+ new RenameProgressListener(this, srcStatus,
status.isDirectory() ? null :
+ new DeleteObjectsRequest.KeyVersion(key),
deleteQueue, copyFailure)),
+ key, newDstKey, length));
} else {
- S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas,
childSrc,
- childDst, length, getDefaultBlockSize(childDst), username);
+ // We got a copy failure, so don't bother going through the
rest of the files
+ break;
}
- // Ancestor directories may not be listed, so we explicitly add
them
- S3Guard.addMoveAncestors(metadataStore, srcPaths, dstMetas,
- keyToQualifiedPath(srcKey), childSrc, childDst, username);
}
- if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) {
- removeKeys(keysToDelete, true, false);
+ for (CopyContext copyContext : copies) {
+ try {
+ copyContext.getCopy().waitForCopyResult();
+ } catch (InterruptedException e) {
+ throw new RenameFailedException(copyContext.getSrcKey(),
copyContext.getDstKey(), e);
+ }
+
+ if (hasMetadataStore()) {
+ // with a metadata store, the object entries need to be
updated,
+ // including, potentially, the ancestors
+ Path childSrc = keyToQualifiedPath(copyContext.getSrcKey());
+ Path childDst = keyToQualifiedPath(copyContext.getDstKey());
+ if (objectRepresentsDirectory(copyContext.getSrcKey(),
copyContext.getLength())) {
+ S3Guard.addMoveDir(metadataStore, srcPaths, dstMetas,
childSrc,
+ childDst, username);
+ } else {
+ S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas,
childSrc,
+ childDst, copyContext.getLength(),
getDefaultBlockSize(childDst), username);
+ }
+ // Ancestor directories may not be listed, so we explicitly
add them
+ S3Guard.addMoveAncestors(metadataStore, srcPaths, dstMetas,
+ keyToQualifiedPath(srcKey), childSrc, childDst,
username);
+ }
+ }
+
+ if (copyFailure.get()) {
+ throw new RenameFailedException(srcKey, dstKey,
+ new IllegalStateException("Progress listener indicated a
copy failure, but no exception was thrown"));
+ }
+
+ try {
+ for (DeleteObjectsRequest.KeyVersion dirKey : dirKeysToDelete) {
+ deleteQueue.put(Optional.of(dirKey));
+ }
+ deleteQueue.put(Optional.empty());
+ deleteFuture.get();
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RenameFailedException(srcKey, dstKey, e);
+ }
+ deleteFutureComplete = true;
+ } finally {
+ if (!deleteFutureComplete) {
+ if (deleteFuture != null && !deleteFuture.isDone() &&
!deleteFuture.isCancelled()) {
+ deleteFuture.cancel(true);
+ }
--- End diff --
Moved all the parallel rename logic into a dedicated calls called
`ParallelDirectoryRenamer`
> S3a rename() to copy files in a directory in parallel
> -----------------------------------------------------
>
> Key: HADOOP-13600
> URL: https://issues.apache.org/jira/browse/HADOOP-13600
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3
> Affects Versions: 2.7.3
> Reporter: Steve Loughran
> Assignee: Sahil Takiar
> Attachments: HADOOP-13600.001.patch
>
>
> Currently a directory rename does a one-by-one copy, making the request
> O(files * data). If the copy operations were launched in parallel, the
> duration of the copy may be reducable to the duration of the longest copy.
> For a directory with many files, this will be significant
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]