slfan1989 commented on code in PR #13831:
URL: https://github.com/apache/iceberg/pull/13831#discussion_r2312498153


##########
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java:
##########
@@ -74,27 +79,183 @@ void testDeleteMissingFile() throws Exception {
     Path dummyFile =
         FileSystems.getDefault().getPath(table.location().substring(5), 
DUMMY_FILE_NAME);
 
-    deleteFile(tableLoader(), dummyFile.toString());
+    deleteFile(tableLoader(), dummyFile.toString(), true /* expectSuccess */);
 
     assertThat(listFiles(table)).isEqualTo(TABLE_FILES);
   }
 
   @Test
   void testInvalidURIScheme() throws Exception {
-    deleteFile(tableLoader(), "wrong://");
+    deleteFile(tableLoader(), "wrong://", false /* expectFail */);
 
     assertThat(listFiles(table)).isEqualTo(TABLE_FILES);
   }
 
-  private void deleteFile(TableLoader tableLoader, String fileName) throws 
Exception {
-    tableLoader().open();
+  @Test
+  void testDeleteNonExistentFile() throws Exception {
+    String nonexistentFile = "nonexistentFile.txt";
+
+    deleteFile(tableLoader(), nonexistentFile, true /* expectSuccess */);
+
+    assertThat(listFiles(table)).isEqualTo(TABLE_FILES);
+  }
+
+  @Test
+  void testDelete10MBFile() throws Exception {
+    // Simulate a large file (e.g., 10MB file)
+    String largeFileName = "largeFile.txt";
+    Path largeFile = Path.of(tablePath(table).toString(), largeFileName);
+
+    // Write a large file to disk (this will simulate the large file in the 
filesystem)
+    byte[] largeData = new byte[1024 * 1024 * 10]; // 10 MB
+    Files.write(largeFile, largeData);
+
+    // Verify that the file was created
+    Set<String> files = listFiles(table);
+    assertThat(files).contains(largeFileName);
+
+    // Use the DeleteFilesProcessor to delete the large file
+    deleteFile(tableLoader(), largeFile.toString(), true /* expectSuccess */);
+
+    // Verify that the large file has been deleted
+    files = listFiles(table);
+    assertThat(files).doesNotContain(largeFileName);
+  }
+
+  @Test
+  void testBatchDelete() throws Exception {
+    // Simulate adding multiple files
+    Set<String> filesToDelete = Sets.newHashSet(TABLE_FILES);
+    filesToDelete.add("file1.txt");
+    filesToDelete.add("file2.txt");
+
+    // Use a smaller batch size to trigger batch deletion logic
+    DeleteFilesProcessor deleteFilesProcessor =
+        new DeleteFilesProcessor(table, DUMMY_TASK_NAME, 0, 2);
     try (OneInputStreamOperatorTestHarness<String, Void> testHarness =
-        new OneInputStreamOperatorTestHarness<>(
-            new DeleteFilesProcessor(table, DUMMY_TASK_NAME, 0, 10), 
StringSerializer.INSTANCE)) {
+        new OneInputStreamOperatorTestHarness<>(deleteFilesProcessor, 
StringSerializer.INSTANCE)) {
       testHarness.open();
-      testHarness.processElement(fileName, System.currentTimeMillis());
+
+      for (String file : filesToDelete) {
+        testHarness.processElement(file, System.currentTimeMillis());
+      }
+
+      testHarness.processWatermark(EVENT_TIME);
+      testHarness.endInput();
+
+      // Verify that files are deleted
+      assertThat(listFiles(table)).isEqualTo(TABLE_FILES);
+      assertThat(deleteFilesProcessor.getSucceededCounter().getCount())
+          .isEqualTo(filesToDelete.size());
+      
assertThat(deleteFilesProcessor.getFailedCounter().getCount()).isEqualTo(0);
+      
assertThat(deleteFilesProcessor.getDeleteFileTimeMsHistogram().getStatistics().getMean())
+          .isGreaterThan(0);
+    } finally {
+      deleteFilesProcessor.close();
+    }
+  }
+
+  @Test
+  void testConcurrentDelete() throws Exception {
+    Path root = tablePath(table);
+
+    // Generate 30 test files: delete-0.txt ... delete-29.txt
+    Set<String> targets = Sets.newHashSet();
+    for (int i = 0; i < 30; i++) {
+      targets.add("delete-" + i + ".txt");
+    }
+
+    for (String f : targets) {
+      Files.write(root.resolve(f), f.getBytes(StandardCharsets.UTF_8));
+    }
+    assertThat(listFiles(table)).containsAll(targets);
+
+    DeleteFilesProcessor p1 = new DeleteFilesProcessor(table, DUMMY_TASK_NAME 
+ "-p1", 0, 2);
+    DeleteFilesProcessor p2 = new DeleteFilesProcessor(table, DUMMY_TASK_NAME 
+ "-p2", 0, 2);
+
+    // Two processors that will try to delete the same files concurrently
+    try (OneInputStreamOperatorTestHarness<String, Void> h1 =
+            new OneInputStreamOperatorTestHarness<>(p1, 
StringSerializer.INSTANCE);
+        OneInputStreamOperatorTestHarness<String, Void> h2 =
+            new OneInputStreamOperatorTestHarness<>(p2, 
StringSerializer.INSTANCE)) {
+      h1.open();
+      h2.open();
+
+      // One barrier per file: ensures p1 and p2 try to delete the same file 
at the same time
+      Map<String, CyclicBarrier> barriers = Maps.newHashMap();
+      targets.forEach(f -> barriers.put(f, new CyclicBarrier(2)));
+
+      long ts = System.currentTimeMillis();
+
+      Thread t1 =
+          new Thread(
+              () -> {
+                try {
+                  for (String f : targets) {
+                    barriers.get(f).await(2, TimeUnit.SECONDS);
+                    h1.processElement(f, ts);
+                  }
+                  h1.processWatermark(EVENT_TIME);
+                  h1.endInput();
+                } catch (Exception ignored) {
+                }
+              },
+              "deleter-p1");

Review Comment:
   Thank you for your question; it's a great one. The purpose of designing this 
unit test was to simulate a scenario where two Flink tasks are concurrently 
performing expiration snapshot cleanup, which leads to the concurrent deletion 
of metadata files. To model this, I used threads. After carefully considering 
your suggestion, I realized that using a loop to sequentially process both 
processors for deletion is also a valid approach. I've since made the necessary 
improvements to the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to