dekimir commented on code in PR #10691: URL: https://github.com/apache/iceberg/pull/10691#discussion_r1678458408
########## core/src/main/java/org/apache/iceberg/util/ParallelIterable.java: ########## @@ -192,4 +209,65 @@ public synchronized T next() { return queue.poll(); } } + + private static class Task<T> implements Callable<Optional<Task<T>>>, Closeable { + private final Iterable<T> input; + private final ConcurrentLinkedQueue<T> queue; + private final AtomicBoolean closed; + private final int approximateMaxQueueSize; + + private Iterator<T> iterator; + + Task( + Iterable<T> input, + ConcurrentLinkedQueue<T> queue, + AtomicBoolean closed, + int approximateMaxQueueSize) { + this.input = Preconditions.checkNotNull(input, "input cannot be null"); + this.queue = Preconditions.checkNotNull(queue, "queue cannot be null"); + this.closed = Preconditions.checkNotNull(closed, "closed cannot be null"); + this.approximateMaxQueueSize = approximateMaxQueueSize; + } + + @Override + public Optional<Task<T>> call() throws Exception { + try { + if (iterator == null) { + iterator = input.iterator(); + } + while (iterator.hasNext()) { + if (queue.size() >= approximateMaxQueueSize) { + // yield + return Optional.of(this); Review Comment: > I'm saying why not setting `taskFutures[i] = continuation.get()` ? But `continuation.get()` is of type `Optional<Task>`, while `taskFutures[i]` is of type `Future<Optional<Task>>`, so how would this compile? > this may mean that in checkTasks() we're processing the same item over and over again. We may be processing the same *task* but not the same item. Each time a task is submitted, it grabs new items from its iterable and puts them into the queue. Prior to this patch, the task goes over the entire iterable uninterrupted, aggressively growing the queue without any limits. That's where the memory pressure that users are complaining about comes from. With this patch, the task stops when the queue size reaches `approximateMaxQueueSize` and remembers where it stopped in internal state. When it gets resubmitted later, it resumes filling the queue until the limit is reached again, then stops again, and so on. Because tasks stop voluntarily and yield the control back to the executor, they do not block other ongoing threads from the same pool. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org