ismailsimsek commented on code in PR #11906:
URL: https://github.com/apache/iceberg/pull/11906#discussion_r1929534587


##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java:
##########
@@ -292,19 +296,77 @@ private Dataset<FileURI> validFileIdentDS() {
 
   private Dataset<FileURI> actualFileIdentDS() {
     StringToFileURI toFileURI = new StringToFileURI(equalSchemes, 
equalAuthorities);
+    Dataset<String> dataList;
     if (compareToFileList == null) {
-      return toFileURI.apply(listedFileDS());
+      dataList =
+          table.io() instanceof SupportsPrefixOperations ? listWithPrefix() : 
listWithoutPrefix();
     } else {
-      return toFileURI.apply(filteredCompareToFileList());
+      dataList = filteredCompareToFileList();
     }
+
+    return toFileURI.apply(dataList);
+  }
+
+  @VisibleForTesting
+  List<String> listLocationWithPrefix(String location, PathFilter pathFilter) {
+    List<String> matchingFiles = Lists.newArrayList();
+    try {
+      Iterator<org.apache.iceberg.io.FileInfo> iterator =
+          ((SupportsPrefixOperations) 
table.io()).listPrefix(location).iterator();
+      while (iterator.hasNext()) {
+        org.apache.iceberg.io.FileInfo fileInfo = iterator.next();
+        // NOTE: To avoid checking un necessary root folders, check the path 
relative to table
+        // location.
+        Path relativeFilePath = new Path(fileInfo.location().replace(location, 
""));
+        if (fileInfo.createdAtMillis() < olderThanTimestamp
+            && pathFilter.accept(relativeFilePath)) {
+          matchingFiles.add(fileInfo.location());
+        }
+      }
+    } catch (Exception e) {
+      if (!(e.getCause() instanceof FileNotFoundException)) {
+        throw e;
+      }
+    }
+    return matchingFiles;
+  }
+
+  @VisibleForTesting
+  Dataset<String> listWithPrefix() {
+    List<String> matchingFiles = Lists.newArrayList();
+    PathFilter pathFilter = 
PartitionAwareHiddenPathFilter.forSpecs(table.specs(), true);
+
+    if (table.locationProvider() instanceof 
LocationProviders.ObjectStoreLocationProvider) {
+      // ObjectStoreLocationProvider generates hierarchical prefixes in a 
binary fashion
+      // (0000/, 0001/, 0010/, 0011/, ...).
+      // This allows us to parallelize listing operations across these 
prefixes.
+      List<String> prefixes =
+          List.of(
+              "/0000", "/0001", "/0010", "/0011", "/0100", "/0101", "/0110", 
"/0111", "/1000",
+              "/1001", "/1010", "/1011", "/1100", "/1101", "/1110", "/1111");
+
+      String tableDataLocationRoot = 
table.locationProvider().dataLocationRoot();
+      for (String prefix : prefixes) {
+        List<String> result = listLocationWithPrefix(tableDataLocationRoot + 
prefix, pathFilter);
+        matchingFiles.addAll(result);
+      }
+
+    } else {
+      matchingFiles.addAll(listLocationWithPrefix(location, pathFilter));
+    }
+
+    JavaRDD<String> matchingFileRDD = 
sparkContext().parallelize(matchingFiles, 1);
+    return spark().createDataset(matchingFileRDD.rdd(), Encoders.STRING());

Review Comment:
   @danielcweeks @RussellSpitzer  on high level, does this reflects the idea of 
multiple listing? do we need to use spark instead of the loop, to parallelize 
it?
   
   few blocking limitations are (please correct if its wrong): 
   - listPrefix requires exact prefix, cannot use regexp
   - listPrefix requires exact folder as prefix, cannot use few characters of a 
folder as listing prefix
     - with above both scenarios and when the prefix not eixsts, 
[HadoopFileIO](https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java)
 throws `java.io.FileNotFoundException`
   - listPrefix cannot be used to list prefixes below >`xxxx/0000` or above 
<`xxxx/11111`. 
     - so only prefixes with these numeric hashes are listed. its not prosible 
to list other folders under data location since we cannot determine folder 
names.
   - had to extend `LocationProvider` to give dataLocationRoot so that we can 
use it to list exact prefix with hashes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to