stevenzwu commented on code in PR #12839:
URL: https://github.com/apache/iceberg/pull/12839#discussion_r2050853542


##########
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/StreamingStartingStrategy.java:
##########
@@ -34,6 +34,13 @@ public enum StreamingStartingStrategy {
    */
   INCREMENTAL_FROM_LATEST_SNAPSHOT,
 
+  /**
+   * Start incremental mode from the latest snapshot exclusive.
+   *
+   * <p>If it is an empty map, all future append snapshots should be 
discovered.

Review Comment:
   maybe it is a typo in the Javadoc above too. `empty map` probably should be 
`empty table`.



##########
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java:
##########
@@ -210,6 +210,43 @@ public void 
testIncrementalFromLatestSnapshotWithEmptyTable() throws Exception {
     }
   }
 
+  @Test
+  public void testIncrementalFromLatestSnapshotExclusiveWithEmptyTable() 
throws Exception {
+    ScanContext scanContext =
+        ScanContext.builder()
+            
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT_EXCLUSIVE)
+            .splitSize(1L)
+            .build();
+    ContinuousSplitPlannerImpl splitPlanner =
+        new ContinuousSplitPlannerImpl(TABLE_RESOURCE.tableLoader().clone(), 
scanContext, null);
+
+    ContinuousEnumerationResult emptyTableInitialDiscoveryResult = 
splitPlanner.planSplits(null);
+    assertThat(emptyTableInitialDiscoveryResult.splits()).isEmpty();
+    assertThat(emptyTableInitialDiscoveryResult.fromPosition()).isNull();

Review Comment:
   for consistency, maybe add this assertion after this line.
   ```
   
assertThat(emptyTableInitialDiscoveryResult.fromPosition().snapshotTimestampMs()).isNull();
   ```



##########
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImplStartStrategy.java:
##########
@@ -88,6 +88,23 @@ public void testTableScanThenIncrementalStrategy() throws 
IOException {
     assertThat(startSnapshot.snapshotId()).isEqualTo(snapshot3.snapshotId());
   }
 
+  @Test
+  public void testForLatestSnapshotStrategyExclusive() throws IOException {

Review Comment:
   we should add another test method for non-empty table during start.
   
   I realized this is also missing for the `testForLatestSnapshotStrategy` test 
below.



##########
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java:
##########
@@ -256,6 +293,46 @@ public void 
testIncrementalFromLatestSnapshotWithNonEmptyTable() throws Exceptio
     }
   }
 
+  @Test
+  public void testIncrementalFromLatestSnapshotExclusiveWithNonEmptyTable() 
throws Exception {
+    appendTwoSnapshots();
+
+    ScanContext scanContext =
+        ScanContext.builder()
+            
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT_EXCLUSIVE)
+            .build();
+    ContinuousSplitPlannerImpl splitPlanner =
+        new ContinuousSplitPlannerImpl(TABLE_RESOURCE.tableLoader().clone(), 
scanContext, null);
+
+    ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
+    assertThat(initialResult.fromPosition()).isNull();
+    // For exclusive behavior, the initial result should point to snapshot2
+

Review Comment:
   nit: remove this empty line



##########
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java:
##########
@@ -256,6 +293,46 @@ public void 
testIncrementalFromLatestSnapshotWithNonEmptyTable() throws Exceptio
     }
   }
 
+  @Test
+  public void testIncrementalFromLatestSnapshotExclusiveWithNonEmptyTable() 
throws Exception {
+    appendTwoSnapshots();
+
+    ScanContext scanContext =
+        ScanContext.builder()
+            
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT_EXCLUSIVE)
+            .build();
+    ContinuousSplitPlannerImpl splitPlanner =
+        new ContinuousSplitPlannerImpl(TABLE_RESOURCE.tableLoader().clone(), 
scanContext, null);
+
+    ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
+    assertThat(initialResult.fromPosition()).isNull();
+    // For exclusive behavior, the initial result should point to snapshot2
+
+    assertThat(initialResult.toPosition().snapshotId().longValue())
+        .isEqualTo(snapshot2.snapshotId());
+    assertThat(initialResult.toPosition().snapshotTimestampMs().longValue())
+        .isEqualTo(snapshot2.timestampMillis());
+    assertThat(initialResult.splits()).isEmpty();

Review Comment:
   nit: move this assertion to be the first assertion (after line 307)



##########
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java:
##########
@@ -256,6 +293,46 @@ public void 
testIncrementalFromLatestSnapshotWithNonEmptyTable() throws Exceptio
     }
   }
 
+  @Test
+  public void testIncrementalFromLatestSnapshotExclusiveWithNonEmptyTable() 
throws Exception {
+    appendTwoSnapshots();
+
+    ScanContext scanContext =
+        ScanContext.builder()
+            
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT_EXCLUSIVE)
+            .build();
+    ContinuousSplitPlannerImpl splitPlanner =
+        new ContinuousSplitPlannerImpl(TABLE_RESOURCE.tableLoader().clone(), 
scanContext, null);
+
+    ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
+    assertThat(initialResult.fromPosition()).isNull();
+    // For exclusive behavior, the initial result should point to snapshot2
+
+    assertThat(initialResult.toPosition().snapshotId().longValue())
+        .isEqualTo(snapshot2.snapshotId());
+    assertThat(initialResult.toPosition().snapshotTimestampMs().longValue())
+        .isEqualTo(snapshot2.timestampMillis());
+    assertThat(initialResult.splits()).isEmpty();
+
+    // Then the next incremental scan shall discover no files
+    ContinuousEnumerationResult secondResult = 
splitPlanner.planSplits(initialResult.toPosition());
+    assertThat(secondResult.fromPosition().snapshotId().longValue())
+        .isEqualTo(snapshot2.snapshotId());
+    assertThat(secondResult.fromPosition().snapshotTimestampMs().longValue())
+        .isEqualTo(snapshot2.timestampMillis());
+    assertThat(secondResult.toPosition().snapshotId().longValue())
+        .isEqualTo(snapshot2.snapshotId());
+    assertThat(secondResult.toPosition().snapshotTimestampMs().longValue())
+        .isEqualTo(snapshot2.timestampMillis());
+
+    assertThat(initialResult.splits()).isEmpty();

Review Comment:
   similarly, move this assertion after line 318



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to