zhongqishang commented on code in PR #10526: URL: https://github.com/apache/iceberg/pull/10526#discussion_r1648452582
########## flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java: ########## @@ -887,6 +921,55 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception { } } + @TestTemplate + public void testCommitMultipleCheckpointsWithDuplicateData() throws Exception { + assumeThat(formatVersion) + .as("Only support equality-delete in format v2 or later.") + .isGreaterThan(1); + + assumeThat(hasPrimaryKey).as("The test case only for primary table.").isEqualTo(true); + + long timestamp = 0; + long checkpoint = 10; + + JobID jobId = new JobID(); + OperatorID operatorId; + FileAppenderFactory<RowData> appenderFactory = createDeletableAppenderFactory(); + + try (OneInputStreamOperatorTestHarness<FlinkWriteResult, Void> harness = + createStreamSink(jobId)) { + harness.setup(); + harness.open(); + operatorId = harness.getOperator().getOperatorID(); + + assertMaxCommittedCheckpointId(jobId, operatorId, -1L); + + RowData insert1 = SimpleDataUtil.createInsert(1, "aaa"); + RowData insert2 = SimpleDataUtil.createInsert(2, "bbb"); + for (int i = 1; i <= 3; i++) { + DataFile dataFile = writeDataFile("data-file-" + i, ImmutableList.of(insert1, insert2)); + DeleteFile deleteFile = + writeEqDeleteFile( + appenderFactory, "delete-file-" + i, ImmutableList.of(insert1, insert2)); + harness.processElement( + new FlinkWriteResult( + ++checkpoint, + WriteResult.builder().addDataFiles(dataFile).addDeleteFiles(deleteFile).build()), + ++timestamp); + } + + // The 1th snapshotState. + harness.snapshot(checkpoint, ++timestamp); + + // Notify the 1th snapshot to complete. + harness.notifyOfCompletedCheckpoint(checkpoint); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1, insert2), branch); + assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint); + assertFlinkManifests(0); + assertThat(table.snapshots()).hasSize(3); Review Comment: Because there are three different checkpointIds, each checkpointId will be submitted once. ########## flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java: ########## @@ -887,6 +921,55 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception { } } + @TestTemplate + public void testCommitMultipleCheckpointsWithDuplicateData() throws Exception { + assumeThat(formatVersion) + .as("Only support equality-delete in format v2 or later.") + .isGreaterThan(1); + + assumeThat(hasPrimaryKey).as("The test case only for primary table.").isEqualTo(true); + + long timestamp = 0; + long checkpoint = 10; + + JobID jobId = new JobID(); + OperatorID operatorId; + FileAppenderFactory<RowData> appenderFactory = createDeletableAppenderFactory(); + + try (OneInputStreamOperatorTestHarness<FlinkWriteResult, Void> harness = + createStreamSink(jobId)) { + harness.setup(); + harness.open(); + operatorId = harness.getOperator().getOperatorID(); + + assertMaxCommittedCheckpointId(jobId, operatorId, -1L); + + RowData insert1 = SimpleDataUtil.createInsert(1, "aaa"); + RowData insert2 = SimpleDataUtil.createInsert(2, "bbb"); + for (int i = 1; i <= 3; i++) { + DataFile dataFile = writeDataFile("data-file-" + i, ImmutableList.of(insert1, insert2)); + DeleteFile deleteFile = + writeEqDeleteFile( + appenderFactory, "delete-file-" + i, ImmutableList.of(insert1, insert2)); + harness.processElement( + new FlinkWriteResult( + ++checkpoint, + WriteResult.builder().addDataFiles(dataFile).addDeleteFiles(deleteFile).build()), + ++timestamp); + } + + // The 1th snapshotState. + harness.snapshot(checkpoint, ++timestamp); + + // Notify the 1th snapshot to complete. + harness.notifyOfCompletedCheckpoint(checkpoint); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1, insert2), branch); Review Comment: Agree, I would put the data generation in the `for` loop. ########## flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java: ########## @@ -887,6 +921,55 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception { } } + @TestTemplate + public void testCommitMultipleCheckpointsWithDuplicateData() throws Exception { + assumeThat(formatVersion) + .as("Only support equality-delete in format v2 or later.") + .isGreaterThan(1); + + assumeThat(hasPrimaryKey).as("The test case only for primary table.").isEqualTo(true); + + long timestamp = 0; + long checkpoint = 10; + + JobID jobId = new JobID(); + OperatorID operatorId; + FileAppenderFactory<RowData> appenderFactory = createDeletableAppenderFactory(); + + try (OneInputStreamOperatorTestHarness<FlinkWriteResult, Void> harness = + createStreamSink(jobId)) { + harness.setup(); + harness.open(); + operatorId = harness.getOperator().getOperatorID(); + + assertMaxCommittedCheckpointId(jobId, operatorId, -1L); + + RowData insert1 = SimpleDataUtil.createInsert(1, "aaa"); + RowData insert2 = SimpleDataUtil.createInsert(2, "bbb"); + for (int i = 1; i <= 3; i++) { + DataFile dataFile = writeDataFile("data-file-" + i, ImmutableList.of(insert1, insert2)); + DeleteFile deleteFile = Review Comment: Positional delete file will only be associated in the one snapshot, and it is not necessary to include it in this scenario. So I don't think it needs to be added to the current case. ########## flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java: ########## @@ -887,6 +921,55 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception { } } + @TestTemplate + public void testCommitMultipleCheckpointsWithDuplicateData() throws Exception { + assumeThat(formatVersion) + .as("Only support equality-delete in format v2 or later.") + .isGreaterThan(1); + + assumeThat(hasPrimaryKey).as("The test case only for primary table.").isEqualTo(true); + + long timestamp = 0; + long checkpoint = 10; + + JobID jobId = new JobID(); + OperatorID operatorId; + FileAppenderFactory<RowData> appenderFactory = createDeletableAppenderFactory(); + + try (OneInputStreamOperatorTestHarness<FlinkWriteResult, Void> harness = + createStreamSink(jobId)) { + harness.setup(); + harness.open(); + operatorId = harness.getOperator().getOperatorID(); + + assertMaxCommittedCheckpointId(jobId, operatorId, -1L); + + RowData insert1 = SimpleDataUtil.createInsert(1, "aaa"); + RowData insert2 = SimpleDataUtil.createInsert(2, "bbb"); + for (int i = 1; i <= 3; i++) { + DataFile dataFile = writeDataFile("data-file-" + i, ImmutableList.of(insert1, insert2)); + DeleteFile deleteFile = Review Comment: Positional delete file will only be associated in the one snapshot, and it is not necessary to include it in this scenario. So I don't think it needs to be added to the this test case. ########## flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java: ########## @@ -887,6 +921,55 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception { } } + @TestTemplate + public void testCommitMultipleCheckpointsWithDuplicateData() throws Exception { Review Comment: Yes, the triggering conditions are a bit complicated, and it is necessary to add javadoc to describe it clearly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org