This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new eb7489c453 Track the actor that triggers the minion task (#14829)
eb7489c453 is described below

commit eb7489c453e2b82b8199670bbea931e52ae609db
Author: Shounak kulkarni <shounakmk...@gmail.com>
AuthorDate: Thu Feb 13 09:55:11 2025 +0530

    Track the actor that triggers the minion task (#14829)
    
    * Track the actor that triggered the task
    
    * add java docs
    
    * simplified TaskSchedulingContext
    
    * Track the actor that triggered the task
    
    * add java docs
    
    * simplified TaskSchedulingContext
    
    * variable renames
    
    * fixes
---
 .../api/resources/PinotTaskRestletResource.java    |  41 +++---
 .../helix/core/minion/CronJobScheduleJob.java      |   8 +-
 .../core/minion/PinotHelixTaskResourceManager.java |  31 +++-
 .../helix/core/minion/PinotTaskManager.java        | 156 ++++++++++++---------
 .../helix/core/minion/TaskSchedulingContext.java   | 135 ++++++++++++++++++
 .../helix/core/minion/TaskSchedulingInfo.java      |  56 ++++++++
 .../src/main/resources/app/pages/SubTaskDetail.tsx |   3 +
 .../src/main/resources/app/pages/TaskDetail.tsx    |   3 +
 .../core/minion/PinotTaskManagerStatelessTest.java |  12 +-
 .../integration/tests/MinionTaskTestUtils.java     |  23 ++-
 .../MergeRollupMinionClusterIntegrationTest.java   |  55 +++++---
 .../tests/PurgeMinionClusterIntegrationTest.java   |  45 ++++--
 ...fflineSegmentsMinionClusterIntegrationTest.java |  19 ++-
 ...RefreshSegmentMinionClusterIntegrationTest.java |  44 ++++--
 .../tests/SimpleMinionClusterIntegrationTest.java  |   8 +-
 .../integration/tests/TlsIntegrationTest.java      |   3 +-
 .../tests/UpsertTableIntegrationTest.java          |  10 +-
 .../tests/UrlAuthRealtimeIntegrationTest.java      |   3 +-
 .../apache/pinot/spi/utils/CommonConstants.java    |   4 +
 19 files changed, 496 insertions(+), 163 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index 24cd444dc5..d76ea4f3a2 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -34,6 +34,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -80,12 +81,15 @@ import 
org.apache.pinot.controller.api.exception.UnknownTaskTypeException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
+import org.apache.pinot.controller.helix.core.minion.TaskSchedulingInfo;
 import org.apache.pinot.controller.util.CompletionServiceHelper;
 import org.apache.pinot.core.auth.Actions;
 import org.apache.pinot.core.auth.Authorize;
 import org.apache.pinot.core.auth.TargetType;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.spi.config.task.AdhocTaskConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.glassfish.grizzly.http.server.Request;
 import org.glassfish.jersey.server.ManagedAsync;
@@ -646,29 +650,26 @@ public class PinotTaskRestletResource {
     Map<String, String> response = new HashMap<>();
     List<String> generationErrors = new ArrayList<>();
     List<String> schedulingErrors = new ArrayList<>();
+    TaskSchedulingContext context = new TaskSchedulingContext()
+        .setTriggeredBy(CommonConstants.TaskTriggers.MANUAL_TRIGGER.name())
+        .setMinionInstanceTag(minionInstanceTag)
+        .setLeader(false);
     if (taskType != null) {
-      // Schedule task for the given task type
-      PinotTaskManager.TaskSchedulingInfo taskInfos = tableName != null
-          ? _pinotTaskManager.scheduleTaskForTable(taskType, 
DatabaseUtils.translateTableName(tableName, headers),
-              minionInstanceTag)
-          : _pinotTaskManager.scheduleTaskForDatabase(taskType, database, 
minionInstanceTag);
-      response.put(taskType, 
StringUtils.join(taskInfos.getScheduledTaskNames(), ','));
-      generationErrors.addAll(taskInfos.getGenerationErrors());
-      schedulingErrors.addAll(taskInfos.getSchedulingErrors());
+      context.setTasksToSchedule(Collections.singleton(taskType));
+    }
+    if (tableName != null) {
+      
context.setTablesToSchedule(Collections.singleton(DatabaseUtils.translateTableName(tableName,
 headers)));
     } else {
-      // Schedule tasks for all task types
-      Map<String, PinotTaskManager.TaskSchedulingInfo> allTaskInfos = 
tableName != null
-          ? 
_pinotTaskManager.scheduleAllTasksForTable(DatabaseUtils.translateTableName(tableName,
 headers),
-              minionInstanceTag)
-          : _pinotTaskManager.scheduleAllTasksForDatabase(database, 
minionInstanceTag);
-      allTaskInfos.forEach((key, value) -> {
-        if (value.getScheduledTaskNames() != null) {
-          response.put(key, String.join(",", value.getScheduledTaskNames()));
-        }
-        generationErrors.addAll(value.getGenerationErrors());
-        schedulingErrors.addAll(value.getSchedulingErrors());
-      });
+      context.setDatabasesToSchedule(Collections.singleton(database));
     }
+    Map<String, TaskSchedulingInfo> allTaskInfos = 
_pinotTaskManager.scheduleTasks(context);
+    allTaskInfos.forEach((key, value) -> {
+      if (value.getScheduledTaskNames() != null) {
+        response.put(key, String.join(",", value.getScheduledTaskNames()));
+      }
+      generationErrors.addAll(value.getGenerationErrors());
+      schedulingErrors.addAll(value.getSchedulingErrors());
+    });
     response.put(GENERATION_ERRORS_KEY, String.join(",", generationErrors));
     response.put(SCHEDULING_ERRORS_KEY, String.join(",", schedulingErrors));
     return response;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
index f9b250b2bc..8a4751196f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
@@ -18,11 +18,13 @@
  */
 package org.apache.pinot.controller.helix.core.minion;
 
+import java.util.Collections;
 import java.util.Date;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerTimer;
 import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.quartz.Job;
 import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
@@ -64,8 +66,12 @@ public class CronJobScheduleJob implements Job {
             ControllerMeter.CRON_SCHEDULER_JOB_SKIPPED, 1L);
         return;
       }
+      TaskSchedulingContext context = new TaskSchedulingContext()
+          .setTablesToSchedule(Collections.singleton(table))
+          .setTasksToSchedule(Collections.singleton(taskType))
+          .setTriggeredBy(CommonConstants.TaskTriggers.CRON_TRIGGER.name());
       long jobStartTime = System.currentTimeMillis();
-      pinotTaskManager.scheduleTaskForTable(taskType, table, null);
+      pinotTaskManager.scheduleTasks(context);
       LOGGER.info("Finished CronJob: table - {}, task - {}, next runtime is 
{}", table, taskType,
           jobExecutionContext.getNextFireTime());
       
pinotTaskManager.getControllerMetrics().addTimedTableValue(PinotTaskManager.getCronJobName(table,
 taskType),
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index 87580c30b0..bbbb3fcee2 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -876,6 +876,11 @@ public class PinotHelixTaskResourceManager {
       if (jobFinishTimeMs > 0) {
         
taskDebugInfo.setFinishTime(DateTimeUtils.epochToDefaultDateFormat(jobFinishTimeMs));
       }
+      String triggeredBy = 
jobConfig.getTaskConfigMap().values().stream().findFirst()
+          .map(TaskConfig::getConfigMap)
+          .map(taskConfigs -> taskConfigs.get(PinotTaskManager.TRIGGERED_BY))
+          .orElse("");
+      taskDebugInfo.setTriggeredBy(triggeredBy);
       Set<Integer> partitionSet = jobContext.getPartitionSet();
       TaskCount subtaskCount = new TaskCount();
       for (int partition : partitionSet) {
@@ -890,6 +895,7 @@ public class PinotHelixTaskResourceManager {
         String taskIdForPartition = 
jobContext.getTaskIdForPartition(partition);
         subtaskDebugInfo.setTaskId(taskIdForPartition);
         subtaskDebugInfo.setState(partitionState);
+        subtaskDebugInfo.setTriggeredBy(triggeredBy);
         long subtaskStartTimeMs = jobContext.getPartitionStartTime(partition);
         if (subtaskStartTimeMs > 0) {
           
subtaskDebugInfo.setStartTime(DateTimeUtils.epochToDefaultDateFormat(subtaskStartTimeMs));
@@ -987,7 +993,8 @@ public class PinotHelixTaskResourceManager {
     return 
MinionTaskMetadataUtils.getAllTaskMetadataLastUpdateTimeMs(propertyStore);
   }
 
-  @JsonPropertyOrder({"taskState", "subtaskCount", "startTime", 
"executionStartTime", "finishTime", "subtaskInfos"})
+  @JsonPropertyOrder({"taskState", "subtaskCount", "startTime", 
"executionStartTime", "finishTime", "triggeredBy",
+      "subtaskInfos"})
   @JsonInclude(JsonInclude.Include.NON_NULL)
   public static class TaskDebugInfo {
     // Time at which the task (which may have multiple subtasks) got created.
@@ -998,6 +1005,7 @@ public class PinotHelixTaskResourceManager {
     private String _finishTime;
     private TaskState _taskState;
     private TaskCount _subtaskCount;
+    private String _triggeredBy;
     private List<SubtaskDebugInfo> _subtaskInfos;
 
     public TaskDebugInfo() {
@@ -1046,6 +1054,15 @@ public class PinotHelixTaskResourceManager {
       return _taskState;
     }
 
+    public String getTriggeredBy() {
+      return _triggeredBy;
+    }
+
+    public TaskDebugInfo setTriggeredBy(String triggeredBy) {
+      _triggeredBy = triggeredBy;
+      return this;
+    }
+
     public TaskCount getSubtaskCount() {
       return _subtaskCount;
     }
@@ -1055,7 +1072,7 @@ public class PinotHelixTaskResourceManager {
     }
   }
 
-  @JsonPropertyOrder({"taskId", "state", "startTime", "finishTime", 
"participant", "info", "taskConfig"})
+  @JsonPropertyOrder({"taskId", "state", "startTime", "finishTime", 
"participant", "info", "triggeredBy", "taskConfig"})
   @JsonInclude(JsonInclude.Include.NON_NULL)
   public static class SubtaskDebugInfo {
     private String _taskId;
@@ -1064,6 +1081,7 @@ public class PinotHelixTaskResourceManager {
     private String _finishTime;
     private String _participant;
     private String _info;
+    private String _triggeredBy;
     private PinotTaskConfig _taskConfig;
 
     public SubtaskDebugInfo() {
@@ -1121,6 +1139,15 @@ public class PinotHelixTaskResourceManager {
       return _info;
     }
 
+    public String getTriggeredBy() {
+      return _triggeredBy;
+    }
+
+    public SubtaskDebugInfo setTriggeredBy(String triggeredBy) {
+      _triggeredBy = triggeredBy;
+      return this;
+    }
+
     public PinotTaskConfig getTaskConfig() {
       return _taskConfig;
     }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 93002f9100..9f1f938187 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -94,6 +95,7 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
   private static final String TABLE_CONFIG_PARENT_PATH = "/CONFIGS/TABLE";
   private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/";
   private static final String TASK_QUEUE_PATH_PATTERN = 
"/TaskRebalancer/TaskQueue_%s/Context";
+  public static final String TRIGGERED_BY = "triggeredBy";
 
   private final PinotHelixTaskResourceManager _helixTaskResourceManager;
   private final ClusterInfoAccessor _clusterInfoAccessor;
@@ -208,6 +210,8 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
         LOGGER.warn("No ad-hoc task generated for task type: {}", taskType);
         continue;
       }
+      pinotTaskConfigs.forEach(pinotTaskConfig -> pinotTaskConfig.getConfigs()
+          .computeIfAbsent(TRIGGERED_BY, k -> 
CommonConstants.TaskTriggers.ADHOC_TRIGGER.name()));
       LOGGER.info("Submitting ad-hoc task for task type: {} with task configs: 
{}", taskType, pinotTaskConfigs);
       _controllerMetrics.addMeteredTableValue(taskType, 
ControllerMeter.NUMBER_ADHOC_TASKS_SUBMITTED, 1);
       responseMap.put(tableNameWithType,
@@ -488,8 +492,11 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
    * It might be called from the non-leader controller.
    * Returns a map from the task type to the {@link TaskSchedulingInfo} of 
tasks scheduled.
    */
+  @Deprecated(forRemoval = true)
   public synchronized Map<String, TaskSchedulingInfo> 
scheduleAllTasksForAllTables(@Nullable String minionInstanceTag) {
-    return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false, 
minionInstanceTag);
+    TaskSchedulingContext context = new TaskSchedulingContext()
+        .setMinionInstanceTag(minionInstanceTag);
+    return scheduleTasks(context);
   }
 
   /**
@@ -497,9 +504,13 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
    * It might be called from the non-leader controller.
    * Returns a map from the task type to the {@link TaskSchedulingInfo} of 
tasks scheduled.
    */
+  @Deprecated(forRemoval = true)
   public synchronized Map<String, TaskSchedulingInfo> 
scheduleAllTasksForDatabase(@Nullable String database,
       @Nullable String minionInstanceTag) {
-    return scheduleTasks(_pinotHelixResourceManager.getAllTables(database), 
false, minionInstanceTag);
+    TaskSchedulingContext context = new TaskSchedulingContext()
+        .setDatabasesToSchedule(Collections.singleton(database))
+        .setMinionInstanceTag(minionInstanceTag);
+    return scheduleTasks(context);
   }
 
   /**
@@ -507,9 +518,13 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
    * It might be called from the non-leader controller.
    * Returns a map from the task type to the {@link TaskSchedulingInfo} of 
tasks scheduled.
    */
+  @Deprecated(forRemoval = true)
   public synchronized Map<String, TaskSchedulingInfo> 
scheduleAllTasksForTable(String tableNameWithType,
       @Nullable String minionInstanceTag) {
-    return scheduleTasks(List.of(tableNameWithType), false, minionInstanceTag);
+    TaskSchedulingContext context = new TaskSchedulingContext()
+        .setTablesToSchedule(Collections.singleton(tableNameWithType))
+        .setMinionInstanceTag(minionInstanceTag);
+    return scheduleTasks(context);
   }
 
   /**
@@ -521,8 +536,12 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
    *  - list of task generation errors if any
    *  - list of task scheduling errors if any
    */
+  @Deprecated(forRemoval = true)
   public synchronized TaskSchedulingInfo scheduleTaskForAllTables(String 
taskType, @Nullable String minionInstanceTag) {
-    return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(), 
minionInstanceTag);
+    TaskSchedulingContext context = new TaskSchedulingContext()
+        .setTasksToSchedule(Collections.singleton(taskType))
+        .setMinionInstanceTag(minionInstanceTag);
+    return scheduleTasks(context).get(taskType);
   }
 
   /**
@@ -534,9 +553,14 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
    *  - list of task generation errors if any
    *  - list of task scheduling errors if any
    */
+  @Deprecated(forRemoval = true)
   public synchronized TaskSchedulingInfo scheduleTaskForDatabase(String 
taskType, @Nullable String database,
       @Nullable String minionInstanceTag) {
-    return scheduleTask(taskType, 
_pinotHelixResourceManager.getAllTables(database), minionInstanceTag);
+    TaskSchedulingContext context = new TaskSchedulingContext()
+        .setTasksToSchedule(Collections.singleton(taskType))
+        .setDatabasesToSchedule(Collections.singleton(database))
+        .setMinionInstanceTag(minionInstanceTag);
+    return scheduleTasks(context).get(taskType);
   }
 
   /**
@@ -548,27 +572,64 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
    *  - list of task generation errors if any
    *  - list of task scheduling errors if any
    */
+  @Deprecated(forRemoval = true)
   public synchronized TaskSchedulingInfo scheduleTaskForTable(String taskType, 
String tableNameWithType,
       @Nullable String minionInstanceTag) {
-    return scheduleTask(taskType, List.of(tableNameWithType), 
minionInstanceTag);
+    TaskSchedulingContext context = new TaskSchedulingContext()
+        .setTasksToSchedule(Collections.singleton(taskType))
+        .setTablesToSchedule(Collections.singleton(tableNameWithType))
+        .setMinionInstanceTag(minionInstanceTag);
+    return scheduleTasks(context).get(taskType);
   }
 
   /**
    * Helper method to schedule tasks (all task types) for the given tables 
that have the tasks enabled.
    * Returns a map from the task type to the {@link TaskSchedulingInfo} of the 
tasks scheduled.
    */
+  @Deprecated(forRemoval = true)
   protected synchronized Map<String, TaskSchedulingInfo> 
scheduleTasks(List<String> tableNamesWithType,
       boolean isLeader, @Nullable String minionInstanceTag) {
+    TaskSchedulingContext context = new TaskSchedulingContext()
+        .setTablesToSchedule(new HashSet<>(tableNamesWithType))
+        .setLeader(isLeader)
+        .setMinionInstanceTag(minionInstanceTag);
+    return scheduleTasks(context);
+  }
+
+  /**
+   * Helper method to schedule tasks (all task types) for the given tables 
that have the tasks enabled.
+   * Returns a map from the task type to the {@link TaskSchedulingInfo} of the 
tasks scheduled.
+   */
+  public synchronized Map<String, TaskSchedulingInfo> 
scheduleTasks(TaskSchedulingContext context) {
     
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED,
 1L);
 
-    // Scan all table configs to get the tables with tasks enabled
     Map<String, List<TableConfig>> enabledTableConfigMap = new HashMap<>();
-    for (String tableNameWithType : tableNamesWithType) {
+    Set<String> targetTables = context.getTablesToSchedule();
+    Set<String> targetDatabases = context.getDatabasesToSchedule();
+    Set<String> tasksToSchedule = context.getTasksToSchedule();
+    Set<String> consolidatedTables = new HashSet<>();
+    if (targetTables != null) {
+      consolidatedTables.addAll(targetTables);
+    }
+    if (targetDatabases != null) {
+      targetDatabases.forEach(database ->
+          
consolidatedTables.addAll(_pinotHelixResourceManager.getAllTables(database)));
+    }
+    for (String tableNameWithType : consolidatedTables.isEmpty()
+        ? _pinotHelixResourceManager.getAllTables() : consolidatedTables) {
       TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
       if (tableConfig != null && tableConfig.getTaskConfig() != null) {
         Set<String> enabledTaskTypes = 
tableConfig.getTaskConfig().getTaskTypeConfigsMap().keySet();
-        for (String enabledTaskType : enabledTaskTypes) {
-          enabledTableConfigMap.computeIfAbsent(enabledTaskType, k -> new 
ArrayList<>()).add(tableConfig);
+        Set<String> validTasks;
+        if (tasksToSchedule == null || tasksToSchedule.isEmpty()) {
+          // if no specific task types are provided schedule for all tasks
+          validTasks = enabledTaskTypes;
+        } else {
+          validTasks = new HashSet<>(tasksToSchedule);
+          validTasks.retainAll(enabledTaskTypes);
+        }
+        for (String taskType : validTasks) {
+          enabledTableConfigMap.computeIfAbsent(taskType, k -> new 
ArrayList<>()).add(tableConfig);
         }
       }
     }
@@ -579,13 +640,14 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
       String taskType = entry.getKey();
       List<TableConfig> enabledTableConfigs = entry.getValue();
       PinotTaskGenerator taskGenerator = 
_taskGeneratorRegistry.getTaskGenerator(taskType);
-      List<String> enabledTables =
-          
enabledTableConfigs.stream().map(TableConfig::getTableName).collect(Collectors.toList());
       if (taskGenerator != null) {
         _helixTaskResourceManager.ensureTaskQueueExists(taskType);
         addTaskTypeMetricsUpdaterIfNeeded(taskType);
-        tasksScheduled.put(taskType, scheduleTask(taskGenerator, 
enabledTableConfigs, isLeader, minionInstanceTag));
+        tasksScheduled.put(taskType, scheduleTask(taskGenerator, 
enabledTableConfigs, context.isLeader(),
+            context.getMinionInstanceTag(), context.getTriggeredBy()));
       } else {
+        List<String> enabledTables =
+            
enabledTableConfigs.stream().map(TableConfig::getTableName).collect(Collectors.toList());
         String message = "Task type: " + taskType + " is not registered, 
cannot enable it for tables: " + enabledTables;
         LOGGER.warn(message);
         TaskSchedulingInfo taskSchedulingInfo = new TaskSchedulingInfo();
@@ -597,24 +659,16 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
     return tasksScheduled;
   }
 
+  @Deprecated(forRemoval = true)
   protected synchronized TaskSchedulingInfo scheduleTask(String taskType, 
List<String> tables,
       @Nullable String minionInstanceTag) {
-    PinotTaskGenerator taskGenerator = 
_taskGeneratorRegistry.getTaskGenerator(taskType);
-    Preconditions.checkState(taskGenerator != null, "Task type: %s is not 
registered", taskType);
-
-    // Scan all table configs to get the tables with task enabled
-    List<TableConfig> enabledTableConfigs = new ArrayList<>();
-    for (String tableNameWithType : tables) {
-      TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
-      if (tableConfig != null && tableConfig.getTaskConfig() != null && 
tableConfig.getTaskConfig()
-          .isTaskTypeEnabled(taskType)) {
-        enabledTableConfigs.add(tableConfig);
-      }
-    }
-
-    _helixTaskResourceManager.ensureTaskQueueExists(taskType);
-    addTaskTypeMetricsUpdaterIfNeeded(taskType);
-    return scheduleTask(taskGenerator, enabledTableConfigs, false, 
minionInstanceTag);
+    
Preconditions.checkState(_taskGeneratorRegistry.getAllTaskTypes().contains(taskType),
+        "Task type: %s is not registered", taskType);
+    TaskSchedulingContext context = new TaskSchedulingContext()
+        .setTablesToSchedule(new HashSet<>(tables))
+        .setTasksToSchedule(Collections.singleton(taskType))
+        .setMinionInstanceTag(minionInstanceTag);
+    return scheduleTasks(context).get(taskType);
   }
 
   /**
@@ -626,8 +680,8 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
    *  - list of task scheduling errors if any
    */
   protected TaskSchedulingInfo scheduleTask(PinotTaskGenerator taskGenerator, 
List<TableConfig> enabledTableConfigs,
-      boolean isLeader, @Nullable String minionInstanceTagForTask) {
-    TaskSchedulingInfo response = new TaskSchedulingInfo();
+      boolean isLeader, @Nullable String minionInstanceTagForTask, String 
triggeredBy) {
+      TaskSchedulingInfo response = new TaskSchedulingInfo();
     String taskType = taskGenerator.getTaskType();
     List<String> enabledTables =
         
enabledTableConfigs.stream().map(TableConfig::getTableName).collect(Collectors.toList());
@@ -693,6 +747,8 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
           // This might lead to lot of logs, maybe sum it up and move outside 
the loop
           LOGGER.info("Submitting {} tasks for task type: {} to 
minionInstance: {} with task configs: {}", numTasks,
               taskType, minionInstanceTag, pinotTaskConfigs);
+          pinotTaskConfigs.forEach(pinotTaskConfig ->
+              pinotTaskConfig.getConfigs().computeIfAbsent(TRIGGERED_BY, k -> 
triggeredBy));
           String submittedTaskName = 
_helixTaskResourceManager.submitTask(pinotTaskConfigs, minionInstanceTag,
               taskGenerator.getTaskTimeoutMs(), 
taskGenerator.getNumConcurrentTasksPerInstance(),
               taskGenerator.getMaxAttemptsPerTask());
@@ -718,7 +774,11 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
 
   @Override
   protected void processTables(List<String> tableNamesWithType, Properties 
taskProperties) {
-    scheduleTasks(tableNamesWithType, true, null);
+    TaskSchedulingContext context = new TaskSchedulingContext()
+        .setLeader(true)
+        .setTriggeredBy(CommonConstants.TaskTriggers.CRON_TRIGGER.name());
+    // cron schedule
+    scheduleTasks(context);
   }
 
   @Override
@@ -781,36 +841,4 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
     }
     return true;
   }
-
-  public static class TaskSchedulingInfo {
-    private List<String> _scheduledTaskNames;
-    private final List<String> _generationErrors = new ArrayList<>();
-    private final List<String> _schedulingErrors = new ArrayList<>();
-
-    @Nullable
-    public List<String> getScheduledTaskNames() {
-      return _scheduledTaskNames;
-    }
-
-    public TaskSchedulingInfo setScheduledTaskNames(List<String> 
scheduledTaskNames) {
-      _scheduledTaskNames = scheduledTaskNames;
-      return this;
-    }
-
-    public List<String> getGenerationErrors() {
-      return _generationErrors;
-    }
-
-    public void addGenerationError(String generationError) {
-      _generationErrors.add(generationError);
-    }
-
-    public List<String> getSchedulingErrors() {
-      return _schedulingErrors;
-    }
-
-    public void addSchedulingError(String schedulingError) {
-      _schedulingErrors.add(schedulingError);
-    }
-  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskSchedulingContext.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskSchedulingContext.java
new file mode 100644
index 0000000000..2a685c2ab7
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskSchedulingContext.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion;
+
+import java.util.Set;
+
+
+/**
+ * Wrapper class to manage all the inputs passed to schedule a task on minion.
+ * Tasks will be scheduled based on the combination on tables, databases and 
taskTypes passed
+ * <p>Y -> contains elements
+ * <p>N -> is null or empty
+ * <table>
+ *   <tr>
+ *     <th> tablesToSchedule </th> <th> databasesToSchedule </th> <th> 
tasksToSchedule </th>
+ *     <th> {@link PinotTaskManager} behavior </th>
+ *   </tr>
+ *   <tr>
+ *     <td> N </td> <td> N </td> <td> N </td>
+ *     <td>schedule all the configured tasks on all tables</td>
+ *   </tr>
+ *   <tr>
+ *     <td> Y </td> <td> N </td> <td> N </td>
+ *     <td>schedule all the configured tasks on tables in tablesToSchedule</td>
+ *   </tr>
+ *   <tr>
+ *     <td> N </td> <td> Y </td> <td> N </td>
+ *     <td>schedule all the configured tasks on all tables under the databases 
in databasesToSchedule</td>
+ *   </tr>
+ *   <tr>
+ *     <td> N </td> <td> N </td> <td> Y </td>
+ *     <td>schedule tasksToSchedule on all tables</td>
+ *   </tr>
+ *   <tr>
+ *     <td> N </td> <td> Y </td> <td> Y </td>
+ *     <td>schedule tasksToSchedule on all tables under the databases in 
databasesToSchedule</td>
+ *   </tr>
+ *   <tr>
+ *     <td> Y </td> <td> N </td> <td> Y </td>
+ *     <td>schedule tasksToSchedule on tables in tablesToSchedule</td>
+ *   </tr>
+ *   <tr>
+ *     <td> Y </td> <td> Y </td> <td> N </td>
+ *     <td>schedule all the configured tasks on tables in tablesToSchedule
+ *     and also on all tables under the databases in databasesToSchedule</td>
+ *   </tr>
+ *   <tr>
+ *     <td> Y </td> <td> Y </td> <td> Y </td>
+ *     <td>schedule tasksToSchedule on tables in tablesToSchedule and also
+ *     on all tables under the databases in databasesToSchedule</td>
+ *   </tr>
+ * </table>
+ *
+ * In short empty tasksToSchedule will schedule tasks for all types and
+ * empty tablesToSchedule and databasesToSchedule will schedule tasks for all 
tables
+ *
+ */
+public class TaskSchedulingContext {
+  private Set<String> _tablesToSchedule;
+  private Set<String> _tasksToSchedule;
+  private Set<String> _databasesToSchedule;
+  private String _triggeredBy;
+  private String _minionInstanceTag;
+  private boolean _isLeader;
+
+  public Set<String> getTablesToSchedule() {
+    return _tablesToSchedule;
+  }
+
+  public TaskSchedulingContext setTablesToSchedule(Set<String> 
tablesToSchedule) {
+    _tablesToSchedule = tablesToSchedule;
+    return this;
+  }
+
+  public Set<String> getTasksToSchedule() {
+    return _tasksToSchedule;
+  }
+
+  public TaskSchedulingContext setTasksToSchedule(Set<String> tasksToSchedule) 
{
+    _tasksToSchedule = tasksToSchedule;
+    return this;
+  }
+
+  public Set<String> getDatabasesToSchedule() {
+    return _databasesToSchedule;
+  }
+
+  public TaskSchedulingContext setDatabasesToSchedule(Set<String> 
databasesToSchedule) {
+    _databasesToSchedule = databasesToSchedule;
+    return this;
+  }
+
+  public String getTriggeredBy() {
+    return _triggeredBy;
+  }
+
+  public TaskSchedulingContext setTriggeredBy(String triggeredBy) {
+    _triggeredBy = triggeredBy;
+    return this;
+  }
+
+  public String getMinionInstanceTag() {
+    return _minionInstanceTag;
+  }
+
+  public TaskSchedulingContext setMinionInstanceTag(String minionInstanceTag) {
+    _minionInstanceTag = minionInstanceTag;
+    return this;
+  }
+
+  public boolean isLeader() {
+    return _isLeader;
+  }
+
+  public TaskSchedulingContext setLeader(boolean leader) {
+    _isLeader = leader;
+    return this;
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskSchedulingInfo.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskSchedulingInfo.java
new file mode 100644
index 0000000000..2ffa11676f
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskSchedulingInfo.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
+
+
+public class TaskSchedulingInfo {
+  private List<String> _scheduledTaskNames;
+  private final List<String> _generationErrors = new ArrayList<>();
+  private final List<String> _schedulingErrors = new ArrayList<>();
+
+  @Nullable
+  public List<String> getScheduledTaskNames() {
+    return _scheduledTaskNames;
+  }
+
+  public TaskSchedulingInfo setScheduledTaskNames(List<String> 
scheduledTaskNames) {
+    _scheduledTaskNames = scheduledTaskNames;
+    return this;
+  }
+
+  public List<String> getGenerationErrors() {
+    return _generationErrors;
+  }
+
+  public void addGenerationError(String generationError) {
+    _generationErrors.add(generationError);
+  }
+
+  public List<String> getSchedulingErrors() {
+    return _schedulingErrors;
+  }
+
+  public void addSchedulingError(String schedulingError) {
+    _schedulingErrors.add(schedulingError);
+  }
+}
diff --git a/pinot-controller/src/main/resources/app/pages/SubTaskDetail.tsx 
b/pinot-controller/src/main/resources/app/pages/SubTaskDetail.tsx
index f94cd4d820..27ab5dc273 100644
--- a/pinot-controller/src/main/resources/app/pages/SubTaskDetail.tsx
+++ b/pinot-controller/src/main/resources/app/pages/SubTaskDetail.tsx
@@ -113,6 +113,9 @@ const TaskDetail = (props) => {
           <Grid item xs={12}>
             <strong>Finish Time:</strong> {get(taskDebugData, 'finishTime', 
'')}
           </Grid>
+          <Grid item xs={12}>
+            <strong>Triggered By:</strong> {get(taskDebugData, 'triggeredBy', 
'')}
+          </Grid>
           <Grid item xs={12}>
             <strong>Minion Host Name:</strong> {get(taskDebugData, 
'participant', '')}
           </Grid>
diff --git a/pinot-controller/src/main/resources/app/pages/TaskDetail.tsx 
b/pinot-controller/src/main/resources/app/pages/TaskDetail.tsx
index f825cfbd05..eff82fd3fd 100644
--- a/pinot-controller/src/main/resources/app/pages/TaskDetail.tsx
+++ b/pinot-controller/src/main/resources/app/pages/TaskDetail.tsx
@@ -124,6 +124,9 @@ const TaskDetail = (props) => {
           <Grid item xs={12}>
             <strong>Finish Time:</strong> {get(taskDebugData, 'finishTime', 
'')}
           </Grid>
+          <Grid item xs={12}>
+            <strong>Triggered By:</strong> {get(taskDebugData, 'triggeredBy', 
'')}
+          </Grid>
           <Grid item xs={12}>
             <strong>Number of Sub Tasks:</strong> {get(taskDebugData, 
'subtaskCount.total', '')}
           </Grid>
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java
index 132e109796..15b584eabf 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManagerStatelessTest.java
@@ -65,6 +65,7 @@ import static org.testng.Assert.*;
 @Test(groups = "stateless")
 public class PinotTaskManagerStatelessTest extends ControllerTest {
   private static final String RAW_TABLE_NAME = "myTable";
+  public static final String TABLE_NAME_WITH_TYPE = "myTable_OFFLINE";
   private static final String OFFLINE_TABLE_NAME = 
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
   private static final long TIMEOUT_IN_MS = 10_000L;
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotTaskManagerStatelessTest.class);
@@ -192,10 +193,15 @@ public class PinotTaskManagerStatelessTest extends 
ControllerTest {
   public void testPinotTaskManagerScheduleTaskWithStoppedTaskQueue()
       throws Exception {
     testValidateTaskGeneration(taskManager -> {
+      String taskName = "SegmentGenerationAndPushTask";
+      TaskSchedulingContext context = new TaskSchedulingContext()
+          .setTablesToSchedule(Collections.singleton(TABLE_NAME_WITH_TYPE))
+          .setTasksToSchedule(Collections.singleton(taskName));
       // Validate schedule tasks for table when task queue is in stopped state
-      List<String> taskIDs = 
taskManager.scheduleTaskForTable("SegmentGenerationAndPushTask", "myTable", 
null)
-          .getScheduledTaskNames();
-      assertNull(taskIDs);
+      TaskSchedulingInfo info = 
taskManager.scheduleTasks(context).get(taskName);
+      assertNotNull(info);
+      assertNull(info.getScheduledTaskNames());
+      assertFalse(info.getSchedulingErrors().isEmpty());
       return null;
     });
   }
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/MinionTaskTestUtils.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/MinionTaskTestUtils.java
index 849a8b8bfd..2291bbb858 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/MinionTaskTestUtils.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/MinionTaskTestUtils.java
@@ -20,6 +20,8 @@ package org.apache.pinot.integration.tests;
 
 import java.util.Map;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
+import org.apache.pinot.controller.helix.core.minion.TaskSchedulingInfo;
 
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
@@ -29,25 +31,18 @@ public class MinionTaskTestUtils {
   private MinionTaskTestUtils() {
   }
 
-  public static void assertNoTaskSchedule(String tableNameWithType, String 
taskType, PinotTaskManager taskManager) {
-    PinotTaskManager.TaskSchedulingInfo info =
-        taskManager.scheduleAllTasksForTable(tableNameWithType, 
null).get(taskType);
-    assertNoTaskSchedule(info);
-  }
-
-  public static void assertNoTaskSchedule(String taskType, PinotTaskManager 
taskManager) {
-    PinotTaskManager.TaskSchedulingInfo info = 
taskManager.scheduleTaskForAllTables(taskType, null);
-    assertNoTaskSchedule(info);
-  }
-
-  public static void assertNoTaskSchedule(PinotTaskManager taskManager) {
-    Map<String, PinotTaskManager.TaskSchedulingInfo> infoMap = 
taskManager.scheduleAllTasksForAllTables(null);
+  public static void assertNoTaskSchedule(TaskSchedulingContext context, 
PinotTaskManager taskManager) {
+    Map<String, TaskSchedulingInfo> infoMap = 
taskManager.scheduleTasks(context);
     infoMap.forEach((key, value) -> assertNoTaskSchedule(value));
   }
 
-  public static void assertNoTaskSchedule(PinotTaskManager.TaskSchedulingInfo 
info) {
+  public static void assertNoTaskSchedule(TaskSchedulingInfo info) {
     assertNotNull(info.getScheduledTaskNames());
     assertTrue(info.getScheduledTaskNames().isEmpty());
+    assertNoTaskErrors(info);
+  }
+
+  public static void assertNoTaskErrors(TaskSchedulingInfo info) {
     assertNotNull(info.getGenerationErrors());
     assertTrue(info.getGenerationErrors().isEmpty());
     assertNotNull(info.getSchedulingErrors());
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
index b8833d10b1..71a55da67d 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
@@ -42,6 +42,7 @@ import org.apache.pinot.common.utils.TarCompressionUtils;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
 import org.apache.pinot.core.common.MinionConstants;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
@@ -408,18 +409,20 @@ public class MergeRollupMinionClusterIntegrationTest 
extends BaseClusterIntegrat
     long expectedWatermark = 16000 * 86_400_000L;
     String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(SINGLE_LEVEL_CONCAT_TEST_TABLE);
     int numTasks = 0;
+    TaskSchedulingContext context = new TaskSchedulingContext()
+        .setTablesToSchedule(Collections.singleton(offlineTableName));
     List<String> taskList;
-    for (String tasks = 
_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+    for (String tasks = _taskManager.scheduleTasks(context)
         
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames().get(0);
         tasks != null;
-        taskList = _taskManager.scheduleAllTasksForTable(offlineTableName, 
null)
+        taskList = _taskManager.scheduleTasks(context)
             
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames(),
             tasks = taskList != null && !taskList.isEmpty() ? taskList.get(0) 
: null, numTasks++) {
       assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), 
expectedNumSubTasks[numTasks]);
       assertTrue(_helixTaskResourceManager.getTaskQueues()
           
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
       // Will not schedule task if there's incomplete task
-      assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+      assertNull(_taskManager.scheduleTasks(context)
           .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
       waitForTaskToComplete();
 
@@ -524,18 +527,20 @@ public class MergeRollupMinionClusterIntegrationTest 
extends BaseClusterIntegrat
     long expectedWatermark = 16000 * 86_400_000L;
     String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE);
     int numTasks = 0;
+    TaskSchedulingContext context = new TaskSchedulingContext()
+        .setTablesToSchedule(Collections.singleton(offlineTableName));
     List<String> taskList;
-    for (String tasks = 
_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+    for (String tasks = _taskManager.scheduleTasks(context)
         
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames().get(0);
         tasks != null;
-        taskList = _taskManager.scheduleAllTasksForTable(offlineTableName, 
null)
+        taskList = _taskManager.scheduleTasks(context)
             
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames(),
             tasks = taskList != null && !taskList.isEmpty() ? taskList.get(0) 
: null, numTasks++) {
       assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), 
expectedNumSubTasks[numTasks]);
       assertTrue(_helixTaskResourceManager.getTaskQueues()
           
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
       // Will not schedule task if there's incomplete task
-      assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+      assertNull(_taskManager.scheduleTasks(context)
           .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
       waitForTaskToComplete();
 
@@ -633,18 +638,20 @@ public class MergeRollupMinionClusterIntegrationTest 
extends BaseClusterIntegrat
     long expectedWatermark = 16050 * 86_400_000L;
     String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(SINGLE_LEVEL_ROLLUP_TEST_TABLE);
     int numTasks = 0;
+    TaskSchedulingContext context = new TaskSchedulingContext()
+        .setTablesToSchedule(Collections.singleton(offlineTableName));
     List<String> taskList;
-    for (String tasks = 
_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+    for (String tasks = _taskManager.scheduleTasks(context)
         
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames().get(0);
         tasks != null;
-        taskList = _taskManager.scheduleAllTasksForTable(offlineTableName, 
null)
+        taskList = _taskManager.scheduleTasks(context)
             
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames(),
             tasks = taskList != null && !taskList.isEmpty() ? taskList.get(0) 
: null, numTasks++) {
       assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), 
1);
       assertTrue(_helixTaskResourceManager.getTaskQueues()
           
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
       // Will not schedule task if there's incomplete task
-      assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+      assertNull(_taskManager.scheduleTasks(context)
           .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
       waitForTaskToComplete();
 
@@ -785,18 +792,20 @@ public class MergeRollupMinionClusterIntegrationTest 
extends BaseClusterIntegrat
 
     String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(MULTI_LEVEL_CONCAT_TEST_TABLE);
     int numTasks = 0;
+    TaskSchedulingContext context = new TaskSchedulingContext()
+        .setTablesToSchedule(Collections.singleton(offlineTableName));
     List<String> taskList;
-    for (String tasks = 
_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+    for (String tasks = _taskManager.scheduleTasks(context)
         
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames().get(0);
         tasks != null;
-        taskList = _taskManager.scheduleAllTasksForTable(offlineTableName, 
null)
+        taskList = _taskManager.scheduleTasks(context)
             
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames(),
             tasks = taskList != null && !taskList.isEmpty() ? taskList.get(0) 
: null, numTasks++) {
       assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), 
expectedNumSubTasks[numTasks]);
       assertTrue(_helixTaskResourceManager.getTaskQueues()
           
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
       // Will not schedule task if there's incomplete task
-      assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+      assertNull(_taskManager.scheduleTasks(context)
           .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
       waitForTaskToComplete();
 
@@ -918,11 +927,13 @@ public class MergeRollupMinionClusterIntegrationTest 
extends BaseClusterIntegrat
     long expectedWatermark = 16000 * 86_400_000L;
     String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
     int numTasks = 0;
+    TaskSchedulingContext context = new TaskSchedulingContext()
+        .setTablesToSchedule(Collections.singleton(realtimeTableName));
     List<String> taskList;
-    for (String tasks = 
taskManager.scheduleAllTasksForTable(realtimeTableName, null)
+    for (String tasks = taskManager.scheduleTasks(context)
             
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames().get(0);
         tasks != null;
-        taskList = taskManager.scheduleAllTasksForTable(realtimeTableName, 
null)
+        taskList = taskManager.scheduleTasks(context)
             
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames(),
             tasks = taskList != null && !taskList.isEmpty() ? taskList.get(0) 
: null, numTasks++) {
 //      assertEquals(helixTaskResourceManager.getSubtaskConfigs(tasks).size(), 
expectedNumSubTasks[numTasks]);
@@ -930,7 +941,7 @@ public class MergeRollupMinionClusterIntegrationTest 
extends BaseClusterIntegrat
           
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
 
       // Will not schedule task if there's incomplete task
-      assertNull(taskManager.scheduleAllTasksForTable(realtimeTableName, null)
+      assertNull(taskManager.scheduleTasks(context)
           .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
       waitForTaskToComplete();
 
@@ -1024,17 +1035,19 @@ public class MergeRollupMinionClusterIntegrationTest 
extends BaseClusterIntegrat
     long[] expectedNumBucketsToProcess200Days = {0, 0, 1, 1, 0, 0, 1, 1};
     String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
     int numTasks = 0;
+    TaskSchedulingContext context = new TaskSchedulingContext()
+        .setTablesToSchedule(Collections.singleton(realtimeTableName));
     List<String> taskList;
-    for (String tasks = 
taskManager.scheduleAllTasksForTable(realtimeTableName, null).
-        
get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames().get(0); 
tasks != null;
-        taskList = taskManager.scheduleAllTasksForTable(realtimeTableName, 
null)
+    for (String tasks = taskManager.scheduleTasks(context)
+        
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames().get(0); 
tasks != null;
+        taskList = taskManager.scheduleTasks(context)
             
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames(),
             tasks = taskList != null && !taskList.isEmpty() ? taskList.get(0) 
: null, numTasks++) {
       assertTrue(helixTaskResourceManager.getTaskQueues()
           
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
 
       // Will not schedule task if there's incomplete task
-      assertNull(taskManager.scheduleAllTasksForTable(realtimeTableName, null)
+      assertNull(taskManager.scheduleTasks(context)
           .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
       waitForTaskToComplete();
 
@@ -1066,10 +1079,10 @@ public class MergeRollupMinionClusterIntegrationTest 
extends BaseClusterIntegrat
     uploadSegments(MULTI_LEVEL_CONCAT_PROCESS_ALL_REALTIME_TABLE, 
TableType.REALTIME, _tarDir5);
     waitForAllDocsLoaded(600_000L);
 
-    for (String tasks = 
taskManager.scheduleAllTasksForTable(realtimeTableName, null)
+    for (String tasks = taskManager.scheduleTasks(context)
         
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames().get(0);
         tasks != null;
-        taskList = taskManager.scheduleAllTasksForTable(realtimeTableName, 
null)
+        taskList = taskManager.scheduleTasks(context)
             
.get(MinionConstants.MergeRollupTask.TASK_TYPE).getScheduledTaskNames(),
             tasks = taskList != null && !taskList.isEmpty() ? taskList.get(0) 
: null, numTasks++) {
       waitForTaskToComplete();
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
index fed10b9f1b..19459894c7 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
@@ -33,6 +33,7 @@ import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.minion.MinionContext;
 import org.apache.pinot.spi.config.table.IndexingConfig;
@@ -185,12 +186,16 @@ public class PurgeMinionClusterIntegrationTest extends 
BaseClusterIntegrationTes
     // 5. Check the purge process itself by setting an expecting number of rows
 
     String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(PURGE_FIRST_RUN_TABLE);
-    assertNotNull(
-        _taskManager.scheduleAllTasksForTable(offlineTableName, 
null).get(MinionConstants.PurgeTask.TASK_TYPE));
+    assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+            .setTablesToSchedule(Collections.singleton(offlineTableName)))
+        .get(MinionConstants.PurgeTask.TASK_TYPE));
     assertTrue(_helixTaskResourceManager.getTaskQueues()
         
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE)));
     // Will not schedule task if there's incomplete task
-    MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, 
MinionConstants.PurgeTask.TASK_TYPE, _taskManager);
+    MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
+            .setTablesToSchedule(Collections.singleton(offlineTableName))
+            
.setTasksToSchedule(Collections.singleton(MinionConstants.PurgeTask.TASK_TYPE)),
+        _taskManager);
     waitForTaskToComplete();
 
     // Check that metadata contains expected values
@@ -200,7 +205,10 @@ public class PurgeMinionClusterIntegrationTest extends 
BaseClusterIntegrationTes
           
metadata.getCustomMap().containsKey(MinionConstants.PurgeTask.TASK_TYPE + 
MinionConstants.TASK_TIME_SUFFIX));
     }
     // Should not generate new purge task as the last time purge is not 
greater than last + 1day (default purge delay)
-    MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, 
MinionConstants.PurgeTask.TASK_TYPE, _taskManager);
+    MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
+            .setTablesToSchedule(Collections.singleton(offlineTableName))
+            
.setTasksToSchedule(Collections.singleton(MinionConstants.PurgeTask.TASK_TYPE)),
+        _taskManager);
 
     // 52 rows with ArrTime = 1
     // 115545 totals rows
@@ -231,11 +239,16 @@ public class PurgeMinionClusterIntegrationTest extends 
BaseClusterIntegrationTes
 
     String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_PASSED_TABLE);
     assertNotNull(
-        _taskManager.scheduleAllTasksForTable(offlineTableName, 
null).get(MinionConstants.PurgeTask.TASK_TYPE));
+        _taskManager.scheduleTasks(new TaskSchedulingContext()
+                .setTablesToSchedule(Collections.singleton(offlineTableName)))
+            .get(MinionConstants.PurgeTask.TASK_TYPE));
     assertTrue(_helixTaskResourceManager.getTaskQueues()
         
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE)));
     // Will not schedule task if there's incomplete task
-    MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, 
MinionConstants.PurgeTask.TASK_TYPE, _taskManager);
+    MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
+            .setTablesToSchedule(Collections.singleton(offlineTableName))
+            
.setTasksToSchedule(Collections.singleton(MinionConstants.PurgeTask.TASK_TYPE)),
+        _taskManager);
     waitForTaskToComplete();
 
     // Check that metadata contains expected values
@@ -247,7 +260,10 @@ public class PurgeMinionClusterIntegrationTest extends 
BaseClusterIntegrationTes
       assertTrue(System.currentTimeMillis() - Long.parseLong(purgeTime) < 
86400000);
     }
     // Should not generate new purge task as the last time purge is not 
greater than last + 1day (default purge delay)
-    MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, 
MinionConstants.PurgeTask.TASK_TYPE, _taskManager);
+    MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
+            .setTablesToSchedule(Collections.singleton(offlineTableName))
+            
.setTasksToSchedule(Collections.singleton(MinionConstants.PurgeTask.TASK_TYPE)),
+        _taskManager);
 
     // 52 rows with ArrTime = 1
     // 115545 totals rows
@@ -279,7 +295,10 @@ public class PurgeMinionClusterIntegrationTest extends 
BaseClusterIntegrationTes
     String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_NOT_PASSED_TABLE);
 
     // No task should be schedule as the delay is not passed
-    MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, 
MinionConstants.PurgeTask.TASK_TYPE, _taskManager);
+    MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
+            .setTablesToSchedule(Collections.singleton(offlineTableName))
+            
.setTasksToSchedule(Collections.singleton(MinionConstants.PurgeTask.TASK_TYPE)),
+        _taskManager);
     for (SegmentZKMetadata metadata : 
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
       // Check purge time
       String purgeTime =
@@ -330,11 +349,15 @@ public class PurgeMinionClusterIntegrationTest extends 
BaseClusterIntegrationTes
 
     // schedule purge tasks
     String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
-    assertNotNull(
-        _taskManager.scheduleAllTasksForTable(offlineTableName, 
null).get(MinionConstants.PurgeTask.TASK_TYPE));
+    assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+            .setTablesToSchedule(Collections.singleton(offlineTableName)))
+        .get(MinionConstants.PurgeTask.TASK_TYPE));
     assertTrue(_helixTaskResourceManager.getTaskQueues()
         
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE)));
-    MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, 
MinionConstants.PurgeTask.TASK_TYPE, _taskManager);
+    MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
+            .setTablesToSchedule(Collections.singleton(offlineTableName))
+            
.setTasksToSchedule(Collections.singleton(MinionConstants.PurgeTask.TASK_TYPE)),
+        _taskManager);
     waitForTaskToComplete();
 
     // Check that metadata contains expected values
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
index 296c981c18..15829f57b0 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
@@ -33,6 +33,7 @@ import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
 import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
 import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.FieldConfig;
@@ -231,13 +232,16 @@ public class 
RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC
     long expectedWatermark = _dataSmallestTimeMs + 86400000;
     for (int i = 0; i < 3; i++) {
       // Schedule task
-      assertNotNull(_taskManager.scheduleAllTasksForTable(_realtimeTableName, 
null)
+      assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+              .setTablesToSchedule(Collections.singleton(_realtimeTableName)))
           .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
       assertTrue(_taskResourceManager.getTaskQueues().contains(
           
PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)));
       // Should not generate more tasks
-      MinionTaskTestUtils.assertNoTaskSchedule(_realtimeTableName,
-          MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, 
_taskManager);
+      MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
+              .setTablesToSchedule(Collections.singleton(_realtimeTableName))
+              
.setTasksToSchedule(Collections.singleton(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)),
+          _taskManager);
 
       // Wait at most 600 seconds for all tasks COMPLETED
       waitForTaskToComplete(expectedWatermark, _realtimeTableName);
@@ -283,13 +287,16 @@ public class 
RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC
     _taskManager.cleanUpTask();
     for (int i = 0; i < 3; i++) {
       // Schedule task
-      
assertNotNull(_taskManager.scheduleAllTasksForTable(_realtimeMetadataTableName, 
null)
+      assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+              
.setTablesToSchedule(Collections.singleton(_realtimeMetadataTableName)))
           .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
       assertTrue(_taskResourceManager.getTaskQueues().contains(
           
PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)));
       // Should not generate more tasks
-      MinionTaskTestUtils.assertNoTaskSchedule(_realtimeMetadataTableName,
-          MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, 
_taskManager);
+      MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
+              
.setTablesToSchedule(Collections.singleton(_realtimeMetadataTableName))
+              
.setTasksToSchedule(Collections.singleton(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)),
+          _taskManager);
 
       // Wait at most 600 seconds for all tasks COMPLETED
       waitForTaskToComplete(expectedWatermark, _realtimeMetadataTableName);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
index c14f278cf6..9a0b23bae4 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RefreshSegmentMinionClusterIntegrationTest.java
@@ -33,6 +33,7 @@ import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
 import org.apache.pinot.spi.config.table.FieldConfig;
@@ -108,12 +109,15 @@ public class RefreshSegmentMinionClusterIntegrationTest 
extends BaseClusterInteg
   public void testFirstSegmentRefresh() {
     // This will create the inverted index as we disable inverted index 
creation during segment push.
     String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
-    assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+    assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+            .setTablesToSchedule(Collections.singleton(offlineTableName)))
         .get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
     assertTrue(_helixTaskResourceManager.getTaskQueues()
         
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE)));
     // Will not schedule task if there's incomplete task
-    MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, 
MinionConstants.RefreshSegmentTask.TASK_TYPE,
+    MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
+            .setTablesToSchedule(Collections.singleton(offlineTableName))
+            
.setTasksToSchedule(Collections.singleton(MinionConstants.RefreshSegmentTask.TASK_TYPE)),
         _taskManager);
     waitForTaskToComplete();
 
@@ -128,7 +132,9 @@ public class RefreshSegmentMinionClusterIntegrationTest 
extends BaseClusterInteg
     }
 
     // This should be no-op as nothing changes.
-    MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, 
MinionConstants.RefreshSegmentTask.TASK_TYPE,
+    MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
+            .setTablesToSchedule(Collections.singleton(offlineTableName))
+            
.setTasksToSchedule(Collections.singleton(MinionConstants.RefreshSegmentTask.TASK_TYPE)),
         _taskManager);
     for (SegmentZKMetadata metadata : 
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
       // Get the value in segment metadata
@@ -153,12 +159,15 @@ public class RefreshSegmentMinionClusterIntegrationTest 
extends BaseClusterInteg
     
schema.getFieldSpecFor("DestAirportID").setDataType(FieldSpec.DataType.STRING);
     addSchema(schema);
 
-    assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+    assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+            .setTablesToSchedule(Collections.singleton(offlineTableName)))
         .get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
     assertTrue(_helixTaskResourceManager.getTaskQueues()
         
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE)));
     // Will not schedule task if there's incomplete task
-    MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, 
MinionConstants.RefreshSegmentTask.TASK_TYPE,
+    MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
+            .setTablesToSchedule(Collections.singleton(offlineTableName))
+            
.setTasksToSchedule(Collections.singleton(MinionConstants.RefreshSegmentTask.TASK_TYPE)),
         _taskManager);
     waitForTaskToComplete();
 
@@ -232,12 +241,15 @@ public class RefreshSegmentMinionClusterIntegrationTest 
extends BaseClusterInteg
     updateTableConfig(tableConfig);
 
     String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
-    assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+    assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+            .setTablesToSchedule(Collections.singleton(offlineTableName)))
         .get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
     assertTrue(_helixTaskResourceManager.getTaskQueues()
         
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE)));
     // Will not schedule task if there's incomplete task
-    MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, 
MinionConstants.RefreshSegmentTask.TASK_TYPE,
+    MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
+            .setTablesToSchedule(Collections.singleton(offlineTableName))
+            
.setTasksToSchedule(Collections.singleton(MinionConstants.RefreshSegmentTask.TASK_TYPE)),
         _taskManager);
     waitForTaskToComplete();
 
@@ -323,12 +335,15 @@ public class RefreshSegmentMinionClusterIntegrationTest 
extends BaseClusterInteg
 
     String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
 
-    assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+    assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+            .setTablesToSchedule(Collections.singleton(offlineTableName)))
         .get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
     assertTrue(_helixTaskResourceManager.getTaskQueues()
         
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE)));
     // Will not schedule task if there's incomplete task
-    MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, 
MinionConstants.RefreshSegmentTask.TASK_TYPE,
+    MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
+            .setTablesToSchedule(Collections.singleton(offlineTableName))
+            
.setTasksToSchedule(Collections.singleton(MinionConstants.RefreshSegmentTask.TASK_TYPE)),
         _taskManager);
     waitForTaskToComplete();
 
@@ -401,12 +416,15 @@ public class RefreshSegmentMinionClusterIntegrationTest 
extends BaseClusterInteg
 
     updateTableConfig(tableConfig);
 
-    assertNotNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+    assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+            .setTablesToSchedule(Collections.singleton(offlineTableName)))
         .get(MinionConstants.RefreshSegmentTask.TASK_TYPE));
     assertTrue(_helixTaskResourceManager.getTaskQueues()
         
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RefreshSegmentTask.TASK_TYPE)));
     // Will not schedule task if there's incomplete task
-    MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, 
MinionConstants.RefreshSegmentTask.TASK_TYPE,
+    MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
+            .setTablesToSchedule(Collections.singleton(offlineTableName))
+            
.setTasksToSchedule(Collections.singleton(MinionConstants.RefreshSegmentTask.TASK_TYPE)),
         _taskManager);
     waitForTaskToComplete();
 
@@ -423,7 +441,9 @@ public class RefreshSegmentMinionClusterIntegrationTest 
extends BaseClusterInteg
     }
 
     // This should be no-op as nothing changes.
-    MinionTaskTestUtils.assertNoTaskSchedule(offlineTableName, 
MinionConstants.RefreshSegmentTask.TASK_TYPE,
+    MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext()
+            .setTablesToSchedule(Collections.singleton(offlineTableName))
+            
.setTasksToSchedule(Collections.singleton(MinionConstants.RefreshSegmentTask.TASK_TYPE)),
         _taskManager);
     for (SegmentZKMetadata metadata : 
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
       // Get the value in segment metadata
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index 3071d9c7fb..00dce3341c 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -33,6 +33,7 @@ import org.apache.pinot.common.metrics.MetricValueUtils;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
 import 
org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.minion.executor.PinotTaskExecutor;
@@ -137,7 +138,7 @@ public class SimpleMinionClusterIntegrationTest extends 
ClusterTest {
 
     // Should create the task queues and generate a task in the same minion 
instance
     List<String> task1 =
-        
_taskManager.scheduleAllTasksForAllTables(null).get(TASK_TYPE).getScheduledTaskNames();
+        _taskManager.scheduleTasks(new 
TaskSchedulingContext()).get(TASK_TYPE).getScheduledTaskNames();
     assertNotNull(task1);
     assertEquals(task1.size(), 1);
     assertTrue(_helixTaskResourceManager.getTaskQueues()
@@ -151,7 +152,7 @@ public class SimpleMinionClusterIntegrationTest extends 
ClusterTest {
     verifyTaskCount(task1.get(0), 0, 1, 1, 2);
     // Should generate one more task, with two sub-tasks. Both of these 
sub-tasks will wait
     // since we have one minion instance that is still running one of the 
sub-tasks.
-    List<String> task2 = _taskManager.scheduleTaskForAllTables(TASK_TYPE, 
null).getScheduledTaskNames();
+    List<String> task2 = _taskManager.scheduleTasks(new 
TaskSchedulingContext()).get(TASK_TYPE).getScheduledTaskNames();
     assertNotNull(task2);
     assertEquals(task2.size(), 1);
     
assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task2.get(0)));
@@ -160,8 +161,7 @@ public class SimpleMinionClusterIntegrationTest extends 
ClusterTest {
     // Should not generate more tasks since 
SimpleMinionClusterIntegrationTests.NUM_TASKS is 2.
     // Our test task generator does not generate if there are already this 
many sub-tasks in the
     // running+waiting count already.
-    MinionTaskTestUtils.assertNoTaskSchedule(_taskManager);
-    MinionTaskTestUtils.assertNoTaskSchedule(TASK_TYPE, _taskManager);
+    MinionTaskTestUtils.assertNoTaskSchedule(new TaskSchedulingContext(), 
_taskManager);
 
     // Wait at most 60 seconds for all tasks IN_PROGRESS
     TestUtils.waitForCondition(input -> {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java
index e5c35f6dbd..eceafc732c 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java
@@ -58,6 +58,7 @@ import org.apache.pinot.common.utils.SimpleHttpResponse;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.common.utils.tls.TlsUtils;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
 import org.apache.pinot.core.common.MinionConstants;
 import 
org.apache.pinot.integration.tests.access.CertBasedTlsChannelAccessControlFactory;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -478,7 +479,7 @@ public class TlsIntegrationTest extends 
BaseClusterIntegrationTest {
     Assert.assertTrue(resultBeforeOffline.getResultSet(0).getLong(0) > 0);
 
     // schedule offline segment generation
-    
Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleAllTasksForAllTables(null));
+    Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleTasks(new 
TaskSchedulingContext()));
 
     // wait for offline segments
     JsonNode offlineSegments = TestUtils.waitForResult(() -> {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
index 6d965ffae0..3aa15e1fb0 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
@@ -34,6 +34,7 @@ import 
org.apache.pinot.common.restlet.resources.ValidDocIdsType;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
 import 
org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender;
@@ -468,7 +469,8 @@ public class UpsertTableIntegrationTest extends 
BaseClusterIntegrationTest {
     
sendPostRequest(_controllerRequestURLBuilder.forResumeConsumption(tableName));
     waitForNumQueriedSegmentsToConverge(tableName, 600_000L, 5, 2);
     String realtimeTableName = 
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
-    assertNotNull(_taskManager.scheduleAllTasksForTable(realtimeTableName, 
null)
+    assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+            .setTablesToSchedule(Collections.singleton(realtimeTableName)))
         .get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
     waitForTaskToComplete();
     // 2 segments should be compacted (351 rows -> 1 row; 500 rows -> 2 rows), 
1 segment (149 rows) should be deleted
@@ -501,7 +503,8 @@ public class UpsertTableIntegrationTest extends 
BaseClusterIntegrationTest {
 
     // NOTE: When in-memory valid doc ids are used, no need to pause/resume 
consumption to trigger the snapshot.
     String realtimeTableName = 
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
-    assertNotNull(_taskManager.scheduleAllTasksForTable(realtimeTableName, 
null)
+    assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+            .setTablesToSchedule(Collections.singleton(realtimeTableName)))
         .get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
     waitForTaskToComplete();
     // 1 segment should be compacted (500 rows -> 2 rows)
@@ -544,7 +547,8 @@ public class UpsertTableIntegrationTest extends 
BaseClusterIntegrationTest {
     waitForNumQueriedSegmentsToConverge(tableName, 10_000L, 5, 2);
 
     String realtimeTableName = 
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
-    assertNotNull(_taskManager.scheduleAllTasksForTable(realtimeTableName, 
null)
+    assertNotNull(_taskManager.scheduleTasks(new TaskSchedulingContext()
+            .setTablesToSchedule(Collections.singleton(realtimeTableName)))
         .get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
     waitForTaskToComplete();
     // 1 segment should be compacted (351 rows -> 1 rows), 2 segments (500 
rows, 151 rows) should be deleted
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java
index 22d24115d9..ade04ba12e 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java
@@ -32,6 +32,7 @@ import 
org.apache.pinot.client.JsonAsyncHttpPinotClientTransportFactory;
 import org.apache.pinot.client.ResultSetGroup;
 import org.apache.pinot.common.auth.UrlAuthProvider;
 import org.apache.pinot.controller.helix.ControllerRequestClient;
+import org.apache.pinot.controller.helix.core.minion.TaskSchedulingContext;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.spi.config.table.TableTaskConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -176,7 +177,7 @@ public class UrlAuthRealtimeIntegrationTest extends 
BaseClusterIntegrationTest {
     Assert.assertTrue(resultBeforeOffline.getResultSet(0).getLong(0) > 0);
 
     // schedule offline segment generation
-    
Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleAllTasksForAllTables(null));
+    Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleTasks(new 
TaskSchedulingContext()));
 
     // wait for offline segments
     List<String> offlineSegments = TestUtils.waitForResult(() -> {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 19fbb75216..f1f197bfcd 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -79,6 +79,10 @@ public class CommonConstants {
     CONSUMING, NOT_CONSUMING // In error state
   }
 
+  public enum TaskTriggers {
+    CRON_TRIGGER, MANUAL_TRIGGER, ADHOC_TRIGGER, UNKNOWN
+  }
+
   public static class Table {
     public static final String PUSH_FREQUENCY_HOURLY = "hourly";
     public static final String PUSH_FREQUENCY_DAILY = "daily";


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to