findepi commented on code in PR #10691:
URL: https://github.com/apache/iceberg/pull/10691#discussion_r1685430017


##########
core/src/main/java/org/apache/iceberg/util/ParallelIterable.java:
##########
@@ -20,84 +20,115 @@
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayDeque;
+import java.util.Deque;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import org.apache.iceberg.exceptions.RuntimeIOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 import org.apache.iceberg.io.CloseableGroup;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.io.Closer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ParallelIterable<T> extends CloseableGroup implements 
CloseableIterable<T> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ParallelIterable.class);
+
+  private static final int DEFAULT_MAX_QUEUE_SIZE = 10_000;
+
   private final Iterable<? extends Iterable<T>> iterables;
   private final ExecutorService workerPool;
 
+  // Bound for number of items in the queue to limit memory consumption
+  // even in the case when input iterables are large.
+  private final int approximateMaxQueueSize;
+
   public ParallelIterable(Iterable<? extends Iterable<T>> iterables, 
ExecutorService workerPool) {
-    this.iterables = iterables;
-    this.workerPool = workerPool;
+    this(iterables, workerPool, DEFAULT_MAX_QUEUE_SIZE);
+  }
+
+  public ParallelIterable(
+      Iterable<? extends Iterable<T>> iterables,
+      ExecutorService workerPool,
+      int approximateMaxQueueSize) {
+    this.iterables = Preconditions.checkNotNull(iterables, "Input iterables 
cannot be null");
+    this.workerPool = Preconditions.checkNotNull(workerPool, "Worker pool 
cannot be null");
+    this.approximateMaxQueueSize = approximateMaxQueueSize;
   }
 
   @Override
   public CloseableIterator<T> iterator() {
-    ParallelIterator<T> iter = new ParallelIterator<>(iterables, workerPool);
+    ParallelIterator<T> iter =
+        new ParallelIterator<>(iterables, workerPool, approximateMaxQueueSize);
     addCloseable(iter);
     return iter;
   }
 
   private static class ParallelIterator<T> implements CloseableIterator<T> {
-    private final Iterator<Runnable> tasks;
+    private final Iterator<Task<T>> tasks;
+    private final Deque<Task<T>> yieldedTasks = new ArrayDeque<>();
     private final ExecutorService workerPool;
-    private final Future<?>[] taskFutures;
+    private final CompletableFuture<Optional<Task<T>>>[] taskFutures;
     private final ConcurrentLinkedQueue<T> queue = new 
ConcurrentLinkedQueue<>();
-    private volatile boolean closed = false;
+    private final int maxQueueSize;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
 
     private ParallelIterator(
-        Iterable<? extends Iterable<T>> iterables, ExecutorService workerPool) 
{
+        Iterable<? extends Iterable<T>> iterables, ExecutorService workerPool, 
int maxQueueSize) {
       this.tasks =
           Iterables.transform(
-                  iterables,
-                  iterable ->
-                      (Runnable)
-                          () -> {
-                            try (Closeable ignored =
-                                (iterable instanceof Closeable) ? (Closeable) 
iterable : () -> {}) {
-                              for (T item : iterable) {
-                                // exit manually because 
`ConcurrentLinkedQueue` can't be
-                                // interrupted
-                                if (closed) {
-                                  return;
-                                }
-
-                                queue.add(item);
-                              }
-                            } catch (IOException e) {
-                              throw new RuntimeIOException(e, "Failed to close 
iterable");
-                            }
-                          })
+                  iterables, iterable -> new Task<>(iterable, queue, closed, 
maxQueueSize))
               .iterator();
       this.workerPool = workerPool;
+      this.maxQueueSize = maxQueueSize;
       // submit 2 tasks per worker at a time
-      this.taskFutures = new Future[2 * ThreadPools.WORKER_THREAD_POOL_SIZE];
+      this.taskFutures = new CompletableFuture[2 * 
ThreadPools.WORKER_THREAD_POOL_SIZE];
     }
 
     @Override
     public void close() {
       // close first, avoid new task submit
-      this.closed = true;
+      this.closed.set(true);
 
-      // cancel background tasks
-      for (Future<?> taskFuture : taskFutures) {
-        if (taskFuture != null && !taskFuture.isDone()) {
-          taskFuture.cancel(true);
+      try (Closer closer = Closer.create()) {
+        synchronized (this) {
+          yieldedTasks.forEach(closer::register);
+          yieldedTasks.clear();
         }
+
+        // cancel background tasks and close continuations if any
+        for (CompletableFuture<Optional<Task<T>>> taskFuture : taskFutures) {
+          if (taskFuture != null) {
+            taskFuture.cancel(true);
+            taskFuture.thenAccept(
+                continuation -> {
+                  if (continuation.isPresent()) {
+                    try {
+                      continuation.get().close();
+                    } catch (IOException e) {
+                      LOG.error("Task close failed", e);
+                    }
+                  }
+                });
+          }
+        }
+
+        // clean queue
+        this.queue.clear();
+      } catch (IOException e) {
+        throw new RuntimeException("Close failed", e);

Review Comment:
   thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to