zhongqishang commented on code in PR #10526:
URL: https://github.com/apache/iceberg/pull/10526#discussion_r1648452582


##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -887,6 +921,55 @@ public void testCommitTwoCheckpointsInSingleTxn() throws 
Exception {
     }
   }
 
+  @TestTemplate
+  public void testCommitMultipleCheckpointsWithDuplicateData() throws 
Exception {
+    assumeThat(formatVersion)
+        .as("Only support equality-delete in format v2 or later.")
+        .isGreaterThan(1);
+
+    assumeThat(hasPrimaryKey).as("The test case only for primary 
table.").isEqualTo(true);
+
+    long timestamp = 0;
+    long checkpoint = 10;
+
+    JobID jobId = new JobID();
+    OperatorID operatorId;
+    FileAppenderFactory<RowData> appenderFactory = 
createDeletableAppenderFactory();
+
+    try (OneInputStreamOperatorTestHarness<FlinkWriteResult, Void> harness =
+        createStreamSink(jobId)) {
+      harness.setup();
+      harness.open();
+      operatorId = harness.getOperator().getOperatorID();
+
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
+
+      RowData insert1 = SimpleDataUtil.createInsert(1, "aaa");
+      RowData insert2 = SimpleDataUtil.createInsert(2, "bbb");
+      for (int i = 1; i <= 3; i++) {
+        DataFile dataFile = writeDataFile("data-file-" + i, 
ImmutableList.of(insert1, insert2));
+        DeleteFile deleteFile =
+            writeEqDeleteFile(
+                appenderFactory, "delete-file-" + i, ImmutableList.of(insert1, 
insert2));
+        harness.processElement(
+            new FlinkWriteResult(
+                ++checkpoint,
+                
WriteResult.builder().addDataFiles(dataFile).addDeleteFiles(deleteFile).build()),
+            ++timestamp);
+      }
+
+      // The 1th snapshotState.
+      harness.snapshot(checkpoint, ++timestamp);
+
+      // Notify the 1th snapshot to complete.
+      harness.notifyOfCompletedCheckpoint(checkpoint);
+      SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1, 
insert2), branch);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
+      assertFlinkManifests(0);
+      assertThat(table.snapshots()).hasSize(3);

Review Comment:
   Because there are three different checkpointIds, each checkpointId will be 
submitted once.



##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -887,6 +921,55 @@ public void testCommitTwoCheckpointsInSingleTxn() throws 
Exception {
     }
   }
 
+  @TestTemplate
+  public void testCommitMultipleCheckpointsWithDuplicateData() throws 
Exception {
+    assumeThat(formatVersion)
+        .as("Only support equality-delete in format v2 or later.")
+        .isGreaterThan(1);
+
+    assumeThat(hasPrimaryKey).as("The test case only for primary 
table.").isEqualTo(true);
+
+    long timestamp = 0;
+    long checkpoint = 10;
+
+    JobID jobId = new JobID();
+    OperatorID operatorId;
+    FileAppenderFactory<RowData> appenderFactory = 
createDeletableAppenderFactory();
+
+    try (OneInputStreamOperatorTestHarness<FlinkWriteResult, Void> harness =
+        createStreamSink(jobId)) {
+      harness.setup();
+      harness.open();
+      operatorId = harness.getOperator().getOperatorID();
+
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
+
+      RowData insert1 = SimpleDataUtil.createInsert(1, "aaa");
+      RowData insert2 = SimpleDataUtil.createInsert(2, "bbb");
+      for (int i = 1; i <= 3; i++) {
+        DataFile dataFile = writeDataFile("data-file-" + i, 
ImmutableList.of(insert1, insert2));
+        DeleteFile deleteFile =
+            writeEqDeleteFile(
+                appenderFactory, "delete-file-" + i, ImmutableList.of(insert1, 
insert2));
+        harness.processElement(
+            new FlinkWriteResult(
+                ++checkpoint,
+                
WriteResult.builder().addDataFiles(dataFile).addDeleteFiles(deleteFile).build()),
+            ++timestamp);
+      }
+
+      // The 1th snapshotState.
+      harness.snapshot(checkpoint, ++timestamp);
+
+      // Notify the 1th snapshot to complete.
+      harness.notifyOfCompletedCheckpoint(checkpoint);
+      SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1, 
insert2), branch);

Review Comment:
   Agree, I would put the data generation in the `for` loop.



##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -887,6 +921,55 @@ public void testCommitTwoCheckpointsInSingleTxn() throws 
Exception {
     }
   }
 
+  @TestTemplate
+  public void testCommitMultipleCheckpointsWithDuplicateData() throws 
Exception {
+    assumeThat(formatVersion)
+        .as("Only support equality-delete in format v2 or later.")
+        .isGreaterThan(1);
+
+    assumeThat(hasPrimaryKey).as("The test case only for primary 
table.").isEqualTo(true);
+
+    long timestamp = 0;
+    long checkpoint = 10;
+
+    JobID jobId = new JobID();
+    OperatorID operatorId;
+    FileAppenderFactory<RowData> appenderFactory = 
createDeletableAppenderFactory();
+
+    try (OneInputStreamOperatorTestHarness<FlinkWriteResult, Void> harness =
+        createStreamSink(jobId)) {
+      harness.setup();
+      harness.open();
+      operatorId = harness.getOperator().getOperatorID();
+
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
+
+      RowData insert1 = SimpleDataUtil.createInsert(1, "aaa");
+      RowData insert2 = SimpleDataUtil.createInsert(2, "bbb");
+      for (int i = 1; i <= 3; i++) {
+        DataFile dataFile = writeDataFile("data-file-" + i, 
ImmutableList.of(insert1, insert2));
+        DeleteFile deleteFile =

Review Comment:
   Positional delete file will only be associated in the one snapshot, and it 
is not necessary to include it in this scenario. 
   So I don't think it needs to be added to the current case.



##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -887,6 +921,55 @@ public void testCommitTwoCheckpointsInSingleTxn() throws 
Exception {
     }
   }
 
+  @TestTemplate
+  public void testCommitMultipleCheckpointsWithDuplicateData() throws 
Exception {
+    assumeThat(formatVersion)
+        .as("Only support equality-delete in format v2 or later.")
+        .isGreaterThan(1);
+
+    assumeThat(hasPrimaryKey).as("The test case only for primary 
table.").isEqualTo(true);
+
+    long timestamp = 0;
+    long checkpoint = 10;
+
+    JobID jobId = new JobID();
+    OperatorID operatorId;
+    FileAppenderFactory<RowData> appenderFactory = 
createDeletableAppenderFactory();
+
+    try (OneInputStreamOperatorTestHarness<FlinkWriteResult, Void> harness =
+        createStreamSink(jobId)) {
+      harness.setup();
+      harness.open();
+      operatorId = harness.getOperator().getOperatorID();
+
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
+
+      RowData insert1 = SimpleDataUtil.createInsert(1, "aaa");
+      RowData insert2 = SimpleDataUtil.createInsert(2, "bbb");
+      for (int i = 1; i <= 3; i++) {
+        DataFile dataFile = writeDataFile("data-file-" + i, 
ImmutableList.of(insert1, insert2));
+        DeleteFile deleteFile =

Review Comment:
   Positional delete file will only be associated in the one snapshot, and it 
is not necessary to include it in this scenario. 
   So I don't think it needs to be added to the this test case.



##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -887,6 +921,55 @@ public void testCommitTwoCheckpointsInSingleTxn() throws 
Exception {
     }
   }
 
+  @TestTemplate
+  public void testCommitMultipleCheckpointsWithDuplicateData() throws 
Exception {

Review Comment:
   Yes, the triggering conditions are a bit complicated, and it is necessary to 
add javadoc to describe it clearly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to