stevenzwu commented on code in PR #6299: URL: https://github.com/apache/iceberg/pull/6299#discussion_r1036531697
########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java: ########## @@ -507,4 +507,71 @@ public void testIncrementalFromSnapshotTimestamp() throws Exception { lastPosition = verifyOneCycle(splitPlanner, lastPosition); } } + + @Test + public void testMaxPlanningSnapshotCount() throws Exception { + appendTwoSnapshots(); + // append 3 more snapshots + for (int i = 2; i < 5; ++i) { + appendSnapshot(i, 2); + } + + ScanContext scanContext = + ScanContext.builder() + .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT) + // limit to 1 snapshot per discovery + .maxPlanningSnapshotCount(1) + .build(); + ContinuousSplitPlannerImpl splitPlanner = + new ContinuousSplitPlannerImpl(tableResource.table(), scanContext, null); + + ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null); + Assert.assertNull(initialResult.fromPosition()); + // For inclusive behavior, the initial result should point to snapshot1's parent, + // which leads to null snapshotId and snapshotTimestampMs. + Assert.assertNull(initialResult.toPosition().snapshotId()); + Assert.assertNull(initialResult.toPosition().snapshotTimestampMs()); + Assert.assertEquals(0, initialResult.splits().size()); + + ContinuousEnumerationResult secondResult = splitPlanner.planSplits(initialResult.toPosition()); + Assert.assertNull(secondResult.fromPosition().snapshotId()); + Assert.assertNull(secondResult.fromPosition().snapshotTimestampMs()); + Assert.assertEquals(snapshot1.snapshotId(), secondResult.toPosition().snapshotId().longValue()); + Assert.assertEquals( + snapshot1.timestampMillis(), secondResult.toPosition().snapshotTimestampMs().longValue()); + IcebergSourceSplit splitSecond = Iterables.getOnlyElement(secondResult.splits()); + Assert.assertEquals(1, splitSecond.task().files().size()); + Set<String> discoveredFilesSecond = + splitSecond.task().files().stream() + .map(fileScanTask -> fileScanTask.file().path().toString()) + .collect(Collectors.toSet()); + // should discover dataFile1 appended in snapshot1 + Set<String> expectedFilesSecond = ImmutableSet.of(dataFile1.path().toString()); + Assert.assertEquals(expectedFilesSecond, discoveredFilesSecond); + + ContinuousEnumerationResult thirdResult = splitPlanner.planSplits(secondResult.toPosition()); + Assert.assertEquals( + snapshot1.snapshotId(), thirdResult.fromPosition().snapshotId().longValue()); + Assert.assertEquals( + snapshot1.timestampMillis(), thirdResult.fromPosition().snapshotTimestampMs().longValue()); + Assert.assertEquals(snapshot2.snapshotId(), thirdResult.toPosition().snapshotId().longValue()); + Assert.assertEquals( + snapshot2.timestampMillis(), thirdResult.toPosition().snapshotTimestampMs().longValue()); + IcebergSourceSplit splitThird = Iterables.getOnlyElement(thirdResult.splits()); + Assert.assertEquals(1, splitThird.task().files().size()); + Set<String> discoveredFilesThird = + splitThird.task().files().stream() + .map(fileScanTask -> fileScanTask.file().path().toString()) + .collect(Collectors.toSet()); + // should discover dataFile2 appended in snapshot2 + Set<String> expectedFilesThird = ImmutableSet.of(dataFile2.path().toString()); + Assert.assertEquals(expectedFilesThird, discoveredFilesThird); Review Comment: agree. will refactor -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org