pvary commented on code in PR #10526:
URL: https://github.com/apache/iceberg/pull/10526#discussion_r1648420475


##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -887,6 +921,55 @@ public void testCommitTwoCheckpointsInSingleTxn() throws 
Exception {
     }
   }
 
+  @TestTemplate
+  public void testCommitMultipleCheckpointsWithDuplicateData() throws 
Exception {

Review Comment:
   Nit: maybe a better name, like `testCommitMultipleCheckpointsForV2Table`?
   Or anything where we highlight that the goal is to create different commits 
for different checkpoints, even if the WriteResults arrive at the same time.



##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -887,6 +921,55 @@ public void testCommitTwoCheckpointsInSingleTxn() throws 
Exception {
     }
   }
 
+  @TestTemplate
+  public void testCommitMultipleCheckpointsWithDuplicateData() throws 
Exception {

Review Comment:
   I'm not satisfied with the name (even the on I suggested), so probably the 
best would be to add javadoc to the test method describing the scenario we are 
testing 



##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -887,6 +921,55 @@ public void testCommitTwoCheckpointsInSingleTxn() throws 
Exception {
     }
   }
 
+  @TestTemplate
+  public void testCommitMultipleCheckpointsWithDuplicateData() throws 
Exception {
+    assumeThat(formatVersion)
+        .as("Only support equality-delete in format v2 or later.")
+        .isGreaterThan(1);
+
+    assumeThat(hasPrimaryKey).as("The test case only for primary 
table.").isEqualTo(true);
+
+    long timestamp = 0;
+    long checkpoint = 10;
+
+    JobID jobId = new JobID();
+    OperatorID operatorId;
+    FileAppenderFactory<RowData> appenderFactory = 
createDeletableAppenderFactory();
+
+    try (OneInputStreamOperatorTestHarness<FlinkWriteResult, Void> harness =
+        createStreamSink(jobId)) {
+      harness.setup();
+      harness.open();
+      operatorId = harness.getOperator().getOperatorID();
+
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
+
+      RowData insert1 = SimpleDataUtil.createInsert(1, "aaa");
+      RowData insert2 = SimpleDataUtil.createInsert(2, "bbb");
+      for (int i = 1; i <= 3; i++) {
+        DataFile dataFile = writeDataFile("data-file-" + i, 
ImmutableList.of(insert1, insert2));
+        DeleteFile deleteFile =

Review Comment:
   Do we want to add a positional delete file too? Just for completeness shake? 
Or it adds to much complexity to the test?



##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -887,6 +921,55 @@ public void testCommitTwoCheckpointsInSingleTxn() throws 
Exception {
     }
   }
 
+  @TestTemplate
+  public void testCommitMultipleCheckpointsWithDuplicateData() throws 
Exception {
+    assumeThat(formatVersion)
+        .as("Only support equality-delete in format v2 or later.")
+        .isGreaterThan(1);
+
+    assumeThat(hasPrimaryKey).as("The test case only for primary 
table.").isEqualTo(true);
+
+    long timestamp = 0;
+    long checkpoint = 10;
+
+    JobID jobId = new JobID();
+    OperatorID operatorId;
+    FileAppenderFactory<RowData> appenderFactory = 
createDeletableAppenderFactory();
+
+    try (OneInputStreamOperatorTestHarness<FlinkWriteResult, Void> harness =
+        createStreamSink(jobId)) {
+      harness.setup();
+      harness.open();
+      operatorId = harness.getOperator().getOperatorID();
+
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
+
+      RowData insert1 = SimpleDataUtil.createInsert(1, "aaa");
+      RowData insert2 = SimpleDataUtil.createInsert(2, "bbb");
+      for (int i = 1; i <= 3; i++) {
+        DataFile dataFile = writeDataFile("data-file-" + i, 
ImmutableList.of(insert1, insert2));
+        DeleteFile deleteFile =
+            writeEqDeleteFile(
+                appenderFactory, "delete-file-" + i, ImmutableList.of(insert1, 
insert2));
+        harness.processElement(
+            new FlinkWriteResult(
+                ++checkpoint,
+                
WriteResult.builder().addDataFiles(dataFile).addDeleteFiles(deleteFile).build()),
+            ++timestamp);
+      }
+
+      // The 1th snapshotState.
+      harness.snapshot(checkpoint, ++timestamp);
+
+      // Notify the 1th snapshot to complete.
+      harness.notifyOfCompletedCheckpoint(checkpoint);
+      SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1, 
insert2), branch);

Review Comment:
   Can we change something in the rows, so we can be sure, that the last ones 
that we are seeing here?



##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -887,6 +921,55 @@ public void testCommitTwoCheckpointsInSingleTxn() throws 
Exception {
     }
   }
 
+  @TestTemplate
+  public void testCommitMultipleCheckpointsWithDuplicateData() throws 
Exception {
+    assumeThat(formatVersion)
+        .as("Only support equality-delete in format v2 or later.")
+        .isGreaterThan(1);
+
+    assumeThat(hasPrimaryKey).as("The test case only for primary 
table.").isEqualTo(true);
+
+    long timestamp = 0;
+    long checkpoint = 10;
+
+    JobID jobId = new JobID();
+    OperatorID operatorId;
+    FileAppenderFactory<RowData> appenderFactory = 
createDeletableAppenderFactory();
+
+    try (OneInputStreamOperatorTestHarness<FlinkWriteResult, Void> harness =
+        createStreamSink(jobId)) {
+      harness.setup();
+      harness.open();
+      operatorId = harness.getOperator().getOperatorID();
+
+      assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
+
+      RowData insert1 = SimpleDataUtil.createInsert(1, "aaa");
+      RowData insert2 = SimpleDataUtil.createInsert(2, "bbb");
+      for (int i = 1; i <= 3; i++) {
+        DataFile dataFile = writeDataFile("data-file-" + i, 
ImmutableList.of(insert1, insert2));
+        DeleteFile deleteFile =
+            writeEqDeleteFile(
+                appenderFactory, "delete-file-" + i, ImmutableList.of(insert1, 
insert2));
+        harness.processElement(
+            new FlinkWriteResult(
+                ++checkpoint,
+                
WriteResult.builder().addDataFiles(dataFile).addDeleteFiles(deleteFile).build()),
+            ++timestamp);
+      }
+
+      // The 1th snapshotState.
+      harness.snapshot(checkpoint, ++timestamp);
+
+      // Notify the 1th snapshot to complete.
+      harness.notifyOfCompletedCheckpoint(checkpoint);
+      SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1, 
insert2), branch);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
+      assertFlinkManifests(0);
+      assertThat(table.snapshots()).hasSize(3);

Review Comment:
   Why is this 3?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to