pvary commented on code in PR #10526: URL: https://github.com/apache/iceberg/pull/10526#discussion_r1648420475
########## flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java: ########## @@ -887,6 +921,55 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception { } } + @TestTemplate + public void testCommitMultipleCheckpointsWithDuplicateData() throws Exception { Review Comment: Nit: maybe a better name, like `testCommitMultipleCheckpointsForV2Table`? Or anything where we highlight that the goal is to create different commits for different checkpoints, even if the WriteResults arrive at the same time. ########## flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java: ########## @@ -887,6 +921,55 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception { } } + @TestTemplate + public void testCommitMultipleCheckpointsWithDuplicateData() throws Exception { Review Comment: I'm not satisfied with the name (even the on I suggested), so probably the best would be to add javadoc to the test method describing the scenario we are testing ########## flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java: ########## @@ -887,6 +921,55 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception { } } + @TestTemplate + public void testCommitMultipleCheckpointsWithDuplicateData() throws Exception { + assumeThat(formatVersion) + .as("Only support equality-delete in format v2 or later.") + .isGreaterThan(1); + + assumeThat(hasPrimaryKey).as("The test case only for primary table.").isEqualTo(true); + + long timestamp = 0; + long checkpoint = 10; + + JobID jobId = new JobID(); + OperatorID operatorId; + FileAppenderFactory<RowData> appenderFactory = createDeletableAppenderFactory(); + + try (OneInputStreamOperatorTestHarness<FlinkWriteResult, Void> harness = + createStreamSink(jobId)) { + harness.setup(); + harness.open(); + operatorId = harness.getOperator().getOperatorID(); + + assertMaxCommittedCheckpointId(jobId, operatorId, -1L); + + RowData insert1 = SimpleDataUtil.createInsert(1, "aaa"); + RowData insert2 = SimpleDataUtil.createInsert(2, "bbb"); + for (int i = 1; i <= 3; i++) { + DataFile dataFile = writeDataFile("data-file-" + i, ImmutableList.of(insert1, insert2)); + DeleteFile deleteFile = Review Comment: Do we want to add a positional delete file too? Just for completeness shake? Or it adds to much complexity to the test? ########## flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java: ########## @@ -887,6 +921,55 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception { } } + @TestTemplate + public void testCommitMultipleCheckpointsWithDuplicateData() throws Exception { + assumeThat(formatVersion) + .as("Only support equality-delete in format v2 or later.") + .isGreaterThan(1); + + assumeThat(hasPrimaryKey).as("The test case only for primary table.").isEqualTo(true); + + long timestamp = 0; + long checkpoint = 10; + + JobID jobId = new JobID(); + OperatorID operatorId; + FileAppenderFactory<RowData> appenderFactory = createDeletableAppenderFactory(); + + try (OneInputStreamOperatorTestHarness<FlinkWriteResult, Void> harness = + createStreamSink(jobId)) { + harness.setup(); + harness.open(); + operatorId = harness.getOperator().getOperatorID(); + + assertMaxCommittedCheckpointId(jobId, operatorId, -1L); + + RowData insert1 = SimpleDataUtil.createInsert(1, "aaa"); + RowData insert2 = SimpleDataUtil.createInsert(2, "bbb"); + for (int i = 1; i <= 3; i++) { + DataFile dataFile = writeDataFile("data-file-" + i, ImmutableList.of(insert1, insert2)); + DeleteFile deleteFile = + writeEqDeleteFile( + appenderFactory, "delete-file-" + i, ImmutableList.of(insert1, insert2)); + harness.processElement( + new FlinkWriteResult( + ++checkpoint, + WriteResult.builder().addDataFiles(dataFile).addDeleteFiles(deleteFile).build()), + ++timestamp); + } + + // The 1th snapshotState. + harness.snapshot(checkpoint, ++timestamp); + + // Notify the 1th snapshot to complete. + harness.notifyOfCompletedCheckpoint(checkpoint); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1, insert2), branch); Review Comment: Can we change something in the rows, so we can be sure, that the last ones that we are seeing here? ########## flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java: ########## @@ -887,6 +921,55 @@ public void testCommitTwoCheckpointsInSingleTxn() throws Exception { } } + @TestTemplate + public void testCommitMultipleCheckpointsWithDuplicateData() throws Exception { + assumeThat(formatVersion) + .as("Only support equality-delete in format v2 or later.") + .isGreaterThan(1); + + assumeThat(hasPrimaryKey).as("The test case only for primary table.").isEqualTo(true); + + long timestamp = 0; + long checkpoint = 10; + + JobID jobId = new JobID(); + OperatorID operatorId; + FileAppenderFactory<RowData> appenderFactory = createDeletableAppenderFactory(); + + try (OneInputStreamOperatorTestHarness<FlinkWriteResult, Void> harness = + createStreamSink(jobId)) { + harness.setup(); + harness.open(); + operatorId = harness.getOperator().getOperatorID(); + + assertMaxCommittedCheckpointId(jobId, operatorId, -1L); + + RowData insert1 = SimpleDataUtil.createInsert(1, "aaa"); + RowData insert2 = SimpleDataUtil.createInsert(2, "bbb"); + for (int i = 1; i <= 3; i++) { + DataFile dataFile = writeDataFile("data-file-" + i, ImmutableList.of(insert1, insert2)); + DeleteFile deleteFile = + writeEqDeleteFile( + appenderFactory, "delete-file-" + i, ImmutableList.of(insert1, insert2)); + harness.processElement( + new FlinkWriteResult( + ++checkpoint, + WriteResult.builder().addDataFiles(dataFile).addDeleteFiles(deleteFile).build()), + ++timestamp); + } + + // The 1th snapshotState. + harness.snapshot(checkpoint, ++timestamp); + + // Notify the 1th snapshot to complete. + harness.notifyOfCompletedCheckpoint(checkpoint); + SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1, insert2), branch); + assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint); + assertFlinkManifests(0); + assertThat(table.snapshots()).hasSize(3); Review Comment: Why is this 3? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org