findepi commented on code in PR #11781: URL: https://github.com/apache/iceberg/pull/11781#discussion_r1888440219
########## core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java: ########## @@ -171,38 +171,55 @@ public void limitQueueSize() { } @Test - public void queueSizeOne() { - List<Iterable<Integer>> iterables = - ImmutableList.of( - () -> IntStream.range(0, 100).iterator(), - () -> IntStream.range(0, 100).iterator(), - () -> IntStream.range(0, 100).iterator()); + @Timeout(10) + public void noDeadlock() { + ExecutorService executor = Executors.newFixedThreadPool(1); + Semaphore semaphore = new Semaphore(1); Review Comment: Add a code comment that this simulates a constrained resource, such as S3 connection pool that has limit on a max number of connections ########## core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java: ########## @@ -171,38 +171,55 @@ public void limitQueueSize() { } @Test - public void queueSizeOne() { - List<Iterable<Integer>> iterables = - ImmutableList.of( - () -> IntStream.range(0, 100).iterator(), - () -> IntStream.range(0, 100).iterator(), - () -> IntStream.range(0, 100).iterator()); + @Timeout(10) + public void noDeadlock() { + ExecutorService executor = Executors.newFixedThreadPool(1); + Semaphore semaphore = new Semaphore(1); - Multiset<Integer> expectedValues = - IntStream.range(0, 100) - .boxed() - .flatMap(i -> Stream.of(i, i, i)) - .collect(ImmutableMultiset.toImmutableMultiset()); + List<Iterable<Integer>> iterablesA = + ImmutableList.of( + testIterable( + semaphore::acquire, semaphore::release, IntStream.range(0, 100).iterator())); + List<Iterable<Integer>> iterablesB = + ImmutableList.of( + testIterable( + semaphore::acquire, semaphore::release, IntStream.range(200, 300).iterator())); - ExecutorService executor = Executors.newCachedThreadPool(); - ParallelIterable<Integer> parallelIterable = new ParallelIterable<>(iterables, executor, 1); - ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) parallelIterable.iterator(); + ParallelIterable<Integer> parallelIterableA = new ParallelIterable<>(iterablesA, executor, 1); + ParallelIterable<Integer> parallelIterableB = new ParallelIterable<>(iterablesB, executor, 1); Review Comment: i have a code warning in those lines. Do you get one as well? ########## core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java: ########## @@ -171,38 +171,55 @@ public void limitQueueSize() { } @Test - public void queueSizeOne() { - List<Iterable<Integer>> iterables = - ImmutableList.of( - () -> IntStream.range(0, 100).iterator(), - () -> IntStream.range(0, 100).iterator(), - () -> IntStream.range(0, 100).iterator()); + @Timeout(10) + public void noDeadlock() { Review Comment: this new test indeed times out before the changes ########## core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java: ########## @@ -148,17 +150,15 @@ public void limitQueueSize() { .collect(ImmutableMultiset.toImmutableMultiset()); int maxQueueSize = 20; - ExecutorService executor = Executors.newCachedThreadPool(); + ExecutorService executor = Executors.newSingleThreadExecutor(); Review Comment: is the change to existing test important? or is it enough for the noDeadlock test to test there is no deadock? ########## core/src/main/java/org/apache/iceberg/util/ParallelIterable.java: ########## @@ -153,6 +155,7 @@ private synchronized boolean checkTasks() { try { Optional<Task<T>> continuation = taskFutures[i].get(); continuation.ifPresent(yieldedTasks::addLast); + taskFutures[i] = null; Review Comment: Why? ########## core/src/main/java/org/apache/iceberg/util/ParallelIterable.java: ########## @@ -257,17 +257,17 @@ private static class Task<T> implements Supplier<Optional<Task<T>>>, Closeable { @Override public Optional<Task<T>> get() { try { + if (queue.size() >= approximateMaxQueueSize) { + // Yield when queue is over the size limit. Task will be resubmitted later and continue + // the work. + return Optional.of(this); Review Comment: it's very neat you solved the problem by just moving 2 lines of code around (okay, 5 including comments). Please add a code comment capturing the wisdom behind this seemingly cosmetic change. ########## core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java: ########## @@ -171,38 +171,55 @@ public void limitQueueSize() { } @Test - public void queueSizeOne() { - List<Iterable<Integer>> iterables = - ImmutableList.of( - () -> IntStream.range(0, 100).iterator(), - () -> IntStream.range(0, 100).iterator(), - () -> IntStream.range(0, 100).iterator()); + @Timeout(10) + public void noDeadlock() { + ExecutorService executor = Executors.newFixedThreadPool(1); + Semaphore semaphore = new Semaphore(1); - Multiset<Integer> expectedValues = - IntStream.range(0, 100) - .boxed() - .flatMap(i -> Stream.of(i, i, i)) - .collect(ImmutableMultiset.toImmutableMultiset()); + List<Iterable<Integer>> iterablesA = + ImmutableList.of( + testIterable( + semaphore::acquire, semaphore::release, IntStream.range(0, 100).iterator())); + List<Iterable<Integer>> iterablesB = + ImmutableList.of( + testIterable( + semaphore::acquire, semaphore::release, IntStream.range(200, 300).iterator())); - ExecutorService executor = Executors.newCachedThreadPool(); - ParallelIterable<Integer> parallelIterable = new ParallelIterable<>(iterables, executor, 1); - ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) parallelIterable.iterator(); + ParallelIterable<Integer> parallelIterableA = new ParallelIterable<>(iterablesA, executor, 1); + ParallelIterable<Integer> parallelIterableB = new ParallelIterable<>(iterablesB, executor, 1); - Multiset<Integer> actualValues = HashMultiset.create(); + parallelIterableA.iterator().next(); + parallelIterableB.iterator().next(); - while (iterator.hasNext()) { - assertThat(iterator.queueSize()) - .as("iterator internal queue size") - .isLessThanOrEqualTo(1 + iterables.size()); - actualValues.add(iterator.next()); - } + executor.shutdownNow(); Review Comment: please use try finally to manage an executor in tests (i am aware existing tests don't do this, but that's not a reason not to do better in new code) ########## core/src/main/java/org/apache/iceberg/util/ParallelIterable.java: ########## @@ -165,7 +168,10 @@ private synchronized boolean checkTasks() { } } - taskFutures[i] = submitNextTask(); + // submit a new task if there is space in the queue + if (queue.size() < maxQueueSize) { + taskFutures[i] = submitNextTask(); Review Comment: Can this be tested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org