vigyasharma commented on code in PR #14335:
URL: https://github.com/apache/lucene/pull/14335#discussion_r1998041333


##########
lucene/core/src/java/org/apache/lucene/index/MultiTenantMergeScheduler.java:
##########
@@ -0,0 +1,44 @@
+package org.apache.lucene.index;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.io.IOException;
+
+/**
+ * A multi-tenant merge scheduler that shares a global thread pool across 
multiple IndexWriters.
+ */
+public class MultiTenantMergeScheduler extends MergeScheduler {
+
+    // Shared global thread pool
+    private static final ExecutorService MERGE_THREAD_POOL = 
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2);
+
+    @Override
+    public void merge(MergeScheduler.MergeSource mergeSource, MergeTrigger 
trigger) throws IOException {
+        while (true) {
+            MergePolicy.OneMerge merge = mergeSource.getNextMerge();
+            if (merge == null) {

Review Comment:
   Can we use `hasPendingMerges()` instead? I know existing schedulers assume 
`null` means there are no more merges, but it's not the documented API behavior.



##########
lucene/core/src/java/org/apache/lucene/index/MultiTenantMergeScheduler.java:
##########
@@ -0,0 +1,44 @@
+package org.apache.lucene.index;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.io.IOException;
+
+/**
+ * A multi-tenant merge scheduler that shares a global thread pool across 
multiple IndexWriters.
+ */
+public class MultiTenantMergeScheduler extends MergeScheduler {
+
+    // Shared global thread pool
+    private static final ExecutorService MERGE_THREAD_POOL = 
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2);

Review Comment:
   This will create a threadpool even when this policy is not used. Let's use 
lazy static initialization.



##########
lucene/core/src/test/org/apache/lucene/index/TestMultiTenantMergeScheduler.java:
##########
@@ -0,0 +1,30 @@
+package org.apache.lucene.index;
+
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.RAMDirectory;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.util.LuceneTestCase;
+
+public class TestMultiTenantMergeScheduler extends LuceneTestCase {
+
+    public void testMultiTenantMergeScheduler() throws Exception {

Review Comment:
   This test is not sufficient. We need to test (and also implement) a lot more 
scenarios, like multiple writers consuming this scheduler, scheduling different 
merges and starting/closing at different times. You can refer to the tests in 
`TestConcurrentMergeScheduler` as a starting point.



##########
lucene/core/src/java/org/apache/lucene/index/MultiTenantMergeScheduler.java:
##########
@@ -0,0 +1,44 @@
+package org.apache.lucene.index;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.io.IOException;
+
+/**
+ * A multi-tenant merge scheduler that shares a global thread pool across 
multiple IndexWriters.
+ */
+public class MultiTenantMergeScheduler extends MergeScheduler {
+
+    // Shared global thread pool
+    private static final ExecutorService MERGE_THREAD_POOL = 
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2);
+
+    @Override
+    public void merge(MergeScheduler.MergeSource mergeSource, MergeTrigger 
trigger) throws IOException {
+        while (true) {
+            MergePolicy.OneMerge merge = mergeSource.getNextMerge();
+            if (merge == null) {
+                break;
+            }
+            // Submit merge task to the shared thread pool
+            Future<?> future = MERGE_THREAD_POOL.submit(() -> {
+                try {
+                    mergeSource.merge(merge);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+
+            try {
+                future.get(); // Ensure the task completes
+            } catch (Exception e) {
+                throw new IOException("Merge operation failed", e);
+            }

Review Comment:
   I don't think this achieves concurrent background merging like we'd want. I 
think this code will submit a merge request and block on its future for 
completion, effectively making all merges sequential.



##########
lucene/core/src/java/org/apache/lucene/index/MultiTenantMergeScheduler.java:
##########
@@ -0,0 +1,44 @@
+package org.apache.lucene.index;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.io.IOException;
+
+/**
+ * A multi-tenant merge scheduler that shares a global thread pool across 
multiple IndexWriters.
+ */
+public class MultiTenantMergeScheduler extends MergeScheduler {
+
+    // Shared global thread pool
+    private static final ExecutorService MERGE_THREAD_POOL = 
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2);
+
+    @Override
+    public void merge(MergeScheduler.MergeSource mergeSource, MergeTrigger 
trigger) throws IOException {
+        while (true) {
+            MergePolicy.OneMerge merge = mergeSource.getNextMerge();
+            if (merge == null) {
+                break;
+            }
+            // Submit merge task to the shared thread pool
+            Future<?> future = MERGE_THREAD_POOL.submit(() -> {
+                try {
+                    mergeSource.merge(merge);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+
+            try {
+                future.get(); // Ensure the task completes
+            } catch (Exception e) {
+                throw new IOException("Merge operation failed", e);
+            }
+        }
+    }
+
+    @Override
+    public void close() {

Review Comment:
   This shouldn't be a no-op. We should wait for all running merges of the 
calling IndexWriter to complete, similar to `sync()` in 
ConcurrentMergeScheduler. This could be done by maintaining a per writer list 
of futures for running merges. We could run a clean up loop in `merge()` that 
removes completed futures.
   
   Also, I get that this thread pool is global and shared, but I don't like the 
idea of having a threadpool that can never be closed. How about we expose some 
hooks to close this scheduler if all consuming writers have closed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org
For additional commands, e-mail: issues-h...@lucene.apache.org

Reply via email to