vigyasharma commented on code in PR #14335:
URL: https://github.com/apache/lucene/pull/14335#discussion_r2000345662


##########
lucene/core/src/test/org/apache/lucene/index/TestMultiTenantMergeScheduler.java:
##########
@@ -0,0 +1,73 @@
+package org.apache.lucene.index;
+
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.RAMDirectory;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.util.LuceneTestCase;
+
+public class TestMultiTenantMergeScheduler extends LuceneTestCase {
+
+    public void testMultiTenantMergeScheduler() throws Exception {
+        Directory dir = new RAMDirectory();
+        IndexWriterConfig config = new IndexWriterConfig(new 
MockAnalyzer(random()));
+        MultiTenantMergeScheduler scheduler = new MultiTenantMergeScheduler();
+        config.setMergeScheduler(scheduler);
+
+        IndexWriter writer1 = new IndexWriter(dir, config);
+        IndexWriter writer2 = new IndexWriter(dir, config);
+
+        // Add documents and trigger merges
+        for (int i = 0; i < 50; i++) {
+            writer1.addDocument(new Document());
+            writer2.addDocument(new Document());
+            if (i % 10 == 0) {
+                writer1.commit();
+                writer2.commit();
+            }
+        }
+
+        writer1.forceMerge(1);
+        writer2.forceMerge(1);
+
+        // Close writers at different times
+        writer1.close();
+        Thread.sleep(500);
+        writer2.close();
+
+        // Ensure scheduler is properly closed
+        scheduler.close();
+        MultiTenantMergeScheduler.shutdownThreadPool();
+
+        dir.close();
+    }
+
+    public void testConcurrentMerging() throws Exception {
+        Directory dir = new RAMDirectory();
+        IndexWriterConfig config = new IndexWriterConfig(new 
MockAnalyzer(random()));
+        MultiTenantMergeScheduler scheduler = new MultiTenantMergeScheduler();
+        config.setMergeScheduler(scheduler);
+
+        IndexWriter writer = new IndexWriter(dir, config);
+
+        // Add documents
+        for (int i = 0; i < 100; i++) {
+            writer.addDocument(new Document());
+        }
+        writer.commit();
+
+        long startTime = System.currentTimeMillis();
+        writer.forceMerge(1);
+        long endTime = System.currentTimeMillis();
+
+        writer.close();
+        scheduler.close();
+        MultiTenantMergeScheduler.shutdownThreadPool();
+
+        // Check if merging took less time than sequential execution would
+        assertTrue("Merges did not happen concurrently!", (endTime - 
startTime) < 5000);

Review Comment:
   It's not reliable to assert or depend on elapsed time. Lucene is run of many 
different machines and this timing will differ.



##########
lucene/core/src/java/org/apache/lucene/index/MultiTenantMergeScheduler.java:
##########
@@ -0,0 +1,72 @@
+package org.apache.lucene.index;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.io.IOException;
+
+/**
+ * A multi-tenant merge scheduler that shares a global thread pool across 
multiple IndexWriters.
+ */
+public class MultiTenantMergeScheduler extends MergeScheduler {
+
+    // Shared global thread pool with lazy initialization
+    private static class LazyHolder {
+        static final ExecutorService MERGE_THREAD_POOL = 
+            
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2);
+    }
+
+    private static ExecutorService getMergeThreadPool() {
+        return LazyHolder.MERGE_THREAD_POOL;
+    }
+
+    // Use getMergeThreadPool() instead of direct access

Review Comment:
   We can skip the `getMergeThreadPool()` function since we only need to access 
the threadpool internally. Something like:
   ```java
     private static class MergeThreadPool {
         private static final ExecutorService INSTANCE = 
             
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2);
     }
     // and then just use MergeThreadPool.INSTANCE inside merge() and other 
functions.
     // ...
   }
   ```



##########
lucene/core/src/java/org/apache/lucene/index/MultiTenantMergeScheduler.java:
##########
@@ -0,0 +1,72 @@
+package org.apache.lucene.index;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.io.IOException;
+
+/**
+ * A multi-tenant merge scheduler that shares a global thread pool across 
multiple IndexWriters.
+ */
+public class MultiTenantMergeScheduler extends MergeScheduler {
+
+    // Shared global thread pool with lazy initialization
+    private static class LazyHolder {
+        static final ExecutorService MERGE_THREAD_POOL = 
+            
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2);
+    }
+
+    private static ExecutorService getMergeThreadPool() {
+        return LazyHolder.MERGE_THREAD_POOL;
+    }
+
+    // Use getMergeThreadPool() instead of direct access
+
+    @Override
+    public void merge(MergeScheduler.MergeSource mergeSource, MergeTrigger 
trigger) throws IOException {
+        while (mergeSource.hasPendingMerges()) { // Use hasPendingMerges() 
instead of relying on null check
+            MergePolicy.OneMerge merge = mergeSource.getNextMerge();
+            if (merge == null) {
+                break; // Explicitly exit if no merge is available
+            }
+            
+            // Submit merge task to the shared thread pool
+            MERGE_THREAD_POOL.submit(() -> {
+                try {
+                    mergeSource.merge(merge);
+                } catch (IOException e) {
+                    throw new RuntimeException("Merge operation failed", e);
+                }
+            });
+
+            // Cleanup completed merges
+            activeMerges.removeIf(Future::isDone);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {

Review Comment:
   +1; my suggestion in the previous review was similar. We need a mapping of 
writer to merges and only block on merges specific to the calling writer.



##########
lucene/core/src/java/org/apache/lucene/index/MultiTenantMergeScheduler.java:
##########
@@ -0,0 +1,72 @@
+package org.apache.lucene.index;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.io.IOException;
+
+/**
+ * A multi-tenant merge scheduler that shares a global thread pool across 
multiple IndexWriters.
+ */
+public class MultiTenantMergeScheduler extends MergeScheduler {
+
+    // Shared global thread pool with lazy initialization
+    private static class LazyHolder {
+        static final ExecutorService MERGE_THREAD_POOL = 
+            
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2);
+    }
+
+    private static ExecutorService getMergeThreadPool() {
+        return LazyHolder.MERGE_THREAD_POOL;
+    }
+
+    // Use getMergeThreadPool() instead of direct access
+
+    @Override
+    public void merge(MergeScheduler.MergeSource mergeSource, MergeTrigger 
trigger) throws IOException {
+        while (mergeSource.hasPendingMerges()) { // Use hasPendingMerges() 
instead of relying on null check
+            MergePolicy.OneMerge merge = mergeSource.getNextMerge();
+            if (merge == null) {
+                break; // Explicitly exit if no merge is available
+            }
+            
+            // Submit merge task to the shared thread pool
+            MERGE_THREAD_POOL.submit(() -> {
+                try {
+                    mergeSource.merge(merge);
+                } catch (IOException e) {
+                    throw new RuntimeException("Merge operation failed", e);
+                }
+            });
+
+            // Cleanup completed merges
+            activeMerges.removeIf(Future::isDone);

Review Comment:
   Where is this declared and initialized? When are merges added to 
`activeMerges` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org
For additional commands, e-mail: issues-h...@lucene.apache.org

Reply via email to