javanna commented on code in PR #13472:
URL: https://github.com/apache/lucene/pull/13472#discussion_r1642563757


##########
lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java:
##########
@@ -30,27 +30,21 @@
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.ThreadInterruptedException;
 
 /**
  * Executor wrapper responsible for the execution of concurrent tasks. Used to 
parallelize search
  * across segments as well as query rewrite in some cases. Exposes a single 
{@link
  * #invokeAll(Collection)} method that takes a collection of {@link Callable}s 
and executes them
- * concurrently/ Once all tasks are submitted to the executor, it blocks and 
wait for all tasks to
- * be completed, and then returns a list with the obtained results. Ensures 
that the underlying
- * executor is only used for top-level {@link #invokeAll(Collection)} calls, 
and not for potential
- * {@link #invokeAll(Collection)} calls made from one of the tasks. This is to 
prevent deadlock with
- * certain types of pool based executors (e.g. {@link 
java.util.concurrent.ThreadPoolExecutor}).
+ * concurrently. Once all but one task have been submitted to the executor, it 
tries to run as many
+ * tasks as possible on the calling thread, then waits for all tasks that have 
been executed in
+ * parallel on the executor to be completed and then returns a list with the 
obtained results.

Review Comment:
   nit: remove the last "then", leave just "and returns a list with the 
obtained results" ?



##########
lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java:
##########
@@ -144,32 +128,45 @@ public boolean cancel(boolean mayInterruptIfRunning) {
     }
 
     List<T> invokeAll(Executor executor) throws IOException {
-      boolean runOnCallerThread = numberOfRunningTasksInCurrentThread.get() > 
0;
-      for (Runnable runnable : futures) {
-        if (runOnCallerThread) {
-          runnable.run();
-        } else {
-          executor.execute(runnable);
+      final int count = futures.size();
+      // taskId provides the first index of an un-executed task in #futures
+      final AtomicInteger taskId = new AtomicInteger(0);
+      // we fork execution count - 1 tasks to execute at least one task on the 
current thread to
+      // minimize needless forking and blocking of the current thread
+      if (count > 1) {
+        final Runnable work =
+            () -> {
+              int id = taskId.getAndIncrement();
+              if (id < count) {
+                futures.get(id).run();
+              }
+            };
+        for (int j = 0; j < count - 1; j++) {
+          executor.execute(work);
+        }
+      }
+      // try to execute as many tasks as possible on the current thread to 
minimize context
+      // switching in case of long running concurrent
+      // tasks as well as dead-locking if the current thread is part of 
#executor for executors that
+      // have limited or no parallelism
+      int id;
+      while ((id = taskId.getAndIncrement()) < count) {
+        futures.get(id).run();
+        if (id >= count - 1) {
+          // save redundant CAS in case this was the last task
+          break;
         }
       }
       Throwable exc = null;
-      List<T> results = new ArrayList<>(futures.size());
-      for (Future<T> future : futures) {
+      List<T> results = new ArrayList<>(count);
+      for (int i = 0; i < count; i++) {

Review Comment:
   can you remind me, why does order matter here compared to before?



##########
lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java:
##########
@@ -112,15 +102,10 @@ RunnableFuture<T> createTask(Callable<T> callable) {
           () -> {
             if (startedOrCancelled.compareAndSet(false, true)) {
               try {
-                Integer counter = numberOfRunningTasksInCurrentThread.get();

Review Comment:
   What's your thinking about this? It may make sense to lock 2nd level fan 
out, yet with this new way of executing, in practice we won't use as many 
threads as tasks. Maybe we can keep things simple?



##########
lucene/core/src/test/org/apache/lucene/search/TestTaskExecutor.java:
##########
@@ -308,7 +306,7 @@ public void testInvokeAllCatchesMultipleExceptions() {
   }
 
   public void testCancelTasksOnException() {
-    TaskExecutor taskExecutor = new TaskExecutor(executorService);
+    TaskExecutor taskExecutor = new TaskExecutor(Runnable::run);

Review Comment:
   This way the test does not use any concurrency, hence it makes the test much 
simpler? Shall we change all tests in this class to reproduce the usage that we 
are envisioning, meaning one of the threads of the executor to call invokeAll? 
   
   I was also wondering if we want to add additional tests for the type of 
execution you implemented. For instance an artificial test the verifies that we 
never deadlock? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org
For additional commands, e-mail: issues-h...@lucene.apache.org

Reply via email to