findepi commented on code in PR #10691: URL: https://github.com/apache/iceberg/pull/10691#discussion_r1677652693
########## core/src/main/java/org/apache/iceberg/util/ParallelIterable.java: ########## @@ -20,65 +20,69 @@ import java.io.Closeable; import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Deque; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import org.apache.iceberg.exceptions.RuntimeIOException; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.iceberg.io.CloseableGroup; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.io.Closer; public class ParallelIterable<T> extends CloseableGroup implements CloseableIterable<T> { + + private static final int DEFAULT_MAX_QUEUE_SIZE = 10_000; + private final Iterable<? extends Iterable<T>> iterables; private final ExecutorService workerPool; + // Bound for number of items in the queue to limit memory consumption + // even in the case when input iterables are large. + private final int approximateMaxQueueSize; + public ParallelIterable(Iterable<? extends Iterable<T>> iterables, ExecutorService workerPool) { - this.iterables = iterables; - this.workerPool = workerPool; + this(iterables, workerPool, DEFAULT_MAX_QUEUE_SIZE); + } + + public ParallelIterable( + Iterable<? extends Iterable<T>> iterables, + ExecutorService workerPool, + int approximateMaxQueueSize) { + this.iterables = Preconditions.checkNotNull(iterables, "Input iterables cannot be null"); + this.workerPool = Preconditions.checkNotNull(workerPool, "Worker pool cannot be null"); + this.approximateMaxQueueSize = approximateMaxQueueSize; } @Override public CloseableIterator<T> iterator() { - ParallelIterator<T> iter = new ParallelIterator<>(iterables, workerPool); + ParallelIterator<T> iter = + new ParallelIterator<>(iterables, workerPool, approximateMaxQueueSize); addCloseable(iter); return iter; } private static class ParallelIterator<T> implements CloseableIterator<T> { - private final Iterator<Runnable> tasks; + private final Iterator<Task<T>> tasks; + private final Deque<Task<T>> yieldedTasks = new ArrayDeque<>(); Review Comment: `Iterator` cannot be reasonably consumed by multiple consumers, so the only real concurrency is for filling the queue. however, ParallelItetator has the safegurads - the next and hasNext are synchronized. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org