slfan1989 commented on code in PR #13831:
URL: https://github.com/apache/iceberg/pull/13831#discussion_r2295439694


##########
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java:
##########
@@ -74,27 +79,172 @@ void testDeleteMissingFile() throws Exception {
     Path dummyFile =
         FileSystems.getDefault().getPath(table.location().substring(5), 
DUMMY_FILE_NAME);
 
-    deleteFile(tableLoader(), dummyFile.toString());
+    deleteFile(tableLoader(), dummyFile.toString(), true);
 
     assertThat(listFiles(table)).isEqualTo(TABLE_FILES);
   }
 
   @Test
   void testInvalidURIScheme() throws Exception {
-    deleteFile(tableLoader(), "wrong://");
+    deleteFile(tableLoader(), "wrong://", false);
 
     assertThat(listFiles(table)).isEqualTo(TABLE_FILES);
   }
 
-  private void deleteFile(TableLoader tableLoader, String fileName) throws 
Exception {
-    tableLoader().open();
+  @Test
+  void testDeleteNonExistentFile() throws Exception {
+    String nonexistentFile = "nonexistentFile.txt";
+
+    deleteFile(tableLoader(), nonexistentFile, true);
+
+    assertThat(listFiles(table)).isEqualTo(TABLE_FILES);
+  }
+
+  @Test
+  void testDeleteLargeFile() throws Exception {
+    // Simulate a large file (e.g., 100MB file)
+    String largeFileName = "largeFile.txt";
+    Path largeFile = Path.of(tablePath(table).toString(), largeFileName);
+
+    // Write a large file to disk (this will simulate the large file in the 
filesystem)
+    byte[] largeData = new byte[1024 * 1024 * 100]; // 100 MB
+    Files.write(largeFile, largeData);
+
+    // Verify that the file was created
+    Set<String> files = listFiles(table);
+    assertThat(files).contains(largeFileName);
+
+    // Use the DeleteFilesProcessor to delete the large file
+    deleteFile(tableLoader(), largeFile.toString(), true);
+
+    // Verify that the large file has been deleted
+    files = listFiles(table);
+    assertThat(files).doesNotContain(largeFileName);
+  }
+
+  private void deleteFile(TableLoader tableLoader, String fileName, boolean 
expectSuccess)
+      throws Exception {
+    tableLoader.open();
+    DeleteFilesProcessor deleteFilesProcessor =
+        new DeleteFilesProcessor(table, DUMMY_TASK_NAME, 0, 10);
     try (OneInputStreamOperatorTestHarness<String, Void> testHarness =
-        new OneInputStreamOperatorTestHarness<>(
-            new DeleteFilesProcessor(table, DUMMY_TASK_NAME, 0, 10), 
StringSerializer.INSTANCE)) {
+        new OneInputStreamOperatorTestHarness<>(deleteFilesProcessor, 
StringSerializer.INSTANCE)) {
       testHarness.open();
       testHarness.processElement(fileName, System.currentTimeMillis());
       testHarness.processWatermark(EVENT_TIME);
       testHarness.endInput();
+
+      // Validate if the metrics meet expectations
+      if (expectSuccess) {
+        
assertThat(deleteFilesProcessor.getSucceededCounter().getCount()).isEqualTo(1);
+        
assertThat(deleteFilesProcessor.getFailedCounter().getCount()).isEqualTo(0);
+        
assertThat(deleteFilesProcessor.getDeleteFileTimeMsHistogram().getStatistics().getMean())
+            .isGreaterThan(0);
+      } else {
+        
assertThat(deleteFilesProcessor.getSucceededCounter().getCount()).isEqualTo(0);
+        
assertThat(deleteFilesProcessor.getFailedCounter().getCount()).isEqualTo(1);
+        
assertThat(deleteFilesProcessor.getDeleteFileTimeMsHistogram().getStatistics().getMean())
+            .isGreaterThan(0);
+      }
+
+    } finally {
+      deleteFilesProcessor.close();
+    }
+  }
+
+  @Test
+  void testBatchDelete() throws Exception {
+    // Simulate adding multiple files
+    Set<String> filesToDelete = Sets.newHashSet(TABLE_FILES);
+    filesToDelete.add("file1.txt");
+    filesToDelete.add("file2.txt");
+
+    // Use a smaller batch size to trigger batch deletion logic
+    DeleteFilesProcessor deleteFilesProcessor =
+        new DeleteFilesProcessor(table, DUMMY_TASK_NAME, 0, 2);
+    try (OneInputStreamOperatorTestHarness<String, Void> testHarness =
+        new OneInputStreamOperatorTestHarness<>(deleteFilesProcessor, 
StringSerializer.INSTANCE)) {
+      testHarness.open();
+
+      for (String file : filesToDelete) {
+        testHarness.processElement(file, System.currentTimeMillis());
+      }
+
+      testHarness.processWatermark(EVENT_TIME);
+      testHarness.endInput();
+
+      // Verify that files are deleted
+      assertThat(listFiles(table)).isEqualTo(TABLE_FILES);
+      assertThat(deleteFilesProcessor.getSucceededCounter().getCount())
+          .isEqualTo(filesToDelete.size());
+      
assertThat(deleteFilesProcessor.getFailedCounter().getCount()).isEqualTo(0);
+      
assertThat(deleteFilesProcessor.getDeleteFileTimeMsHistogram().getStatistics().getMean())
+          .isGreaterThan(0);
+    } finally {
+      deleteFilesProcessor.close();
+    }
+  }
+
+  @Test
+  void testConcurrentDelete() throws Exception {

Review Comment:
   > How often this delete actually happens concurrently?
   > Like one file deleted by the `DeleteFilesProcessor`, and one file deleted 
by the `executorService`?
   
   You are right — the way I previously simulated concurrency may not reflect a 
realistic scenario. Instead, what we really want to validate is the case where 
multiple tasks operate on the same table at the same time, leading to 
concurrent deletions. To cover this, I have improved the unit test by using two 
DeleteFilesProcessor instances deleting files concurrently.
   
   > If we want to test real concurrency, we likely need to use more files, and 
maybe some delays.
   > If we are ok with checking concurrently deleted files, then 
`testDeleteNonExistentFile` might be enough
   
   In the latest test, I ensured that both processors attempt to delete the 
same files concurrently, which allows us to verify that DeleteFilesProcessor 
behaves as expected under concurrent conditions.
   
   From my local run, I can observe both processors deleting files at the same 
time:
   
   ```
   [deleter-p2] INFO 
org.apache.iceberg.flink.maintenance.operator.DeleteFilesProcessor - Deleted 2 
files from table hadoop.default.t using bulk deletes
   [deleter-p1] INFO 
org.apache.iceberg.flink.maintenance.operator.DeleteFilesProcessor - Deleted 2 
files from table hadoop.default.t using bulk deletes
   [deleter-p1] INFO 
org.apache.iceberg.flink.maintenance.operator.DeleteFilesProcessor - Deleted 2 
files from table hadoop.default.t using bulk deletes
   [deleter-p2] INFO 
org.apache.iceberg.flink.maintenance.operator.DeleteFilesProcessor - Deleted 2 
files from table hadoop.default.t using bulk deletes
   [deleter-p2] INFO 
org.apache.iceberg.flink.maintenance.operator.DeleteFilesProcessor - Deleted 2 
files from table hadoop.default.t using bulk deletes
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to