jpountz commented on code in PR #12569: URL: https://github.com/apache/lucene/pull/12569#discussion_r1330338668
########## lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java: ########## @@ -64,4 +82,26 @@ final <T> List<T> invokeAll(Collection<RunnableFuture<T>> tasks) throws IOExcept } return results; } + + final <C> Task<C> createTask(Callable<C> callable) { + return new Task<>(callable); + } + + static class Task<V> extends FutureTask<V> { + private Task(Callable<V> callable) { + super(callable); + } + + @Override + public void run() { + try { + Integer counter = runSameThread.get(); + runSameThread.set(++counter); + super.run(); + } finally { + Integer counter = runSameThread.get(); + runSameThread.set(--counter); Review Comment: and likewise here ```suggestion runSameThread.set(counter - 1); ``` ########## lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java: ########## @@ -64,4 +82,26 @@ final <T> List<T> invokeAll(Collection<RunnableFuture<T>> tasks) throws IOExcept } return results; } + + final <C> Task<C> createTask(Callable<C> callable) { + return new Task<>(callable); + } + + static class Task<V> extends FutureTask<V> { + private Task(Callable<V> callable) { + super(callable); + } + + @Override + public void run() { + try { + Integer counter = runSameThread.get(); + runSameThread.set(++counter); Review Comment: nit: it feels weird to increment when you don't read the value later on ```suggestion runSameThread.set(counter + 1); ``` ########## lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java: ########## @@ -22,18 +22,29 @@ import java.util.Collection; import java.util.List; import java.util.Objects; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Future; -import java.util.concurrent.RunnableFuture; +import java.util.concurrent.FutureTask; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.ThreadInterruptedException; /** * Executor wrapper responsible for the execution of concurrent tasks. Used to parallelize search - * across segments as well as query rewrite in some cases. + * across segments as well as query rewrite in some cases. Exposes a {@link #createTask(Callable)} + * method to create tasks given a {@link Callable}, as well as the {@link #invokeAll(Collection)} + * method to execute a set of tasks concurrently. Once all tasks are submitted to the executor, it + * blocks and wait for all tasks to be completed, and then returns a list with the obtained results. + * Ensures that the underlying executor is only used for top-level {@link #invokeAll(Collection)} + * calls, and not for potential {@link #invokeAll(Collection)} calls made from one of the tasks. + * This is to prevent deadlock with certain types of executors, as well as to limit the level of + * parallelism. */ class TaskExecutor { + // a static thread local is ok as long as there is a single TaskExecutor ever created Review Comment: But there would usually be one TaskExecutor per IndexSearcher right? So more than one for users that have multiple indexes in the same JVM? Even for a single IndexSearcher, you could have two point-in-time views that are open at the same time and would have different `TaskExecutor` instances. IMO `static` is helpful because it also helps cover cases when users search into another `IndexSearcher` from a collector? ########## lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java: ########## @@ -22,18 +22,28 @@ import java.util.Collection; import java.util.List; import java.util.Objects; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Future; -import java.util.concurrent.RunnableFuture; +import java.util.concurrent.FutureTask; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.ThreadInterruptedException; /** * Executor wrapper responsible for the execution of concurrent tasks. Used to parallelize search - * across segments as well as query rewrite in some cases. + * across segments as well as query rewrite in some cases. Exposes a {@link #createTask(Callable)} + * method to create tasks given a {@link Callable}, as well as the {@link #invokeAll(Collection)} + * method to execute a set of tasks concurrently. Once all tasks are submitted to the executor, it + * blocks and wait for all tasks to be completed, and then returns a list with the obtained results. + * Ensures that the underlying executor is only used for top-level {@link #invokeAll(Collection)} + * calls, and not for potential {@link #invokeAll(Collection)} calls made from one of the tasks. + * This is to prevent deadlock with certain types of executors, as well as to limit the level of + * parallelism. Review Comment: I would remove the bit about limiting the level of parallelism. I don't think it's a goal, mostly a side effect of the logic to avoid deadlocks. It's true that this might hurt executors that are not subject to deadlocks, but I would be very surprised if there were many users relying on it today since it can only happen when running a rewrite or a search from a rewrite or a search, which is not typical. ########## lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java: ########## @@ -22,18 +22,29 @@ import java.util.Collection; import java.util.List; import java.util.Objects; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Future; -import java.util.concurrent.RunnableFuture; +import java.util.concurrent.FutureTask; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.ThreadInterruptedException; /** * Executor wrapper responsible for the execution of concurrent tasks. Used to parallelize search - * across segments as well as query rewrite in some cases. + * across segments as well as query rewrite in some cases. Exposes a {@link #createTask(Callable)} + * method to create tasks given a {@link Callable}, as well as the {@link #invokeAll(Collection)} + * method to execute a set of tasks concurrently. Once all tasks are submitted to the executor, it + * blocks and wait for all tasks to be completed, and then returns a list with the obtained results. + * Ensures that the underlying executor is only used for top-level {@link #invokeAll(Collection)} + * calls, and not for potential {@link #invokeAll(Collection)} calls made from one of the tasks. + * This is to prevent deadlock with certain types of executors, as well as to limit the level of + * parallelism. */ class TaskExecutor { + // a static thread local is ok as long as there is a single TaskExecutor ever created + private static final ThreadLocal<Integer> runSameThread = ThreadLocal.withInitial(() -> 0); Review Comment: can we make the name more descriptive, e.g. `numberOfRunningTasksInCurrentThread` or something along these lines? Maybe also leave a comment about how tracking counts instead of just booleans is important in case `ThreadPoolExecutor.CallerRunsPolicy` is used? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org