vigyasharma commented on code in PR #14953: URL: https://github.com/apache/lucene/pull/14953#discussion_r2218224417
########## lucene/core/src/java/org/apache/lucene/index/MultiTenantCMSManager.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.index; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Simple demonstration of a MultiTenantCMSManager that divides a fixed thread budget equally among + * all registered ConcurrentMergeSchedulers. + */ +public class MultiTenantCMSManager { + + private static final MultiTenantCMSManager INSTANCE = new MultiTenantCMSManager(); + private static final int coreCount = Runtime.getRuntime().availableProcessors(); + private static final int maxThreadCount = Math.max(1, coreCount / 2); + + private final Set<ConcurrentMergeScheduler> schedulers = + Collections.synchronizedSet(new HashSet<>()); + + private MultiTenantCMSManager() {} + + /** Returns the singleton instance of the MultiTenantCMSManager. */ + public static MultiTenantCMSManager getInstance() { + return INSTANCE; + } + + /** + * Registers a ConcurrentMergeScheduler with the global manager. + * + * @param cms the merge scheduler to register + */ + public void register(ConcurrentMergeScheduler cms) { + schedulers.add(cms); + updateBudgets(); + } + + /** + * Unregisters a ConcurrentMergeScheduler from the global manager. + * + * @param cms the merge scheduler to unregister + */ + public void unregister(ConcurrentMergeScheduler cms) { Review Comment: nit: should we call it "deregister"? ########## lucene/core/src/java/org/apache/lucene/index/MultiTenantCMSManager.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.index; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Simple demonstration of a MultiTenantCMSManager that divides a fixed thread budget equally among + * all registered ConcurrentMergeSchedulers. + */ +public class MultiTenantCMSManager { + + private static final MultiTenantCMSManager INSTANCE = new MultiTenantCMSManager(); + private static final int coreCount = Runtime.getRuntime().availableProcessors(); + private static final int maxThreadCount = Math.max(1, coreCount / 2); + + private final Set<ConcurrentMergeScheduler> schedulers = + Collections.synchronizedSet(new HashSet<>()); + + private MultiTenantCMSManager() {} + + /** Returns the singleton instance of the MultiTenantCMSManager. */ + public static MultiTenantCMSManager getInstance() { + return INSTANCE; + } + + /** + * Registers a ConcurrentMergeScheduler with the global manager. + * + * @param cms the merge scheduler to register + */ + public void register(ConcurrentMergeScheduler cms) { + schedulers.add(cms); + updateBudgets(); + } + + /** + * Unregisters a ConcurrentMergeScheduler from the global manager. + * + * @param cms the merge scheduler to unregister + */ + public void unregister(ConcurrentMergeScheduler cms) { + schedulers.remove(cms); + updateBudgets(); + } + + private void updateBudgets() { + int count = schedulers.size(); + if (count == 0) return; + + int share = Math.max(1, maxThreadCount / count); Review Comment: This should be `Math.min()`, otherwise, we'll set some schedulers to 0 maxMergeThreads. ########## lucene/core/src/test/org/apache/lucene/index/TestMultiTenantCMSManager.java: ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.index; + +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.TextField; +import org.apache.lucene.store.Directory; +import org.apache.lucene.tests.analysis.MockAnalyzer; +import org.apache.lucene.tests.util.LuceneTestCase; + +/** + * Unit tests for MultiTenantCMSManager: budgeting, dynamic rebalancing, live IndexWriter + * integration, and auto-unregister behavior. + */ +public class TestMultiTenantCMSManager extends LuceneTestCase { + + /** Subclass to count actual merges. */ + static class TrackingCMS extends ConcurrentMergeScheduler { + private final AtomicInteger mergeCount = new AtomicInteger(); + + @Override + protected void doMerge(MergeScheduler.MergeSource mergeSource, MergePolicy.OneMerge merge) + throws IOException { + mergeCount.incrementAndGet(); + super.doMerge(mergeSource, merge); + } + + int getMergeCount() { + return mergeCount.get(); + } + } + + /** Test basic registration and budgeting across multiple schedulers. */ + public void testRegistrationAndThreadBudgeting() { + MultiTenantCMSManager manager = MultiTenantCMSManager.getInstance(); + manager.unregisterAllForTest(); + + final int numSchedulers = 4; + ConcurrentMergeScheduler[] cmsArr = new ConcurrentMergeScheduler[numSchedulers]; + for (int i = 0; i < numSchedulers; i++) { + cmsArr[i] = new ConcurrentMergeScheduler(); + manager.register(cmsArr[i]); Review Comment: We already register in the CMS ctor. Do we need to register again? ########## lucene/core/src/java/org/apache/lucene/index/MultiTenantCMSManager.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.index; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Simple demonstration of a MultiTenantCMSManager that divides a fixed thread budget equally among + * all registered ConcurrentMergeSchedulers. + */ +public class MultiTenantCMSManager { + + private static final MultiTenantCMSManager INSTANCE = new MultiTenantCMSManager(); + private static final int coreCount = Runtime.getRuntime().availableProcessors(); + private static final int maxThreadCount = Math.max(1, coreCount / 2); + + private final Set<ConcurrentMergeScheduler> schedulers = + Collections.synchronizedSet(new HashSet<>()); + + private MultiTenantCMSManager() {} + + /** Returns the singleton instance of the MultiTenantCMSManager. */ + public static MultiTenantCMSManager getInstance() { + return INSTANCE; + } + + /** + * Registers a ConcurrentMergeScheduler with the global manager. + * + * @param cms the merge scheduler to register + */ + public void register(ConcurrentMergeScheduler cms) { + schedulers.add(cms); + updateBudgets(); + } + + /** + * Unregisters a ConcurrentMergeScheduler from the global manager. + * + * @param cms the merge scheduler to unregister + */ + public void unregister(ConcurrentMergeScheduler cms) { + schedulers.remove(cms); + updateBudgets(); + } + + private void updateBudgets() { + int count = schedulers.size(); + if (count == 0) return; + + int share = Math.max(1, maxThreadCount / count); + for (ConcurrentMergeScheduler cms : schedulers) { + cms.setMaxMergesAndThreads(share + 5, share); // +5 to allow merge queuing + } + } + + // ----------------------------------------- + // 🧪 TESTING HOOKS + // ----------------------------------------- + + /** Used in tests to read the current registered CMS set */ + synchronized Set<ConcurrentMergeScheduler> getRegisteredSchedulersForTest() { + return new HashSet<>(schedulers); Review Comment: Maybe just return a 1Collections.unmodifableSet()` ? and avoid allocating a new set and copy all the entries into it.. ########## lucene/core/src/java/org/apache/lucene/index/MultiTenantCMSManager.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.index; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** + * Simple demonstration of a MultiTenantCMSManager that divides a fixed thread budget equally among + * all registered ConcurrentMergeSchedulers. + */ +public class MultiTenantCMSManager { + + private static final MultiTenantCMSManager INSTANCE = new MultiTenantCMSManager(); + private static final int coreCount = Runtime.getRuntime().availableProcessors(); + private static final int maxThreadCount = Math.max(1, coreCount / 2); + + private final Set<ConcurrentMergeScheduler> schedulers = + Collections.synchronizedSet(new HashSet<>()); + + private MultiTenantCMSManager() {} + + /** Returns the singleton instance of the MultiTenantCMSManager. */ + public static MultiTenantCMSManager getInstance() { + return INSTANCE; + } + + /** + * Registers a ConcurrentMergeScheduler with the global manager. + * + * @param cms the merge scheduler to register + */ + public void register(ConcurrentMergeScheduler cms) { + schedulers.add(cms); + updateBudgets(); + } + + /** + * Unregisters a ConcurrentMergeScheduler from the global manager. + * + * @param cms the merge scheduler to unregister + */ + public void unregister(ConcurrentMergeScheduler cms) { + schedulers.remove(cms); + updateBudgets(); + } + + private void updateBudgets() { + int count = schedulers.size(); + if (count == 0) return; + + int share = Math.max(1, maxThreadCount / count); + for (ConcurrentMergeScheduler cms : schedulers) { + cms.setMaxMergesAndThreads(share + 5, share); // +5 to allow merge queuing + } + } + + // ----------------------------------------- + // 🧪 TESTING HOOKS + // ----------------------------------------- + + /** Used in tests to read the current registered CMS set */ + synchronized Set<ConcurrentMergeScheduler> getRegisteredSchedulersForTest() { + return new HashSet<>(schedulers); + } + + /** Used in tests to clear all registered CMS instances */ + synchronized void unregisterAllForTest() { Review Comment: Why does this method need to be `synchronized` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org