javanna commented on code in PR #13472: URL: https://github.com/apache/lucene/pull/13472#discussion_r1642563757
########## lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java: ########## @@ -30,27 +30,21 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.RunnableFuture; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.ThreadInterruptedException; /** * Executor wrapper responsible for the execution of concurrent tasks. Used to parallelize search * across segments as well as query rewrite in some cases. Exposes a single {@link * #invokeAll(Collection)} method that takes a collection of {@link Callable}s and executes them - * concurrently/ Once all tasks are submitted to the executor, it blocks and wait for all tasks to - * be completed, and then returns a list with the obtained results. Ensures that the underlying - * executor is only used for top-level {@link #invokeAll(Collection)} calls, and not for potential - * {@link #invokeAll(Collection)} calls made from one of the tasks. This is to prevent deadlock with - * certain types of pool based executors (e.g. {@link java.util.concurrent.ThreadPoolExecutor}). + * concurrently. Once all but one task have been submitted to the executor, it tries to run as many + * tasks as possible on the calling thread, then waits for all tasks that have been executed in + * parallel on the executor to be completed and then returns a list with the obtained results. Review Comment: nit: remove the last "then", leave just "and returns a list with the obtained results" ? ########## lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java: ########## @@ -144,32 +128,45 @@ public boolean cancel(boolean mayInterruptIfRunning) { } List<T> invokeAll(Executor executor) throws IOException { - boolean runOnCallerThread = numberOfRunningTasksInCurrentThread.get() > 0; - for (Runnable runnable : futures) { - if (runOnCallerThread) { - runnable.run(); - } else { - executor.execute(runnable); + final int count = futures.size(); + // taskId provides the first index of an un-executed task in #futures + final AtomicInteger taskId = new AtomicInteger(0); + // we fork execution count - 1 tasks to execute at least one task on the current thread to + // minimize needless forking and blocking of the current thread + if (count > 1) { + final Runnable work = + () -> { + int id = taskId.getAndIncrement(); + if (id < count) { + futures.get(id).run(); + } + }; + for (int j = 0; j < count - 1; j++) { + executor.execute(work); + } + } + // try to execute as many tasks as possible on the current thread to minimize context + // switching in case of long running concurrent + // tasks as well as dead-locking if the current thread is part of #executor for executors that + // have limited or no parallelism + int id; + while ((id = taskId.getAndIncrement()) < count) { + futures.get(id).run(); + if (id >= count - 1) { + // save redundant CAS in case this was the last task + break; } } Throwable exc = null; - List<T> results = new ArrayList<>(futures.size()); - for (Future<T> future : futures) { + List<T> results = new ArrayList<>(count); + for (int i = 0; i < count; i++) { Review Comment: can you remind me, why does order matter here compared to before? ########## lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java: ########## @@ -112,15 +102,10 @@ RunnableFuture<T> createTask(Callable<T> callable) { () -> { if (startedOrCancelled.compareAndSet(false, true)) { try { - Integer counter = numberOfRunningTasksInCurrentThread.get(); Review Comment: What's your thinking about this? It may make sense to lock 2nd level fan out, yet with this new way of executing, in practice we won't use as many threads as tasks. Maybe we can keep things simple? ########## lucene/core/src/test/org/apache/lucene/search/TestTaskExecutor.java: ########## @@ -308,7 +306,7 @@ public void testInvokeAllCatchesMultipleExceptions() { } public void testCancelTasksOnException() { - TaskExecutor taskExecutor = new TaskExecutor(executorService); + TaskExecutor taskExecutor = new TaskExecutor(Runnable::run); Review Comment: This way the test does not use any concurrency, hence it makes the test much simpler? Shall we change all tests in this class to reproduce the usage that we are envisioning, meaning one of the threads of the executor to call invokeAll? I was also wondering if we want to add additional tests for the type of execution you implemented. For instance an artificial test the verifies that we never deadlock? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org