mcvsubbu commented on a change in pull request #7091:
URL: https://github.com/apache/incubator-pinot/pull/7091#discussion_r660872726



##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
##########
@@ -249,15 +253,72 @@ public synchronized String 
submitTask(List<PinotTaskConfig> pinotTaskConfigs, St
    * @return Map from task name to task state
    */
   public synchronized Map<String, TaskState> getTaskStates(String taskType) {
-    Map<String, TaskState> helixJobStates =
-        
_taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)).getJobStates();
+    Map<String, TaskState> helixJobStates = new HashMap<>();
+    WorkflowContext workflowContext = 
_taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
+
+    if (workflowContext == null) {
+      return helixJobStates;
+    }
+    helixJobStates = workflowContext.getJobStates();
     Map<String, TaskState> taskStates = new HashMap<>(helixJobStates.size());
     for (Map.Entry<String, TaskState> entry : helixJobStates.entrySet()) {
       taskStates.put(getPinotTaskName(entry.getKey()), entry.getValue());
     }
     return taskStates;
   }
 
+  /**
+   * This method returns a count of sub-tasks in various states, given the 
top-level task name.
+   * @param parentTaskName (e.g. "Task_TestTask_1624403781879")
+   * @return TaskCount object
+   */
+  public synchronized TaskCount getTaskCount(String parentTaskName) {
+    TaskCount taskCount = new TaskCount();
+    JobContext jobContext = 
_taskDriver.getJobContext(getHelixJobName(parentTaskName));
+
+    if (jobContext == null) {
+      return taskCount;
+    }
+    Set<Integer> partitionSet = jobContext.getPartitionSet();
+    taskCount.addToTotal(partitionSet.size());
+    for (int partition : partitionSet) {
+      TaskPartitionState state = jobContext.getPartitionState(partition);
+      // Helix returns state as null if the task is not enqueued anywhere yet
+      if (state == null) {
+        // task is not yet assigned to a participant
+        taskCount.addToWaiting(1);
+      } else if (state.equals(TaskPartitionState.INIT) || 
state.equals(TaskPartitionState.RUNNING)) {
+        taskCount.addToRunning(1);
+      } else if (state.equals(TaskPartitionState.TASK_ERROR)) {
+        taskCount.addToError(1);
+      }
+    }
+    return taskCount;
+  }
+
+  /**
+   * Returns a set of Task names (in the form "Task_TestTask_1624403781879") 
that are in progress or not started yet.
+   *
+   * @param taskType
+   * @return Set of task names
+   */
+  public synchronized Set<String> getTasksInProgress(String taskType) {
+    Set<String> tasksInProgress = new HashSet<>();
+    WorkflowContext workflowContext = 
_taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
+    if (workflowContext == null) {
+      return tasksInProgress;

Review comment:
       The warning message may appear every 5 minutes. Not a good idea. I will 
leave it as it is.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to