This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3121f8fc56f Add state and table filtering support to task counts API 
(#16433)
3121f8fc56f is described below

commit 3121f8fc56f28f3e76c3c14d7bc86b29da91b96d
Author: Xiang Fu <[email protected]>
AuthorDate: Mon Jul 28 19:03:45 2025 -0700

    Add state and table filtering support to task counts API (#16433)
    
    * Add state and table filtering support to task counts API
    
    - Enhanced /tasks/{taskType}/taskcounts endpoint with optional query 
parameters:
      * state: Filter by single or multiple comma-separated task states
        (waiting, running, error, completed, dropped, timedOut, aborted, 
unknown, total)
      * table: Filter by table name to show only tasks with subtasks for 
specific tables
    
    - Features:
      * Multiple state filtering: ?state=running,error,waiting
      * Table-specific filtering: ?table=myTable_OFFLINE
      * Combined filtering: ?state=running&table=myTable_OFFLINE
      * Database-aware table name translation
      * Comprehensive input validation with clear error messages
      * Backward compatible - existing API usage unchanged
    
    - Implementation:
      * New overloaded getTaskCounts methods in PinotHelixTaskResourceManager
      * Efficient filtering logic with early validation
      * Robust error handling for edge cases
      * Helper methods for state and table filtering logic
    
    - Tests:
      * Added comprehensive test suite covering all scenarios
      * State filtering tests (single, multiple, invalid states)
      * Table filtering tests (table-only, combined with states)
      * Edge case handling (null values, exceptions, no matches)
      * 6 new test methods with proper mocking
    
    This enhancement enables precise monitoring of Pinot tasks by allowing
    operators to filter task counts by both execution state and target table,
    significantly improving operational visibility.
    
    * Update task counts API to use Helix TaskState enum for state filtering
    
    - Changed state parameter to use official Helix TaskState enum values:
      NOT_STARTED, IN_PROGRESS, STOPPED, STOPPING, FAILED, COMPLETED,
      ABORTED, TIMED_OUT, TIMING_OUT, FAILING
    
    - Updated API documentation with correct TaskState values and examples
    - Enhanced state validation using TaskState.valueOf() for robust enum 
validation
    - Updated filtering logic to work with actual TaskState enum instead of 
aggregated counts
    - Added validateAndParseTaskState() method for proper state validation with 
clear error messages
    - Updated all test methods to use correct TaskState enum values
    - Fixed all checkstyle violations and line length issues
    
    This ensures the API uses the standard Helix TaskState semantics for
    consistent task monitoring across the Pinot ecosystem.
    
    * Fix PinotHelixTaskResourceManagerTest workflow context mocking
    
    - Fixed failing tests for TaskState enum filtering functionality
    - Updated workflow context mocking to use getJobStates() method instead of 
individual getJobState() calls
    - Added proper WorkflowConfig and JobDag mocking for getTaskStates() method 
dependencies
    - Fixed import organization and removed duplicate imports
    - All TaskState filtering tests now pass: 
testGetTaskCountsWithSingleStateFilter,
      testGetTaskCountsWithMultipleStatesFilter, 
testGetTaskCountsWithStateAndTableFilter
    
    The tests now properly mock the Helix workflow dependencies required by the
    updated TaskState enum implementation, ensuring correct state filtering 
behavior.
    
    * Optimize getTaskCounts performance by applying filters before collecting 
TaskCount
    
    - Move expensive getTaskCount() operation to only happen after state and 
table filtering
    - Apply state filtering first, then table filtering, then collect TaskCount
    - Significantly improves performance when many tasks are filtered out
    - Addresses review feedback from krishan1390 on PR #16433
    
    * Address PR review comments: use StringUtils.isNotEmpty() and add 
@Nullable annotations
    
    - Replace null checks with StringUtils.isNotEmpty() for better string 
validation in PinotTaskRestletResource
    - Add @Nullable annotations to state and tableNameWithType parameters in 
PinotHelixTaskResourceManager.getTaskCounts()
    - Update state and table filtering logic to use StringUtils.isNotEmpty() 
for consistency
    - Improve method signature formatting for better readability
    
    Addresses feedback from Jackie-Jiang on PR #16433
---
 .../api/resources/PinotTaskRestletResource.java    |  38 +-
 .../core/minion/PinotHelixTaskResourceManager.java | 115 ++++++
 .../resources/PinotTaskRestletResourceTest.java    |   1 -
 .../minion/PinotHelixTaskResourceManagerTest.java  | 405 ++++++++++++++++++++-
 4 files changed, 543 insertions(+), 16 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index d76ea4f3a25..90e6732dc96 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -130,15 +130,18 @@ import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_K
  *   <li>DELETE '/tasks/{taskType}': Delete all tasks (as well as the task 
queue) for the given task type</li>
  * </ul>
  */
-@Api(tags = Constants.TASK_TAG, authorizations = {@Authorization(value = 
SWAGGER_AUTHORIZATION_KEY),
-    @Authorization(value = DATABASE)})
+@Api(tags = Constants.TASK_TAG, authorizations = {
+    @Authorization(value = SWAGGER_AUTHORIZATION_KEY),
+    @Authorization(value = DATABASE)
+})
 @SwaggerDefinition(securityDefinition = 
@SecurityDefinition(apiKeyAuthDefinitions = {
     @ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in = 
ApiKeyAuthDefinition.ApiKeyLocation.HEADER,
         key = SWAGGER_AUTHORIZATION_KEY,
         description = "The format of the key is  ```\"Basic <token>\" or 
\"Bearer <token>\"```"),
     @ApiKeyAuthDefinition(name = DATABASE, in = 
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = DATABASE,
         description = "Database context passed through http header. If no 
context is provided 'default' database "
-            + "context will be considered.")}))
+            + "context will be considered.")
+}))
 @Path("/")
 public class PinotTaskRestletResource {
   public static final Logger LOGGER = 
LoggerFactory.getLogger(PinotTaskRestletResource.class);
@@ -267,8 +270,21 @@ public class PinotTaskRestletResource {
   @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation("Fetch count of sub-tasks for each of the tasks for the given 
task type")
   public Map<String, PinotHelixTaskResourceManager.TaskCount> getTaskCounts(
-      @ApiParam(value = "Task type", required = true) @PathParam("taskType") 
String taskType) {
-    return _pinotHelixTaskResourceManager.getTaskCounts(taskType);
+      @ApiParam(value = "Task type", required = true) @PathParam("taskType") 
String taskType,
+      @ApiParam(value = "Task state(s) to filter by. Can be single state or 
comma-separated multiple states "
+          + "(NOT_STARTED, IN_PROGRESS, STOPPED, STOPPING, FAILED, COMPLETED, 
ABORTED, TIMED_OUT, TIMING_OUT, "
+          + "FAILING). Example: 'IN_PROGRESS' or 
'IN_PROGRESS,FAILED,STOPPING'")
+      @QueryParam("state") @Nullable String state,
+      @ApiParam(value = "Table name with type (e.g., 'myTable_OFFLINE') to 
filter tasks by table. "
+          + "Only tasks that have subtasks for this table will be returned.")
+      @QueryParam("table") @Nullable String table, @Context HttpHeaders 
headers) {
+    String tableNameWithType = table != null ? 
DatabaseUtils.translateTableName(table, headers) : null;
+
+    if (StringUtils.isNotEmpty(state) || 
StringUtils.isNotEmpty(tableNameWithType)) {
+      return _pinotHelixTaskResourceManager.getTaskCounts(taskType, state, 
tableNameWithType);
+    } else {
+      return _pinotHelixTaskResourceManager.getTaskCounts(taskType);
+    }
   }
 
   @GET
@@ -293,7 +309,7 @@ public class PinotTaskRestletResource {
   public Map<String, PinotHelixTaskResourceManager.TaskDebugInfo> 
getTasksDebugInfo(
       @ApiParam(value = "Task type", required = true) @PathParam("taskType") 
String taskType,
       @ApiParam(value = "Table name with type", required = true) 
@PathParam("tableNameWithType")
-          String tableNameWithType,
+      String tableNameWithType,
       @ApiParam(value = "verbosity (Prints information for all the tasks for 
the given task type and table."
           + "By default, only prints subtask details for running and error 
tasks. "
           + "Value of > 0 prints subtask details for all tasks)")
@@ -310,9 +326,9 @@ public class PinotTaskRestletResource {
   public String getTaskGenerationDebugInto(
       @ApiParam(value = "Task type", required = true) @PathParam("taskType") 
String taskType,
       @ApiParam(value = "Table name with type", required = true) 
@PathParam("tableNameWithType")
-          String tableNameWithType,
+      String tableNameWithType,
       @ApiParam(value = "Whether to only lookup local cache for logs", 
defaultValue = "false") @QueryParam("localOnly")
-          boolean localOnly, @Context HttpHeaders httpHeaders)
+      boolean localOnly, @Context HttpHeaders httpHeaders)
       throws JsonProcessingException {
     tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, 
httpHeaders);
     if (localOnly) {
@@ -430,7 +446,7 @@ public class PinotTaskRestletResource {
   public Map<String, PinotTaskConfig> getSubtaskConfigs(
       @ApiParam(value = "Task name", required = true) @PathParam("taskName") 
String taskName,
       @ApiParam(value = "Sub task names separated by comma") 
@QueryParam("subtaskNames") @Nullable
-          String subtaskNames) {
+      String subtaskNames) {
     return _pinotHelixTaskResourceManager.getSubtaskConfigs(taskName, 
subtaskNames);
   }
 
@@ -442,7 +458,7 @@ public class PinotTaskRestletResource {
   public String getSubtaskProgress(@Context HttpHeaders httpHeaders,
       @ApiParam(value = "Task name", required = true) @PathParam("taskName") 
String taskName,
       @ApiParam(value = "Sub task names separated by comma") 
@QueryParam("subtaskNames") @Nullable
-          String subtaskNames) {
+      String subtaskNames) {
     // Relying on original schema that was used to query the controller
     String scheme = _uriInfo.getRequestUri().getScheme();
     List<InstanceConfig> workers = 
_pinotHelixResourceManager.getAllMinionInstanceConfigs();
@@ -482,7 +498,7 @@ public class PinotTaskRestletResource {
       @ApiParam(value = "Subtask state 
(UNKNOWN,IN_PROGRESS,SUCCEEDED,CANCELLED,ERROR)", required = true)
       @QueryParam("subTaskState") String subTaskState,
       @ApiParam(value = "Minion worker IDs separated by comma") 
@QueryParam("minionWorkerIds") @Nullable
-          String minionWorkerIds) {
+      String minionWorkerIds) {
     Set<String> selectedMinionWorkers = new HashSet<>();
     if (StringUtils.isNotEmpty(minionWorkerIds)) {
       selectedMinionWorkers.addAll(
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index 6f1fb4c0a22..5f86db52b0a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -776,6 +776,121 @@ public class PinotHelixTaskResourceManager {
     return taskCounts;
   }
 
+  /**
+   * Fetch count of sub-tasks for each of the tasks for the given taskType, 
filtered by state.
+   *
+   * @param taskType      Pinot taskType / Helix JobQueue
+   * @param state         State(s) to filter by. Can be single state or 
comma-separated multiple states
+   *                      (waiting, running, error, completed, dropped, 
timedOut, aborted, unknown, total)
+   * @return Map of Pinot Task Name to TaskCount containing only tasks that 
have > 0 count for any of the
+   *         specified states
+   */
+  public synchronized Map<String, TaskCount> getTaskCounts(String taskType, 
String state) {
+    return getTaskCounts(taskType, state, null);
+  }
+
+  /**
+   * Fetch count of sub-tasks for each of the tasks for the given taskType, 
filtered by state and/or table.
+   *
+   * @param taskType           Pinot taskType / Helix JobQueue
+   * @param state              State(s) to filter by. Can be single state or 
comma-separated multiple states
+   *                           (NOT_STARTED, IN_PROGRESS, STOPPED, STOPPING, 
FAILED, COMPLETED, ABORTED, TIMED_OUT,
+   *                           TIMING_OUT, FAILING). Can be null to skip state 
filtering.
+   * @param tableNameWithType  Table name with type to filter by. Only tasks 
that have subtasks for this table
+   *                           will be returned. Can be null to skip table 
filtering.
+   * @return Map of Pinot Task Name to TaskCount containing only tasks that 
match the specified filters
+   */
+  public synchronized Map<String, TaskCount> getTaskCounts(String taskType, 
@Nullable String state,
+      @Nullable String tableNameWithType) {
+    Set<String> tasks = getTasks(taskType);
+    if (tasks == null) {
+      return Collections.emptyMap();
+    }
+
+    // Parse and validate comma-separated states if provided
+    Set<TaskState> requestedStates = null;
+    if (StringUtils.isNotEmpty(state)) {
+      String[] stateArray = state.trim().split(",");
+      requestedStates = new HashSet<>();
+      for (String s : stateArray) {
+        String normalizedState = s.trim().toUpperCase();
+        // Validate each state upfront
+        TaskState taskState = validateAndParseTaskState(normalizedState);
+        requestedStates.add(taskState);
+      }
+    }
+
+    // Get all task states if we need to filter by state
+    Map<String, TaskState> taskStates = null;
+    if (requestedStates != null) {
+      taskStates = getTaskStates(taskType);
+    }
+
+    Map<String, TaskCount> taskCounts = new TreeMap<>();
+    for (String taskName : tasks) {
+      // Apply state filtering first (less expensive)
+      if (requestedStates != null) {
+        TaskState currentTaskState = taskStates.get(taskName);
+        if (currentTaskState == null || 
!requestedStates.contains(currentTaskState)) {
+          continue;
+        }
+      }
+
+      // Apply table filtering next (also less expensive than getting task 
count)
+      if (StringUtils.isNotEmpty(tableNameWithType) && 
!hasTasksForTable(taskName, tableNameWithType)) {
+        continue;
+      }
+
+      // Only collect TaskCount after passing all filters (expensive operation)
+      TaskCount taskCount = getTaskCount(taskName);
+      taskCounts.put(taskName, taskCount);
+    }
+    return taskCounts;
+  }
+
+  /**
+   * Validates and parses a task state string into TaskState enum.
+   *
+   * @param state State string to validate (should be uppercase)
+   * @throws IllegalArgumentException if the state is invalid
+   * @return TaskState enum value
+   */
+  private TaskState validateAndParseTaskState(String state) {
+    try {
+      return TaskState.valueOf(state);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException("Invalid state: " + state + ". Valid 
states are: "
+          + Arrays.toString(TaskState.values()));
+    }
+  }
+
+  /**
+   * Helper method to check if a task has any subtasks for the specified table.
+   *
+   * @param taskName          Task name to check
+   * @param tableNameWithType Table name with type to check for
+   * @return true if the task has subtasks for the specified table
+   */
+  private boolean hasTasksForTable(String taskName, String tableNameWithType) {
+    try {
+      // Get all subtask configs for this task
+      List<PinotTaskConfig> subtaskConfigs = getSubtaskConfigs(taskName);
+
+      // Check if any subtask is for the specified table
+      for (PinotTaskConfig taskConfig : subtaskConfigs) {
+        String taskTableName = taskConfig.getTableName();
+        if (taskTableName != null && taskTableName.equals(tableNameWithType)) {
+          return true;
+        }
+      }
+      return false;
+    } catch (Exception e) {
+      // If we can't get the subtask configs, assume no match
+      LOGGER.warn("Failed to get subtask configs for task: {}", taskName, e);
+      return false;
+    }
+  }
+
   /**
    * Given a taskType, helper method to debug all the HelixJobs for the 
taskType.
    * For each of the HelixJobs, collects status of the (sub)tasks in the 
taskbatch.
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResourceTest.java
index 764b1d53d16..3e1fac57276 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResourceTest.java
@@ -123,7 +123,6 @@ public class PinotTaskRestletResourceTest {
     return instanceConfig;
   }
 
-
   @Test
   public void testGetSubtaskWithGivenStateProgressWithException()
       throws JsonProcessingException {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
index ef11b4f3020..cee3c246f46 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.helix.core.minion;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -31,10 +32,12 @@ import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
+import org.apache.helix.task.JobDag;
 import org.apache.helix.task.TaskConfig;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskPartitionState;
 import org.apache.helix.task.TaskState;
+import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.util.CompletionServiceHelper;
@@ -52,10 +55,7 @@ import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.*;
 
 
 public class PinotHelixTaskResourceManagerTest {
@@ -664,4 +664,401 @@ public class PinotHelixTaskResourceManagerTest {
     assertEquals(subtaskInfos.size(), 1); // Completed tasks should be included
     assertEquals(subtaskInfos.get(0).getState(), TaskPartitionState.COMPLETED);
   }
+
+  @Test
+  public void testGetTaskCountsWithSingleStateFilter() {
+    String taskType = "TestTask";
+    String taskName1 = "Task_TestTask_12345";
+    String taskName2 = "Task_TestTask_67890";
+
+    TaskDriver taskDriver = mock(TaskDriver.class);
+    PinotHelixResourceManager pinotHelixResourceManager = 
mock(PinotHelixResourceManager.class);
+    PinotHelixTaskResourceManager mgr = new 
PinotHelixTaskResourceManager(pinotHelixResourceManager, taskDriver);
+
+    // Mock getTasks to return our test task names
+    Set<String> tasks = new HashSet<>();
+    tasks.add(taskName1);
+    tasks.add(taskName2);
+    PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr);
+    when(spyMgr.getTasks(taskType)).thenReturn(tasks);
+
+    // Mock workflow-level components for getTaskStates
+    String helixJobQueueName = "TaskQueue_" + taskType;
+    WorkflowConfig workflowConfig = mock(WorkflowConfig.class);
+    
when(taskDriver.getWorkflowConfig(helixJobQueueName)).thenReturn(workflowConfig);
+
+    JobDag jobDag = mock(JobDag.class);
+    Set<String> helixJobs = new HashSet<>();
+    helixJobs.add(PinotHelixTaskResourceManager.getHelixJobName(taskName1));
+    helixJobs.add(PinotHelixTaskResourceManager.getHelixJobName(taskName2));
+    when(jobDag.getAllNodes()).thenReturn(helixJobs);
+    when(workflowConfig.getJobDag()).thenReturn(jobDag);
+
+    WorkflowContext workflowContext = mock(WorkflowContext.class);
+    
when(taskDriver.getWorkflowContext(helixJobQueueName)).thenReturn(workflowContext);
+    Map<String, TaskState> jobStatesMap = new HashMap<>();
+    jobStatesMap.put(PinotHelixTaskResourceManager.getHelixJobName(taskName1), 
TaskState.IN_PROGRESS);
+    jobStatesMap.put(PinotHelixTaskResourceManager.getHelixJobName(taskName2), 
TaskState.COMPLETED);
+    when(workflowContext.getJobStates()).thenReturn(jobStatesMap);
+
+    // Mock JobConfig and JobContext for both tasks
+    mockTaskJobConfigAndContext(taskDriver, taskName1, 
TaskPartitionState.RUNNING);
+    mockTaskJobConfigAndContext(taskDriver, taskName2, 
TaskPartitionState.COMPLETED);
+
+    // Test filter by "IN_PROGRESS" - should only return taskName1
+    Map<String, PinotHelixTaskResourceManager.TaskCount> inProgressTasks =
+        spyMgr.getTaskCounts(taskType, "IN_PROGRESS", null);
+    assertEquals(inProgressTasks.size(), 1);
+    assertTrue(inProgressTasks.containsKey(taskName1));
+    assertFalse(inProgressTasks.containsKey(taskName2));
+
+    // Test filter by "COMPLETED" - should only return taskName2
+    Map<String, PinotHelixTaskResourceManager.TaskCount> completedTasks =
+        spyMgr.getTaskCounts(taskType, "COMPLETED", null);
+    assertEquals(completedTasks.size(), 1);
+    assertFalse(completedTasks.containsKey(taskName1));
+    assertTrue(completedTasks.containsKey(taskName2));
+
+    // Test filter by "FAILED" - should return no tasks
+    Map<String, PinotHelixTaskResourceManager.TaskCount> failedTasks =
+        spyMgr.getTaskCounts(taskType, "FAILED", null);
+    assertEquals(failedTasks.size(), 0);
+  }
+
+  @Test
+  public void testGetTaskCountsWithMultipleStatesFilter() {
+    String taskType = "TestTask";
+    String taskName1 = "Task_TestTask_12345";
+    String taskName2 = "Task_TestTask_67890";
+    String taskName3 = "Task_TestTask_11111";
+
+    TaskDriver taskDriver = mock(TaskDriver.class);
+    PinotHelixResourceManager pinotHelixResourceManager = 
mock(PinotHelixResourceManager.class);
+    PinotHelixTaskResourceManager mgr = new 
PinotHelixTaskResourceManager(pinotHelixResourceManager, taskDriver);
+
+    // Mock getTasks to return our test task names
+    Set<String> tasks = new HashSet<>();
+    tasks.add(taskName1);
+    tasks.add(taskName2);
+    tasks.add(taskName3);
+    PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr);
+    when(spyMgr.getTasks(taskType)).thenReturn(tasks);
+
+    // Mock workflow-level components for getTaskStates
+    String helixJobQueueName = "TaskQueue_" + taskType;
+    WorkflowConfig workflowConfig = mock(WorkflowConfig.class);
+    
when(taskDriver.getWorkflowConfig(helixJobQueueName)).thenReturn(workflowConfig);
+
+    JobDag jobDag = mock(JobDag.class);
+    Set<String> helixJobs = new HashSet<>();
+    helixJobs.add(PinotHelixTaskResourceManager.getHelixJobName(taskName1));
+    helixJobs.add(PinotHelixTaskResourceManager.getHelixJobName(taskName2));
+    helixJobs.add(PinotHelixTaskResourceManager.getHelixJobName(taskName3));
+    when(jobDag.getAllNodes()).thenReturn(helixJobs);
+    when(workflowConfig.getJobDag()).thenReturn(jobDag);
+
+    WorkflowContext workflowContext = mock(WorkflowContext.class);
+    
when(taskDriver.getWorkflowContext(helixJobQueueName)).thenReturn(workflowContext);
+    Map<String, TaskState> jobStatesMap = new HashMap<>();
+    jobStatesMap.put(PinotHelixTaskResourceManager.getHelixJobName(taskName1), 
TaskState.IN_PROGRESS);
+    jobStatesMap.put(PinotHelixTaskResourceManager.getHelixJobName(taskName2), 
TaskState.FAILED);
+    jobStatesMap.put(PinotHelixTaskResourceManager.getHelixJobName(taskName3), 
TaskState.COMPLETED);
+    when(workflowContext.getJobStates()).thenReturn(jobStatesMap);
+
+    // Mock JobConfig and JobContext for all tasks
+    mockTaskJobConfigAndContext(taskDriver, taskName1, 
TaskPartitionState.RUNNING);
+    mockTaskJobConfigAndContext(taskDriver, taskName2, 
TaskPartitionState.TASK_ERROR);
+    mockTaskJobConfigAndContext(taskDriver, taskName3, 
TaskPartitionState.COMPLETED);
+
+    // Test filter by "IN_PROGRESS,FAILED" - should return taskName1 and 
taskName2
+    Map<String, PinotHelixTaskResourceManager.TaskCount> 
inProgressOrFailedTasks =
+        spyMgr.getTaskCounts(taskType, "IN_PROGRESS,FAILED", null);
+    assertEquals(inProgressOrFailedTasks.size(), 2);
+    assertTrue(inProgressOrFailedTasks.containsKey(taskName1));
+    assertTrue(inProgressOrFailedTasks.containsKey(taskName2));
+    assertFalse(inProgressOrFailedTasks.containsKey(taskName3));
+
+    // Test filter by "COMPLETED,IN_PROGRESS" - should return taskName1 and 
taskName3
+    Map<String, PinotHelixTaskResourceManager.TaskCount> 
completedOrInProgressTasks =
+        spyMgr.getTaskCounts(taskType, "COMPLETED,IN_PROGRESS", null);
+    assertEquals(completedOrInProgressTasks.size(), 2);
+    assertTrue(completedOrInProgressTasks.containsKey(taskName1));
+    assertFalse(completedOrInProgressTasks.containsKey(taskName2));
+    assertTrue(completedOrInProgressTasks.containsKey(taskName3));
+
+    // Test filter by "NOT_STARTED,STOPPED" - should return no tasks
+    Map<String, PinotHelixTaskResourceManager.TaskCount> 
notStartedOrStoppedTasks =
+        spyMgr.getTaskCounts(taskType, "NOT_STARTED,STOPPED", null);
+    assertEquals(notStartedOrStoppedTasks.size(), 0);
+
+    // Test filter with spaces "IN_PROGRESS, FAILED, COMPLETED" - should 
return all three
+    Map<String, PinotHelixTaskResourceManager.TaskCount> allTasks =
+        spyMgr.getTaskCounts(taskType, "IN_PROGRESS, FAILED, COMPLETED", null);
+    assertEquals(allTasks.size(), 3);
+    assertTrue(allTasks.containsKey(taskName1));
+    assertTrue(allTasks.containsKey(taskName2));
+    assertTrue(allTasks.containsKey(taskName3));
+  }
+
+  @Test
+  public void testGetTaskCountsWithInvalidState() {
+    String taskType = "TestTask";
+    String taskName1 = "Task_TestTask_12345";
+
+    TaskDriver taskDriver = mock(TaskDriver.class);
+    PinotHelixResourceManager pinotHelixResourceManager = 
mock(PinotHelixResourceManager.class);
+    PinotHelixTaskResourceManager mgr = new 
PinotHelixTaskResourceManager(pinotHelixResourceManager, taskDriver);
+
+    // Mock getTasks to return a task
+    Set<String> tasks = new HashSet<>();
+    tasks.add(taskName1);
+    PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr);
+    when(spyMgr.getTasks(taskType)).thenReturn(tasks);
+
+    // Mock JobConfig and JobContext for the task
+    mockTaskJobConfigAndContext(taskDriver, taskName1, 
TaskPartitionState.RUNNING);
+
+    // Test with invalid single state
+    try {
+      spyMgr.getTaskCounts(taskType, "INVALID_STATE", null);
+      fail("Expected IllegalArgumentException for invalid state");
+    } catch (IllegalArgumentException e) {
+      assertTrue(e.getMessage().contains("Invalid state: INVALID_STATE"));
+    }
+
+    // Test with mixed valid and invalid states
+    try {
+      spyMgr.getTaskCounts(taskType, "IN_PROGRESS,INVALID_STATE,COMPLETED", 
null);
+      fail("Expected IllegalArgumentException for invalid state in multiple 
states");
+    } catch (IllegalArgumentException e) {
+      assertTrue(e.getMessage().contains("Invalid state: INVALID_STATE"));
+    }
+  }
+
+  @Test
+  public void testGetTaskCountsWithTableFilter() {
+    String taskType = "TestTask";
+    String taskName1 = "Task_TestTask_12345";
+    String taskName2 = "Task_TestTask_67890";
+    String table1 = "table1_OFFLINE";
+    String table2 = "table2_OFFLINE";
+
+    TaskDriver taskDriver = mock(TaskDriver.class);
+    PinotHelixResourceManager pinotHelixResourceManager = 
mock(PinotHelixResourceManager.class);
+    PinotHelixTaskResourceManager mgr = new 
PinotHelixTaskResourceManager(pinotHelixResourceManager, taskDriver);
+
+    // Mock getTasks to return our test task names
+    Set<String> tasks = new HashSet<>();
+    tasks.add(taskName1);
+    tasks.add(taskName2);
+    PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr);
+    when(spyMgr.getTasks(taskType)).thenReturn(tasks);
+
+    // Mock JobConfig and JobContext for both tasks
+    mockTaskJobConfigAndContext(taskDriver, taskName1, 
TaskPartitionState.RUNNING);
+    mockTaskJobConfigAndContext(taskDriver, taskName2, 
TaskPartitionState.COMPLETED);
+
+    // Mock subtask configs - taskName1 has subtasks for table1, taskName2 has 
subtasks for table2
+    List<PinotTaskConfig> subtaskConfigs1 = new ArrayList<>();
+    PinotTaskConfig taskConfig1 = mock(PinotTaskConfig.class);
+    when(taskConfig1.getTableName()).thenReturn(table1);
+    subtaskConfigs1.add(taskConfig1);
+    when(spyMgr.getSubtaskConfigs(taskName1)).thenReturn(subtaskConfigs1);
+
+    List<PinotTaskConfig> subtaskConfigs2 = new ArrayList<>();
+    PinotTaskConfig taskConfig2 = mock(PinotTaskConfig.class);
+    when(taskConfig2.getTableName()).thenReturn(table2);
+    subtaskConfigs2.add(taskConfig2);
+    when(spyMgr.getSubtaskConfigs(taskName2)).thenReturn(subtaskConfigs2);
+
+    // Test filter by table1 - should only return taskName1
+    Map<String, PinotHelixTaskResourceManager.TaskCount> table1Tasks =
+        spyMgr.getTaskCounts(taskType, null, table1);
+    assertEquals(table1Tasks.size(), 1);
+    assertTrue(table1Tasks.containsKey(taskName1));
+    assertFalse(table1Tasks.containsKey(taskName2));
+
+    // Test filter by table2 - should only return taskName2
+    Map<String, PinotHelixTaskResourceManager.TaskCount> table2Tasks =
+        spyMgr.getTaskCounts(taskType, null, table2);
+    assertEquals(table2Tasks.size(), 1);
+    assertFalse(table2Tasks.containsKey(taskName1));
+    assertTrue(table2Tasks.containsKey(taskName2));
+
+    // Test filter by non-existent table - should return no tasks
+    Map<String, PinotHelixTaskResourceManager.TaskCount> noTableTasks =
+        spyMgr.getTaskCounts(taskType, null, "nonexistent_OFFLINE");
+    assertEquals(noTableTasks.size(), 0);
+  }
+
+  @Test
+  public void testGetTaskCountsWithStateAndTableFilter() {
+    String taskType = "TestTask";
+    String taskName1 = "Task_TestTask_12345";
+    String taskName2 = "Task_TestTask_67890";
+    String taskName3 = "Task_TestTask_11111";
+    String table1 = "table1_OFFLINE";
+    String table2 = "table2_OFFLINE";
+
+    TaskDriver taskDriver = mock(TaskDriver.class);
+    PinotHelixResourceManager pinotHelixResourceManager = 
mock(PinotHelixResourceManager.class);
+    PinotHelixTaskResourceManager mgr = new 
PinotHelixTaskResourceManager(pinotHelixResourceManager, taskDriver);
+
+    // Mock getTasks to return our test task names
+    Set<String> tasks = new HashSet<>();
+    tasks.add(taskName1);
+    tasks.add(taskName2);
+    tasks.add(taskName3);
+    PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr);
+    when(spyMgr.getTasks(taskType)).thenReturn(tasks);
+
+    // Mock workflow-level components for getTaskStates
+    String helixJobQueueName = "TaskQueue_" + taskType;
+    WorkflowConfig workflowConfig = mock(WorkflowConfig.class);
+    
when(taskDriver.getWorkflowConfig(helixJobQueueName)).thenReturn(workflowConfig);
+
+    JobDag jobDag = mock(JobDag.class);
+    Set<String> helixJobs = new HashSet<>();
+    helixJobs.add(PinotHelixTaskResourceManager.getHelixJobName(taskName1));
+    helixJobs.add(PinotHelixTaskResourceManager.getHelixJobName(taskName2));
+    helixJobs.add(PinotHelixTaskResourceManager.getHelixJobName(taskName3));
+    when(jobDag.getAllNodes()).thenReturn(helixJobs);
+    when(workflowConfig.getJobDag()).thenReturn(jobDag);
+
+    WorkflowContext workflowContext = mock(WorkflowContext.class);
+    
when(taskDriver.getWorkflowContext(helixJobQueueName)).thenReturn(workflowContext);
+    Map<String, TaskState> jobStatesMap = new HashMap<>();
+    jobStatesMap.put(PinotHelixTaskResourceManager.getHelixJobName(taskName1), 
TaskState.IN_PROGRESS); // table1
+    jobStatesMap.put(PinotHelixTaskResourceManager.getHelixJobName(taskName2), 
TaskState.COMPLETED);   // table1
+    jobStatesMap.put(PinotHelixTaskResourceManager.getHelixJobName(taskName3), 
TaskState.IN_PROGRESS); // table2
+    when(workflowContext.getJobStates()).thenReturn(jobStatesMap);
+
+    // Mock JobConfig and JobContext - different states for each task
+    mockTaskJobConfigAndContext(taskDriver, taskName1, 
TaskPartitionState.RUNNING);   // table1
+    mockTaskJobConfigAndContext(taskDriver, taskName2, 
TaskPartitionState.COMPLETED); // table1
+    mockTaskJobConfigAndContext(taskDriver, taskName3, 
TaskPartitionState.RUNNING);   // table2
+
+    // Mock subtask configs - taskName1 and taskName2 for table1, taskName3 
for table2
+    List<PinotTaskConfig> subtaskConfigs1 = new ArrayList<>();
+    PinotTaskConfig taskConfig1 = mock(PinotTaskConfig.class);
+    when(taskConfig1.getTableName()).thenReturn(table1);
+    subtaskConfigs1.add(taskConfig1);
+    when(spyMgr.getSubtaskConfigs(taskName1)).thenReturn(subtaskConfigs1);
+
+    List<PinotTaskConfig> subtaskConfigs2 = new ArrayList<>();
+    PinotTaskConfig taskConfig2 = mock(PinotTaskConfig.class);
+    when(taskConfig2.getTableName()).thenReturn(table1);
+    subtaskConfigs2.add(taskConfig2);
+    when(spyMgr.getSubtaskConfigs(taskName2)).thenReturn(subtaskConfigs2);
+
+    List<PinotTaskConfig> subtaskConfigs3 = new ArrayList<>();
+    PinotTaskConfig taskConfig3 = mock(PinotTaskConfig.class);
+    when(taskConfig3.getTableName()).thenReturn(table2);
+    subtaskConfigs3.add(taskConfig3);
+    when(spyMgr.getSubtaskConfigs(taskName3)).thenReturn(subtaskConfigs3);
+
+    // Test filter by running state and table1 - should only return taskName1
+    Map<String, PinotHelixTaskResourceManager.TaskCount> runningTable1Tasks =
+        spyMgr.getTaskCounts(taskType, "IN_PROGRESS", table1);
+    assertEquals(runningTable1Tasks.size(), 1);
+    assertTrue(runningTable1Tasks.containsKey(taskName1));
+    assertFalse(runningTable1Tasks.containsKey(taskName2));
+    assertFalse(runningTable1Tasks.containsKey(taskName3));
+
+    // Test filter by completed state and table1 - should only return taskName2
+    Map<String, PinotHelixTaskResourceManager.TaskCount> completedTable1Tasks =
+        spyMgr.getTaskCounts(taskType, "COMPLETED", table1);
+    assertEquals(completedTable1Tasks.size(), 1);
+    assertFalse(completedTable1Tasks.containsKey(taskName1));
+    assertTrue(completedTable1Tasks.containsKey(taskName2));
+    assertFalse(completedTable1Tasks.containsKey(taskName3));
+
+    // Test filter by running state and table2 - should only return taskName3
+    Map<String, PinotHelixTaskResourceManager.TaskCount> runningTable2Tasks =
+        spyMgr.getTaskCounts(taskType, "IN_PROGRESS", table2);
+    assertEquals(runningTable2Tasks.size(), 1);
+    assertFalse(runningTable2Tasks.containsKey(taskName1));
+    assertFalse(runningTable2Tasks.containsKey(taskName2));
+    assertTrue(runningTable2Tasks.containsKey(taskName3));
+
+    // Test filter by completed state and table2 - should return no tasks
+    Map<String, PinotHelixTaskResourceManager.TaskCount> completedTable2Tasks =
+        spyMgr.getTaskCounts(taskType, "COMPLETED", table2);
+    assertEquals(completedTable2Tasks.size(), 0);
+
+    // Test filter by multiple states and table1 - should return taskName1 and 
taskName2
+    Map<String, PinotHelixTaskResourceManager.TaskCount> multiStateTable1Tasks 
=
+        spyMgr.getTaskCounts(taskType, "IN_PROGRESS,COMPLETED", table1);
+    assertEquals(multiStateTable1Tasks.size(), 2);
+    assertTrue(multiStateTable1Tasks.containsKey(taskName1));
+    assertTrue(multiStateTable1Tasks.containsKey(taskName2));
+    assertFalse(multiStateTable1Tasks.containsKey(taskName3));
+  }
+
+  @Test
+  public void testGetTaskCountsWithTableFilterEdgeCases() {
+    String taskType = "TestTask";
+    String taskName1 = "Task_TestTask_12345";
+
+    TaskDriver taskDriver = mock(TaskDriver.class);
+    PinotHelixResourceManager pinotHelixResourceManager = 
mock(PinotHelixResourceManager.class);
+    PinotHelixTaskResourceManager mgr = new 
PinotHelixTaskResourceManager(pinotHelixResourceManager, taskDriver);
+
+    // Mock getTasks to return our test task name
+    Set<String> tasks = new HashSet<>();
+    tasks.add(taskName1);
+    PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr);
+    when(spyMgr.getTasks(taskType)).thenReturn(tasks);
+
+    // Mock JobConfig and JobContext
+    mockTaskJobConfigAndContext(taskDriver, taskName1, 
TaskPartitionState.RUNNING);
+
+    // Test with task that has null table name
+    List<PinotTaskConfig> subtaskConfigsNullTable = new ArrayList<>();
+    PinotTaskConfig taskConfigNullTable = mock(PinotTaskConfig.class);
+    when(taskConfigNullTable.getTableName()).thenReturn(null);
+    subtaskConfigsNullTable.add(taskConfigNullTable);
+    
when(spyMgr.getSubtaskConfigs(taskName1)).thenReturn(subtaskConfigsNullTable);
+
+    Map<String, PinotHelixTaskResourceManager.TaskCount> nullTableTasks =
+        spyMgr.getTaskCounts(taskType, null, "anyTable_OFFLINE");
+    assertEquals(nullTableTasks.size(), 0);
+
+    // Test with task that throws exception when getting subtask configs
+    when(spyMgr.getSubtaskConfigs(taskName1)).thenThrow(new 
RuntimeException("Test exception"));
+
+    Map<String, PinotHelixTaskResourceManager.TaskCount> exceptionTasks =
+        spyMgr.getTaskCounts(taskType, null, "anyTable_OFFLINE");
+    assertEquals(exceptionTasks.size(), 0);
+
+    // Reset the mock to avoid exception in subsequent calls
+    Mockito.reset(spyMgr);
+    when(spyMgr.getTasks(taskType)).thenReturn(tasks);
+    mockTaskJobConfigAndContext(taskDriver, taskName1, 
TaskPartitionState.RUNNING);
+
+    // Test with null state and null table (should return all tasks like 
original method)
+    when(spyMgr.getSubtaskConfigs(taskName1)).thenReturn(new ArrayList<>());
+    Map<String, PinotHelixTaskResourceManager.TaskCount> allTasks =
+        spyMgr.getTaskCounts(taskType, null, null);
+    assertEquals(allTasks.size(), 1);
+    assertTrue(allTasks.containsKey(taskName1));
+  }
+
+  /**
+   * Helper method to mock JobConfig and JobContext for a task
+   */
+  private void mockTaskJobConfigAndContext(TaskDriver taskDriver, String 
taskName, TaskPartitionState state) {
+    String helixJobName = 
PinotHelixTaskResourceManager.getHelixJobName(taskName);
+    JobConfig jobConfig = mock(JobConfig.class);
+    when(taskDriver.getJobConfig(helixJobName)).thenReturn(jobConfig);
+    Map<String, TaskConfig> taskConfigMap = new HashMap<>();
+    taskConfigMap.put("taskId0", new TaskConfig("", new HashMap<>()));
+    when(jobConfig.getTaskConfigMap()).thenReturn(taskConfigMap);
+    JobContext jobContext = mock(JobContext.class);
+    when(taskDriver.getJobContext(helixJobName)).thenReturn(jobContext);
+    Map<String, Integer> taskIdPartitionMap = new HashMap<>();
+    taskIdPartitionMap.put("taskId0", 0);
+    when(jobContext.getTaskIdPartitionMap()).thenReturn(taskIdPartitionMap);
+    when(jobContext.getPartitionState(0)).thenReturn(state);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to