steveloughran commented on code in PR #7127:
URL: https://github.com/apache/iceberg/pull/7127#discussion_r1734723004


##########
api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java:
##########
@@ -144,7 +146,25 @@ default DeleteOrphanFiles equalAuthorities(Map<String, 
String> newEqualAuthoriti
   @Value.Immutable
   interface Result {
     /** Returns locations of orphan files. */
-    Iterable<String> orphanFileLocations();
+    default Iterable<String> orphanFileLocations() {
+      return StreamSupport.stream(statuses().spliterator(), false)
+          .map(DeleteOrphanFiles.OrphanFileStatus::location)
+          .collect(Collectors.toList());
+    }
+
+    Iterable<OrphanFileStatus> statuses();
+  }
+
+  interface OrphanFileStatus {

Review Comment:
   prefer `DeleteOrphanFileOutcome` as `FileStatus` is broadly used elsewhere, 
same for implementation



##########
spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java:
##########
@@ -301,6 +301,69 @@ public void orphanedFileRemovedWithParallelTasks() throws 
InterruptedException,
             "remove-orphan-0", "remove-orphan-1", "remove-orphan-2", 
"remove-orphan-3"));
 
     Assert.assertEquals("Should delete 4 files", 4, deletedFiles.size());
+    assertDeletedStatusAndFailureCause(result.statuses());
+  }
+
+  @Test
+  public void testOrphanFileDeleteThrowsException() {
+    Table table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), 
tableLocation);
+
+    List<ThreeColumnRecord> records1 =
+        Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
+    Dataset<Row> df1 = spark.createDataFrame(records1, 
ThreeColumnRecord.class).coalesce(1);
+
+    // original append
+    df1.select("c1", "c2", 
"c3").write().format("iceberg").mode("append").save(tableLocation);
+
+    List<ThreeColumnRecord> records2 =
+        Lists.newArrayList(new ThreeColumnRecord(2, "AAAAAAAAAA", "AAAA"));
+    Dataset<Row> df2 = spark.createDataFrame(records2, 
ThreeColumnRecord.class).coalesce(1);
+
+    // dynamic partition overwrite
+    df2.select("c1", "c2", 
"c3").write().format("iceberg").mode("overwrite").save(tableLocation);
+
+    // second append
+    df2.select("c1", "c2", 
"c3").write().format("iceberg").mode("append").save(tableLocation);
+
+    df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data");
+    df2.coalesce(1).write().mode("append").parquet(tableLocation + 
"/data/c2_trunc=AA");
+    df2.coalesce(1).write().mode("append").parquet(tableLocation + 
"/data/c2_trunc=AA/c3=AAAA");
+    df2.coalesce(1).write().mode("append").parquet(tableLocation + 
"/data/invalid/invalid");
+
+    waitUntilAfter(System.currentTimeMillis());
+
+    String locationSubstringForException = "invalid";
+    DeleteOrphanFiles.Result result =
+        SparkActions.get()
+            .deleteOrphanFiles(table)
+            .olderThan(System.currentTimeMillis())
+            .deleteWith(
+                file -> {
+                  if (file.contains(locationSubstringForException)) {
+                    throw new RuntimeException("simulating failure during file 
deletion");
+                  }
+                  table.io().deleteFile(file);
+                })
+            .execute();
+
+    Assert.assertEquals("Should delete 4 files", 4, 
Iterables.size(result.statuses()));

Review Comment:
   use assertj assertions on collections here



##########
spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java:
##########
@@ -301,6 +301,69 @@ public void orphanedFileRemovedWithParallelTasks() throws 
InterruptedException,
             "remove-orphan-0", "remove-orphan-1", "remove-orphan-2", 
"remove-orphan-3"));
 
     Assert.assertEquals("Should delete 4 files", 4, deletedFiles.size());
+    assertDeletedStatusAndFailureCause(result.statuses());
+  }
+
+  @Test
+  public void testOrphanFileDeleteThrowsException() {
+    Table table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), 
tableLocation);
+
+    List<ThreeColumnRecord> records1 =
+        Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
+    Dataset<Row> df1 = spark.createDataFrame(records1, 
ThreeColumnRecord.class).coalesce(1);
+
+    // original append
+    df1.select("c1", "c2", 
"c3").write().format("iceberg").mode("append").save(tableLocation);
+
+    List<ThreeColumnRecord> records2 =
+        Lists.newArrayList(new ThreeColumnRecord(2, "AAAAAAAAAA", "AAAA"));
+    Dataset<Row> df2 = spark.createDataFrame(records2, 
ThreeColumnRecord.class).coalesce(1);
+
+    // dynamic partition overwrite
+    df2.select("c1", "c2", 
"c3").write().format("iceberg").mode("overwrite").save(tableLocation);
+
+    // second append
+    df2.select("c1", "c2", 
"c3").write().format("iceberg").mode("append").save(tableLocation);
+
+    df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data");
+    df2.coalesce(1).write().mode("append").parquet(tableLocation + 
"/data/c2_trunc=AA");
+    df2.coalesce(1).write().mode("append").parquet(tableLocation + 
"/data/c2_trunc=AA/c3=AAAA");
+    df2.coalesce(1).write().mode("append").parquet(tableLocation + 
"/data/invalid/invalid");
+
+    waitUntilAfter(System.currentTimeMillis());
+
+    String locationSubstringForException = "invalid";
+    DeleteOrphanFiles.Result result =
+        SparkActions.get()
+            .deleteOrphanFiles(table)
+            .olderThan(System.currentTimeMillis())

Review Comment:
   I'd propose actually makin the older than value "now + 1000" to deal with 
filesystem clock resolution quirks. I know, nobody will be testing on FAT32 
with 2s granularity, but its still good to avoid brittle tests



##########
spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java:
##########
@@ -301,6 +301,69 @@ public void orphanedFileRemovedWithParallelTasks() throws 
InterruptedException,
             "remove-orphan-0", "remove-orphan-1", "remove-orphan-2", 
"remove-orphan-3"));
 
     Assert.assertEquals("Should delete 4 files", 4, deletedFiles.size());
+    assertDeletedStatusAndFailureCause(result.statuses());
+  }
+
+  @Test
+  public void testOrphanFileDeleteThrowsException() {
+    Table table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), 
tableLocation);
+
+    List<ThreeColumnRecord> records1 =
+        Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));
+    Dataset<Row> df1 = spark.createDataFrame(records1, 
ThreeColumnRecord.class).coalesce(1);
+
+    // original append
+    df1.select("c1", "c2", 
"c3").write().format("iceberg").mode("append").save(tableLocation);
+
+    List<ThreeColumnRecord> records2 =
+        Lists.newArrayList(new ThreeColumnRecord(2, "AAAAAAAAAA", "AAAA"));
+    Dataset<Row> df2 = spark.createDataFrame(records2, 
ThreeColumnRecord.class).coalesce(1);
+
+    // dynamic partition overwrite
+    df2.select("c1", "c2", 
"c3").write().format("iceberg").mode("overwrite").save(tableLocation);
+
+    // second append
+    df2.select("c1", "c2", 
"c3").write().format("iceberg").mode("append").save(tableLocation);
+
+    df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data");
+    df2.coalesce(1).write().mode("append").parquet(tableLocation + 
"/data/c2_trunc=AA");
+    df2.coalesce(1).write().mode("append").parquet(tableLocation + 
"/data/c2_trunc=AA/c3=AAAA");
+    df2.coalesce(1).write().mode("append").parquet(tableLocation + 
"/data/invalid/invalid");
+
+    waitUntilAfter(System.currentTimeMillis());

Review Comment:
   pull out to a variable and reuse on L339



##########
spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestDeleteReachableFilesAction.java:
##########
@@ -286,11 +286,11 @@ public void testIgnoreMetadataFilesNotFound() {
     DeleteOrphanFiles.Result result =
         
sparkActions().deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
 
-    Assert.assertEquals("Should delete 1 file", 1, 
Iterables.size(result.orphanFileLocations()));
+    Assert.assertEquals("Should delete 1 file", 1, 
Iterables.size(result.statuses()));

Review Comment:
   as iceberg uses assertj in tests, use it for the new/updated assertions for 
better error reporting



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to