dekimir commented on code in PR #10691:
URL: https://github.com/apache/iceberg/pull/10691#discussion_r1678458408


##########
core/src/main/java/org/apache/iceberg/util/ParallelIterable.java:
##########
@@ -192,4 +209,65 @@ public synchronized T next() {
       return queue.poll();
     }
   }
+
+  private static class Task<T> implements Callable<Optional<Task<T>>>, 
Closeable {
+    private final Iterable<T> input;
+    private final ConcurrentLinkedQueue<T> queue;
+    private final AtomicBoolean closed;
+    private final int approximateMaxQueueSize;
+
+    private Iterator<T> iterator;
+
+    Task(
+        Iterable<T> input,
+        ConcurrentLinkedQueue<T> queue,
+        AtomicBoolean closed,
+        int approximateMaxQueueSize) {
+      this.input = Preconditions.checkNotNull(input, "input cannot be null");
+      this.queue = Preconditions.checkNotNull(queue, "queue cannot be null");
+      this.closed = Preconditions.checkNotNull(closed, "closed cannot be 
null");
+      this.approximateMaxQueueSize = approximateMaxQueueSize;
+    }
+
+    @Override
+    public Optional<Task<T>> call() throws Exception {
+      try {
+        if (iterator == null) {
+          iterator = input.iterator();
+        }
+        while (iterator.hasNext()) {
+          if (queue.size() >= approximateMaxQueueSize) {
+            // yield
+            return Optional.of(this);

Review Comment:
   > I'm saying why not setting `taskFutures[i] = continuation.get()` ?
   
   But `continuation.get()` is of type `Task`, while `taskFutures[i]` is of 
type `Future<Optional<Task>>`, so how would this compile?
   
   > this may mean that in checkTasks() we're processing the same item over and 
over again.
   
   We may be processing the same *task* but not the same item.  Each time a 
task is submitted, it grabs new items from its iterable and puts them into the 
queue.  Prior to this patch, the task goes over the entire iterable 
uninterrupted, aggressively growing the queue without any limits.  That's where 
the memory pressure that users are complaining about comes from.
   
   With this patch, the task stops when the queue size reaches 
`approximateMaxQueueSize` and remembers where it stopped in internal state.  
When it gets resubmitted later, it resumes filling the queue until the limit is 
reached again, then stops again, and so on.  Because tasks stop voluntarily and 
yield the control back to the executor, they do not block other ongoing threads 
from the same pool.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to