findepi commented on code in PR #11781:
URL: https://github.com/apache/iceberg/pull/11781#discussion_r1888440219


##########
core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java:
##########
@@ -171,38 +171,55 @@ public void limitQueueSize() {
   }
 
   @Test
-  public void queueSizeOne() {
-    List<Iterable<Integer>> iterables =
-        ImmutableList.of(
-            () -> IntStream.range(0, 100).iterator(),
-            () -> IntStream.range(0, 100).iterator(),
-            () -> IntStream.range(0, 100).iterator());
+  @Timeout(10)
+  public void noDeadlock() {
+    ExecutorService executor = Executors.newFixedThreadPool(1);
+    Semaphore semaphore = new Semaphore(1);

Review Comment:
   Add a code comment that this simulates a constrained resource, such as S3 
connection pool that has limit on a max number of connections



##########
core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java:
##########
@@ -171,38 +171,55 @@ public void limitQueueSize() {
   }
 
   @Test
-  public void queueSizeOne() {
-    List<Iterable<Integer>> iterables =
-        ImmutableList.of(
-            () -> IntStream.range(0, 100).iterator(),
-            () -> IntStream.range(0, 100).iterator(),
-            () -> IntStream.range(0, 100).iterator());
+  @Timeout(10)
+  public void noDeadlock() {
+    ExecutorService executor = Executors.newFixedThreadPool(1);
+    Semaphore semaphore = new Semaphore(1);
 
-    Multiset<Integer> expectedValues =
-        IntStream.range(0, 100)
-            .boxed()
-            .flatMap(i -> Stream.of(i, i, i))
-            .collect(ImmutableMultiset.toImmutableMultiset());
+    List<Iterable<Integer>> iterablesA =
+        ImmutableList.of(
+            testIterable(
+                semaphore::acquire, semaphore::release, IntStream.range(0, 
100).iterator()));
+    List<Iterable<Integer>> iterablesB =
+        ImmutableList.of(
+            testIterable(
+                semaphore::acquire, semaphore::release, IntStream.range(200, 
300).iterator()));
 
-    ExecutorService executor = Executors.newCachedThreadPool();
-    ParallelIterable<Integer> parallelIterable = new 
ParallelIterable<>(iterables, executor, 1);
-    ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) 
parallelIterable.iterator();
+    ParallelIterable<Integer> parallelIterableA = new 
ParallelIterable<>(iterablesA, executor, 1);
+    ParallelIterable<Integer> parallelIterableB = new 
ParallelIterable<>(iterablesB, executor, 1);

Review Comment:
   i have a code warning in those lines. Do you get one as well?



##########
core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java:
##########
@@ -171,38 +171,55 @@ public void limitQueueSize() {
   }
 
   @Test
-  public void queueSizeOne() {
-    List<Iterable<Integer>> iterables =
-        ImmutableList.of(
-            () -> IntStream.range(0, 100).iterator(),
-            () -> IntStream.range(0, 100).iterator(),
-            () -> IntStream.range(0, 100).iterator());
+  @Timeout(10)
+  public void noDeadlock() {

Review Comment:
   this new test indeed times out before the changes



##########
core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java:
##########
@@ -148,17 +150,15 @@ public void limitQueueSize() {
             .collect(ImmutableMultiset.toImmutableMultiset());
 
     int maxQueueSize = 20;
-    ExecutorService executor = Executors.newCachedThreadPool();
+    ExecutorService executor = Executors.newSingleThreadExecutor();

Review Comment:
   is the change to existing test important?
   or is it enough for the noDeadlock test to test there is no deadock?
   
   



##########
core/src/main/java/org/apache/iceberg/util/ParallelIterable.java:
##########
@@ -153,6 +155,7 @@ private synchronized boolean checkTasks() {
             try {
               Optional<Task<T>> continuation = taskFutures[i].get();
               continuation.ifPresent(yieldedTasks::addLast);
+              taskFutures[i] = null;

Review Comment:
   Why?



##########
core/src/main/java/org/apache/iceberg/util/ParallelIterable.java:
##########
@@ -257,17 +257,17 @@ private static class Task<T> implements 
Supplier<Optional<Task<T>>>, Closeable {
     @Override
     public Optional<Task<T>> get() {
       try {
+        if (queue.size() >= approximateMaxQueueSize) {
+          // Yield when queue is over the size limit. Task will be resubmitted 
later and continue
+          // the work.
+          return Optional.of(this);

Review Comment:
   it's very neat you solved the problem by just moving 2 lines of code around 
(okay, 5 including comments). Please add a code comment capturing the wisdom 
behind this seemingly cosmetic change.



##########
core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java:
##########
@@ -171,38 +171,55 @@ public void limitQueueSize() {
   }
 
   @Test
-  public void queueSizeOne() {
-    List<Iterable<Integer>> iterables =
-        ImmutableList.of(
-            () -> IntStream.range(0, 100).iterator(),
-            () -> IntStream.range(0, 100).iterator(),
-            () -> IntStream.range(0, 100).iterator());
+  @Timeout(10)
+  public void noDeadlock() {
+    ExecutorService executor = Executors.newFixedThreadPool(1);
+    Semaphore semaphore = new Semaphore(1);
 
-    Multiset<Integer> expectedValues =
-        IntStream.range(0, 100)
-            .boxed()
-            .flatMap(i -> Stream.of(i, i, i))
-            .collect(ImmutableMultiset.toImmutableMultiset());
+    List<Iterable<Integer>> iterablesA =
+        ImmutableList.of(
+            testIterable(
+                semaphore::acquire, semaphore::release, IntStream.range(0, 
100).iterator()));
+    List<Iterable<Integer>> iterablesB =
+        ImmutableList.of(
+            testIterable(
+                semaphore::acquire, semaphore::release, IntStream.range(200, 
300).iterator()));
 
-    ExecutorService executor = Executors.newCachedThreadPool();
-    ParallelIterable<Integer> parallelIterable = new 
ParallelIterable<>(iterables, executor, 1);
-    ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) 
parallelIterable.iterator();
+    ParallelIterable<Integer> parallelIterableA = new 
ParallelIterable<>(iterablesA, executor, 1);
+    ParallelIterable<Integer> parallelIterableB = new 
ParallelIterable<>(iterablesB, executor, 1);
 
-    Multiset<Integer> actualValues = HashMultiset.create();
+    parallelIterableA.iterator().next();
+    parallelIterableB.iterator().next();
 
-    while (iterator.hasNext()) {
-      assertThat(iterator.queueSize())
-          .as("iterator internal queue size")
-          .isLessThanOrEqualTo(1 + iterables.size());
-      actualValues.add(iterator.next());
-    }
+    executor.shutdownNow();

Review Comment:
   please use try finally to manage an executor in tests
   (i am aware existing tests don't do this, but that's not a reason not to do 
better in new code)



##########
core/src/main/java/org/apache/iceberg/util/ParallelIterable.java:
##########
@@ -165,7 +168,10 @@ private synchronized boolean checkTasks() {
             }
           }
 
-          taskFutures[i] = submitNextTask();
+          // submit a new task if there is space in the queue
+          if (queue.size() < maxQueueSize) {
+            taskFutures[i] = submitNextTask();

Review Comment:
   Can this be tested?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to