stevenzwu commented on code in PR #6299: URL: https://github.com/apache/iceberg/pull/6299#discussion_r1038429027
########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java: ########## @@ -147,6 +148,85 @@ public void testRequestingReaderUnavailableWhenSplitDiscovered() throws Exceptio .contains(splits.get(0)); } + @Test + public void testThrottlingDiscovery() throws Exception { + // create 10 splits + List<IcebergSourceSplit> splits = + SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 10, 1); + + TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext = + new TestingSplitEnumeratorContext<>(4); + ScanContext scanContext = + ScanContext.builder() + .streaming(true) + .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT) + // discover one snapshot at a time + .maxPlanningSnapshotCount(1) + .build(); + ManualContinuousSplitPlanner splitPlanner = new ManualContinuousSplitPlanner(scanContext); + ContinuousIcebergEnumerator enumerator = + createEnumerator(enumeratorContext, scanContext, splitPlanner); + + // register reader-2, and let it request a split + enumeratorContext.registerReader(2, "localhost"); + enumerator.addReader(2); + enumerator.handleSourceEvent(2, new SplitRequestEvent()); + + // add splits[0] to the planner for next discovery + splitPlanner.addSplits(Arrays.asList(splits.get(0))); + enumeratorContext.triggerAllActions(); + + // because discovered split was assigned to reader, pending splits should be empty + Assert.assertEquals(0, enumerator.snapshotState(1).pendingSplits().size()); + // split assignment to reader-2 should contain splits[0, 1) + Assert.assertEquals( + splits.subList(0, 1), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits()); + + // add the remaining 9 splits (one for every snapshot) + // run discovery cycles while reader-2 still processing the splits[0] + for (int i = 1; i < 10; ++i) { + splitPlanner.addSplits(Arrays.asList(splits.get(i))); + enumeratorContext.triggerAllActions(); + } + + // can only discover up to 3 snapshots/splits + Assert.assertEquals(3, enumerator.snapshotState(2).pendingSplits().size()); + // split assignment to reader-2 should be splits[0, 1) + Assert.assertEquals( + splits.subList(0, 1), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits()); + + // now reader-2 finished splits[0] + enumerator.handleSourceEvent(2, new SplitRequestEvent(Arrays.asList(splits.get(0).splitId()))); + enumeratorContext.triggerAllActions(); + // still have 3 pending splits. After assigned splits[1] to reader-2, one more split was + // discovered and added. + Assert.assertEquals(3, enumerator.snapshotState(3).pendingSplits().size()); + // split assignment to reader-2 should be splits[0, 2) + Assert.assertEquals( + splits.subList(0, 2), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits()); + + // run 3 more split discovery cycles + for (int i = 0; i < 3; ++i) { + enumeratorContext.triggerAllActions(); + } + + // no more splits are discovered due to throttling + Assert.assertEquals(3, enumerator.snapshotState(4).pendingSplits().size()); + // split assignment to reader-2 should still be splits[0, 2) + Assert.assertEquals( + splits.subList(0, 2), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits()); + + // now reader-2 finished splits[1] + enumerator.handleSourceEvent(2, new SplitRequestEvent(Arrays.asList(splits.get(1).splitId()))); + enumeratorContext.triggerAllActions(); + // still have 3 pending splits. After assigned new splits[2] to reader-2, one more split was + // discovered and added. + Assert.assertEquals(3, enumerator.snapshotState(5).pendingSplits().size()); + // split assignment to reader-2 should be splits[0, 3) + Assert.assertEquals( + splits.subList(0, 3), enumeratorContext.getSplitAssignments().get(2).getAssignedSplits()); Review Comment: I will keep it as it is then for better readability -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org