pvary commented on code in PR #13831:
URL: https://github.com/apache/iceberg/pull/13831#discussion_r2298135927


##########
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java:
##########
@@ -74,27 +79,183 @@ void testDeleteMissingFile() throws Exception {
     Path dummyFile =
         FileSystems.getDefault().getPath(table.location().substring(5), 
DUMMY_FILE_NAME);
 
-    deleteFile(tableLoader(), dummyFile.toString());
+    deleteFile(tableLoader(), dummyFile.toString(), true /* expectSuccess */);
 
     assertThat(listFiles(table)).isEqualTo(TABLE_FILES);
   }
 
   @Test
   void testInvalidURIScheme() throws Exception {
-    deleteFile(tableLoader(), "wrong://");
+    deleteFile(tableLoader(), "wrong://", false /* expectFail */);
 
     assertThat(listFiles(table)).isEqualTo(TABLE_FILES);
   }
 
-  private void deleteFile(TableLoader tableLoader, String fileName) throws 
Exception {
-    tableLoader().open();
+  @Test
+  void testDeleteNonExistentFile() throws Exception {
+    String nonexistentFile = "nonexistentFile.txt";
+
+    deleteFile(tableLoader(), nonexistentFile, true /* expectSuccess */);
+
+    assertThat(listFiles(table)).isEqualTo(TABLE_FILES);
+  }
+
+  @Test
+  void testDelete10MBFile() throws Exception {
+    // Simulate a large file (e.g., 10MB file)
+    String largeFileName = "largeFile.txt";
+    Path largeFile = Path.of(tablePath(table).toString(), largeFileName);
+
+    // Write a large file to disk (this will simulate the large file in the 
filesystem)
+    byte[] largeData = new byte[1024 * 1024 * 10]; // 10 MB
+    Files.write(largeFile, largeData);
+
+    // Verify that the file was created
+    Set<String> files = listFiles(table);
+    assertThat(files).contains(largeFileName);
+
+    // Use the DeleteFilesProcessor to delete the large file
+    deleteFile(tableLoader(), largeFile.toString(), true /* expectSuccess */);
+
+    // Verify that the large file has been deleted
+    files = listFiles(table);
+    assertThat(files).doesNotContain(largeFileName);
+  }
+
+  @Test
+  void testBatchDelete() throws Exception {
+    // Simulate adding multiple files
+    Set<String> filesToDelete = Sets.newHashSet(TABLE_FILES);
+    filesToDelete.add("file1.txt");
+    filesToDelete.add("file2.txt");
+
+    // Use a smaller batch size to trigger batch deletion logic
+    DeleteFilesProcessor deleteFilesProcessor =
+        new DeleteFilesProcessor(table, DUMMY_TASK_NAME, 0, 2);
     try (OneInputStreamOperatorTestHarness<String, Void> testHarness =
-        new OneInputStreamOperatorTestHarness<>(
-            new DeleteFilesProcessor(table, DUMMY_TASK_NAME, 0, 10), 
StringSerializer.INSTANCE)) {
+        new OneInputStreamOperatorTestHarness<>(deleteFilesProcessor, 
StringSerializer.INSTANCE)) {
       testHarness.open();
-      testHarness.processElement(fileName, System.currentTimeMillis());
+
+      for (String file : filesToDelete) {
+        testHarness.processElement(file, System.currentTimeMillis());
+      }
+
+      testHarness.processWatermark(EVENT_TIME);
+      testHarness.endInput();
+
+      // Verify that files are deleted
+      assertThat(listFiles(table)).isEqualTo(TABLE_FILES);
+      assertThat(deleteFilesProcessor.getSucceededCounter().getCount())
+          .isEqualTo(filesToDelete.size());
+      
assertThat(deleteFilesProcessor.getFailedCounter().getCount()).isEqualTo(0);
+      
assertThat(deleteFilesProcessor.getDeleteFileTimeMsHistogram().getStatistics().getMean())
+          .isGreaterThan(0);
+    } finally {
+      deleteFilesProcessor.close();
+    }
+  }
+
+  @Test
+  void testConcurrentDelete() throws Exception {
+    Path root = tablePath(table);
+
+    // Generate 30 test files: delete-0.txt ... delete-29.txt
+    Set<String> targets = Sets.newHashSet();
+    for (int i = 0; i < 30; i++) {
+      targets.add("delete-" + i + ".txt");
+    }
+
+    for (String f : targets) {
+      Files.write(root.resolve(f), f.getBytes(StandardCharsets.UTF_8));
+    }

Review Comment:
   Why not merge the 2 `for` loops?
   
   Also add newline after every block. See: 
https://iceberg.apache.org/contribute/#block-spacing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to