stevenzwu commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1038429027


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousIcebergEnumerator.java:
##########
@@ -147,6 +148,85 @@ public void 
testRequestingReaderUnavailableWhenSplitDiscovered() throws Exceptio
         .contains(splits.get(0));
   }
 
+  @Test
+  public void testThrottlingDiscovery() throws Exception {
+    // create 10 splits
+    List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 
10, 1);
+
+    TestingSplitEnumeratorContext<IcebergSourceSplit> enumeratorContext =
+        new TestingSplitEnumeratorContext<>(4);
+    ScanContext scanContext =
+        ScanContext.builder()
+            .streaming(true)
+            
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_EARLIEST_SNAPSHOT)
+            // discover one snapshot at a time
+            .maxPlanningSnapshotCount(1)
+            .build();
+    ManualContinuousSplitPlanner splitPlanner = new 
ManualContinuousSplitPlanner(scanContext);
+    ContinuousIcebergEnumerator enumerator =
+        createEnumerator(enumeratorContext, scanContext, splitPlanner);
+
+    // register reader-2, and let it request a split
+    enumeratorContext.registerReader(2, "localhost");
+    enumerator.addReader(2);
+    enumerator.handleSourceEvent(2, new SplitRequestEvent());
+
+    // add splits[0] to the planner for next discovery
+    splitPlanner.addSplits(Arrays.asList(splits.get(0)));
+    enumeratorContext.triggerAllActions();
+
+    // because discovered split was assigned to reader, pending splits should 
be empty
+    Assert.assertEquals(0, enumerator.snapshotState(1).pendingSplits().size());
+    // split assignment to reader-2 should contain splits[0, 1)
+    Assert.assertEquals(
+        splits.subList(0, 1), 
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
+
+    // add the remaining 9 splits (one for every snapshot)
+    // run discovery cycles while reader-2 still processing the splits[0]
+    for (int i = 1; i < 10; ++i) {
+      splitPlanner.addSplits(Arrays.asList(splits.get(i)));
+      enumeratorContext.triggerAllActions();
+    }
+
+    // can only discover up to 3 snapshots/splits
+    Assert.assertEquals(3, enumerator.snapshotState(2).pendingSplits().size());
+    // split assignment to reader-2 should be splits[0, 1)
+    Assert.assertEquals(
+        splits.subList(0, 1), 
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
+
+    // now reader-2 finished splits[0]
+    enumerator.handleSourceEvent(2, new 
SplitRequestEvent(Arrays.asList(splits.get(0).splitId())));
+    enumeratorContext.triggerAllActions();
+    // still have 3 pending splits. After assigned splits[1] to reader-2, one 
more split was
+    // discovered and added.
+    Assert.assertEquals(3, enumerator.snapshotState(3).pendingSplits().size());
+    // split assignment to reader-2 should be splits[0, 2)
+    Assert.assertEquals(
+        splits.subList(0, 2), 
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
+
+    // run 3 more split discovery cycles
+    for (int i = 0; i < 3; ++i) {
+      enumeratorContext.triggerAllActions();
+    }
+
+    // no more splits are discovered due to throttling
+    Assert.assertEquals(3, enumerator.snapshotState(4).pendingSplits().size());
+    // split assignment to reader-2 should still be splits[0, 2)
+    Assert.assertEquals(
+        splits.subList(0, 2), 
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());
+
+    // now reader-2 finished splits[1]
+    enumerator.handleSourceEvent(2, new 
SplitRequestEvent(Arrays.asList(splits.get(1).splitId())));
+    enumeratorContext.triggerAllActions();
+    // still have 3 pending splits. After assigned new splits[2] to reader-2, 
one more split was
+    // discovered and added.
+    Assert.assertEquals(3, enumerator.snapshotState(5).pendingSplits().size());
+    // split assignment to reader-2 should be splits[0, 3)
+    Assert.assertEquals(
+        splits.subList(0, 3), 
enumeratorContext.getSplitAssignments().get(2).getAssignedSplits());

Review Comment:
   I will keep it as it is then for better readability



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to