This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 25ef4806fb make pinot task manager pluggable (#15429)
25ef4806fb is described below

commit 25ef4806fbb800cd72d3df9eed648ac7f8e3c84e
Author: Songqiao Su <andysongq...@gmail.com>
AuthorDate: Wed Apr 2 11:32:52 2025 -0700

    make pinot task manager pluggable (#15429)
---
 .../pinot/controller/BaseControllerStarter.java    | 28 ++++++++++++++++++----
 .../apache/pinot/spi/utils/CommonConstants.java    |  6 +++++
 2 files changed, 30 insertions(+), 4 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 106cfdf59e..532d628c41 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -133,6 +133,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.metrics.PinotMetricUtils;
 import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
+import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.services.ServiceRole;
 import org.apache.pinot.spi.services.ServiceStartable;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -813,10 +814,8 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     LOGGER.info("Setting up periodic tasks");
     List<PeriodicTask> periodicTasks = new ArrayList<>();
     _taskManagerStatusCache = getTaskManagerStatusCache();
-    _taskManager =
-        new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, 
_leadControllerManager, _config,
-            _controllerMetrics, _taskManagerStatusCache, _executorService, 
_connectionManager,
-            _resourceUtilizationManager);
+    // Create and add task manager
+    _taskManager = createTaskManager();
     periodicTasks.add(_taskManager);
     BrokerServiceHelper brokerServiceHelper =
         new BrokerServiceHelper(_helixResourceManager, _config, 
_executorService, _connectionManager);
@@ -866,6 +865,27 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     return periodicTasks;
   }
 
+  /**
+   * Creates a TaskManager instance  as specified in the configuration.
+   */
+  private PinotTaskManager createTaskManager() {
+    String taskManagerClass = 
_config.getProperty(CommonConstants.Controller.CONFIG_OF_TASK_MANAGER_CLASS,
+        CommonConstants.Controller.DEFAULT_TASK_MANAGER_CLASS);
+    LOGGER.info("Creating TaskManager with class: {}", taskManagerClass);
+    try {
+      return PluginManager.get().createInstance(taskManagerClass,
+          new Class[]{PinotHelixTaskResourceManager.class, 
PinotHelixResourceManager.class, LeadControllerManager.class,
+              ControllerConf.class, ControllerMetrics.class, 
TaskManagerStatusCache.class,
+              Executor.class, PoolingHttpClientConnectionManager.class, 
ResourceUtilizationManager.class},
+          new Object[]{_helixTaskResourceManager, _helixResourceManager, 
_leadControllerManager,
+              _config, _controllerMetrics, _taskManagerStatusCache, 
_executorService,
+              _connectionManager, _resourceUtilizationManager});
+    } catch (Exception e) {
+      LOGGER.error("Failed to create task manager with class: {}", 
taskManagerClass, e);
+      throw new RuntimeException("Failed to create task manager with class: " 
+ taskManagerClass, e);
+    }
+  }
+
   @Override
   public void stop() {
     switch (_controllerMode) {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 7eb62120f7..a5a4b60bff 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1116,6 +1116,12 @@ public class CommonConstants {
     public static final String CONFIG_OF_INSTANCE_ID = 
"pinot.controller.instance.id";
     public static final String CONFIG_OF_CONTROLLER_QUERY_REWRITER_CLASS_NAMES 
=
         "pinot.controller.query.rewriter.class.names";
+
+    // Task Manager configuration
+    public static final String CONFIG_OF_TASK_MANAGER_CLASS = 
"pinot.controller.task.manager.class";
+    public static final String DEFAULT_TASK_MANAGER_CLASS =
+        "org.apache.pinot.controller.helix.core.minion.PinotTaskManager";
+
     //Set to true to load all services tagged and compiled with 
hk2-metadata-generator. Default to False
     public static final String CONTROLLER_SERVICE_AUTO_DISCOVERY = 
"pinot.controller.service.auto.discovery";
     public static final String CONFIG_OF_LOGGER_ROOT_DIR = 
"pinot.controller.logger.root.dir";


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to