Copilot commented on code in PR #17128:
URL: https://github.com/apache/pinot/pull/17128#discussion_r2485515234


##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitterTest.java:
##########
@@ -336,4 +336,141 @@ public void removeOldTaskTypeAddNewTaskType() {
     oneSingleTaskTypeWithTwoTables();
     taskType2WithOneTable();
   }
+
+  /**
+   * Test for previously in-progress tasks that completed between runs:
+   * Tasks that were in-progress in the previous run but completed before the 
current run
+   * should still have their metrics reported in the current run.
+   *
+   * Scenario:
+   * - Run 1: Task "taskCompletedBetweenRuns" is in-progress with 1 error 
subtask
+   * - Run 2: Task "taskCompletedBetweenRuns" has completed and is no longer 
in getTasksInProgress()
+   *
+   * Expected: Metrics for "taskCompletedBetweenRuns" should still be emitted 
in Run 2 by detecting it via
+   * _previousInProgressTasks tracking. The emitter maintains state of tasks 
that were in-progress
+   * in the previous execution cycle and includes completed tasks in the 
current cycle's metrics.
+   */
+  @Test
+  public void testReportsPreviouslyInProgressTasksThatCompletedBetweenRuns() {
+    String taskType = "SegmentGenerationAndPushTask";
+    String taskName = "taskCompletedBetweenRuns";
+    String tableName = "testTable_OFFLINE";
+
+    
Mockito.when(_pinotHelixTaskResourceManager.getTaskTypes()).thenReturn(ImmutableSet.of(taskType));
+
+    // Run 1: Task is in-progress with 1 error subtask
+    Mockito.when(_pinotHelixTaskResourceManager.getTasksInProgress(taskType))
+        .thenReturn(ImmutableSet.of(taskName));
+
+    // Ensure getTasksStartedAfter returns empty for this test (not relevant 
for this scenario)
+    Mockito.when(_pinotHelixTaskResourceManager.getTasksStartedAfter(
+        Mockito.eq(taskType), Mockito.anyLong()))
+        .thenReturn(ImmutableSet.of());
+
+    PinotHelixTaskResourceManager.TaskCount taskCount = new 
PinotHelixTaskResourceManager.TaskCount();
+    taskCount.addTaskState(TaskPartitionState.TASK_ERROR);
+    Mockito.when(_pinotHelixTaskResourceManager.getTableTaskCount(taskName))
+        .thenReturn(Map.of(tableName, taskCount));
+
+    _taskMetricsEmitter.runTask(null);
+
+    // Verify metrics were emitted in Run 1
+    PinotMetricsRegistry metricsRegistry = 
_controllerMetrics.getMetricsRegistry();
+    Assert.assertEquals(((YammerSettableGauge<?>) 
metricsRegistry.allMetrics().get(
+            new YammerMetricName(ControllerMetrics.class,
+                "pinot.controller.numMinionSubtasksError." + taskType))
+        .getMetric()).value(), 1L);

Review Comment:
   This metric retrieval pattern with multiple type casts is repeated 
identically at lines 379-382, 395-398, 438-441, and 471-474. Extract this into 
a test helper method like `getMetricValue(String metricName, String taskType)` 
to improve maintainability and reduce duplication.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java:
##########
@@ -90,20 +106,71 @@ protected final void runTask(Properties 
periodicTaskProperties) {
       TaskCount taskTypeAccumulatedCount = new TaskCount();
       Map<String, TaskCount> tableAccumulatedCount = new HashMap<>();
       try {
-        Set<String> tasksInProgress = 
_helixTaskResourceManager.getTasksInProgress(taskType);
-        final int numRunningTasks = tasksInProgress.size();
-        for (String task : tasksInProgress) {
-          Map<String, TaskCount> tableTaskCount = 
_helixTaskResourceManager.getTableTaskCount(task);
-          tableTaskCount.forEach((tableNameWithType, taskCount) -> {
-            taskTypeAccumulatedCount.accumulate(taskCount);
-            tableAccumulatedCount.compute(tableNameWithType, (name, count) -> {
-              if (count == null) {
-                count = new TaskCount();
-              }
-              count.accumulate(taskCount);
-              return count;
+        // Capture the current execution timestamp for this task type 
collection cycle
+        long currentExecutionTimestamp = System.currentTimeMillis();
+        // For task types encountered for the first time, use current time 
minus one collection cycle
+        // as the initial timestamp to ensure we don't miss tasks that started 
recently
+        long previousExecutionTimestamp = 
_previousExecutionTimestamps.computeIfAbsent(taskType,
+            k -> currentExecutionTimestamp - _taskMetricsEmitterFrequencyMs);
+
+        // Get currently in-progress tasks
+        Set<String> currentInProgressTasks = 
_helixTaskResourceManager.getTasksInProgress(taskType);
+
+        // Get tasks that were in-progress during the previous collection cycle
+        Set<String> previouslyInProgressTasks =
+            _previousInProgressTasks.getOrDefault(taskType, 
Collections.emptySet());
+
+        // Start with currently in-progress tasks
+        Set<String> tasksToReport = new HashSet<>(currentInProgressTasks);
+
+        // Include tasks that were in-progress previously but are no longer 
in-progress
+        // These tasks completed between collection cycles and need their 
final metrics reported
+        for (String taskName : previouslyInProgressTasks) {
+          if (!currentInProgressTasks.contains(taskName)) {
+            LOGGER.debug("Including task {} that completed between collection 
cycles for taskType: {}",
+                taskName, taskType);
+            tasksToReport.add(taskName);
+          }
+        }
+
+        // Include tasks that started and completed between collection cycles
+        Set<String> tasksStartedAfter = 
_helixTaskResourceManager.getTasksStartedAfter(
+            taskType, previousExecutionTimestamp);
+
+        // Filter out tasks we already know about to avoid duplicates
+        Set<String> shortLivedTasks = new HashSet<>();
+        for (String taskName : tasksStartedAfter) {
+          if (!currentInProgressTasks.contains(taskName) && 
!previouslyInProgressTasks.contains(taskName)) {
+            shortLivedTasks.add(taskName);
+          }
+        }
+
+        if (!shortLivedTasks.isEmpty()) {
+          LOGGER.debug("Including {} short-lived tasks that started and 
completed between cycles for taskType: {}",
+              shortLivedTasks.size(), taskType);
+          tasksToReport.addAll(shortLivedTasks);
+        }
+
+        final int numRunningTasks = currentInProgressTasks.size();
+
+        // Process all tasks that need metrics reported
+        for (String task : tasksToReport) {
+          try {
+            Map<String, TaskCount> tableTaskCount = 
_helixTaskResourceManager.getTableTaskCount(task);
+            tableTaskCount.forEach((tableNameWithType, taskCount) -> {
+              taskTypeAccumulatedCount.accumulate(taskCount);
+              tableAccumulatedCount.compute(tableNameWithType, (name, count) 
-> {
+                if (count == null) {
+                  count = new TaskCount();
+                }
+                count.accumulate(taskCount);
+                return count;
+              });
             });
-          });
+          } catch (Exception e) {
+            LOGGER.warn("Failed to get task count for task: {} of type: {} 
(task may have been purged from DAG)",

Review Comment:
   The error message references 'DAG' (Directed Acyclic Graph) which is 
Helix-specific terminology that may not be immediately clear to all developers 
or operators reading logs. Consider clarifying this as 'Helix workflow DAG' or 
adding a brief explanation of what it means for a task to be purged from the 
DAG.
   ```suggestion
               LOGGER.warn("Failed to get task count for task: {} of type: {} 
(task may have been purged from the Helix workflow DAG, i.e., removed from the 
workflow graph)",
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java:
##########
@@ -90,20 +106,71 @@ protected final void runTask(Properties 
periodicTaskProperties) {
       TaskCount taskTypeAccumulatedCount = new TaskCount();
       Map<String, TaskCount> tableAccumulatedCount = new HashMap<>();
       try {
-        Set<String> tasksInProgress = 
_helixTaskResourceManager.getTasksInProgress(taskType);
-        final int numRunningTasks = tasksInProgress.size();
-        for (String task : tasksInProgress) {
-          Map<String, TaskCount> tableTaskCount = 
_helixTaskResourceManager.getTableTaskCount(task);
-          tableTaskCount.forEach((tableNameWithType, taskCount) -> {
-            taskTypeAccumulatedCount.accumulate(taskCount);
-            tableAccumulatedCount.compute(tableNameWithType, (name, count) -> {
-              if (count == null) {
-                count = new TaskCount();
-              }
-              count.accumulate(taskCount);
-              return count;
+        // Capture the current execution timestamp for this task type 
collection cycle
+        long currentExecutionTimestamp = System.currentTimeMillis();
+        // For task types encountered for the first time, use current time 
minus one collection cycle
+        // as the initial timestamp to ensure we don't miss tasks that started 
recently
+        long previousExecutionTimestamp = 
_previousExecutionTimestamps.computeIfAbsent(taskType,
+            k -> currentExecutionTimestamp - _taskMetricsEmitterFrequencyMs);
+
+        // Get currently in-progress tasks
+        Set<String> currentInProgressTasks = 
_helixTaskResourceManager.getTasksInProgress(taskType);
+
+        // Get tasks that were in-progress during the previous collection cycle
+        Set<String> previouslyInProgressTasks =
+            _previousInProgressTasks.getOrDefault(taskType, 
Collections.emptySet());
+
+        // Start with currently in-progress tasks
+        Set<String> tasksToReport = new HashSet<>(currentInProgressTasks);
+
+        // Include tasks that were in-progress previously but are no longer 
in-progress
+        // These tasks completed between collection cycles and need their 
final metrics reported
+        for (String taskName : previouslyInProgressTasks) {
+          if (!currentInProgressTasks.contains(taskName)) {
+            LOGGER.debug("Including task {} that completed between collection 
cycles for taskType: {}",
+                taskName, taskType);
+            tasksToReport.add(taskName);
+          }
+        }
+
+        // Include tasks that started and completed between collection cycles
+        Set<String> tasksStartedAfter = 
_helixTaskResourceManager.getTasksStartedAfter(
+            taskType, previousExecutionTimestamp);
+
+        // Filter out tasks we already know about to avoid duplicates
+        Set<String> shortLivedTasks = new HashSet<>();
+        for (String taskName : tasksStartedAfter) {
+          if (!currentInProgressTasks.contains(taskName) && 
!previouslyInProgressTasks.contains(taskName)) {
+            shortLivedTasks.add(taskName);
+          }
+        }

Review Comment:
   This filtering logic duplicates the membership checks against 
currentInProgressTasks and previouslyInProgressTasks. Consider using Set 
operations (e.g., removeAll) for cleaner and more efficient code: `Set<String> 
shortLivedTasks = new HashSet<>(tasksStartedAfter); 
shortLivedTasks.removeAll(currentInProgressTasks); 
shortLivedTasks.removeAll(previouslyInProgressTasks);`
   ```suggestion
           Set<String> shortLivedTasks = new HashSet<>(tasksStartedAfter);
           shortLivedTasks.removeAll(currentInProgressTasks);
           shortLivedTasks.removeAll(previouslyInProgressTasks);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to