steveloughran commented on code in PR #7127: URL: https://github.com/apache/iceberg/pull/7127#discussion_r1734723004
########## api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java: ########## @@ -144,7 +146,25 @@ default DeleteOrphanFiles equalAuthorities(Map<String, String> newEqualAuthoriti @Value.Immutable interface Result { /** Returns locations of orphan files. */ - Iterable<String> orphanFileLocations(); + default Iterable<String> orphanFileLocations() { + return StreamSupport.stream(statuses().spliterator(), false) + .map(DeleteOrphanFiles.OrphanFileStatus::location) + .collect(Collectors.toList()); + } + + Iterable<OrphanFileStatus> statuses(); + } + + interface OrphanFileStatus { Review Comment: prefer `DeleteOrphanFileOutcome` as `FileStatus` is broadly used elsewhere, same for implementation ########## spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java: ########## @@ -301,6 +301,69 @@ public void orphanedFileRemovedWithParallelTasks() throws InterruptedException, "remove-orphan-0", "remove-orphan-1", "remove-orphan-2", "remove-orphan-3")); Assert.assertEquals("Should delete 4 files", 4, deletedFiles.size()); + assertDeletedStatusAndFailureCause(result.statuses()); + } + + @Test + public void testOrphanFileDeleteThrowsException() { + Table table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), tableLocation); + + List<ThreeColumnRecord> records1 = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + Dataset<Row> df1 = spark.createDataFrame(records1, ThreeColumnRecord.class).coalesce(1); + + // original append + df1.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation); + + List<ThreeColumnRecord> records2 = + Lists.newArrayList(new ThreeColumnRecord(2, "AAAAAAAAAA", "AAAA")); + Dataset<Row> df2 = spark.createDataFrame(records2, ThreeColumnRecord.class).coalesce(1); + + // dynamic partition overwrite + df2.select("c1", "c2", "c3").write().format("iceberg").mode("overwrite").save(tableLocation); + + // second append + df2.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation); + + df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data"); + df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data/c2_trunc=AA"); + df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data/c2_trunc=AA/c3=AAAA"); + df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data/invalid/invalid"); + + waitUntilAfter(System.currentTimeMillis()); + + String locationSubstringForException = "invalid"; + DeleteOrphanFiles.Result result = + SparkActions.get() + .deleteOrphanFiles(table) + .olderThan(System.currentTimeMillis()) + .deleteWith( + file -> { + if (file.contains(locationSubstringForException)) { + throw new RuntimeException("simulating failure during file deletion"); + } + table.io().deleteFile(file); + }) + .execute(); + + Assert.assertEquals("Should delete 4 files", 4, Iterables.size(result.statuses())); Review Comment: use assertj assertions on collections here ########## spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java: ########## @@ -301,6 +301,69 @@ public void orphanedFileRemovedWithParallelTasks() throws InterruptedException, "remove-orphan-0", "remove-orphan-1", "remove-orphan-2", "remove-orphan-3")); Assert.assertEquals("Should delete 4 files", 4, deletedFiles.size()); + assertDeletedStatusAndFailureCause(result.statuses()); + } + + @Test + public void testOrphanFileDeleteThrowsException() { + Table table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), tableLocation); + + List<ThreeColumnRecord> records1 = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + Dataset<Row> df1 = spark.createDataFrame(records1, ThreeColumnRecord.class).coalesce(1); + + // original append + df1.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation); + + List<ThreeColumnRecord> records2 = + Lists.newArrayList(new ThreeColumnRecord(2, "AAAAAAAAAA", "AAAA")); + Dataset<Row> df2 = spark.createDataFrame(records2, ThreeColumnRecord.class).coalesce(1); + + // dynamic partition overwrite + df2.select("c1", "c2", "c3").write().format("iceberg").mode("overwrite").save(tableLocation); + + // second append + df2.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation); + + df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data"); + df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data/c2_trunc=AA"); + df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data/c2_trunc=AA/c3=AAAA"); + df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data/invalid/invalid"); + + waitUntilAfter(System.currentTimeMillis()); + + String locationSubstringForException = "invalid"; + DeleteOrphanFiles.Result result = + SparkActions.get() + .deleteOrphanFiles(table) + .olderThan(System.currentTimeMillis()) Review Comment: I'd propose actually makin the older than value "now + 1000" to deal with filesystem clock resolution quirks. I know, nobody will be testing on FAT32 with 2s granularity, but its still good to avoid brittle tests ########## spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java: ########## @@ -301,6 +301,69 @@ public void orphanedFileRemovedWithParallelTasks() throws InterruptedException, "remove-orphan-0", "remove-orphan-1", "remove-orphan-2", "remove-orphan-3")); Assert.assertEquals("Should delete 4 files", 4, deletedFiles.size()); + assertDeletedStatusAndFailureCause(result.statuses()); + } + + @Test + public void testOrphanFileDeleteThrowsException() { + Table table = TABLES.create(SCHEMA, SPEC, Maps.newHashMap(), tableLocation); + + List<ThreeColumnRecord> records1 = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + Dataset<Row> df1 = spark.createDataFrame(records1, ThreeColumnRecord.class).coalesce(1); + + // original append + df1.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation); + + List<ThreeColumnRecord> records2 = + Lists.newArrayList(new ThreeColumnRecord(2, "AAAAAAAAAA", "AAAA")); + Dataset<Row> df2 = spark.createDataFrame(records2, ThreeColumnRecord.class).coalesce(1); + + // dynamic partition overwrite + df2.select("c1", "c2", "c3").write().format("iceberg").mode("overwrite").save(tableLocation); + + // second append + df2.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation); + + df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data"); + df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data/c2_trunc=AA"); + df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data/c2_trunc=AA/c3=AAAA"); + df2.coalesce(1).write().mode("append").parquet(tableLocation + "/data/invalid/invalid"); + + waitUntilAfter(System.currentTimeMillis()); Review Comment: pull out to a variable and reuse on L339 ########## spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestDeleteReachableFilesAction.java: ########## @@ -286,11 +286,11 @@ public void testIgnoreMetadataFilesNotFound() { DeleteOrphanFiles.Result result = sparkActions().deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute(); - Assert.assertEquals("Should delete 1 file", 1, Iterables.size(result.orphanFileLocations())); + Assert.assertEquals("Should delete 1 file", 1, Iterables.size(result.statuses())); Review Comment: as iceberg uses assertj in tests, use it for the new/updated assertions for better error reporting -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org