stevenzwu commented on code in PR #12839: URL: https://github.com/apache/iceberg/pull/12839#discussion_r2050853542
########## flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/StreamingStartingStrategy.java: ########## @@ -34,6 +34,13 @@ public enum StreamingStartingStrategy { */ INCREMENTAL_FROM_LATEST_SNAPSHOT, + /** + * Start incremental mode from the latest snapshot exclusive. + * + * <p>If it is an empty map, all future append snapshots should be discovered. Review Comment: maybe it is a typo in the Javadoc above too. `empty map` probably should be `empty table`. ########## flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java: ########## @@ -210,6 +210,43 @@ public void testIncrementalFromLatestSnapshotWithEmptyTable() throws Exception { } } + @Test + public void testIncrementalFromLatestSnapshotExclusiveWithEmptyTable() throws Exception { + ScanContext scanContext = + ScanContext.builder() + .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT_EXCLUSIVE) + .splitSize(1L) + .build(); + ContinuousSplitPlannerImpl splitPlanner = + new ContinuousSplitPlannerImpl(TABLE_RESOURCE.tableLoader().clone(), scanContext, null); + + ContinuousEnumerationResult emptyTableInitialDiscoveryResult = splitPlanner.planSplits(null); + assertThat(emptyTableInitialDiscoveryResult.splits()).isEmpty(); + assertThat(emptyTableInitialDiscoveryResult.fromPosition()).isNull(); Review Comment: for consistency, maybe add this assertion after this line. ``` assertThat(emptyTableInitialDiscoveryResult.fromPosition().snapshotTimestampMs()).isNull(); ``` ########## flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImplStartStrategy.java: ########## @@ -88,6 +88,23 @@ public void testTableScanThenIncrementalStrategy() throws IOException { assertThat(startSnapshot.snapshotId()).isEqualTo(snapshot3.snapshotId()); } + @Test + public void testForLatestSnapshotStrategyExclusive() throws IOException { Review Comment: we should add another test method for non-empty table during start. I realized this is also missing for the `testForLatestSnapshotStrategy` test below. ########## flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java: ########## @@ -256,6 +293,46 @@ public void testIncrementalFromLatestSnapshotWithNonEmptyTable() throws Exceptio } } + @Test + public void testIncrementalFromLatestSnapshotExclusiveWithNonEmptyTable() throws Exception { + appendTwoSnapshots(); + + ScanContext scanContext = + ScanContext.builder() + .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT_EXCLUSIVE) + .build(); + ContinuousSplitPlannerImpl splitPlanner = + new ContinuousSplitPlannerImpl(TABLE_RESOURCE.tableLoader().clone(), scanContext, null); + + ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null); + assertThat(initialResult.fromPosition()).isNull(); + // For exclusive behavior, the initial result should point to snapshot2 + Review Comment: nit: remove this empty line ########## flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java: ########## @@ -256,6 +293,46 @@ public void testIncrementalFromLatestSnapshotWithNonEmptyTable() throws Exceptio } } + @Test + public void testIncrementalFromLatestSnapshotExclusiveWithNonEmptyTable() throws Exception { + appendTwoSnapshots(); + + ScanContext scanContext = + ScanContext.builder() + .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT_EXCLUSIVE) + .build(); + ContinuousSplitPlannerImpl splitPlanner = + new ContinuousSplitPlannerImpl(TABLE_RESOURCE.tableLoader().clone(), scanContext, null); + + ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null); + assertThat(initialResult.fromPosition()).isNull(); + // For exclusive behavior, the initial result should point to snapshot2 + + assertThat(initialResult.toPosition().snapshotId().longValue()) + .isEqualTo(snapshot2.snapshotId()); + assertThat(initialResult.toPosition().snapshotTimestampMs().longValue()) + .isEqualTo(snapshot2.timestampMillis()); + assertThat(initialResult.splits()).isEmpty(); Review Comment: nit: move this assertion to be the first assertion (after line 307) ########## flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java: ########## @@ -256,6 +293,46 @@ public void testIncrementalFromLatestSnapshotWithNonEmptyTable() throws Exceptio } } + @Test + public void testIncrementalFromLatestSnapshotExclusiveWithNonEmptyTable() throws Exception { + appendTwoSnapshots(); + + ScanContext scanContext = + ScanContext.builder() + .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT_EXCLUSIVE) + .build(); + ContinuousSplitPlannerImpl splitPlanner = + new ContinuousSplitPlannerImpl(TABLE_RESOURCE.tableLoader().clone(), scanContext, null); + + ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null); + assertThat(initialResult.fromPosition()).isNull(); + // For exclusive behavior, the initial result should point to snapshot2 + + assertThat(initialResult.toPosition().snapshotId().longValue()) + .isEqualTo(snapshot2.snapshotId()); + assertThat(initialResult.toPosition().snapshotTimestampMs().longValue()) + .isEqualTo(snapshot2.timestampMillis()); + assertThat(initialResult.splits()).isEmpty(); + + // Then the next incremental scan shall discover no files + ContinuousEnumerationResult secondResult = splitPlanner.planSplits(initialResult.toPosition()); + assertThat(secondResult.fromPosition().snapshotId().longValue()) + .isEqualTo(snapshot2.snapshotId()); + assertThat(secondResult.fromPosition().snapshotTimestampMs().longValue()) + .isEqualTo(snapshot2.timestampMillis()); + assertThat(secondResult.toPosition().snapshotId().longValue()) + .isEqualTo(snapshot2.snapshotId()); + assertThat(secondResult.toPosition().snapshotTimestampMs().longValue()) + .isEqualTo(snapshot2.timestampMillis()); + + assertThat(initialResult.splits()).isEmpty(); Review Comment: similarly, move this assertion after line 318 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org