mnpoonia commented on code in PR #6483:
URL: https://github.com/apache/hbase/pull/6483#discussion_r1851367892


##########
hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java:
##########
@@ -437,33 +440,53 @@ private static List<File> resolveAndArchive(FileSystem 
fs, Path baseArchiveDir,
       LOG.trace("Created archive directory {}", baseArchiveDir);
     }
 
-    List<File> failures = new ArrayList<>();
+    List<File> failures = Collections.synchronizedList(new ArrayList<>());
     String startTime = Long.toString(start);
+    List<File> filesOnly = new ArrayList<>();
     for (File file : toArchive) {
-      // if its a file archive it
       try {
-        LOG.trace("Archiving {}", file);
-        if (file.isFile()) {
-          // attempt to archive the file
-          if (!resolveAndArchiveFile(baseArchiveDir, file, startTime)) {
-            LOG.warn("Couldn't archive " + file + " into backup directory: " + 
baseArchiveDir);
-            failures.add(file);
-          }
-        } else {
-          // otherwise its a directory and we need to archive all files
+        if (!file.isFile()) {
+          // if its a directory and we need to archive all files
           LOG.trace("{} is a directory, archiving children files", file);
           // so we add the directory name to the one base archive
           Path parentArchiveDir = new Path(baseArchiveDir, file.getName());
           // and then get all the files from that directory and attempt to
           // archive those too
           Collection<File> children = file.getChildren();
-          failures.addAll(resolveAndArchive(fs, parentArchiveDir, children, 
start));
+          failures.addAll(resolveAndArchive(conf, fs, parentArchiveDir, 
children, start));
+        } else {
+          filesOnly.add(file);
         }
       } catch (IOException e) {
         LOG.warn("Failed to archive {}", file, e);
         failures.add(file);
       }
     }
+    Map<File, Future<Boolean>> futures = new HashMap<>();
+    // In current baseDir all files will be processed concurrently
+    for (File file : filesOnly) {
+      LOG.trace("Archiving {}", file);
+      Future<Boolean> archiveTask = getArchiveExecutor(conf)
+        .submit(() -> resolveAndArchiveFile(baseArchiveDir, file, startTime));
+      futures.put(file, archiveTask);
+    }
+
+    for (Map.Entry<File, Future<Boolean>> fileFutureEntry : 
futures.entrySet()) {
+      try {
+        boolean fileCleaned = fileFutureEntry.getValue().get();
+        if (!fileCleaned) {
+          LOG.warn("Couldn't archive %s into backup directory: %s"
+            .formatted(fileFutureEntry.getKey(), baseArchiveDir));
+          failures.add(fileFutureEntry.getKey());
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("HFileArchive Cleanup thread was interrupted");

Review Comment:
   Thanks for pointing. Added it now. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@hbase.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to