jpountz commented on code in PR #13124: URL: https://github.com/apache/lucene/pull/13124#discussion_r1525217090
########## lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java: ########## @@ -130,19 +135,31 @@ MergeState merge() throws IOException { IOContext.READ, segmentWriteState.segmentSuffix); + TaskExecutor taskExecutor = new TaskExecutor(mergeState.intraMergeTaskExecutor); + List<Callable<Void>> mergingTasks = new ArrayList<>(); + if (mergeState.mergeFieldInfos.hasNorms()) { mergeWithLogging(this::mergeNorms, segmentWriteState, segmentReadState, "norms", numMerged); } mergeWithLogging(this::mergeTerms, segmentWriteState, segmentReadState, "postings", numMerged); Review Comment: Can we have a parallel task that handles norms + terms so that the order is respected? ########## lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java: ########## @@ -902,12 +932,57 @@ private static String getSegmentName(MergePolicy.OneMerge merge) { } static { - TestSecrets.setConcurrentMergeSchedulerAccess( - new ConcurrentMergeSchedulerAccess() { - @Override - public void setSuppressExceptions(ConcurrentMergeScheduler cms) { - cms.setSuppressExceptions(); + TestSecrets.setConcurrentMergeSchedulerAccess(ConcurrentMergeScheduler::setSuppressExceptions); + } + + private class ScaledExecutor implements Executor { + + private final AtomicInteger activeCount = new AtomicInteger(0); + private final ThreadPoolExecutor executor; + + public ScaledExecutor() { + this.executor = + new ThreadPoolExecutor(0, 1024, 1L, TimeUnit.MINUTES, new SynchronousQueue<>()); + } + + void shutdown() { + executor.shutdown(); + } + + @Override + public void execute(Runnable command) { + assert mergeThreads.contains(Thread.currentThread()) : "caller is not a merge thread"; Review Comment: I'd expect this assertion to no longer be valid, since SegmentMerger may fork into this executor for vectors, and then the task for vectors may want to further fork into a separate thread? So the caller may be either a MergeThread, or a thread from the wrapper thread pool? ########## lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java: ########## @@ -910,4 +936,55 @@ public void setSuppressExceptions(ConcurrentMergeScheduler cms) { } }); } + + private class ScaledExecutor implements Executor { Review Comment: nit: should it be called `CachedExecutor` now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org