This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 25ef4806fb make pinot task manager pluggable (#15429) 25ef4806fb is described below commit 25ef4806fbb800cd72d3df9eed648ac7f8e3c84e Author: Songqiao Su <andysongq...@gmail.com> AuthorDate: Wed Apr 2 11:32:52 2025 -0700 make pinot task manager pluggable (#15429) --- .../pinot/controller/BaseControllerStarter.java | 28 ++++++++++++++++++---- .../apache/pinot/spi/utils/CommonConstants.java | 6 +++++ 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index 106cfdf59e..532d628c41 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -133,6 +133,7 @@ import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.apache.pinot.spi.metrics.PinotMetricUtils; import org.apache.pinot.spi.metrics.PinotMetricsRegistry; +import org.apache.pinot.spi.plugin.PluginManager; import org.apache.pinot.spi.services.ServiceRole; import org.apache.pinot.spi.services.ServiceStartable; import org.apache.pinot.spi.utils.CommonConstants; @@ -813,10 +814,8 @@ public abstract class BaseControllerStarter implements ServiceStartable { LOGGER.info("Setting up periodic tasks"); List<PeriodicTask> periodicTasks = new ArrayList<>(); _taskManagerStatusCache = getTaskManagerStatusCache(); - _taskManager = - new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, _leadControllerManager, _config, - _controllerMetrics, _taskManagerStatusCache, _executorService, _connectionManager, - _resourceUtilizationManager); + // Create and add task manager + _taskManager = createTaskManager(); periodicTasks.add(_taskManager); BrokerServiceHelper brokerServiceHelper = new BrokerServiceHelper(_helixResourceManager, _config, _executorService, _connectionManager); @@ -866,6 +865,27 @@ public abstract class BaseControllerStarter implements ServiceStartable { return periodicTasks; } + /** + * Creates a TaskManager instance as specified in the configuration. + */ + private PinotTaskManager createTaskManager() { + String taskManagerClass = _config.getProperty(CommonConstants.Controller.CONFIG_OF_TASK_MANAGER_CLASS, + CommonConstants.Controller.DEFAULT_TASK_MANAGER_CLASS); + LOGGER.info("Creating TaskManager with class: {}", taskManagerClass); + try { + return PluginManager.get().createInstance(taskManagerClass, + new Class[]{PinotHelixTaskResourceManager.class, PinotHelixResourceManager.class, LeadControllerManager.class, + ControllerConf.class, ControllerMetrics.class, TaskManagerStatusCache.class, + Executor.class, PoolingHttpClientConnectionManager.class, ResourceUtilizationManager.class}, + new Object[]{_helixTaskResourceManager, _helixResourceManager, _leadControllerManager, + _config, _controllerMetrics, _taskManagerStatusCache, _executorService, + _connectionManager, _resourceUtilizationManager}); + } catch (Exception e) { + LOGGER.error("Failed to create task manager with class: {}", taskManagerClass, e); + throw new RuntimeException("Failed to create task manager with class: " + taskManagerClass, e); + } + } + @Override public void stop() { switch (_controllerMode) { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 7eb62120f7..a5a4b60bff 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -1116,6 +1116,12 @@ public class CommonConstants { public static final String CONFIG_OF_INSTANCE_ID = "pinot.controller.instance.id"; public static final String CONFIG_OF_CONTROLLER_QUERY_REWRITER_CLASS_NAMES = "pinot.controller.query.rewriter.class.names"; + + // Task Manager configuration + public static final String CONFIG_OF_TASK_MANAGER_CLASS = "pinot.controller.task.manager.class"; + public static final String DEFAULT_TASK_MANAGER_CLASS = + "org.apache.pinot.controller.helix.core.minion.PinotTaskManager"; + //Set to true to load all services tagged and compiled with hk2-metadata-generator. Default to False public static final String CONTROLLER_SERVICE_AUTO_DISCOVERY = "pinot.controller.service.auto.discovery"; public static final String CONFIG_OF_LOGGER_ROOT_DIR = "pinot.controller.logger.root.dir"; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org