This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 44eb01feae Use fixed thread pool in Pinot Controller (#14159) 44eb01feae is described below commit 44eb01feae097d2f017b9c7a4609449d8abfd2d9 Author: Shreyaa Sharma <66686803+cypher...@users.noreply.github.com> AuthorDate: Fri Oct 25 01:59:58 2024 +0530 Use fixed thread pool in Pinot Controller (#14159) --- .../pinot/controller/BaseControllerStarter.java | 15 +++++++++++---- .../org/apache/pinot/controller/ControllerConf.java | 20 ++++++++++++++++++++ 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index 44c8e96f36..1f7ccc0c60 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -35,6 +35,7 @@ import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.io.FileUtils; @@ -252,10 +253,9 @@ public abstract class BaseControllerStarter implements ServiceStartable { // ControllerStarter::start()} _helixResourceManager = createHelixResourceManager(); // This executor service is used to do async tasks from multiget util or table rebalancing. - _executorService = - Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("async-task-thread-%d").build()); - _tenantRebalanceExecutorService = - Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("tenant-rebalance-thread-%d").build()); + _executorService = createExecutorService(_config.getControllerExecutorNumThreads(), "async-task-thread-%d"); + _tenantRebalanceExecutorService = createExecutorService(_config.getControllerExecutorRebalanceNumThreads(), + "tenant-rebalance-thread-%d"); _tenantRebalancer = new DefaultTenantRebalancer(_helixResourceManager, _tenantRebalanceExecutorService); } @@ -266,6 +266,13 @@ public abstract class BaseControllerStarter implements ServiceStartable { TableConfigUtils.setEnforcePoolBasedAssignment(_config.isEnforcePoolBasedAssignmentEnabled()); } + // If thread pool size is not configured executor will use cached thread pool + private ExecutorService createExecutorService(int numThreadPool, String threadNameFormat) { + ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build(); + return (numThreadPool <= 0) ? Executors.newCachedThreadPool(threadFactory) + : Executors.newFixedThreadPool(numThreadPool, threadFactory); + } + private void inferHostnameIfNeeded(ControllerConf config) { if (config.getControllerHost() == null) { if (config.getProperty(CommonConstants.Helix.SET_INSTANCE_ID_TO_HOSTNAME_KEY, false)) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java index e8c6dc8c48..612aa9bafe 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java @@ -278,6 +278,9 @@ public class ControllerConf extends PinotConfiguration { private static final String SERVER_ADMIN_REQUEST_TIMEOUT_SECONDS = "server.request.timeoutSeconds"; private static final String MINION_ADMIN_REQUEST_TIMEOUT_SECONDS = "minion.request.timeoutSeconds"; private static final String SEGMENT_COMMIT_TIMEOUT_SECONDS = "controller.realtime.segment.commit.timeoutSeconds"; + private static final String CONTROLLER_EXECUTOR_NUM_THREADS = "controller.executor.numThreads"; + public static final String CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS = "controller.executor.rebalance.numThreads"; + private static final String DELETED_SEGMENTS_RETENTION_IN_DAYS = "controller.deleted.segments.retentionInDays"; public static final String TABLE_MIN_REPLICAS = "table.minReplicas"; private static final String JERSEY_ADMIN_API_PORT = "jersey.admin.api.port"; @@ -321,6 +324,7 @@ public class ControllerConf extends PinotConfiguration { AutoRebalanceStrategy.class.getName(); private static final int DEFAULT_LEAD_CONTROLLER_RESOURCE_REBALANCE_DELAY_MS = 300_000; // 5 minutes private static final String DEFAULT_DIM_TABLE_MAX_SIZE = "200M"; + private static final int UNSPECIFIED_THREAD_POOL = -1; private static final String DEFAULT_PINOT_FS_FACTORY_CLASS_LOCAL = LocalPinotFS.class.getName(); @@ -408,6 +412,14 @@ public class ControllerConf extends PinotConfiguration { setProperty(SEGMENT_COMMIT_TIMEOUT_SECONDS, Integer.toString(timeoutSec)); } + public void setControllerExecutorNumThreads(int numThreads) { + setProperty(CONTROLLER_EXECUTOR_NUM_THREADS, Integer.toString(numThreads)); + } + + public void setControllerExecutorRebalanceNumThreads(int numThreads) { + setProperty(CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS, Integer.toString(numThreads)); + } + public void setUpdateSegmentStateModel(String updateStateModel) { setProperty(UPDATE_SEGMENT_STATE_MODEL, updateStateModel); } @@ -476,6 +488,14 @@ public class ControllerConf extends PinotConfiguration { SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds()); } + public int getControllerExecutorNumThreads() { + return getProperty(CONTROLLER_EXECUTOR_NUM_THREADS, UNSPECIFIED_THREAD_POOL); + } + + public int getControllerExecutorRebalanceNumThreads() { + return getProperty(CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS, UNSPECIFIED_THREAD_POOL); + } + public boolean isUpdateSegmentStateModel() { return getProperty(UPDATE_SEGMENT_STATE_MODEL, false); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org