This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e1b0e5357e Refactor PinotTaskManager class (#12964)
e1b0e5357e is described below

commit e1b0e5357ebfcecffcc6cce3997a3edcdac1aa2c
Author: Pratik Tibrewal <tibrewalpra...@uber.com>
AuthorDate: Sat Apr 20 03:35:01 2024 +0530

    Refactor PinotTaskManager class (#12964)
---
 .../api/resources/PinotTaskRestletResource.java    |  14 +-
 .../helix/core/minion/CronJobScheduleJob.java      |   2 +-
 .../helix/core/minion/PinotTaskManager.java        | 162 +++++++++------------
 .../MergeRollupMinionClusterIntegrationTest.java   | 140 +++++++++---------
 .../tests/PurgeMinionClusterIntegrationTest.java   |  40 +++--
 ...fflineSegmentsMinionClusterIntegrationTest.java |  55 ++++---
 .../tests/SimpleMinionClusterIntegrationTest.java  |  45 +++---
 .../integration/tests/TlsIntegrationTest.java      |   2 +-
 .../tests/UpsertTableIntegrationTest.java          |  14 +-
 .../tests/UrlAuthRealtimeIntegrationTest.java      |   2 +-
 10 files changed, 217 insertions(+), 259 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index e09bde8466..0d9d3a05c1 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -618,22 +618,20 @@ public class PinotTaskRestletResource {
   @ApiOperation("Schedule tasks and return a map from task type to task name 
scheduled")
   public Map<String, String> scheduleTasks(@ApiParam(value = "Task type") 
@QueryParam("taskType") String taskType,
       @ApiParam(value = "Table name (with type suffix)") 
@QueryParam("tableName") String tableName,
-      @ApiParam(value = "Minion Instance tag to schedule the task explicitly 
on")
-      @QueryParam("minionInstanceTag") @Nullable String minionInstanceTag,
-      @Context HttpHeaders headers) {
+      @ApiParam(value = "Minion Instance tag to schedule the task explicitly 
on") @QueryParam("minionInstanceTag")
+      @Nullable String minionInstanceTag, @Context HttpHeaders headers) {
     String database = headers != null ? headers.getHeaderString(DATABASE) : 
DEFAULT_DATABASE;
     if (taskType != null) {
       // Schedule task for the given task type
-      List<String> taskNames = tableName != null
-          ? _pinotTaskManager.scheduleTask(taskType,
+      List<String> taskNames = tableName != null ? 
_pinotTaskManager.scheduleTaskForTable(taskType,
           DatabaseUtils.translateTableName(tableName, headers), 
minionInstanceTag)
           : _pinotTaskManager.scheduleTaskForDatabase(taskType, database, 
minionInstanceTag);
       return Collections.singletonMap(taskType, taskNames == null ? null : 
StringUtils.join(taskNames, ','));
     } else {
       // Schedule tasks for all task types
-      Map<String, List<String>> allTaskNames = tableName != null
-          ? 
_pinotTaskManager.scheduleTasks(DatabaseUtils.translateTableName(tableName, 
headers), minionInstanceTag)
-          : _pinotTaskManager.scheduleTasksForDatabase(database, 
minionInstanceTag);
+      Map<String, List<String>> allTaskNames = tableName != null ? 
_pinotTaskManager.scheduleAllTasksForTable(
+          DatabaseUtils.translateTableName(tableName, headers), 
minionInstanceTag)
+          : _pinotTaskManager.scheduleAllTasksForDatabase(database, 
minionInstanceTag);
       return allTaskNames.entrySet().stream()
           .collect(Collectors.toMap(Map.Entry::getKey, entry -> 
String.join(",", entry.getValue())));
     }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
index 8c0433854f..f9b250b2bc 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/CronJobScheduleJob.java
@@ -65,7 +65,7 @@ public class CronJobScheduleJob implements Job {
         return;
       }
       long jobStartTime = System.currentTimeMillis();
-      pinotTaskManager.scheduleTask(taskType, table);
+      pinotTaskManager.scheduleTaskForTable(taskType, table, null);
       LOGGER.info("Finished CronJob: table - {}, task - {}, next runtime is 
{}", table, taskType,
           jobExecutionContext.getNextFireTime());
       
pinotTaskManager.getControllerMetrics().addTimedTableValue(PinotTaskManager.getCronJobName(table,
 taskType),
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 4029944139..97417d6bea 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -22,7 +22,6 @@ import com.google.common.base.Preconditions;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -480,30 +479,72 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
   }
 
   /**
-   * Public API to schedule tasks (all task types) for all tables in all 
databases.
+   * Schedules tasks (all task types) for all tables.
    * It might be called from the non-leader controller.
    * Returns a map from the task type to the list of tasks scheduled.
    */
-  public synchronized Map<String, List<String>> scheduleTasks() {
-    return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false, 
null);
+  public synchronized Map<String, List<String>> 
scheduleAllTasksForAllTables(@Nullable String minionInstanceTag) {
+    return scheduleTasks(_pinotHelixResourceManager.getAllTables(), false, 
minionInstanceTag);
   }
 
   /**
-   * Public API to schedule tasks (all task types) for all tables in given 
database.
+   * Schedules tasks (all task types) for all tables in the given database.
    * It might be called from the non-leader controller.
    * Returns a map from the task type to the list of tasks scheduled.
    */
-  public synchronized Map<String, List<String>> 
scheduleTasksForDatabase(@Nullable String database,
+  public synchronized Map<String, List<String>> 
scheduleAllTasksForDatabase(@Nullable String database,
       @Nullable String minionInstanceTag) {
     return scheduleTasks(_pinotHelixResourceManager.getAllTables(database), 
false, minionInstanceTag);
   }
 
+  /**
+   * Schedules tasks (all task types) for the given table.
+   * It might be called from the non-leader controller.
+   * Returns a map from the task type to the list of tasks scheduled.
+   */
+  public synchronized Map<String, List<String>> 
scheduleAllTasksForTable(String tableNameWithType,
+      @Nullable String minionInstanceTag) {
+    return scheduleTasks(List.of(tableNameWithType), false, minionInstanceTag);
+  }
+
+  /**
+   * Schedules task for the given task type for all tables.
+   * It might be called from the non-leader controller.
+   * Returns a list of tasks scheduled, or {@code null} if no task is 
scheduled.
+   */
+  @Nullable
+  public synchronized List<String> scheduleTaskForAllTables(String taskType, 
@Nullable String minionInstanceTag) {
+    return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(), 
minionInstanceTag);
+  }
+
+  /**
+   * Schedules task for the given task type for all tables in the given 
database.
+   * It might be called from the non-leader controller.
+   * Returns a list of tasks scheduled, or {@code null} if no task is 
scheduled.
+   */
+  @Nullable
+  public synchronized List<String> scheduleTaskForDatabase(String taskType, 
@Nullable String database,
+      @Nullable String minionInstanceTag) {
+    return scheduleTask(taskType, 
_pinotHelixResourceManager.getAllTables(database), minionInstanceTag);
+  }
+
+  /**
+   * Schedules task for the given task type for the give table.
+   * It might be called from the non-leader controller.
+   * Returns a list of tasks scheduled, or {@code null} if no task is 
scheduled.
+   */
+  @Nullable
+  public synchronized List<String> scheduleTaskForTable(String taskType, 
String tableNameWithType,
+      @Nullable String minionInstanceTag) {
+    return scheduleTask(taskType, List.of(tableNameWithType), 
minionInstanceTag);
+  }
+
   /**
    * Helper method to schedule tasks (all task types) for the given tables 
that have the tasks enabled. Returns a map
    * from the task type to the list of the tasks scheduled.
    */
-  private synchronized Map<String, List<String>> scheduleTasks(List<String> 
tableNamesWithType,
-      boolean isLeader, @Nullable String minionInstanceTag) {
+  private synchronized Map<String, List<String>> scheduleTasks(List<String> 
tableNamesWithType, boolean isLeader,
+      @Nullable String minionInstanceTag) {
     
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED,
 1L);
 
     // Scan all table configs to get the tables with tasks enabled
@@ -541,6 +582,27 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
     return tasksScheduled;
   }
 
+  @Nullable
+  private synchronized List<String> scheduleTask(String taskType, List<String> 
tables,
+      @Nullable String minionInstanceTag) {
+    PinotTaskGenerator taskGenerator = 
_taskGeneratorRegistry.getTaskGenerator(taskType);
+    Preconditions.checkState(taskGenerator != null, "Task type: %s is not 
registered", taskType);
+
+    // Scan all table configs to get the tables with task enabled
+    List<TableConfig> enabledTableConfigs = new ArrayList<>();
+    for (String tableNameWithType : tables) {
+      TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig != null && tableConfig.getTaskConfig() != null && 
tableConfig.getTaskConfig()
+          .isTaskTypeEnabled(taskType)) {
+        enabledTableConfigs.add(tableConfig);
+      }
+    }
+
+    _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+    addTaskTypeMetricsUpdaterIfNeeded(taskType);
+    return scheduleTask(taskGenerator, enabledTableConfigs, false, 
minionInstanceTag);
+  }
+
   /**
    * Helper method to schedule task with the given task generator for the 
given tables that have the task enabled.
    * Returns the list of task names, or {@code null} if no task is scheduled.
@@ -554,8 +616,8 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
     for (TableConfig tableConfig : enabledTableConfigs) {
       String tableName = tableConfig.getTableName();
       try {
-        String minionInstanceTag = minionInstanceTagForTask != null
-            ? minionInstanceTagForTask : 
taskGenerator.getMinionInstanceTag(tableConfig);
+        String minionInstanceTag = minionInstanceTagForTask != null ? 
minionInstanceTagForTask
+            : taskGenerator.getMinionInstanceTag(tableConfig);
         List<PinotTaskConfig> presentTaskConfig =
             minionInstanceTagToTaskConfigs.computeIfAbsent(minionInstanceTag, 
k -> new ArrayList<>());
         taskGenerator.generateTasks(List.of(tableConfig), presentTaskConfig);
@@ -622,86 +684,6 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
     return submittedTaskNames;
   }
 
-  /**
-   * Public API to schedule tasks (all task types) for the given table. It 
might be called from the non-leader
-   * controller. Returns a map from the task type to the list of tasks 
scheduled.
-   */
-  public synchronized Map<String, List<String>> scheduleTasks(String 
tableNameWithType) {
-    return scheduleTasks(Collections.singletonList(tableNameWithType), false, 
null);
-  }
-
-  /**
-   * Public API to schedule tasks (all task types) for the given table on a 
specific instance tag.
-   * It might be called from the non-leader controller. Returns a map from the 
task type to the list of tasks scheduled.
-   */
-  public synchronized Map<String, List<String>> scheduleTasks(String 
tableNameWithType,
-      @Nullable String minionInstanceTag) {
-    return scheduleTasks(Collections.singletonList(tableNameWithType), false, 
minionInstanceTag);
-  }
-
-  /**
-   * Public API to schedule task for the given task type in all databases.
-   * It might be called from the non-leader controller.
-   * Returns the list of task names, or {@code null} if no task is scheduled.
-   */
-  @Nullable
-  public synchronized List<String> scheduleTask(String taskType, @Nullable 
String minionInstanceTag) {
-    return scheduleTask(taskType, _pinotHelixResourceManager.getAllTables(), 
minionInstanceTag);
-  }
-
-  /**
-   * Public API to schedule task for the given task type in given database.
-   * It might be called from the non-leader controller.
-   * Returns the list of task name, or {@code null} if no task is scheduled.
-   */
-  @Nullable
-  public synchronized List<String> scheduleTaskForDatabase(String taskType, 
@Nullable String database,
-      @Nullable String minionInstanceTag) {
-    return scheduleTask(taskType, 
_pinotHelixResourceManager.getAllTables(database), minionInstanceTag);
-  }
-
-  @Nullable
-  private List<String> scheduleTask(String taskType, List<String> tables, 
@Nullable String minionInstanceTag) {
-    PinotTaskGenerator taskGenerator = 
_taskGeneratorRegistry.getTaskGenerator(taskType);
-    Preconditions.checkState(taskGenerator != null, "Task type: %s is not 
registered", taskType);
-
-    // Scan all table configs to get the tables with task enabled
-    List<TableConfig> enabledTableConfigs = new ArrayList<>();
-    for (String tableNameWithType : tables) {
-      TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
-      if (tableConfig != null && tableConfig.getTaskConfig() != null && 
tableConfig.getTaskConfig()
-          .isTaskTypeEnabled(taskType)) {
-        enabledTableConfigs.add(tableConfig);
-      }
-    }
-
-    _helixTaskResourceManager.ensureTaskQueueExists(taskType);
-    addTaskTypeMetricsUpdaterIfNeeded(taskType);
-    return scheduleTask(taskGenerator, enabledTableConfigs, false, 
minionInstanceTag);
-  }
-
-  /**
-   * Public API to schedule task for the given task type on the given table. 
It might be called from the non-leader
-   * controller. Returns the list of task names, or {@code null} if no task is 
scheduled.
-   */
-  @Nullable
-  public synchronized List<String> scheduleTask(String taskType, String 
tableNameWithType,
-      @Nullable String minionInstanceTag) {
-    PinotTaskGenerator taskGenerator = 
_taskGeneratorRegistry.getTaskGenerator(taskType);
-    Preconditions.checkState(taskGenerator != null, "Task type: %s is not 
registered", taskType);
-
-    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
-    Preconditions.checkState(tableConfig != null, "Failed to find table config 
for table: %s", tableNameWithType);
-
-    Preconditions.checkState(
-        tableConfig.getTaskConfig() != null && 
tableConfig.getTaskConfig().isTaskTypeEnabled(taskType),
-        "Table: %s does not have task type: %s enabled", tableNameWithType, 
taskType);
-
-    _helixTaskResourceManager.ensureTaskQueueExists(taskType);
-    addTaskTypeMetricsUpdaterIfNeeded(taskType);
-    return scheduleTask(taskGenerator, Collections.singletonList(tableConfig), 
false, minionInstanceTag);
-  }
-
   @Override
   protected void processTables(List<String> tableNamesWithType, Properties 
taskProperties) {
     scheduleTasks(tableNamesWithType, true, null);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
index b655416c87..c5be600661 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java
@@ -139,14 +139,14 @@ public class MergeRollupMinionClusterIntegrationTest 
extends BaseClusterIntegrat
     List<File> avroFiles = unpackAvroData(_tempDir);
 
     // Create and upload segments
-    ClusterIntegrationTestUtils
-        .buildSegmentsFromAvro(avroFiles, singleLevelConcatTableConfig, 
schema, 0, _segmentDir1, _tarDir1);
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, 
singleLevelConcatTableConfig, schema, 0, _segmentDir1,
+        _tarDir1);
     buildSegmentsFromAvroWithPostfix(avroFiles, singleLevelRollupTableConfig, 
schema, 0, _segmentDir2, _tarDir2, "1");
     buildSegmentsFromAvroWithPostfix(avroFiles, singleLevelRollupTableConfig, 
schema, 0, _segmentDir2, _tarDir2, "2");
-    ClusterIntegrationTestUtils
-        .buildSegmentsFromAvro(avroFiles, multiLevelConcatTableConfig, schema, 
0, _segmentDir3, _tarDir3);
-    ClusterIntegrationTestUtils
-        .buildSegmentsFromAvro(avroFiles, 
singleLevelConcatMetadataTableConfig, schema, 0, _segmentDir4, _tarDir4);
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, 
multiLevelConcatTableConfig, schema, 0, _segmentDir3,
+        _tarDir3);
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, 
singleLevelConcatMetadataTableConfig, schema, 0,
+        _segmentDir4, _tarDir4);
     uploadSegments(SINGLE_LEVEL_CONCAT_TEST_TABLE, _tarDir1);
     uploadSegments(SINGLE_LEVEL_ROLLUP_TEST_TABLE, _tarDir2);
     uploadSegments(MULTI_LEVEL_CONCAT_TEST_TABLE, _tarDir3);
@@ -160,8 +160,8 @@ public class MergeRollupMinionClusterIntegrationTest 
extends BaseClusterIntegrat
     schema.setSchemaName(MULTI_LEVEL_CONCAT_PROCESS_ALL_REALTIME_TABLE);
     addSchema(schema);
     TableConfig singleLevelConcatProcessAllRealtimeTableConfig =
-        createRealtimeTableConfigWithProcessAllMode(avroFiles.get(0),
-            MULTI_LEVEL_CONCAT_PROCESS_ALL_REALTIME_TABLE, 
PROCESS_ALL_MODE_KAFKA_TOPIC);
+        createRealtimeTableConfigWithProcessAllMode(avroFiles.get(0), 
MULTI_LEVEL_CONCAT_PROCESS_ALL_REALTIME_TABLE,
+            PROCESS_ALL_MODE_KAFKA_TOPIC);
     addTableConfig(singleLevelConcatProcessAllRealtimeTableConfig);
 
     // Push data into Kafka
@@ -172,9 +172,8 @@ public class MergeRollupMinionClusterIntegrationTest 
extends BaseClusterIntegrat
     ClusterIntegrationTestUtils.pushAvroIntoKafka(avroFiles.subList(0, 3), 
"localhost:" + getKafkaPort(),
         PROCESS_ALL_MODE_KAFKA_TOPIC, getMaxNumKafkaMessagesPerBatch(), 
getKafkaMessageHeader(), getPartitionColumn(),
         injectTombstones());
-    ClusterIntegrationTestUtils
-        .buildSegmentsFromAvro(avroFiles.subList(3, 9), 
singleLevelConcatProcessAllRealtimeTableConfig, schema, 0,
-            _segmentDir5, _tarDir5);
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles.subList(3, 9),
+        singleLevelConcatProcessAllRealtimeTableConfig, schema, 0, 
_segmentDir5, _tarDir5);
     // Wait for all documents loaded
     waitForAllDocsLoaded(600_000L);
 
@@ -216,14 +215,14 @@ public class MergeRollupMinionClusterIntegrationTest 
extends BaseClusterIntegrat
 
   private TableConfig createOfflineTableConfig(String tableName, 
TableTaskConfig taskConfig,
       @Nullable SegmentPartitionConfig partitionConfig) {
-    return new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName)
-        
.setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn())
-        
.setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns())
-        
.setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
-        
.setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
-        
.setLoadMode(getLoadMode()).setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant())
-        
.setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
-        
.setNullHandlingEnabled(getNullHandlingEnabled()).setSegmentPartitionConfig(partitionConfig).build();
+    return new 
TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).setTimeColumnName(getTimeColumnName())
+        
.setSortedColumn(getSortedColumn()).setInvertedIndexColumns(getInvertedIndexColumns())
+        
.setNoDictionaryColumns(getNoDictionaryColumns()).setRangeIndexColumns(getRangeIndexColumns())
+        
.setBloomFilterColumns(getBloomFilterColumns()).setFieldConfigList(getFieldConfigs())
+        
.setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode())
+        
.setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant())
+        
.setIngestionConfig(getIngestionConfig()).setNullHandlingEnabled(getNullHandlingEnabled())
+        .setSegmentPartitionConfig(partitionConfig).build();
   }
 
   protected TableConfig createRealtimeTableConfigWithProcessAllMode(File 
sampleAvroFile, String tableName,
@@ -246,12 +245,12 @@ public class MergeRollupMinionClusterIntegrationTest 
extends BaseClusterIntegrat
     tableTaskConfigs.put("ActualElapsedTime.aggregationType", "min");
     tableTaskConfigs.put("WeatherDelay.aggregationType", "sum");
     tableTaskConfigs.put("mode", "processAll");
-    return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName)
-        
.setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn())
-        
.setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns())
-        
.setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
-        
.setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
-        .setLoadMode(getLoadMode()).setTaskConfig(
+    return new 
TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setTimeColumnName(getTimeColumnName())
+        
.setSortedColumn(getSortedColumn()).setInvertedIndexColumns(getInvertedIndexColumns())
+        
.setNoDictionaryColumns(getNoDictionaryColumns()).setRangeIndexColumns(getRangeIndexColumns())
+        
.setBloomFilterColumns(getBloomFilterColumns()).setFieldConfigList(getFieldConfigs())
+        
.setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode())
+        .setTaskConfig(
             new 
TableTaskConfig(Collections.singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE,
 tableTaskConfigs)))
         
.setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
         .setQueryConfig(getQueryConfig()).setStreamConfigs(streamConfigs)
@@ -411,17 +410,16 @@ public class MergeRollupMinionClusterIntegrationTest 
extends BaseClusterIntegrat
     int numTasks = 0;
     List<String> taskList;
     for (String tasks =
-        
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
-        tasks != null;
-        taskList = 
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
-        tasks = taskList != null ? taskList.get(0) : null,
-        numTasks++) {
+        _taskManager.scheduleAllTasksForTable(offlineTableName, 
null).get(MinionConstants.MergeRollupTask.TASK_TYPE)
+            .get(0); tasks != null; taskList =
+        _taskManager.scheduleAllTasksForTable(offlineTableName, 
null).get(MinionConstants.MergeRollupTask.TASK_TYPE),
+        tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
       assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), 
expectedNumSubTasks[numTasks]);
       assertTrue(_helixTaskResourceManager.getTaskQueues()
           
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
       // Will not schedule task if there's incomplete task
-      assertNull(
-          
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+      assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+          .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
       waitForTaskToComplete();
 
       // Check watermark
@@ -527,17 +525,16 @@ public class MergeRollupMinionClusterIntegrationTest 
extends BaseClusterIntegrat
     int numTasks = 0;
     List<String> taskList;
     for (String tasks =
-        
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
-        tasks != null;
-        taskList = 
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
-        tasks = taskList != null ? taskList.get(0) : null,
-        numTasks++) {
+        _taskManager.scheduleAllTasksForTable(offlineTableName, 
null).get(MinionConstants.MergeRollupTask.TASK_TYPE)
+            .get(0); tasks != null; taskList =
+        _taskManager.scheduleAllTasksForTable(offlineTableName, 
null).get(MinionConstants.MergeRollupTask.TASK_TYPE),
+        tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
       assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), 
expectedNumSubTasks[numTasks]);
       assertTrue(_helixTaskResourceManager.getTaskQueues()
           
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
       // Will not schedule task if there's incomplete task
-      assertNull(
-          
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+      assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+          .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
       waitForTaskToComplete();
 
       // Check watermark
@@ -636,17 +633,16 @@ public class MergeRollupMinionClusterIntegrationTest 
extends BaseClusterIntegrat
     int numTasks = 0;
     List<String> taskList;
     for (String tasks =
-        
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
-        tasks != null;
-        taskList = 
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
-        tasks = taskList != null ? taskList.get(0) : null,
-        numTasks++) {
+        _taskManager.scheduleAllTasksForTable(offlineTableName, 
null).get(MinionConstants.MergeRollupTask.TASK_TYPE)
+            .get(0); tasks != null; taskList =
+        _taskManager.scheduleAllTasksForTable(offlineTableName, 
null).get(MinionConstants.MergeRollupTask.TASK_TYPE),
+        tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
       assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), 
1);
       assertTrue(_helixTaskResourceManager.getTaskQueues()
           
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
       // Will not schedule task if there's incomplete task
-      assertNull(
-          
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+      assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+          .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
       waitForTaskToComplete();
 
       // Check watermark
@@ -788,17 +784,16 @@ public class MergeRollupMinionClusterIntegrationTest 
extends BaseClusterIntegrat
     int numTasks = 0;
     List<String> taskList;
     for (String tasks =
-        
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
-        tasks != null;
-        taskList = 
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
-        tasks = taskList != null ? taskList.get(0) : null,
-        numTasks++) {
+        _taskManager.scheduleAllTasksForTable(offlineTableName, 
null).get(MinionConstants.MergeRollupTask.TASK_TYPE)
+            .get(0); tasks != null; taskList =
+        _taskManager.scheduleAllTasksForTable(offlineTableName, 
null).get(MinionConstants.MergeRollupTask.TASK_TYPE),
+        tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
       assertEquals(_helixTaskResourceManager.getSubtaskConfigs(tasks).size(), 
expectedNumSubTasks[numTasks]);
       assertTrue(_helixTaskResourceManager.getTaskQueues()
           
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
       // Will not schedule task if there's incomplete task
-      assertNull(
-          
_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+      assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, null)
+          .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
       waitForTaskToComplete();
 
       // Check watermark
@@ -859,8 +854,8 @@ public class MergeRollupMinionClusterIntegrationTest 
extends BaseClusterIntegrat
         return false;
       }
       // Check if the task metadata is cleaned up
-      if (MinionTaskMetadataUtils
-          .fetchTaskMetadata(_propertyStore, 
MinionConstants.MergeRollupTask.TASK_TYPE, tableNameWithType) != null) {
+      if (MinionTaskMetadataUtils.fetchTaskMetadata(_propertyStore, 
MinionConstants.MergeRollupTask.TASK_TYPE,
+          tableNameWithType) != null) {
         return false;
       }
       return true;
@@ -921,18 +916,17 @@ public class MergeRollupMinionClusterIntegrationTest 
extends BaseClusterIntegrat
     int numTasks = 0;
     List<String> taskList;
     for (String tasks =
-        
taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
-        tasks != null;
-        taskList = 
taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
-        tasks = taskList != null ? taskList.get(0) : null,
-        numTasks++) {
+        taskManager.scheduleAllTasksForTable(realtimeTableName, 
null).get(MinionConstants.MergeRollupTask.TASK_TYPE)
+            .get(0); tasks != null; taskList =
+        taskManager.scheduleAllTasksForTable(realtimeTableName, 
null).get(MinionConstants.MergeRollupTask.TASK_TYPE),
+        tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
 //      assertEquals(helixTaskResourceManager.getSubtaskConfigs(tasks).size(), 
expectedNumSubTasks[numTasks]);
       assertTrue(helixTaskResourceManager.getTaskQueues()
           
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
 
       // Will not schedule task if there's incomplete task
-      assertNull(
-          
taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+      assertNull(taskManager.scheduleAllTasksForTable(realtimeTableName, null)
+          .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
       waitForTaskToComplete();
 
       // Check watermark
@@ -1027,17 +1021,16 @@ public class MergeRollupMinionClusterIntegrationTest 
extends BaseClusterIntegrat
     int numTasks = 0;
     List<String> taskList;
     for (String tasks =
-        
taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
-        tasks != null; taskList =
-        
taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
-        tasks = taskList != null ? taskList.get(0) : null,
-        numTasks++) {
+        taskManager.scheduleAllTasksForTable(realtimeTableName, 
null).get(MinionConstants.MergeRollupTask.TASK_TYPE)
+            .get(0); tasks != null; taskList =
+        taskManager.scheduleAllTasksForTable(realtimeTableName, 
null).get(MinionConstants.MergeRollupTask.TASK_TYPE),
+        tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
       assertTrue(helixTaskResourceManager.getTaskQueues()
           
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.MergeRollupTask.TASK_TYPE)));
 
       // Will not schedule task if there's incomplete task
-      assertNull(
-          
taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+      assertNull(taskManager.scheduleAllTasksForTable(realtimeTableName, null)
+          .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
       waitForTaskToComplete();
 
       // Check not using watermarks
@@ -1069,11 +1062,10 @@ public class MergeRollupMinionClusterIntegrationTest 
extends BaseClusterIntegrat
     waitForAllDocsLoaded(600_000L);
 
     for (String tasks =
-        
taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE).get(0);
-        tasks != null; taskList =
-        
taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
-        tasks = taskList != null ? taskList.get(0) : null,
-        numTasks++) {
+        taskManager.scheduleAllTasksForTable(realtimeTableName, 
null).get(MinionConstants.MergeRollupTask.TASK_TYPE)
+            .get(0); tasks != null; taskList =
+        taskManager.scheduleAllTasksForTable(realtimeTableName, 
null).get(MinionConstants.MergeRollupTask.TASK_TYPE),
+        tasks = taskList != null ? taskList.get(0) : null, numTasks++) {
       waitForTaskToComplete();
       // Check metrics
       long numBucketsToProcess = 
MetricValueUtils.getGaugeValue(_controllerStarter.getControllerMetrics(),
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
index c4ba131f6d..da4e85696c 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PurgeMinionClusterIntegrationTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.integration.tests;
 
-import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -63,7 +62,6 @@ public class PurgeMinionClusterIntegrationTest extends 
BaseClusterIntegrationTes
   private static final String PURGE_DELTA_NOT_PASSED_TABLE = "myTable3";
   private static final String PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE = 
"myTable4";
 
-
   protected PinotHelixTaskResourceManager _helixTaskResourceManager;
   protected PinotTaskManager _taskManager;
   protected PinotHelixResourceManager _pinotHelixResourceManager;
@@ -83,12 +81,8 @@ public class PurgeMinionClusterIntegrationTest extends 
BaseClusterIntegrationTes
     startBrokers(1);
     startServers(1);
 
-    List<String> allTables = ImmutableList.of(
-        PURGE_FIRST_RUN_TABLE,
-        PURGE_DELTA_PASSED_TABLE,
-        PURGE_DELTA_NOT_PASSED_TABLE,
-        PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE
-    );
+    List<String> allTables = List.of(PURGE_FIRST_RUN_TABLE, 
PURGE_DELTA_PASSED_TABLE, PURGE_DELTA_NOT_PASSED_TABLE,
+        PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
     Schema schema = null;
     TableConfig tableConfig = null;
     for (String tableName : allTables) {
@@ -152,12 +146,9 @@ public class PurgeMinionClusterIntegrationTest extends 
BaseClusterIntegrationTes
   private void setRecordPurger() {
     MinionContext minionContext = MinionContext.getInstance();
     minionContext.setRecordPurgerFactory(rawTableName -> {
-      List<String> tableNames = Arrays.asList(
-          PURGE_FIRST_RUN_TABLE,
-          PURGE_DELTA_PASSED_TABLE,
-          PURGE_DELTA_NOT_PASSED_TABLE,
-          PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE
-      );
+      List<String> tableNames =
+          Arrays.asList(PURGE_FIRST_RUN_TABLE, PURGE_DELTA_PASSED_TABLE, 
PURGE_DELTA_NOT_PASSED_TABLE,
+              PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
       if (tableNames.contains(rawTableName)) {
         return row -> row.getValue("ArrTime").equals(1);
       } else {
@@ -195,11 +186,12 @@ public class PurgeMinionClusterIntegrationTest extends 
BaseClusterIntegrationTes
     // 5. Check the purge process itself by setting an expecting number of rows
 
     String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(PURGE_FIRST_RUN_TABLE);
-    
assertNotNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+    assertNotNull(
+        _taskManager.scheduleAllTasksForTable(offlineTableName, 
null).get(MinionConstants.PurgeTask.TASK_TYPE));
     assertTrue(_helixTaskResourceManager.getTaskQueues()
         
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE)));
     // Will not schedule task if there's incomplete task
-    
assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+    assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, 
null).get(MinionConstants.PurgeTask.TASK_TYPE));
     waitForTaskToComplete();
 
     // Check that metadata contains expected values
@@ -209,7 +201,7 @@ public class PurgeMinionClusterIntegrationTest extends 
BaseClusterIntegrationTes
           
metadata.getCustomMap().containsKey(MinionConstants.PurgeTask.TASK_TYPE + 
MinionConstants.TASK_TIME_SUFFIX));
     }
     // Should not generate new purge task as the last time purge is not 
greater than last + 1day (default purge delay)
-    
assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+    assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, 
null).get(MinionConstants.PurgeTask.TASK_TYPE));
 
     // 52 rows with ArrTime = 1
     // 115545 totals rows
@@ -239,11 +231,12 @@ public class PurgeMinionClusterIntegrationTest extends 
BaseClusterIntegrationTes
     // 5. Check the purge process itself by setting an expecting number of rows
 
     String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_PASSED_TABLE);
-    
assertNotNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+    assertNotNull(
+        _taskManager.scheduleAllTasksForTable(offlineTableName, 
null).get(MinionConstants.PurgeTask.TASK_TYPE));
     assertTrue(_helixTaskResourceManager.getTaskQueues()
         
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE)));
     // Will not schedule task if there's incomplete task
-    
assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+    assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, 
null).get(MinionConstants.PurgeTask.TASK_TYPE));
     waitForTaskToComplete();
 
     // Check that metadata contains expected values
@@ -255,7 +248,7 @@ public class PurgeMinionClusterIntegrationTest extends 
BaseClusterIntegrationTes
       assertTrue(System.currentTimeMillis() - Long.parseLong(purgeTime) < 
86400000);
     }
     // Should not generate new purge task as the last time purge is not 
greater than last + 1day (default purge delay)
-    
assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+    assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, 
null).get(MinionConstants.PurgeTask.TASK_TYPE));
 
     // 52 rows with ArrTime = 1
     // 115545 totals rows
@@ -287,7 +280,7 @@ public class PurgeMinionClusterIntegrationTest extends 
BaseClusterIntegrationTes
     String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(PURGE_DELTA_NOT_PASSED_TABLE);
 
     // No task should be schedule as the delay is not passed
-    
assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+    assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, 
null).get(MinionConstants.PurgeTask.TASK_TYPE));
     for (SegmentZKMetadata metadata : 
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
       // Check purge time
       String purgeTime =
@@ -338,10 +331,11 @@ public class PurgeMinionClusterIntegrationTest extends 
BaseClusterIntegrationTes
 
     // schedule purge tasks
     String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(PURGE_OLD_SEGMENTS_WITH_NEW_INDICES_TABLE);
-    
assertNotNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+    assertNotNull(
+        _taskManager.scheduleAllTasksForTable(offlineTableName, 
null).get(MinionConstants.PurgeTask.TASK_TYPE));
     assertTrue(_helixTaskResourceManager.getTaskQueues()
         
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.PurgeTask.TASK_TYPE)));
-    
assertNull(_taskManager.scheduleTasks(offlineTableName).get(MinionConstants.PurgeTask.TASK_TYPE));
+    assertNull(_taskManager.scheduleAllTasksForTable(offlineTableName, 
null).get(MinionConstants.PurgeTask.TASK_TYPE));
     waitForTaskToComplete();
 
     // Check that metadata contains expected values
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
index 043c654ef7..e6c8ce2700 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
@@ -134,14 +134,14 @@ public class 
RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC
 
     Map<String, String> taskConfigsWithMetadata = new HashMap<>();
     taskConfigsWithMetadata.put(BatchConfigProperties.OVERWRITE_OUTPUT, 
"true");
-    taskConfigsWithMetadata.put(
-        BatchConfigProperties.PUSH_MODE, 
BatchConfigProperties.SegmentPushType.METADATA.toString());
+    taskConfigsWithMetadata.put(BatchConfigProperties.PUSH_MODE,
+        BatchConfigProperties.SegmentPushType.METADATA.toString());
     String tableWithMetadataPush = "myTable2";
     schema.setSchemaName(tableWithMetadataPush);
     addSchema(schema);
     TableConfig realtimeMetadataTableConfig = 
createRealtimeTableConfig(avroFiles.get(0), tableWithMetadataPush,
-        new TableTaskConfig(Collections.singletonMap(
-            MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, 
taskConfigsWithMetadata)));
+        new 
TableTaskConfig(Collections.singletonMap(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
+            taskConfigsWithMetadata)));
     realtimeMetadataTableConfig.setIngestionConfig(ingestionConfig);
     
realtimeMetadataTableConfig.setFieldConfigList(Collections.singletonList(tsFieldConfig));
     addTableConfig(realtimeMetadataTableConfig);
@@ -151,7 +151,6 @@ public class 
RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC
     
offlineMetadataTableConfig.setFieldConfigList(Collections.singletonList(tsFieldConfig));
     addTableConfig(offlineMetadataTableConfig);
 
-
     // Push data into Kafka
     pushAvroIntoKafka(avroFiles);
 
@@ -163,7 +162,6 @@ public class 
RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC
 
     waitForDocsLoaded(600_000L, true, tableWithMetadataPush);
 
-
     _taskResourceManager = _controllerStarter.getHelixTaskResourceManager();
     _taskManager = _controllerStarter.getTaskManager();
     _realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(getTableName());
@@ -181,8 +179,8 @@ public class 
RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC
     }
     _dataSmallestTimeMs = minSegmentTimeMs;
 
-   segmentsZKMetadata = 
_helixResourceManager.getSegmentsZKMetadata(_realtimeMetadataTableName);
-   minSegmentTimeMs = Long.MAX_VALUE;
+    segmentsZKMetadata = 
_helixResourceManager.getSegmentsZKMetadata(_realtimeMetadataTableName);
+    minSegmentTimeMs = Long.MAX_VALUE;
     for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
       if (segmentZKMetadata.getStatus() == 
CommonConstants.Segment.Realtime.Status.DONE) {
         minSegmentTimeMs = Math.min(minSegmentTimeMs, 
segmentZKMetadata.getStartTimeMs());
@@ -193,29 +191,28 @@ public class 
RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC
 
   private TableConfig createOfflineTableConfig(String tableName, @Nullable 
TableTaskConfig taskConfig,
       @Nullable SegmentPartitionConfig partitionConfig) {
-    return new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName)
-        
.setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn())
-        
.setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns())
-        
.setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
-        
.setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
-        
.setLoadMode(getLoadMode()).setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant())
-        
.setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
-        
.setNullHandlingEnabled(getNullHandlingEnabled()).setSegmentPartitionConfig(partitionConfig).build();
+    return new 
TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).setTimeColumnName(getTimeColumnName())
+        
.setSortedColumn(getSortedColumn()).setInvertedIndexColumns(getInvertedIndexColumns())
+        
.setNoDictionaryColumns(getNoDictionaryColumns()).setRangeIndexColumns(getRangeIndexColumns())
+        
.setBloomFilterColumns(getBloomFilterColumns()).setFieldConfigList(getFieldConfigs())
+        
.setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode())
+        
.setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant())
+        
.setIngestionConfig(getIngestionConfig()).setNullHandlingEnabled(getNullHandlingEnabled())
+        .setSegmentPartitionConfig(partitionConfig).build();
   }
 
   protected TableConfig createRealtimeTableConfig(File sampleAvroFile, String 
tableName, TableTaskConfig taskConfig) {
     AvroFileSchemaKafkaAvroMessageDecoder._avroFile = sampleAvroFile;
-    return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName)
-        
.setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn())
-        
.setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns())
-        
.setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns())
-        
.setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion())
-        
.setLoadMode(getLoadMode()).setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant())
-        
.setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()).setQueryConfig(getQueryConfig())
-        
.setStreamConfigs(getStreamConfigs()).setNullHandlingEnabled(getNullHandlingEnabled()).build();
+    return new 
TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setTimeColumnName(getTimeColumnName())
+        
.setSortedColumn(getSortedColumn()).setInvertedIndexColumns(getInvertedIndexColumns())
+        
.setNoDictionaryColumns(getNoDictionaryColumns()).setRangeIndexColumns(getRangeIndexColumns())
+        
.setBloomFilterColumns(getBloomFilterColumns()).setFieldConfigList(getFieldConfigs())
+        
.setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode())
+        
.setTaskConfig(taskConfig).setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant())
+        
.setIngestionConfig(getIngestionConfig()).setQueryConfig(getQueryConfig()).setStreamConfigs(getStreamConfigs())
+        .setNullHandlingEnabled(getNullHandlingEnabled()).build();
   }
 
-
   @Test
   public void testRealtimeToOfflineSegmentsTask()
       throws Exception {
@@ -234,12 +231,12 @@ public class 
RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC
     long expectedWatermark = _dataSmallestTimeMs + 86400000;
     for (int i = 0; i < 3; i++) {
       // Schedule task
-      assertNotNull(_taskManager.scheduleTasks(_realtimeTableName)
+      assertNotNull(_taskManager.scheduleAllTasksForTable(_realtimeTableName, 
null)
           .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
       assertTrue(_taskResourceManager.getTaskQueues().contains(
           
PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)));
       // Should not generate more tasks
-      assertNull(_taskManager.scheduleTasks(_realtimeTableName)
+      assertNull(_taskManager.scheduleAllTasksForTable(_realtimeTableName, 
null)
           .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
 
       // Wait at most 600 seconds for all tasks COMPLETED
@@ -286,12 +283,12 @@ public class 
RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC
     _taskManager.cleanUpTask();
     for (int i = 0; i < 3; i++) {
       // Schedule task
-      assertNotNull(_taskManager.scheduleTasks(_realtimeMetadataTableName)
+      
assertNotNull(_taskManager.scheduleAllTasksForTable(_realtimeMetadataTableName, 
null)
           .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
       assertTrue(_taskResourceManager.getTaskQueues().contains(
           
PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)));
       // Should not generate more tasks
-      assertNull(_taskManager.scheduleTasks(_realtimeMetadataTableName)
+      
assertNull(_taskManager.scheduleAllTasksForTable(_realtimeMetadataTableName, 
null)
           .get(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
 
       // Wait at most 600 seconds for all tasks COMPLETED
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index 241c1c0876..78aa4d1c24 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -87,8 +87,8 @@ public class SimpleMinionClusterIntegrationTest extends 
ClusterTest {
     properties.put(TASK_TYPE + 
MinionConstants.MAX_ATTEMPTS_PER_TASK_KEY_SUFFIX, "2");
 
     helixResourceManager.getHelixAdmin().setConfig(
-        new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER)
-            .forCluster(helixResourceManager.getHelixClusterName()).build(), 
properties);
+        new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(
+            helixResourceManager.getHelixClusterName()).build(), properties);
 
     // Add 3 offline tables, where 2 of them have TestTask enabled
     addDummySchema(TABLE_NAME_1);
@@ -136,7 +136,7 @@ public class SimpleMinionClusterIntegrationTest extends 
ClusterTest {
     
assertEquals(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).size(), 0);
 
     // Should create the task queues and generate a task in the same minion 
instance
-    List<String> task1 = _taskManager.scheduleTasks().get(TASK_TYPE);
+    List<String> task1 = 
_taskManager.scheduleAllTasksForAllTables(null).get(TASK_TYPE);
     assertNotNull(task1);
     assertEquals(task1.size(), 1);
     assertTrue(_helixTaskResourceManager.getTaskQueues()
@@ -150,7 +150,7 @@ public class SimpleMinionClusterIntegrationTest extends 
ClusterTest {
     verifyTaskCount(task1.get(0), 0, 1, 1, 2);
     // Should generate one more task, with two sub-tasks. Both of these 
sub-tasks will wait
     // since we have one minion instance that is still running one of the 
sub-tasks.
-    List<String> task2 = _taskManager.scheduleTask(TASK_TYPE, null);
+    List<String> task2 = _taskManager.scheduleTaskForAllTables(TASK_TYPE, 
null);
     assertNotNull(task2);
     assertEquals(task2.size(), 1);
     
assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task2.get(0)));
@@ -159,8 +159,8 @@ public class SimpleMinionClusterIntegrationTest extends 
ClusterTest {
     // Should not generate more tasks since 
SimpleMinionClusterIntegrationTests.NUM_TASKS is 2.
     // Our test task generator does not generate if there are already this 
many sub-tasks in the
     // running+waiting count already.
-    assertNull(_taskManager.scheduleTasks().get(TASK_TYPE));
-    assertNull(_taskManager.scheduleTask(TASK_TYPE, null));
+    assertNull(_taskManager.scheduleAllTasksForAllTables(null).get(TASK_TYPE));
+    assertNull(_taskManager.scheduleTaskForAllTables(TASK_TYPE, null));
 
     // Wait at most 60 seconds for all tasks IN_PROGRESS
     TestUtils.waitForCondition(input -> {
@@ -183,13 +183,12 @@ public class SimpleMinionClusterIntegrationTest extends 
ClusterTest {
     String inProgressGauge = TASK_TYPE + "." + TaskState.IN_PROGRESS;
     String stoppedGauge = TASK_TYPE + "." + TaskState.STOPPED;
     String completedGauge = TASK_TYPE + "." + TaskState.COMPLETED;
-    TestUtils.waitForCondition(
-        input -> MetricValueUtils.getGlobalGaugeValue(controllerMetrics, 
inProgressGauge, ControllerGauge.TASK_STATUS)
+    TestUtils.waitForCondition(input ->
+        MetricValueUtils.getGlobalGaugeValue(controllerMetrics, 
inProgressGauge, ControllerGauge.TASK_STATUS)
             == NUM_TASKS
             && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, 
stoppedGauge, ControllerGauge.TASK_STATUS) == 0
             && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, 
completedGauge, ControllerGauge.TASK_STATUS)
-            == 0,
-        ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
+            == 0, ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller 
gauges");
 
     // Stop the task queue
     _helixTaskResourceManager.stopTaskQueue(TASK_TYPE);
@@ -211,14 +210,12 @@ public class SimpleMinionClusterIntegrationTest extends 
ClusterTest {
     }, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks STOPPED");
 
     // Wait at most 30 seconds for ZK callback to update the controller gauges
-    TestUtils.waitForCondition(
-        input -> MetricValueUtils.getGlobalGaugeValue(controllerMetrics, 
inProgressGauge, ControllerGauge.TASK_STATUS)
-            == 0
+    TestUtils.waitForCondition(input ->
+        MetricValueUtils.getGlobalGaugeValue(controllerMetrics, 
inProgressGauge, ControllerGauge.TASK_STATUS) == 0
             && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, 
stoppedGauge, ControllerGauge.TASK_STATUS)
             == NUM_TASKS
             && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, 
completedGauge, ControllerGauge.TASK_STATUS)
-            == 0,
-        ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
+            == 0, ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller 
gauges");
 
     // Task deletion requires the task queue to be stopped,
     // so deleting task1 here before resuming the task queue.
@@ -247,13 +244,11 @@ public class SimpleMinionClusterIntegrationTest extends 
ClusterTest {
     }, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks COMPLETED");
 
     // Wait at most 30 seconds for ZK callback to update the controller gauges
-    TestUtils.waitForCondition(
-        input -> MetricValueUtils.getGlobalGaugeValue(controllerMetrics, 
inProgressGauge, ControllerGauge.TASK_STATUS)
-            == 0
+    TestUtils.waitForCondition(input ->
+        MetricValueUtils.getGlobalGaugeValue(controllerMetrics, 
inProgressGauge, ControllerGauge.TASK_STATUS) == 0
             && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, 
stoppedGauge, ControllerGauge.TASK_STATUS) == 0
-            && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, 
completedGauge, ControllerGauge.TASK_STATUS)
-            == (NUM_TASKS - 1),
-        ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
+            && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, 
completedGauge, ControllerGauge.TASK_STATUS) == (
+            NUM_TASKS - 1), ZK_CALLBACK_TIMEOUT_MS, "Failed to update the 
controller gauges");
 
     // Delete the task queue
     _helixTaskResourceManager.deleteTaskQueue(TASK_TYPE, false);
@@ -263,13 +258,11 @@ public class SimpleMinionClusterIntegrationTest extends 
ClusterTest {
         STATE_TRANSITION_TIMEOUT_MS, "Failed to delete the task queue");
 
     // Wait at most 30 seconds for ZK callback to update the controller gauges
-    TestUtils.waitForCondition(
-        input -> MetricValueUtils.getGlobalGaugeValue(controllerMetrics, 
inProgressGauge, ControllerGauge.TASK_STATUS)
-            == 0
+    TestUtils.waitForCondition(input ->
+        MetricValueUtils.getGlobalGaugeValue(controllerMetrics, 
inProgressGauge, ControllerGauge.TASK_STATUS) == 0
             && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, 
stoppedGauge, ControllerGauge.TASK_STATUS) == 0
             && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, 
completedGauge, ControllerGauge.TASK_STATUS)
-            == 0,
-        ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges");
+            == 0, ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller 
gauges");
   }
 
   @AfterClass
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java
index d292ef4c9b..5058fd4b75 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java
@@ -489,7 +489,7 @@ public class TlsIntegrationTest extends 
BaseClusterIntegrationTest {
     Assert.assertTrue(resultBeforeOffline.getResultSet(0).getLong(0) > 0);
 
     // schedule offline segment generation
-    Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleTasks());
+    
Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleAllTasksForAllTables(null));
 
     // wait for offline segments
     JsonNode offlineSegments = TestUtils.waitForResult(() -> {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
index 238d515b54..19c3ac61ff 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
@@ -471,8 +471,8 @@ public class UpsertTableIntegrationTest extends 
BaseClusterIntegrationTestSet {
     waitForAllDocsLoaded(tableName, 600_000L, 1000);
     assertEquals(getScore(tableName), 3692);
     waitForNumQueriedSegmentsToConverge(tableName, 10_000L, 3);
-
-    
assertNotNull(_taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName))
+    String realtimeTableName = 
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
+    assertNotNull(_taskManager.scheduleAllTasksForTable(realtimeTableName, 
null)
         .get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
     waitForTaskToComplete();
     waitForAllDocsLoaded(tableName, 600_000L, 3);
@@ -501,8 +501,8 @@ public class UpsertTableIntegrationTest extends 
BaseClusterIntegrationTestSet {
     waitForAllDocsLoaded(tableName, 600_000L, 2000);
     assertEquals(getScore(tableName), 3692);
     waitForNumQueriedSegmentsToConverge(tableName, 10_000L, 5);
-
-    
assertNotNull(_taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName))
+    String realtimeTableName = 
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
+    assertNotNull(_taskManager.scheduleAllTasksForTable(realtimeTableName, 
null)
         .get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
     waitForTaskToComplete();
     waitForAllDocsLoaded(tableName, 600_000L, 3);
@@ -546,7 +546,8 @@ public class UpsertTableIntegrationTest extends 
BaseClusterIntegrationTestSet {
 
     // Run segment compaction. This time, we expect that the deleting rows are 
still there because they are
     // as part of the consuming segment
-    
assertNotNull(_taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName))
+    String realtimeTableName = 
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
+    assertNotNull(_taskManager.scheduleAllTasksForTable(realtimeTableName, 
null)
         .get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
     waitForTaskToComplete();
     waitForAllDocsLoaded(tableName, 600_000L, 3);
@@ -563,7 +564,8 @@ public class UpsertTableIntegrationTest extends 
BaseClusterIntegrationTestSet {
     assertEquals(getNumDeletedRows(tableName), 2);
 
     // Run segment compaction. This time, we expect that the deleting rows are 
cleaned up
-    
assertNotNull(_taskManager.scheduleTasks(TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName))
+    realtimeTableName = 
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
+    assertNotNull(_taskManager.scheduleAllTasksForTable(realtimeTableName, 
null)
         .get(MinionConstants.UpsertCompactionTask.TASK_TYPE));
     waitForTaskToComplete();
     waitForAllDocsLoaded(tableName, 600_000L, 3);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java
index e8389b377f..08aa9aee6a 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java
@@ -203,7 +203,7 @@ public class UrlAuthRealtimeIntegrationTest extends 
BaseClusterIntegrationTest {
     Assert.assertTrue(resultBeforeOffline.getResultSet(0).getLong(0) > 0);
 
     // schedule offline segment generation
-    Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleTasks());
+    
Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleAllTasksForAllTables(null));
 
     // wait for offline segments
     JsonNode offlineSegments = TestUtils.waitForResult(() -> {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to