sopel39 commented on code in PR #11781: URL: https://github.com/apache/iceberg/pull/11781#discussion_r1890160296
########## core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java: ########## @@ -171,38 +171,55 @@ public void limitQueueSize() { } @Test - public void queueSizeOne() { Review Comment: The test is no longer viable as queue size is no longer a hard limit. The queue size will be at min the amount of items produced by single `org.apache.iceberg.util.ParallelIterable.Task` ########## core/src/main/java/org/apache/iceberg/util/ParallelIterable.java: ########## @@ -153,6 +155,7 @@ private synchronized boolean checkTasks() { try { Optional<Task<T>> continuation = taskFutures[i].get(); continuation.ifPresent(yieldedTasks::addLast); + taskFutures[i] = null; Review Comment: This is because of the ``` // submit a new task if there is space in the queue if (queue.size() < maxQueueSize) { ``` below. The queue might be full, hence `taskFutures[i]` should remain empty as it was already added to `yieldedTasks` ########## core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java: ########## @@ -148,17 +150,15 @@ public void limitQueueSize() { .collect(ImmutableMultiset.toImmutableMultiset()); int maxQueueSize = 20; - ExecutorService executor = Executors.newCachedThreadPool(); + ExecutorService executor = Executors.newSingleThreadExecutor(); Review Comment: `ParallelIterable` will still spawn `2 * ThreadPools.WORKER_THREAD_POOL_SIZE` tasks. These tasks won't be immediately picked up. When they are started, they are guaranteed to finish. This means that queue size will be temporary higher than `maxQueueSize`. In order to properly test the limits, we need to reduce executor size (as it will be in practice) to constraint number of concurrently running tasks. ########## core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java: ########## @@ -171,38 +171,55 @@ public void limitQueueSize() { } @Test - public void queueSizeOne() { - List<Iterable<Integer>> iterables = - ImmutableList.of( - () -> IntStream.range(0, 100).iterator(), - () -> IntStream.range(0, 100).iterator(), - () -> IntStream.range(0, 100).iterator()); + @Timeout(10) + public void noDeadlock() { + ExecutorService executor = Executors.newFixedThreadPool(1); + Semaphore semaphore = new Semaphore(1); - Multiset<Integer> expectedValues = - IntStream.range(0, 100) - .boxed() - .flatMap(i -> Stream.of(i, i, i)) - .collect(ImmutableMultiset.toImmutableMultiset()); + List<Iterable<Integer>> iterablesA = + ImmutableList.of( + testIterable( + semaphore::acquire, semaphore::release, IntStream.range(0, 100).iterator())); + List<Iterable<Integer>> iterablesB = + ImmutableList.of( + testIterable( + semaphore::acquire, semaphore::release, IntStream.range(200, 300).iterator())); - ExecutorService executor = Executors.newCachedThreadPool(); - ParallelIterable<Integer> parallelIterable = new ParallelIterable<>(iterables, executor, 1); - ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) parallelIterable.iterator(); + ParallelIterable<Integer> parallelIterableA = new ParallelIterable<>(iterablesA, executor, 1); + ParallelIterable<Integer> parallelIterableB = new ParallelIterable<>(iterablesB, executor, 1); Review Comment: no. Language version issue? ########## core/src/main/java/org/apache/iceberg/util/ParallelIterable.java: ########## @@ -165,7 +168,10 @@ private synchronized boolean checkTasks() { } } - taskFutures[i] = submitNextTask(); + // submit a new task if there is space in the queue + if (queue.size() < maxQueueSize) { + taskFutures[i] = submitNextTask(); Review Comment: no. It's just an optimization to avoid active loop (while queue is full) via executor which increases congestion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org