slfan1989 commented on code in PR #13831:
URL: https://github.com/apache/iceberg/pull/13831#discussion_r2295439694
##########
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java:
##########
@@ -74,27 +79,172 @@ void testDeleteMissingFile() throws Exception {
Path dummyFile =
FileSystems.getDefault().getPath(table.location().substring(5),
DUMMY_FILE_NAME);
- deleteFile(tableLoader(), dummyFile.toString());
+ deleteFile(tableLoader(), dummyFile.toString(), true);
assertThat(listFiles(table)).isEqualTo(TABLE_FILES);
}
@Test
void testInvalidURIScheme() throws Exception {
- deleteFile(tableLoader(), "wrong://");
+ deleteFile(tableLoader(), "wrong://", false);
assertThat(listFiles(table)).isEqualTo(TABLE_FILES);
}
- private void deleteFile(TableLoader tableLoader, String fileName) throws
Exception {
- tableLoader().open();
+ @Test
+ void testDeleteNonExistentFile() throws Exception {
+ String nonexistentFile = "nonexistentFile.txt";
+
+ deleteFile(tableLoader(), nonexistentFile, true);
+
+ assertThat(listFiles(table)).isEqualTo(TABLE_FILES);
+ }
+
+ @Test
+ void testDeleteLargeFile() throws Exception {
+ // Simulate a large file (e.g., 100MB file)
+ String largeFileName = "largeFile.txt";
+ Path largeFile = Path.of(tablePath(table).toString(), largeFileName);
+
+ // Write a large file to disk (this will simulate the large file in the
filesystem)
+ byte[] largeData = new byte[1024 * 1024 * 100]; // 100 MB
+ Files.write(largeFile, largeData);
+
+ // Verify that the file was created
+ Set<String> files = listFiles(table);
+ assertThat(files).contains(largeFileName);
+
+ // Use the DeleteFilesProcessor to delete the large file
+ deleteFile(tableLoader(), largeFile.toString(), true);
+
+ // Verify that the large file has been deleted
+ files = listFiles(table);
+ assertThat(files).doesNotContain(largeFileName);
+ }
+
+ private void deleteFile(TableLoader tableLoader, String fileName, boolean
expectSuccess)
+ throws Exception {
+ tableLoader.open();
+ DeleteFilesProcessor deleteFilesProcessor =
+ new DeleteFilesProcessor(table, DUMMY_TASK_NAME, 0, 10);
try (OneInputStreamOperatorTestHarness<String, Void> testHarness =
- new OneInputStreamOperatorTestHarness<>(
- new DeleteFilesProcessor(table, DUMMY_TASK_NAME, 0, 10),
StringSerializer.INSTANCE)) {
+ new OneInputStreamOperatorTestHarness<>(deleteFilesProcessor,
StringSerializer.INSTANCE)) {
testHarness.open();
testHarness.processElement(fileName, System.currentTimeMillis());
testHarness.processWatermark(EVENT_TIME);
testHarness.endInput();
+
+ // Validate if the metrics meet expectations
+ if (expectSuccess) {
+
assertThat(deleteFilesProcessor.getSucceededCounter().getCount()).isEqualTo(1);
+
assertThat(deleteFilesProcessor.getFailedCounter().getCount()).isEqualTo(0);
+
assertThat(deleteFilesProcessor.getDeleteFileTimeMsHistogram().getStatistics().getMean())
+ .isGreaterThan(0);
+ } else {
+
assertThat(deleteFilesProcessor.getSucceededCounter().getCount()).isEqualTo(0);
+
assertThat(deleteFilesProcessor.getFailedCounter().getCount()).isEqualTo(1);
+
assertThat(deleteFilesProcessor.getDeleteFileTimeMsHistogram().getStatistics().getMean())
+ .isGreaterThan(0);
+ }
+
+ } finally {
+ deleteFilesProcessor.close();
+ }
+ }
+
+ @Test
+ void testBatchDelete() throws Exception {
+ // Simulate adding multiple files
+ Set<String> filesToDelete = Sets.newHashSet(TABLE_FILES);
+ filesToDelete.add("file1.txt");
+ filesToDelete.add("file2.txt");
+
+ // Use a smaller batch size to trigger batch deletion logic
+ DeleteFilesProcessor deleteFilesProcessor =
+ new DeleteFilesProcessor(table, DUMMY_TASK_NAME, 0, 2);
+ try (OneInputStreamOperatorTestHarness<String, Void> testHarness =
+ new OneInputStreamOperatorTestHarness<>(deleteFilesProcessor,
StringSerializer.INSTANCE)) {
+ testHarness.open();
+
+ for (String file : filesToDelete) {
+ testHarness.processElement(file, System.currentTimeMillis());
+ }
+
+ testHarness.processWatermark(EVENT_TIME);
+ testHarness.endInput();
+
+ // Verify that files are deleted
+ assertThat(listFiles(table)).isEqualTo(TABLE_FILES);
+ assertThat(deleteFilesProcessor.getSucceededCounter().getCount())
+ .isEqualTo(filesToDelete.size());
+
assertThat(deleteFilesProcessor.getFailedCounter().getCount()).isEqualTo(0);
+
assertThat(deleteFilesProcessor.getDeleteFileTimeMsHistogram().getStatistics().getMean())
+ .isGreaterThan(0);
+ } finally {
+ deleteFilesProcessor.close();
+ }
+ }
+
+ @Test
+ void testConcurrentDelete() throws Exception {
Review Comment:
> How often this delete actually happens concurrently?
> Like one file deleted by the `DeleteFilesProcessor`, and one file deleted
by the `executorService`?
You are right — the way I previously simulated concurrency may not reflect a
realistic scenario. Instead, what we really want to validate is the case where
multiple tasks operate on the same table at the same time, leading to
concurrent deletions. To cover this, I have improved the unit test by using two
DeleteFilesProcessor instances deleting files concurrently.
> If we want to test real concurrency, we likely need to use more files, and
maybe some delays.
> If we are ok with checking concurrently deleted files, then
`testDeleteNonExistentFile` might be enough
In the latest test, I ensured that both processors attempt to delete the
same files concurrently, which allows us to verify that DeleteFilesProcessor
behaves as expected under concurrent conditions.
From my local run, I can observe both processors deleting files at the same
time:
```
[deleter-p2] INFO
org.apache.iceberg.flink.maintenance.operator.DeleteFilesProcessor - Deleted 2
files from table hadoop.default.t using bulk deletes
[deleter-p1] INFO
org.apache.iceberg.flink.maintenance.operator.DeleteFilesProcessor - Deleted 2
files from table hadoop.default.t using bulk deletes
[deleter-p1] INFO
org.apache.iceberg.flink.maintenance.operator.DeleteFilesProcessor - Deleted 2
files from table hadoop.default.t using bulk deletes
[deleter-p2] INFO
org.apache.iceberg.flink.maintenance.operator.DeleteFilesProcessor - Deleted 2
files from table hadoop.default.t using bulk deletes
[deleter-p2] INFO
org.apache.iceberg.flink.maintenance.operator.DeleteFilesProcessor - Deleted 2
files from table hadoop.default.t using bulk deletes
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]