vigyasharma commented on code in PR #14900:
URL: https://github.com/apache/lucene/pull/14900#discussion_r2227686675


##########
lucene/core/src/java/org/apache/lucene/index/SharedMergeScheduler.java:
##########
@@ -0,0 +1,84 @@
+package org.apache.lucene.index;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/**
+ * SharedMergeScheduler is an experimental MergeScheduler that submits merge 
tasks to a shared
+ * thread pool across IndexWriters.
+ */
+public class SharedMergeScheduler extends MergeScheduler {
+ 
+  // Tracks submitted merges per writer
+  private final ConcurrentHashMap<IndexWriter, Set<MergeTaskWrapper>> 
writerToMerges = new ConcurrentHashMap<>();
+
+ /**
+ * Executor service provided externally to handle merge tasks.
+ * Allows sharing a thread pool across IndexWriters if configured that way.
+ */
+  private final ExecutorService executor;
+
+  public SharedMergeScheduler(ExecutorService executor) {
+      this.executor = executor;
+  }
+
+  /**
+   * Retrieves pending merge tasks from the given {@link MergeSource} and 
submits them to the shared
+   * thread pool for execution.
+   *
+   * @param mergeSource the source of merge tasks (typically an IndexWriter)
+   * @param trigger the event that triggered the merge (e.g., SEGMENT_FLUSH, 
EXPLICIT)
+   * @throws IOException if merging fails
+   */
+  @Override
+  public void merge(MergeSource mergeSource, MergeTrigger trigger) throws 
IOException {
+    while (true) {
+      final MergePolicy.OneMerge merge = mergeSource.getNextMerge();
+      if (merge == null) break;
+
+      Runnable mergeRunnable = () -> {
+            try {
+                mergeSource.merge(merge);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+      };
+
+
+      MergeTaskWrapper wrappedTask = new MergeTaskWrapper(mergeRunnable, 
(IndexWriter) mergeSource, merge.totalBytesSize());
+
+      // Registering this task under the writer 
+      writerToMerges.computeIfAbsent((IndexWriter) mergeSource, k -> new 
CopyOnWriteArraySet<>()).add(wrappedTask);

Review Comment:
   I don't think you can cast `mergeSource` as an `IndexWriter`. However, you 
might not need to. This CMS is no longer a singleton, only the executor is 
shared. The CMS is actually owned by an indexWriter, which means all calls to 
`merge()` come from the same IW.
   
   So instead of a `writer -> merges` mapping that you'd need in a singleton, 
all you need here is the set of pending and running merges submitted to this 
CMS. They are all from the same writer! When writer closes, instead of shutting 
down the executor, you could update the merge objects in the set 
(MergeTaskWrappers), and set an aborted flag. And update your runnable to skip 
the merge if "aborted" flag has been set.



##########
lucene/core/src/java/org/apache/lucene/index/SharedMergeScheduler.java:
##########
@@ -0,0 +1,84 @@
+package org.apache.lucene.index;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/**
+ * SharedMergeScheduler is an experimental MergeScheduler that submits merge 
tasks to a shared
+ * thread pool across IndexWriters.
+ */
+public class SharedMergeScheduler extends MergeScheduler {
+ 
+  // Tracks submitted merges per writer
+  private final ConcurrentHashMap<IndexWriter, Set<MergeTaskWrapper>> 
writerToMerges = new ConcurrentHashMap<>();
+
+ /**
+ * Executor service provided externally to handle merge tasks.
+ * Allows sharing a thread pool across IndexWriters if configured that way.
+ */
+  private final ExecutorService executor;
+
+  public SharedMergeScheduler(ExecutorService executor) {
+      this.executor = executor;
+  }
+
+  /**
+   * Retrieves pending merge tasks from the given {@link MergeSource} and 
submits them to the shared
+   * thread pool for execution.
+   *
+   * @param mergeSource the source of merge tasks (typically an IndexWriter)
+   * @param trigger the event that triggered the merge (e.g., SEGMENT_FLUSH, 
EXPLICIT)
+   * @throws IOException if merging fails
+   */
+  @Override
+  public void merge(MergeSource mergeSource, MergeTrigger trigger) throws 
IOException {
+    while (true) {

Review Comment:
   can we use `hasPendingMerges()` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org
For additional commands, e-mail: issues-h...@lucene.apache.org

Reply via email to