jackjlli commented on a change in pull request #7091:
URL: https://github.com/apache/incubator-pinot/pull/7091#discussion_r659952371



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
##########
@@ -249,15 +253,72 @@ public synchronized String 
submitTask(List<PinotTaskConfig> pinotTaskConfigs, St
    * @return Map from task name to task state
    */
   public synchronized Map<String, TaskState> getTaskStates(String taskType) {
-    Map<String, TaskState> helixJobStates =
-        
_taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)).getJobStates();
+    Map<String, TaskState> helixJobStates = new HashMap<>();
+    WorkflowContext workflowContext = 
_taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
+
+    if (workflowContext == null) {
+      return helixJobStates;
+    }
+    helixJobStates = workflowContext.getJobStates();
     Map<String, TaskState> taskStates = new HashMap<>(helixJobStates.size());
     for (Map.Entry<String, TaskState> entry : helixJobStates.entrySet()) {
       taskStates.put(getPinotTaskName(entry.getKey()), entry.getValue());
     }
     return taskStates;
   }
 
+  /**
+   * This method returns a count of sub-tasks in various states, given the 
top-level task name.
+   * @param parentTaskName (e.g. "Task_TestTask_1624403781879")
+   * @return TaskCount object
+   */
+  public synchronized TaskCount getTaskCount(String parentTaskName) {
+    TaskCount taskCount = new TaskCount();
+    JobContext jobContext = 
_taskDriver.getJobContext(getHelixJobName(parentTaskName));
+
+    if (jobContext == null) {
+      return taskCount;
+    }
+    Set<Integer> partitionSet = jobContext.getPartitionSet();
+    taskCount.addToTotal(partitionSet.size());
+    for (int partition : partitionSet) {
+      TaskPartitionState state = jobContext.getPartitionState(partition);
+      // Helix returns state as null if the task is not enqueued anywhere yet
+      if (state == null) {
+        // task is not yet assigned to a participant
+        taskCount.addToWaiting(1);
+      } else if (state.equals(TaskPartitionState.INIT) || 
state.equals(TaskPartitionState.RUNNING)) {
+        taskCount.addToRunning(1);
+      } else if (state.equals(TaskPartitionState.TASK_ERROR)) {
+        taskCount.addToError(1);
+      }
+    }
+    return taskCount;
+  }
+
+  /**
+   * Returns a set of Task names (in the form "Task_TestTask_1624403781879") 
that are in progress or not started yet.
+   *
+   * @param taskType
+   * @return Set of task names
+   */
+  public synchronized Set<String> getTasksInProgress(String taskType) {
+    Set<String> tasksInProgress = new HashSet<>();
+    WorkflowContext workflowContext = 
_taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
+    if (workflowContext == null) {
+      return tasksInProgress;

Review comment:
       It'd be good to log a warning message here if the context is null.

##########
File path: 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
##########
@@ -85,20 +85,44 @@ public void setUp()
     startMinion();
   }
 
+  private void verifyTaskCount(String task, int errors, int waiting, int 
running, int total) {
+    PinotHelixTaskResourceManager.TaskCount taskCount = 
_helixTaskResourceManager.getTaskCount(task);
+    assertEquals(taskCount.getError(), errors);
+    assertEquals(taskCount.getWaiting(), waiting);
+    assertEquals(taskCount.getRunning(), running);
+    assertEquals(taskCount.getTotal(), total);
+  }
+
   @Test
   public void testStopResumeDeleteTaskQueue() {
     // Hold the task
     HOLD.set(true);
+    // No tasks before we start.
+    
assertEquals(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).size(),0);
+    verifyTaskCount("Task_" + TASK_TYPE + "_1624403781879", 0, 0, 0, 0);
 
     // Should create the task queues and generate a task
-    assertNotNull(_taskManager.scheduleTasks().get(TASK_TYPE));
+    String task1 = _taskManager.scheduleTasks().get(TASK_TYPE);
+    assertNotNull(task1);
     assertTrue(_helixTaskResourceManager.getTaskQueues()
         
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(TASK_TYPE)));
-
-    // Should generate one more task
-    assertNotNull(_taskManager.scheduleTask(TASK_TYPE));
-
-    // Should not generate more tasks
+    
assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task1));
+
+    // Since we have two tables, two sub-tasks are generated -- one for each 
table.
+    // The default concurrent sub-tasks per minion instance is 1, and we have 
one minion
+    // instance spun up. So, one sub-tasks gets scheduled in a minion, and the 
other one
+    // waits.
+    verifyTaskCount(task1, 0, 1, 1, 2);
+    // Should generate one more task, with two sub-tasks. Both of these 
sub-tasks will wait
+    // since we have one minion instance that is still running one of the 
sub-tasks.

Review comment:
       If the only one minion instance is still running one of the sub-tasks, 
why does the `runningCount` below show 0?

##########
File path: 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
##########
@@ -85,18 +85,34 @@ public void setUp()
     startMinion();
   }
 
+  private void verifyTaskCount(String task, int errors, int waiting, int 
running, int total) {
+    PinotHelixTaskResourceManager.TaskCount taskCount = 
_helixTaskResourceManager.getTaskCount(task);
+    assertEquals(taskCount.getError(), errors);
+    assertEquals(taskCount.getWaiting(), waiting);
+    assertEquals(taskCount.getRunning(), running);
+    assertEquals(taskCount.getTotal(), total);
+  }
+
   @Test
   public void testStopResumeDeleteTaskQueue() {
     // Hold the task
     HOLD.set(true);
+    
assertEquals(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).size(),0);
+    verifyTaskCount("Task_" + TASK_TYPE + "_1624403781879", 0, 0, 0, 0);
 
     // Should create the task queues and generate a task
-    assertNotNull(_taskManager.scheduleTasks().get(TASK_TYPE));
+    String task1 = _taskManager.scheduleTasks().get(TASK_TYPE);
+    assertNotNull(task1);
     assertTrue(_helixTaskResourceManager.getTaskQueues()
         
.contains(PinotHelixTaskResourceManager.getHelixJobQueueName(TASK_TYPE)));
+    
assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task1));
 
+    verifyTaskCount(task1, 0, 1, 1, 2);
     // Should generate one more task
-    assertNotNull(_taskManager.scheduleTask(TASK_TYPE));
+    String task2 = _taskManager.scheduleTask(TASK_TYPE);
+    assertNotNull(task2);
+    
assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task2));
+    verifyTaskCount(task2, 0, 2, 0, 2);

Review comment:
       Is it true that the `totalCount` here is the total number of sub-tasks?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to