swaminathanmanish commented on code in PR #12459:
URL: https://github.com/apache/pinot/pull/12459#discussion_r1529417960


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -483,15 +484,15 @@ public void registerTaskGenerator(PinotTaskGenerator 
taskGenerator) {
    * Public API to schedule tasks (all task types) for all tables. It might be 
called from the non-leader controller.
    * Returns a map from the task type to the task scheduled.

Review Comment:
   Can you update doc as well ? 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java:
##########
@@ -534,66 +535,85 @@ private synchronized Map<String, String> 
scheduleTasks(List<String> tableNamesWi
    * Returns the task name, or {@code null} if no task is scheduled.
    */
   @Nullable
-  private String scheduleTask(PinotTaskGenerator taskGenerator, 
List<TableConfig> enabledTableConfigs,
+  private List<String> scheduleTask(PinotTaskGenerator taskGenerator, 
List<TableConfig> enabledTableConfigs,
       boolean isLeader) {
     LOGGER.info("Trying to schedule task type: {}, isLeader: {}", 
taskGenerator.getTaskType(), isLeader);
-    List<PinotTaskConfig> pinotTaskConfigs;
-    try {
-      /* TODO taskGenerator may skip generating tasks for some of the tables 
being passed to it.
-        In that case, we should not be storing success timestamps for those 
table. Same with exceptions that should
-        only be associated with the table for which it was raised and not 
every eligible table. We can have the
-        generateTasks() return a list of TaskGeneratorMostRecentRunInfo for 
each table
-       */
-      pinotTaskConfigs = taskGenerator.generateTasks(enabledTableConfigs);
-      long successRunTimestamp = System.currentTimeMillis();
-      for (TableConfig tableConfig : enabledTableConfigs) {
-        
_taskManagerStatusCache.saveTaskGeneratorInfo(tableConfig.getTableName(), 
taskGenerator.getTaskType(),
+    Map<String, List<PinotTaskConfig>> pinotMinionInstanceToTaskConfigs = new 
HashMap<>();
+    String taskType = taskGenerator.getTaskType();
+    for (TableConfig tableConfig : enabledTableConfigs) {
+      String tableName = tableConfig.getTableName();
+      try {
+        String minionInstanceTag = 
taskGenerator.getMinionInstanceTag(tableConfig);
+        List<PinotTaskConfig> presentTaskConfig =
+            
pinotMinionInstanceToTaskConfigs.computeIfAbsent(minionInstanceTag, k -> new 
ArrayList<>());
+        taskGenerator.generateTasks(List.of(tableConfig), presentTaskConfig);
+        pinotMinionInstanceToTaskConfigs.put(minionInstanceTag, 
presentTaskConfig);
+        long successRunTimestamp = System.currentTimeMillis();
+        _taskManagerStatusCache.saveTaskGeneratorInfo(tableName, taskType,
             taskGeneratorMostRecentRunInfo -> 
taskGeneratorMostRecentRunInfo.addSuccessRunTs(successRunTimestamp));
         // before the first task schedule, the follow two gauge metrics will 
be empty
         // TODO: find a better way to report task generation information
-        _controllerMetrics.setOrUpdateTableGauge(tableConfig.getTableName(), 
taskGenerator.getTaskType(),
+        _controllerMetrics.setOrUpdateTableGauge(tableName, taskType,
             
ControllerGauge.TIME_MS_SINCE_LAST_SUCCESSFUL_MINION_TASK_GENERATION,
             () -> System.currentTimeMillis() - successRunTimestamp);
-        _controllerMetrics.setOrUpdateTableGauge(tableConfig.getTableName(), 
taskGenerator.getTaskType(),
+        _controllerMetrics.setOrUpdateTableGauge(tableName, taskType,
             ControllerGauge.LAST_MINION_TASK_GENERATION_ENCOUNTERS_ERROR, 0L);
-      }
-    } catch (Exception e) {
-      StringWriter errors = new StringWriter();
-      try (PrintWriter pw = new PrintWriter(errors)) {
-        e.printStackTrace(pw);
-      }
-      long successRunTimestamp = System.currentTimeMillis();
-      for (TableConfig tableConfig : enabledTableConfigs) {
-        
_taskManagerStatusCache.saveTaskGeneratorInfo(tableConfig.getTableName(), 
taskGenerator.getTaskType(),
-            taskGeneratorMostRecentRunInfo -> 
taskGeneratorMostRecentRunInfo.addErrorRunMessage(successRunTimestamp,
+      } catch (Exception e) {
+        StringWriter errors = new StringWriter();
+        try (PrintWriter pw = new PrintWriter(errors)) {
+          e.printStackTrace(pw);
+        }
+        long failureRunTimestamp = System.currentTimeMillis();
+        _taskManagerStatusCache.saveTaskGeneratorInfo(tableName, taskType,
+            taskGeneratorMostRecentRunInfo -> 
taskGeneratorMostRecentRunInfo.addErrorRunMessage(failureRunTimestamp,
                 errors.toString()));
         // before the first task schedule, the follow gauge metric will be 
empty
         // TODO: find a better way to report task generation information
-        _controllerMetrics.setOrUpdateTableGauge(tableConfig.getTableName(), 
taskGenerator.getTaskType(),
+        _controllerMetrics.setOrUpdateTableGauge(tableName, taskType,
             ControllerGauge.LAST_MINION_TASK_GENERATION_ENCOUNTERS_ERROR, 1L);
+        LOGGER.error("Failed to generate tasks for task type {} for table {}", 
taskType, tableName, e);
       }
-      throw e;
     }
     if (!isLeader) {
       taskGenerator.nonLeaderCleanUp();
     }
-    String taskType = taskGenerator.getTaskType();
-    int numTasks = pinotTaskConfigs.size();
-    if (numTasks > 0) {
-      LOGGER.info("Submitting {} tasks for task type: {} with task configs: 
{}", numTasks, taskType, pinotTaskConfigs);
-      _controllerMetrics.addMeteredTableValue(taskType, 
ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
-      return _helixTaskResourceManager.submitTask(pinotTaskConfigs, 
taskGenerator.getTaskTimeoutMs(),
-          taskGenerator.getNumConcurrentTasksPerInstance(), 
taskGenerator.getMaxAttemptsPerTask());
-    }
-    LOGGER.info("No task to schedule for task type: {}", taskType);
-    return null;
+    int numErrorTasksScheduled = 0;
+    List<String> submittedTaskNames = new ArrayList<>();
+    for (String minionInstanceTag : pinotMinionInstanceToTaskConfigs.keySet()) 
{
+      List<PinotTaskConfig> pinotTaskConfigs = 
pinotMinionInstanceToTaskConfigs.get(minionInstanceTag);
+      int numTasks = pinotTaskConfigs.size();
+      try {
+        if (numTasks > 0) {
+          // This might lead to lot of logs, maybe sum it up and move outside 
the loop
+          LOGGER.info("Submitting {} tasks for task type: {} to 
minionInstance: {} with task configs: {}", numTasks,
+              taskType, minionInstanceTag, pinotTaskConfigs);
+          String submittedTaskName = 
_helixTaskResourceManager.submitTask(pinotTaskConfigs, minionInstanceTag,
+              taskGenerator.getTaskTimeoutMs(), 
taskGenerator.getNumConcurrentTasksPerInstance(),
+              taskGenerator.getMaxAttemptsPerTask());
+          submittedTaskNames.add(submittedTaskName);
+          _controllerMetrics.addMeteredTableValue(taskType, 
ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
+        }
+      } catch (Exception e) {
+        numErrorTasksScheduled++;
+        LOGGER.error("Failed to schedule task type {} with task configs: {}", 
taskType, pinotTaskConfigs, e);

Review Comment:
   Can you add minionInstanceTag as well?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java:
##########
@@ -120,4 +123,23 @@ public List<PinotTaskConfig> generateTasks(TableConfig 
tableConfig, Map<String,
       throws Exception {
     throw new UnknownTaskTypeException("Adhoc task generation is not supported 
for task type - " + this.getTaskType());
   }
+
+  @Override
+  public void generateTasks(List<TableConfig> tableConfigs, 
List<PinotTaskConfig> pinotTaskConfigs)
+      throws Exception {
+    pinotTaskConfigs.addAll(generateTasks(tableConfigs));
+  }
+
+  @Override
+  public String getMinionInstanceTag(TableConfig tableConfig) {
+    TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+    if (tableTaskConfig != null) {
+      Map<String, String> configs = 
tableTaskConfig.getConfigsForTaskType(getTaskType());
+      if (!configs.isEmpty()) {

Review Comment:
   Do we need null check here, if there are no tasks for a type ? 



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java:
##########
@@ -1005,9 +1025,13 @@ public void 
testRealtimeTableProcessAllModeMultiLevelConcat()
     long[] expectedNumBucketsToProcess200Days = {0, 0, 1, 1, 0, 0, 1, 1};
     String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
     int numTasks = 0;
-    for (String tasks = 
taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE);
-        tasks != null; tasks =
-        
taskManager.scheduleTasks(realtimeTableName).get(MinionConstants.MergeRollupTask.TASK_TYPE),
 numTasks++) {
+    List<String> taskList;

Review Comment:
   Can we make sure that we test the default path as well? (untagged minion)



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java:
##########
@@ -120,4 +123,23 @@ public List<PinotTaskConfig> generateTasks(TableConfig 
tableConfig, Map<String,
       throws Exception {
     throw new UnknownTaskTypeException("Adhoc task generation is not supported 
for task type - " + this.getTaskType());
   }
+
+  @Override
+  public void generateTasks(List<TableConfig> tableConfigs, 
List<PinotTaskConfig> pinotTaskConfigs)
+      throws Exception {
+    pinotTaskConfigs.addAll(generateTasks(tableConfigs));
+  }
+
+  @Override
+  public String getMinionInstanceTag(TableConfig tableConfig) {
+    TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+    if (tableTaskConfig != null) {
+      Map<String, String> configs = 
tableTaskConfig.getConfigsForTaskType(getTaskType());
+      if (!configs.isEmpty()) {
+        return 
configs.getOrDefault(PinotTaskManager.MINION_INSTANCE_TAG_CONFIG,

Review Comment:
   Since the user has to manually fill this in, can we add table config 
validation to ensure that the minion tag configured here has actually been 
created? 
   
   If there are typos, it'll be some work to troubleshoot this, if not 
validated during config time. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to