This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 44eb01feae Use fixed thread pool in Pinot Controller (#14159)
44eb01feae is described below

commit 44eb01feae097d2f017b9c7a4609449d8abfd2d9
Author: Shreyaa Sharma <66686803+cypher...@users.noreply.github.com>
AuthorDate: Fri Oct 25 01:59:58 2024 +0530

    Use fixed thread pool in Pinot Controller (#14159)
---
 .../pinot/controller/BaseControllerStarter.java      | 15 +++++++++++----
 .../org/apache/pinot/controller/ControllerConf.java  | 20 ++++++++++++++++++++
 2 files changed, 31 insertions(+), 4 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 44c8e96f36..1f7ccc0c60 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -35,6 +35,7 @@ import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.io.FileUtils;
@@ -252,10 +253,9 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
       // ControllerStarter::start()}
       _helixResourceManager = createHelixResourceManager();
       // This executor service is used to do async tasks from multiget util or 
table rebalancing.
-      _executorService =
-          Executors.newCachedThreadPool(new 
ThreadFactoryBuilder().setNameFormat("async-task-thread-%d").build());
-      _tenantRebalanceExecutorService =
-          Executors.newCachedThreadPool(new 
ThreadFactoryBuilder().setNameFormat("tenant-rebalance-thread-%d").build());
+      _executorService = 
createExecutorService(_config.getControllerExecutorNumThreads(), 
"async-task-thread-%d");
+      _tenantRebalanceExecutorService = 
createExecutorService(_config.getControllerExecutorRebalanceNumThreads(),
+              "tenant-rebalance-thread-%d");
       _tenantRebalancer = new DefaultTenantRebalancer(_helixResourceManager, 
_tenantRebalanceExecutorService);
     }
 
@@ -266,6 +266,13 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     
TableConfigUtils.setEnforcePoolBasedAssignment(_config.isEnforcePoolBasedAssignmentEnabled());
   }
 
+  // If thread pool size is not configured executor will use cached thread pool
+  private ExecutorService createExecutorService(int numThreadPool, String 
threadNameFormat) {
+    ThreadFactory threadFactory = new 
ThreadFactoryBuilder().setNameFormat(threadNameFormat).build();
+    return (numThreadPool <= 0) ? Executors.newCachedThreadPool(threadFactory)
+            : Executors.newFixedThreadPool(numThreadPool, threadFactory);
+  }
+
   private void inferHostnameIfNeeded(ControllerConf config) {
     if (config.getControllerHost() == null) {
       if 
(config.getProperty(CommonConstants.Helix.SET_INSTANCE_ID_TO_HOSTNAME_KEY, 
false)) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index e8c6dc8c48..612aa9bafe 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -278,6 +278,9 @@ public class ControllerConf extends PinotConfiguration {
   private static final String SERVER_ADMIN_REQUEST_TIMEOUT_SECONDS = 
"server.request.timeoutSeconds";
   private static final String MINION_ADMIN_REQUEST_TIMEOUT_SECONDS = 
"minion.request.timeoutSeconds";
   private static final String SEGMENT_COMMIT_TIMEOUT_SECONDS = 
"controller.realtime.segment.commit.timeoutSeconds";
+  private static final String CONTROLLER_EXECUTOR_NUM_THREADS = 
"controller.executor.numThreads";
+  public static final String CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS = 
"controller.executor.rebalance.numThreads";
+
   private static final String DELETED_SEGMENTS_RETENTION_IN_DAYS = 
"controller.deleted.segments.retentionInDays";
   public static final String TABLE_MIN_REPLICAS = "table.minReplicas";
   private static final String JERSEY_ADMIN_API_PORT = "jersey.admin.api.port";
@@ -321,6 +324,7 @@ public class ControllerConf extends PinotConfiguration {
       AutoRebalanceStrategy.class.getName();
   private static final int DEFAULT_LEAD_CONTROLLER_RESOURCE_REBALANCE_DELAY_MS 
= 300_000; // 5 minutes
   private static final String DEFAULT_DIM_TABLE_MAX_SIZE = "200M";
+  private static final int UNSPECIFIED_THREAD_POOL = -1;
 
   private static final String DEFAULT_PINOT_FS_FACTORY_CLASS_LOCAL = 
LocalPinotFS.class.getName();
 
@@ -408,6 +412,14 @@ public class ControllerConf extends PinotConfiguration {
     setProperty(SEGMENT_COMMIT_TIMEOUT_SECONDS, Integer.toString(timeoutSec));
   }
 
+  public void setControllerExecutorNumThreads(int numThreads) {
+    setProperty(CONTROLLER_EXECUTOR_NUM_THREADS, Integer.toString(numThreads));
+  }
+
+  public void setControllerExecutorRebalanceNumThreads(int numThreads) {
+    setProperty(CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS, 
Integer.toString(numThreads));
+  }
+
   public void setUpdateSegmentStateModel(String updateStateModel) {
     setProperty(UPDATE_SEGMENT_STATE_MODEL, updateStateModel);
   }
@@ -476,6 +488,14 @@ public class ControllerConf extends PinotConfiguration {
         SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds());
   }
 
+  public int getControllerExecutorNumThreads() {
+    return getProperty(CONTROLLER_EXECUTOR_NUM_THREADS, 
UNSPECIFIED_THREAD_POOL);
+  }
+
+  public int getControllerExecutorRebalanceNumThreads() {
+    return getProperty(CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS, 
UNSPECIFIED_THREAD_POOL);
+  }
+
   public boolean isUpdateSegmentStateModel() {
     return getProperty(UPDATE_SEGMENT_STATE_MODEL, false);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to