findepi commented on code in PR #10691:
URL: https://github.com/apache/iceberg/pull/10691#discussion_r1686401382


##########
core/src/main/java/org/apache/iceberg/util/ParallelIterable.java:
##########
@@ -136,30 +169,33 @@ private boolean checkTasks() {
         }
       }
 
-      return !closed && (tasks.hasNext() || hasRunningTask);
+      return !closed.get() && (tasks.hasNext() || hasRunningTask);
     }
 
-    private Future<?> submitNextTask() {
-      if (!closed && tasks.hasNext()) {
-        return workerPool.submit(tasks.next());
+    private CompletableFuture<Optional<Task<T>>> submitNextTask() {
+      if (!closed.get()) {
+        if (!yieldedTasks.isEmpty()) {
+          return CompletableFuture.supplyAsync(yieldedTasks.removeFirst(), 
workerPool);
+        } else if (tasks.hasNext()) {
+          return CompletableFuture.supplyAsync(tasks.next(), workerPool);
+        }
       }
       return null;
     }
 
     @Override
     public synchronized boolean hasNext() {
-      Preconditions.checkState(!closed, "Already closed");
-
-      // if the consumer is processing records more slowly than the producers, 
then this check will
-      // prevent tasks from being submitted. while the producers are running, 
this will always
-      // return here before running checkTasks. when enough of the tasks are 
finished that the
-      // consumer catches up, then lots of new tasks will be submitted at 
once. this behavior is
-      // okay because it ensures that records are not stacking up waiting to 
be consumed and taking
-      // up memory.
-      //
-      // consumers that process results quickly will periodically exhaust the 
queue and submit new
-      // tasks when checkTasks runs. fast consumers should not be delayed.
-      if (!queue.isEmpty()) {
+      Preconditions.checkState(!closed.get(), "Already closed");
+
+      // If the consumer is processing records more slowly than the producers, 
the producers will
+      // eventually fill the queue and yield, returning continuations. 
Continuations and new tasks
+      // are started by checkTasks(). The check here prevents us from 
restarting continuations or
+      // starting new tasks too early (when queue is almost full) or too late 
(when queue is already
+      // emptied). Restarting too early would lead to tasks yielding very 
quickly (CPU waste on
+      // scheduling). Restarting too late would mean the consumer may need to 
wait for the tasks
+      // to produce new items. A consumer slower than producers shouldn't need 
to wait.
+      int queueLowWaterMark = maxQueueSize / 2;

Review Comment:
   re this being class static field -- currently this is instance parameter, 
the constant serves only as a default
   re this being class non-static (instance) field -- i replied to this in 
https://github.com/apache/iceberg/pull/10691#discussion_r1684130747. let me 
know what you think about this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to