jackjlli commented on a change in pull request #7091: URL: https://github.com/apache/incubator-pinot/pull/7091#discussion_r659952371
########## File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java ########## @@ -249,15 +253,72 @@ public synchronized String submitTask(List<PinotTaskConfig> pinotTaskConfigs, St * @return Map from task name to task state */ public synchronized Map<String, TaskState> getTaskStates(String taskType) { - Map<String, TaskState> helixJobStates = - _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)).getJobStates(); + Map<String, TaskState> helixJobStates = new HashMap<>(); + WorkflowContext workflowContext = _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)); + + if (workflowContext == null) { + return helixJobStates; + } + helixJobStates = workflowContext.getJobStates(); Map<String, TaskState> taskStates = new HashMap<>(helixJobStates.size()); for (Map.Entry<String, TaskState> entry : helixJobStates.entrySet()) { taskStates.put(getPinotTaskName(entry.getKey()), entry.getValue()); } return taskStates; } + /** + * This method returns a count of sub-tasks in various states, given the top-level task name. + * @param parentTaskName (e.g. "Task_TestTask_1624403781879") + * @return TaskCount object + */ + public synchronized TaskCount getTaskCount(String parentTaskName) { + TaskCount taskCount = new TaskCount(); + JobContext jobContext = _taskDriver.getJobContext(getHelixJobName(parentTaskName)); + + if (jobContext == null) { + return taskCount; + } + Set<Integer> partitionSet = jobContext.getPartitionSet(); + taskCount.addToTotal(partitionSet.size()); + for (int partition : partitionSet) { + TaskPartitionState state = jobContext.getPartitionState(partition); + // Helix returns state as null if the task is not enqueued anywhere yet + if (state == null) { + // task is not yet assigned to a participant + taskCount.addToWaiting(1); + } else if (state.equals(TaskPartitionState.INIT) || state.equals(TaskPartitionState.RUNNING)) { + taskCount.addToRunning(1); + } else if (state.equals(TaskPartitionState.TASK_ERROR)) { + taskCount.addToError(1); + } + } + return taskCount; + } + + /** + * Returns a set of Task names (in the form "Task_TestTask_1624403781879") that are in progress or not started yet. + * + * @param taskType + * @return Set of task names + */ + public synchronized Set<String> getTasksInProgress(String taskType) { + Set<String> tasksInProgress = new HashSet<>(); + WorkflowContext workflowContext = _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)); + if (workflowContext == null) { + return tasksInProgress; Review comment: It'd be good to log a warning message here if the context is null. ########## File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java ########## @@ -85,20 +85,44 @@ public void setUp() startMinion(); } + private void verifyTaskCount(String task, int errors, int waiting, int running, int total) { + PinotHelixTaskResourceManager.TaskCount taskCount = _helixTaskResourceManager.getTaskCount(task); + assertEquals(taskCount.getError(), errors); + assertEquals(taskCount.getWaiting(), waiting); + assertEquals(taskCount.getRunning(), running); + assertEquals(taskCount.getTotal(), total); + } + @Test public void testStopResumeDeleteTaskQueue() { // Hold the task HOLD.set(true); + // No tasks before we start. + assertEquals(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).size(),0); + verifyTaskCount("Task_" + TASK_TYPE + "_1624403781879", 0, 0, 0, 0); // Should create the task queues and generate a task - assertNotNull(_taskManager.scheduleTasks().get(TASK_TYPE)); + String task1 = _taskManager.scheduleTasks().get(TASK_TYPE); + assertNotNull(task1); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(TASK_TYPE))); - - // Should generate one more task - assertNotNull(_taskManager.scheduleTask(TASK_TYPE)); - - // Should not generate more tasks + assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task1)); + + // Since we have two tables, two sub-tasks are generated -- one for each table. + // The default concurrent sub-tasks per minion instance is 1, and we have one minion + // instance spun up. So, one sub-tasks gets scheduled in a minion, and the other one + // waits. + verifyTaskCount(task1, 0, 1, 1, 2); + // Should generate one more task, with two sub-tasks. Both of these sub-tasks will wait + // since we have one minion instance that is still running one of the sub-tasks. Review comment: If the only one minion instance is still running one of the sub-tasks, why does the `runningCount` below show 0? ########## File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java ########## @@ -85,18 +85,34 @@ public void setUp() startMinion(); } + private void verifyTaskCount(String task, int errors, int waiting, int running, int total) { + PinotHelixTaskResourceManager.TaskCount taskCount = _helixTaskResourceManager.getTaskCount(task); + assertEquals(taskCount.getError(), errors); + assertEquals(taskCount.getWaiting(), waiting); + assertEquals(taskCount.getRunning(), running); + assertEquals(taskCount.getTotal(), total); + } + @Test public void testStopResumeDeleteTaskQueue() { // Hold the task HOLD.set(true); + assertEquals(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).size(),0); + verifyTaskCount("Task_" + TASK_TYPE + "_1624403781879", 0, 0, 0, 0); // Should create the task queues and generate a task - assertNotNull(_taskManager.scheduleTasks().get(TASK_TYPE)); + String task1 = _taskManager.scheduleTasks().get(TASK_TYPE); + assertNotNull(task1); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(TASK_TYPE))); + assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task1)); + verifyTaskCount(task1, 0, 1, 1, 2); // Should generate one more task - assertNotNull(_taskManager.scheduleTask(TASK_TYPE)); + String task2 = _taskManager.scheduleTask(TASK_TYPE); + assertNotNull(task2); + assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task2)); + verifyTaskCount(task2, 0, 2, 0, 2); Review comment: Is it true that the `totalCount` here is the total number of sub-tasks? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org