sopel39 commented on code in PR #11781:
URL: https://github.com/apache/iceberg/pull/11781#discussion_r1890160296


##########
core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java:
##########
@@ -171,38 +171,55 @@ public void limitQueueSize() {
   }
 
   @Test
-  public void queueSizeOne() {

Review Comment:
   The test is no longer viable as queue size is no longer a hard limit. The 
queue size will be at min the amount of items produced by single 
`org.apache.iceberg.util.ParallelIterable.Task`



##########
core/src/main/java/org/apache/iceberg/util/ParallelIterable.java:
##########
@@ -153,6 +155,7 @@ private synchronized boolean checkTasks() {
             try {
               Optional<Task<T>> continuation = taskFutures[i].get();
               continuation.ifPresent(yieldedTasks::addLast);
+              taskFutures[i] = null;

Review Comment:
   This is because of the
   ```
             // submit a new task if there is space in the queue
             if (queue.size() < maxQueueSize) {
   ```
   below. The queue might be full, hence `taskFutures[i]` should remain empty 
as it was already added to `yieldedTasks`



##########
core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java:
##########
@@ -148,17 +150,15 @@ public void limitQueueSize() {
             .collect(ImmutableMultiset.toImmutableMultiset());
 
     int maxQueueSize = 20;
-    ExecutorService executor = Executors.newCachedThreadPool();
+    ExecutorService executor = Executors.newSingleThreadExecutor();

Review Comment:
   `ParallelIterable` will still spawn `2 * 
ThreadPools.WORKER_THREAD_POOL_SIZE` tasks. These tasks won't be immediately 
picked up. When they are started, they are guaranteed to finish. This means 
that queue size will be temporary higher than `maxQueueSize`. In order to 
properly test the limits, we need to reduce executor size (as it will be in 
practice) to constraint number of concurrently running tasks.



##########
core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java:
##########
@@ -171,38 +171,55 @@ public void limitQueueSize() {
   }
 
   @Test
-  public void queueSizeOne() {
-    List<Iterable<Integer>> iterables =
-        ImmutableList.of(
-            () -> IntStream.range(0, 100).iterator(),
-            () -> IntStream.range(0, 100).iterator(),
-            () -> IntStream.range(0, 100).iterator());
+  @Timeout(10)
+  public void noDeadlock() {
+    ExecutorService executor = Executors.newFixedThreadPool(1);
+    Semaphore semaphore = new Semaphore(1);
 
-    Multiset<Integer> expectedValues =
-        IntStream.range(0, 100)
-            .boxed()
-            .flatMap(i -> Stream.of(i, i, i))
-            .collect(ImmutableMultiset.toImmutableMultiset());
+    List<Iterable<Integer>> iterablesA =
+        ImmutableList.of(
+            testIterable(
+                semaphore::acquire, semaphore::release, IntStream.range(0, 
100).iterator()));
+    List<Iterable<Integer>> iterablesB =
+        ImmutableList.of(
+            testIterable(
+                semaphore::acquire, semaphore::release, IntStream.range(200, 
300).iterator()));
 
-    ExecutorService executor = Executors.newCachedThreadPool();
-    ParallelIterable<Integer> parallelIterable = new 
ParallelIterable<>(iterables, executor, 1);
-    ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) 
parallelIterable.iterator();
+    ParallelIterable<Integer> parallelIterableA = new 
ParallelIterable<>(iterablesA, executor, 1);
+    ParallelIterable<Integer> parallelIterableB = new 
ParallelIterable<>(iterablesB, executor, 1);

Review Comment:
   no. Language version issue?



##########
core/src/main/java/org/apache/iceberg/util/ParallelIterable.java:
##########
@@ -165,7 +168,10 @@ private synchronized boolean checkTasks() {
             }
           }
 
-          taskFutures[i] = submitNextTask();
+          // submit a new task if there is space in the queue
+          if (queue.size() < maxQueueSize) {
+            taskFutures[i] = submitNextTask();

Review Comment:
   no. It's just an optimization to avoid active loop (while queue is full) via 
executor which increases congestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to