javanna commented on code in PR #12183: URL: https://github.com/apache/lucene/pull/12183#discussion_r1328932010
########## lucene/core/src/java/org/apache/lucene/index/TermStates.java: ########## @@ -86,19 +93,40 @@ public TermStates( * @param needsStats if {@code true} then all leaf contexts will be visited up-front to collect * term statistics. Otherwise, the {@link TermState} objects will be built only when requested */ - public static TermStates build(IndexReaderContext context, Term term, boolean needsStats) + public static TermStates build(IndexSearcher indexSearcher, Term term, boolean needsStats) throws IOException { - assert context != null && context.isTopLevel; + IndexReaderContext context = indexSearcher.getTopReaderContext(); + assert context != null; final TermStates perReaderTermState = new TermStates(needsStats ? null : term, context); if (needsStats) { - for (final LeafReaderContext ctx : context.leaves()) { - // if (DEBUG) System.out.println(" r=" + leaves[i].reader); - TermsEnum termsEnum = loadTermsEnum(ctx, term); - if (termsEnum != null) { - final TermState termState = termsEnum.termState(); - // if (DEBUG) System.out.println(" found"); + Executor executor = indexSearcher.getExecutor(); + if (executor == null) { + executor = Runnable::run; + } + List<FutureTask<TermStateInfo>> tasks = + context.leaves().stream() + .map( + ctx -> + new FutureTask<>( + () -> { + TermsEnum termsEnum = loadTermsEnum(ctx, term); + if (termsEnum != null) { + return new TermStateInfo( + termsEnum.termState(), + ctx.ord, + termsEnum.docFreq(), + termsEnum.totalTermFreq()); + } + return null; + })) + .toList(); + TaskExecutor taskExecutor = new TaskExecutor(executor); Review Comment: you can retrieve the existing task executor from the IndexSearcher. ########## lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java: ########## @@ -26,17 +26,21 @@ import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.ThreadInterruptedException; /** * Executor wrapper responsible for the execution of concurrent tasks. Used to parallelize search * across segments as well as query rewrite in some cases. */ -class TaskExecutor { +public class TaskExecutor { private final Executor executor; + private static final String THREAD_POOL_EXECUTOR_WORKER_CLASS = + "java.util.concurrent.ThreadPoolExecutor$Worker"; + private static final String RUN_METHOD = "run"; - TaskExecutor(Executor executor) { + public TaskExecutor(Executor executor) { Review Comment: I think I understand why the class needs to become public, but I think the constructor can stay package private. ########## lucene/core/src/java/org/apache/lucene/search/TaskExecutor.java: ########## @@ -64,4 +68,46 @@ final <T> List<T> invokeAll(Collection<RunnableFuture<T>> tasks) throws IOExcept } return results; } + + /** + * Execute all the tasks provided as an argument concurrently only if it is a {@link + * ThreadPoolExecutor} and the current thread invoking this is not a {@link ThreadPoolExecutor} + * thread, else run all the tasks sequentially, wait for them to complete and return the obtained + * results. + * + * @param tasks the tasks to execute + * @return a list containing the results from the tasks execution + * @param <T> the return type of the task execution + */ + public final <T> List<T> invokeAllWithThreadPoolExecutor(Collection<RunnableFuture<T>> tasks) + throws IOException { + boolean executeOnCallerThread = + StackWalker.getInstance(StackWalker.Option.SHOW_HIDDEN_FRAMES) + .walk( + (stream) -> + stream.anyMatch( + frame -> + frame.getClassName().contains(THREAD_POOL_EXECUTOR_WORKER_CLASS) + && frame.getMethodName().contains(RUN_METHOD))); + if (executor instanceof ThreadPoolExecutor && executeOnCallerThread == false) { + for (Runnable task : tasks) { + executor.execute(task); + } + } else { + for (Runnable task : tasks) { + task.run(); + } + } + final List<T> results = new ArrayList<>(); + for (Future<T> future : tasks) { + try { + results.add(future.get()); + } catch (InterruptedException e) { + throw new ThreadInterruptedException(e); + } catch (ExecutionException e) { + throw IOUtils.rethrowAlways(e.getCause()); + } + } + return results; Review Comment: This method still looks for specific usages of ThreadPoolExecutor. Could we instead assume that whenever we are parallelizing, e.g. search a leaf slice, we would no longer want to parallelize further as part of it? I would try to detect this more generally. Also, I would incorporate that directly in the existing invokeAll method, I don't see a reason why that should not be the default and only behaviour. I am wondering if there are other ways than using StackWalker, for instance we could decorate the task to track which threads they are run into and disable parallelization in invokeAll gets called from one of those threads that run has been called against? I opened #12569 to showcase what I have in mind. Would be great to get feedback about it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org