stevenzwu commented on code in PR #6299:
URL: https://github.com/apache/iceberg/pull/6299#discussion_r1036307161


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java:
##########
@@ -92,34 +103,52 @@ public IcebergEnumeratorState snapshotState(long 
checkpointId) {
 
   /** This method is executed in an IO thread pool. */
   private ContinuousEnumerationResult discoverSplits() {
-    return splitPlanner.planSplits(enumeratorPosition.get());
+    int pendingSplitCountFromAssigner = assigner.pendingSplitCount();
+    if 
(enumerationHistory.shouldPauseSplitDiscovery(pendingSplitCountFromAssigner)) {
+      // If the assigner already has many pending splits, it is better to 
pause split discovery.
+      // Otherwise, eagerly discovering more splits will just increase 
assigner memory footprint
+      // and enumerator checkpoint state size.
+      LOG.info(
+          "Pause split discovery as the assigner already has too many pending 
splits: {}",
+          pendingSplitCountFromAssigner);
+      return new ContinuousEnumerationResult(
+          Collections.emptyList(), enumeratorPosition.get(), 
enumeratorPosition.get());
+    } else {
+      return splitPlanner.planSplits(enumeratorPosition.get());
+    }
   }
 
   /** This method is executed in a single coordinator thread. */
   private void processDiscoveredSplits(ContinuousEnumerationResult result, 
Throwable error) {
     if (error == null) {
       if (!Objects.equals(result.fromPosition(), enumeratorPosition.get())) {
         // Multiple discoverSplits() may be triggered with the same starting 
snapshot to the I/O
-        // thread pool.
-        // E.g., the splitDiscoveryInterval is very short (like 10 ms in some 
unit tests) or the
-        // thread
-        // pool is busy and multiple discovery actions are executed 
concurrently. Discovery result
-        // should
-        // only be accepted if the starting position matches the enumerator 
position (like
-        // compare-and-swap).
+        // thread pool. E.g., the splitDiscoveryInterval is very short (like 
10 ms in some unit
+        // tests) or the thread pool is busy and multiple discovery actions 
are executed
+        // concurrently. Discovery result should only be accepted if the 
starting position
+        // matches the enumerator position (like compare-and-swap).
         LOG.info(
             "Skip {} discovered splits because the scan starting position 
doesn't match "
                 + "the current enumerator position: enumerator position = {}, 
scan starting position = {}",
             result.splits().size(),
             enumeratorPosition.get(),
             result.fromPosition());
       } else {
-        assigner.onDiscoveredSplits(result.splits());
-        LOG.info(
-            "Added {} splits discovered between ({}, {}] to the assigner",
-            result.splits().size(),
-            result.fromPosition(),
-            result.toPosition());
+        // Sometimes, enumeration may yield no splits for a few reasons.
+        // - upstream paused or delayed streaming writes to the Iceberg table.
+        // - enumeration frequency is higher than the upstream write frequency.
+        if (!result.splits().isEmpty()) {
+          assigner.onDiscoveredSplits(result.splits());
+          // EnumerationHistory makes throttling decision on split discovery
+          // based on the total number of splits discovered in the last a few 
cycles.
+          // Only update enumeration history when there are some discovered 
splits.
+          enumerationHistory.add(result.splits().size());

Review Comment:
   that is correct. `processDiscoveredSplits` handler is executed in single 
coordinator thread, while `discoverSplits` callable is executed in I/O thread 
pool.
   
   ```
       /**
        * Invoke the given callable periodically and handover the return value 
to the handler which
        * will be executed by the source coordinator. When this method is 
invoked multiple times, The
        * <code>Callable</code>s may be executed in a thread pool concurrently.
        *
        * <p>It is important to make sure that the callable does not modify any 
shared state,
        * especially the states that will be a part of the {@link 
SplitEnumerator#snapshotState(long)}.
        * Otherwise, there might be unexpected behavior.
        *
        * <p>Note that an exception thrown from the handler would result in 
failing the job.
        *
        * @param callable the callable to call.
        * @param handler a handler that handles the return value of or the 
exception thrown from the
        *     callable.
        * @param initialDelayMillis the initial delay of calling the callable, 
in milliseconds.
        * @param periodMillis the period between two invocations of the 
callable, in milliseconds.
        */
       <T> void callAsync(
               Callable<T> callable,
               BiConsumer<T, Throwable> handler,
               long initialDelayMillis,
               long periodMillis);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to