vigyasharma commented on code in PR #14335: URL: https://github.com/apache/lucene/pull/14335#discussion_r2000345662
########## lucene/core/src/test/org/apache/lucene/index/TestMultiTenantMergeScheduler.java: ########## @@ -0,0 +1,73 @@ +package org.apache.lucene.index; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.RAMDirectory; +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.TextField; +import org.apache.lucene.util.LuceneTestCase; + +public class TestMultiTenantMergeScheduler extends LuceneTestCase { + + public void testMultiTenantMergeScheduler() throws Exception { + Directory dir = new RAMDirectory(); + IndexWriterConfig config = new IndexWriterConfig(new MockAnalyzer(random())); + MultiTenantMergeScheduler scheduler = new MultiTenantMergeScheduler(); + config.setMergeScheduler(scheduler); + + IndexWriter writer1 = new IndexWriter(dir, config); + IndexWriter writer2 = new IndexWriter(dir, config); + + // Add documents and trigger merges + for (int i = 0; i < 50; i++) { + writer1.addDocument(new Document()); + writer2.addDocument(new Document()); + if (i % 10 == 0) { + writer1.commit(); + writer2.commit(); + } + } + + writer1.forceMerge(1); + writer2.forceMerge(1); + + // Close writers at different times + writer1.close(); + Thread.sleep(500); + writer2.close(); + + // Ensure scheduler is properly closed + scheduler.close(); + MultiTenantMergeScheduler.shutdownThreadPool(); + + dir.close(); + } + + public void testConcurrentMerging() throws Exception { + Directory dir = new RAMDirectory(); + IndexWriterConfig config = new IndexWriterConfig(new MockAnalyzer(random())); + MultiTenantMergeScheduler scheduler = new MultiTenantMergeScheduler(); + config.setMergeScheduler(scheduler); + + IndexWriter writer = new IndexWriter(dir, config); + + // Add documents + for (int i = 0; i < 100; i++) { + writer.addDocument(new Document()); + } + writer.commit(); + + long startTime = System.currentTimeMillis(); + writer.forceMerge(1); + long endTime = System.currentTimeMillis(); + + writer.close(); + scheduler.close(); + MultiTenantMergeScheduler.shutdownThreadPool(); + + // Check if merging took less time than sequential execution would + assertTrue("Merges did not happen concurrently!", (endTime - startTime) < 5000); Review Comment: It's not reliable to assert or depend on elapsed time. Lucene is run of many different machines and this timing will differ. ########## lucene/core/src/java/org/apache/lucene/index/MultiTenantMergeScheduler.java: ########## @@ -0,0 +1,72 @@ +package org.apache.lucene.index; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.io.IOException; + +/** + * A multi-tenant merge scheduler that shares a global thread pool across multiple IndexWriters. + */ +public class MultiTenantMergeScheduler extends MergeScheduler { + + // Shared global thread pool with lazy initialization + private static class LazyHolder { + static final ExecutorService MERGE_THREAD_POOL = + Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2); + } + + private static ExecutorService getMergeThreadPool() { + return LazyHolder.MERGE_THREAD_POOL; + } + + // Use getMergeThreadPool() instead of direct access Review Comment: We can skip the `getMergeThreadPool()` function since we only need to access the threadpool internally. Something like: ```java private static class MergeThreadPool { private static final ExecutorService INSTANCE = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2); } // and then just use MergeThreadPool.INSTANCE inside merge() and other functions. // ... } ``` ########## lucene/core/src/java/org/apache/lucene/index/MultiTenantMergeScheduler.java: ########## @@ -0,0 +1,72 @@ +package org.apache.lucene.index; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.io.IOException; + +/** + * A multi-tenant merge scheduler that shares a global thread pool across multiple IndexWriters. + */ +public class MultiTenantMergeScheduler extends MergeScheduler { + + // Shared global thread pool with lazy initialization + private static class LazyHolder { + static final ExecutorService MERGE_THREAD_POOL = + Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2); + } + + private static ExecutorService getMergeThreadPool() { + return LazyHolder.MERGE_THREAD_POOL; + } + + // Use getMergeThreadPool() instead of direct access + + @Override + public void merge(MergeScheduler.MergeSource mergeSource, MergeTrigger trigger) throws IOException { + while (mergeSource.hasPendingMerges()) { // Use hasPendingMerges() instead of relying on null check + MergePolicy.OneMerge merge = mergeSource.getNextMerge(); + if (merge == null) { + break; // Explicitly exit if no merge is available + } + + // Submit merge task to the shared thread pool + MERGE_THREAD_POOL.submit(() -> { + try { + mergeSource.merge(merge); + } catch (IOException e) { + throw new RuntimeException("Merge operation failed", e); + } + }); + + // Cleanup completed merges + activeMerges.removeIf(Future::isDone); + } + } + + @Override + public void close() throws IOException { Review Comment: +1; my suggestion in the previous review was similar. We need a mapping of writer to merges and only block on merges specific to the calling writer. ########## lucene/core/src/java/org/apache/lucene/index/MultiTenantMergeScheduler.java: ########## @@ -0,0 +1,72 @@ +package org.apache.lucene.index; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.io.IOException; + +/** + * A multi-tenant merge scheduler that shares a global thread pool across multiple IndexWriters. + */ +public class MultiTenantMergeScheduler extends MergeScheduler { + + // Shared global thread pool with lazy initialization + private static class LazyHolder { + static final ExecutorService MERGE_THREAD_POOL = + Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2); + } + + private static ExecutorService getMergeThreadPool() { + return LazyHolder.MERGE_THREAD_POOL; + } + + // Use getMergeThreadPool() instead of direct access + + @Override + public void merge(MergeScheduler.MergeSource mergeSource, MergeTrigger trigger) throws IOException { + while (mergeSource.hasPendingMerges()) { // Use hasPendingMerges() instead of relying on null check + MergePolicy.OneMerge merge = mergeSource.getNextMerge(); + if (merge == null) { + break; // Explicitly exit if no merge is available + } + + // Submit merge task to the shared thread pool + MERGE_THREAD_POOL.submit(() -> { + try { + mergeSource.merge(merge); + } catch (IOException e) { + throw new RuntimeException("Merge operation failed", e); + } + }); + + // Cleanup completed merges + activeMerges.removeIf(Future::isDone); Review Comment: Where is this declared and initialized? When are merges added to `activeMerges` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org