vigyasharma commented on code in PR #14900: URL: https://github.com/apache/lucene/pull/14900#discussion_r2227686675
########## lucene/core/src/java/org/apache/lucene/index/SharedMergeScheduler.java: ########## @@ -0,0 +1,84 @@ +package org.apache.lucene.index; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; + +/** + * SharedMergeScheduler is an experimental MergeScheduler that submits merge tasks to a shared + * thread pool across IndexWriters. + */ +public class SharedMergeScheduler extends MergeScheduler { + + // Tracks submitted merges per writer + private final ConcurrentHashMap<IndexWriter, Set<MergeTaskWrapper>> writerToMerges = new ConcurrentHashMap<>(); + + /** + * Executor service provided externally to handle merge tasks. + * Allows sharing a thread pool across IndexWriters if configured that way. + */ + private final ExecutorService executor; + + public SharedMergeScheduler(ExecutorService executor) { + this.executor = executor; + } + + /** + * Retrieves pending merge tasks from the given {@link MergeSource} and submits them to the shared + * thread pool for execution. + * + * @param mergeSource the source of merge tasks (typically an IndexWriter) + * @param trigger the event that triggered the merge (e.g., SEGMENT_FLUSH, EXPLICIT) + * @throws IOException if merging fails + */ + @Override + public void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException { + while (true) { + final MergePolicy.OneMerge merge = mergeSource.getNextMerge(); + if (merge == null) break; + + Runnable mergeRunnable = () -> { + try { + mergeSource.merge(merge); + } catch (IOException e) { + throw new RuntimeException(e); + } + }; + + + MergeTaskWrapper wrappedTask = new MergeTaskWrapper(mergeRunnable, (IndexWriter) mergeSource, merge.totalBytesSize()); + + // Registering this task under the writer + writerToMerges.computeIfAbsent((IndexWriter) mergeSource, k -> new CopyOnWriteArraySet<>()).add(wrappedTask); Review Comment: I don't think you can cast `mergeSource` as an `IndexWriter`. However, you might not need to. This CMS is no longer a singleton, only the executor is shared. The CMS is actually owned by an indexWriter, which means all calls to `merge()` come from the same IW. So instead of a `writer -> merges` mapping that you'd need in a singleton, all you need here is the set of pending and running merges submitted to this CMS. They are all from the same writer! When writer closes, instead of shutting down the executor, you could update the merge objects in the set (MergeTaskWrappers), and set an aborted flag. And update your runnable to skip the merge if "aborted" flag has been set. ########## lucene/core/src/java/org/apache/lucene/index/SharedMergeScheduler.java: ########## @@ -0,0 +1,84 @@ +package org.apache.lucene.index; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArraySet; + +/** + * SharedMergeScheduler is an experimental MergeScheduler that submits merge tasks to a shared + * thread pool across IndexWriters. + */ +public class SharedMergeScheduler extends MergeScheduler { + + // Tracks submitted merges per writer + private final ConcurrentHashMap<IndexWriter, Set<MergeTaskWrapper>> writerToMerges = new ConcurrentHashMap<>(); + + /** + * Executor service provided externally to handle merge tasks. + * Allows sharing a thread pool across IndexWriters if configured that way. + */ + private final ExecutorService executor; + + public SharedMergeScheduler(ExecutorService executor) { + this.executor = executor; + } + + /** + * Retrieves pending merge tasks from the given {@link MergeSource} and submits them to the shared + * thread pool for execution. + * + * @param mergeSource the source of merge tasks (typically an IndexWriter) + * @param trigger the event that triggered the merge (e.g., SEGMENT_FLUSH, EXPLICIT) + * @throws IOException if merging fails + */ + @Override + public void merge(MergeSource mergeSource, MergeTrigger trigger) throws IOException { + while (true) { Review Comment: can we use `hasPendingMerges()` instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org