slfan1989 commented on code in PR #13831:
URL: https://github.com/apache/iceberg/pull/13831#discussion_r2312498153
##########
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java:
##########
@@ -74,27 +79,183 @@ void testDeleteMissingFile() throws Exception {
Path dummyFile =
FileSystems.getDefault().getPath(table.location().substring(5),
DUMMY_FILE_NAME);
- deleteFile(tableLoader(), dummyFile.toString());
+ deleteFile(tableLoader(), dummyFile.toString(), true /* expectSuccess */);
assertThat(listFiles(table)).isEqualTo(TABLE_FILES);
}
@Test
void testInvalidURIScheme() throws Exception {
- deleteFile(tableLoader(), "wrong://");
+ deleteFile(tableLoader(), "wrong://", false /* expectFail */);
assertThat(listFiles(table)).isEqualTo(TABLE_FILES);
}
- private void deleteFile(TableLoader tableLoader, String fileName) throws
Exception {
- tableLoader().open();
+ @Test
+ void testDeleteNonExistentFile() throws Exception {
+ String nonexistentFile = "nonexistentFile.txt";
+
+ deleteFile(tableLoader(), nonexistentFile, true /* expectSuccess */);
+
+ assertThat(listFiles(table)).isEqualTo(TABLE_FILES);
+ }
+
+ @Test
+ void testDelete10MBFile() throws Exception {
+ // Simulate a large file (e.g., 10MB file)
+ String largeFileName = "largeFile.txt";
+ Path largeFile = Path.of(tablePath(table).toString(), largeFileName);
+
+ // Write a large file to disk (this will simulate the large file in the
filesystem)
+ byte[] largeData = new byte[1024 * 1024 * 10]; // 10 MB
+ Files.write(largeFile, largeData);
+
+ // Verify that the file was created
+ Set<String> files = listFiles(table);
+ assertThat(files).contains(largeFileName);
+
+ // Use the DeleteFilesProcessor to delete the large file
+ deleteFile(tableLoader(), largeFile.toString(), true /* expectSuccess */);
+
+ // Verify that the large file has been deleted
+ files = listFiles(table);
+ assertThat(files).doesNotContain(largeFileName);
+ }
+
+ @Test
+ void testBatchDelete() throws Exception {
+ // Simulate adding multiple files
+ Set<String> filesToDelete = Sets.newHashSet(TABLE_FILES);
+ filesToDelete.add("file1.txt");
+ filesToDelete.add("file2.txt");
+
+ // Use a smaller batch size to trigger batch deletion logic
+ DeleteFilesProcessor deleteFilesProcessor =
+ new DeleteFilesProcessor(table, DUMMY_TASK_NAME, 0, 2);
try (OneInputStreamOperatorTestHarness<String, Void> testHarness =
- new OneInputStreamOperatorTestHarness<>(
- new DeleteFilesProcessor(table, DUMMY_TASK_NAME, 0, 10),
StringSerializer.INSTANCE)) {
+ new OneInputStreamOperatorTestHarness<>(deleteFilesProcessor,
StringSerializer.INSTANCE)) {
testHarness.open();
- testHarness.processElement(fileName, System.currentTimeMillis());
+
+ for (String file : filesToDelete) {
+ testHarness.processElement(file, System.currentTimeMillis());
+ }
+
+ testHarness.processWatermark(EVENT_TIME);
+ testHarness.endInput();
+
+ // Verify that files are deleted
+ assertThat(listFiles(table)).isEqualTo(TABLE_FILES);
+ assertThat(deleteFilesProcessor.getSucceededCounter().getCount())
+ .isEqualTo(filesToDelete.size());
+
assertThat(deleteFilesProcessor.getFailedCounter().getCount()).isEqualTo(0);
+
assertThat(deleteFilesProcessor.getDeleteFileTimeMsHistogram().getStatistics().getMean())
+ .isGreaterThan(0);
+ } finally {
+ deleteFilesProcessor.close();
+ }
+ }
+
+ @Test
+ void testConcurrentDelete() throws Exception {
+ Path root = tablePath(table);
+
+ // Generate 30 test files: delete-0.txt ... delete-29.txt
+ Set<String> targets = Sets.newHashSet();
+ for (int i = 0; i < 30; i++) {
+ targets.add("delete-" + i + ".txt");
+ }
+
+ for (String f : targets) {
+ Files.write(root.resolve(f), f.getBytes(StandardCharsets.UTF_8));
+ }
+ assertThat(listFiles(table)).containsAll(targets);
+
+ DeleteFilesProcessor p1 = new DeleteFilesProcessor(table, DUMMY_TASK_NAME
+ "-p1", 0, 2);
+ DeleteFilesProcessor p2 = new DeleteFilesProcessor(table, DUMMY_TASK_NAME
+ "-p2", 0, 2);
+
+ // Two processors that will try to delete the same files concurrently
+ try (OneInputStreamOperatorTestHarness<String, Void> h1 =
+ new OneInputStreamOperatorTestHarness<>(p1,
StringSerializer.INSTANCE);
+ OneInputStreamOperatorTestHarness<String, Void> h2 =
+ new OneInputStreamOperatorTestHarness<>(p2,
StringSerializer.INSTANCE)) {
+ h1.open();
+ h2.open();
+
+ // One barrier per file: ensures p1 and p2 try to delete the same file
at the same time
+ Map<String, CyclicBarrier> barriers = Maps.newHashMap();
+ targets.forEach(f -> barriers.put(f, new CyclicBarrier(2)));
+
+ long ts = System.currentTimeMillis();
+
+ Thread t1 =
+ new Thread(
+ () -> {
+ try {
+ for (String f : targets) {
+ barriers.get(f).await(2, TimeUnit.SECONDS);
+ h1.processElement(f, ts);
+ }
+ h1.processWatermark(EVENT_TIME);
+ h1.endInput();
+ } catch (Exception ignored) {
+ }
+ },
+ "deleter-p1");
Review Comment:
Thank you for your question; it's a great one. The purpose of designing this
unit test was to simulate a scenario where two Flink tasks are concurrently
performing expiration snapshot cleanup, which leads to the concurrent deletion
of metadata files. To model this, I used threads. After carefully considering
your suggestion, I realized that using a loop to sequentially process both
processors for deletion is also a valid approach. I've since made the necessary
improvements to the code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]