jpountz commented on code in PR #12569:
URL: https://github.com/apache/lucene/pull/12569#discussion_r1330338668


##########
lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java:
##########
@@ -64,4 +82,26 @@ final <T> List<T> invokeAll(Collection<RunnableFuture<T>> 
tasks) throws IOExcept
     }
     return results;
   }
+
+  final <C> Task<C> createTask(Callable<C> callable) {
+    return new Task<>(callable);
+  }
+
+  static class Task<V> extends FutureTask<V> {
+    private Task(Callable<V> callable) {
+      super(callable);
+    }
+
+    @Override
+    public void run() {
+      try {
+        Integer counter = runSameThread.get();
+        runSameThread.set(++counter);
+        super.run();
+      } finally {
+        Integer counter = runSameThread.get();
+        runSameThread.set(--counter);

Review Comment:
   and likewise here
   
   ```suggestion
           runSameThread.set(counter - 1);
   ```



##########
lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java:
##########
@@ -64,4 +82,26 @@ final <T> List<T> invokeAll(Collection<RunnableFuture<T>> 
tasks) throws IOExcept
     }
     return results;
   }
+
+  final <C> Task<C> createTask(Callable<C> callable) {
+    return new Task<>(callable);
+  }
+
+  static class Task<V> extends FutureTask<V> {
+    private Task(Callable<V> callable) {
+      super(callable);
+    }
+
+    @Override
+    public void run() {
+      try {
+        Integer counter = runSameThread.get();
+        runSameThread.set(++counter);

Review Comment:
   nit: it feels weird to increment when you don't read the value later on
   ```suggestion
           runSameThread.set(counter + 1);
   ```



##########
lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java:
##########
@@ -22,18 +22,29 @@
 import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
-import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.FutureTask;
 import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.ThreadInterruptedException;
 
 /**
  * Executor wrapper responsible for the execution of concurrent tasks. Used to 
parallelize search
- * across segments as well as query rewrite in some cases.
+ * across segments as well as query rewrite in some cases. Exposes a {@link 
#createTask(Callable)}
+ * method to create tasks given a {@link Callable}, as well as the {@link 
#invokeAll(Collection)}
+ * method to execute a set of tasks concurrently. Once all tasks are submitted 
to the executor, it
+ * blocks and wait for all tasks to be completed, and then returns a list with 
the obtained results.
+ * Ensures that the underlying executor is only used for top-level {@link 
#invokeAll(Collection)}
+ * calls, and not for potential {@link #invokeAll(Collection)} calls made from 
one of the tasks.
+ * This is to prevent deadlock with certain types of executors, as well as to 
limit the level of
+ * parallelism.
  */
 class TaskExecutor {
+  // a static thread local is ok as long as there is a single TaskExecutor 
ever created

Review Comment:
   But there would usually be one TaskExecutor per IndexSearcher right? So more 
than one for users that have multiple indexes in the same JVM? Even for a 
single IndexSearcher, you could have two point-in-time views that are open at 
the same time and would have different `TaskExecutor` instances.
   
   IMO `static` is helpful because it also helps cover cases when users search 
into another `IndexSearcher` from a collector?



##########
lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java:
##########
@@ -22,18 +22,28 @@
 import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
-import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.FutureTask;
 import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.ThreadInterruptedException;
 
 /**
  * Executor wrapper responsible for the execution of concurrent tasks. Used to 
parallelize search
- * across segments as well as query rewrite in some cases.
+ * across segments as well as query rewrite in some cases. Exposes a {@link 
#createTask(Callable)}
+ * method to create tasks given a {@link Callable}, as well as the {@link 
#invokeAll(Collection)}
+ * method to execute a set of tasks concurrently. Once all tasks are submitted 
to the executor, it
+ * blocks and wait for all tasks to be completed, and then returns a list with 
the obtained results.
+ * Ensures that the underlying executor is only used for top-level {@link 
#invokeAll(Collection)}
+ * calls, and not for potential {@link #invokeAll(Collection)} calls made from 
one of the tasks.
+ * This is to prevent deadlock with certain types of executors, as well as to 
limit the level of
+ * parallelism.

Review Comment:
   I would remove the bit about limiting the level of parallelism. I don't 
think it's a goal, mostly a side effect of the logic to avoid deadlocks.
   
   It's true that this might hurt executors that are not subject to deadlocks, 
but I would be very surprised if there were many users relying on it today 
since it can only happen when running a rewrite or a search from a rewrite or a 
search, which is not typical.



##########
lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java:
##########
@@ -22,18 +22,29 @@
 import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
-import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.FutureTask;
 import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.ThreadInterruptedException;
 
 /**
  * Executor wrapper responsible for the execution of concurrent tasks. Used to 
parallelize search
- * across segments as well as query rewrite in some cases.
+ * across segments as well as query rewrite in some cases. Exposes a {@link 
#createTask(Callable)}
+ * method to create tasks given a {@link Callable}, as well as the {@link 
#invokeAll(Collection)}
+ * method to execute a set of tasks concurrently. Once all tasks are submitted 
to the executor, it
+ * blocks and wait for all tasks to be completed, and then returns a list with 
the obtained results.
+ * Ensures that the underlying executor is only used for top-level {@link 
#invokeAll(Collection)}
+ * calls, and not for potential {@link #invokeAll(Collection)} calls made from 
one of the tasks.
+ * This is to prevent deadlock with certain types of executors, as well as to 
limit the level of
+ * parallelism.
  */
 class TaskExecutor {
+  // a static thread local is ok as long as there is a single TaskExecutor 
ever created
+  private static final ThreadLocal<Integer> runSameThread = 
ThreadLocal.withInitial(() -> 0);

Review Comment:
   can we make the name more descriptive, e.g. 
`numberOfRunningTasksInCurrentThread` or something along these lines?
   
   Maybe also leave a comment about how tracking counts instead of just 
booleans is important in case `ThreadPoolExecutor.CallerRunsPolicy` is used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org
For additional commands, e-mail: issues-h...@lucene.apache.org

Reply via email to