This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 7805427c81 add_init (#15471) 7805427c81 is described below commit 7805427c8140f6f0d288a432bda576a1af840b4b Author: Songqiao Su <andysongq...@gmail.com> AuthorDate: Mon Apr 7 08:20:49 2025 -0700 add_init (#15471) --- .../apache/pinot/controller/BaseControllerStarter.java | 1 + .../controller/helix/core/minion/PinotTaskManager.java | 18 +++++++++++++++--- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index 532d628c41..bd77104c45 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -816,6 +816,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { _taskManagerStatusCache = getTaskManagerStatusCache(); // Create and add task manager _taskManager = createTaskManager(); + _taskManager.init(); periodicTasks.add(_taskManager); BrokerServiceHelper brokerServiceHelper = new BrokerServiceHelper(_helixResourceManager, _config, _executorService, _connectionManager); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java index c62d7a1035..eab658a521 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java @@ -110,6 +110,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { private final Map<String, Map<String, String>> _tableTaskTypeToCronExpressionMap = new ConcurrentHashMap<>(); private final Map<String, TableTaskSchedulerUpdater> _tableTaskSchedulerUpdaterMap = new ConcurrentHashMap<>(); + private final boolean _isPinotTaskManagerSchedulerEnabled; + // For metrics private final Map<String, TaskTypeMetricsUpdater> _taskTypeMetricsUpdaterMap = new ConcurrentHashMap<>(); private final Map<TaskState, Integer> _taskStateToCountMap = new ConcurrentHashMap<>(); @@ -135,9 +137,21 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { _taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoAccessor); _skipLateCronSchedule = controllerConf.isSkipLateCronSchedule(); _maxCronScheduleDelayInSeconds = controllerConf.getMaxCronScheduleDelayInSeconds(); - if (controllerConf.isPinotTaskManagerSchedulerEnabled()) { + _isPinotTaskManagerSchedulerEnabled = controllerConf.isPinotTaskManagerSchedulerEnabled(); + if (_isPinotTaskManagerSchedulerEnabled) { try { _scheduler = new StdSchedulerFactory().getScheduler(); + } catch (SchedulerException e) { + throw new RuntimeException("Caught exception while setting up the scheduler", e); + } + } else { + _scheduler = null; + } + } + + public void init() { + if (_isPinotTaskManagerSchedulerEnabled) { + try { _scheduler.start(); synchronized (_zkTableConfigChangeListener) { // Subscribe child changes before reading the data to avoid missing changes @@ -153,8 +167,6 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { } catch (SchedulerException e) { throw new RuntimeException("Caught exception while setting up the scheduler", e); } - } else { - _scheduler = null; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org