krishan1390 commented on code in PR #17128:
URL: https://github.com/apache/pinot/pull/17128#discussion_r2486271796


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java:
##########
@@ -90,20 +106,71 @@ protected final void runTask(Properties 
periodicTaskProperties) {
       TaskCount taskTypeAccumulatedCount = new TaskCount();
       Map<String, TaskCount> tableAccumulatedCount = new HashMap<>();
       try {
-        Set<String> tasksInProgress = 
_helixTaskResourceManager.getTasksInProgress(taskType);
-        final int numRunningTasks = tasksInProgress.size();
-        for (String task : tasksInProgress) {
-          Map<String, TaskCount> tableTaskCount = 
_helixTaskResourceManager.getTableTaskCount(task);
-          tableTaskCount.forEach((tableNameWithType, taskCount) -> {
-            taskTypeAccumulatedCount.accumulate(taskCount);
-            tableAccumulatedCount.compute(tableNameWithType, (name, count) -> {
-              if (count == null) {
-                count = new TaskCount();
-              }
-              count.accumulate(taskCount);
-              return count;
+        // Capture the current execution timestamp for this task type 
collection cycle
+        long currentExecutionTimestamp = System.currentTimeMillis();
+        // For task types encountered for the first time, use current time 
minus one collection cycle
+        // as the initial timestamp to ensure we don't miss tasks that started 
recently
+        long previousExecutionTimestamp = 
_previousExecutionTimestamps.computeIfAbsent(taskType,
+            k -> currentExecutionTimestamp - _taskMetricsEmitterFrequencyMs);
+
+        // Get currently in-progress tasks
+        Set<String> currentInProgressTasks = 
_helixTaskResourceManager.getTasksInProgress(taskType);
+
+        // Get tasks that were in-progress during the previous collection cycle
+        Set<String> previouslyInProgressTasks =
+            _previousInProgressTasks.getOrDefault(taskType, 
Collections.emptySet());
+
+        // Start with currently in-progress tasks
+        Set<String> tasksToReport = new HashSet<>(currentInProgressTasks);
+
+        // Include tasks that were in-progress previously but are no longer 
in-progress
+        // These tasks completed between collection cycles and need their 
final metrics reported
+        for (String taskName : previouslyInProgressTasks) {
+          if (!currentInProgressTasks.contains(taskName)) {
+            LOGGER.debug("Including task {} that completed between collection 
cycles for taskType: {}",
+                taskName, taskType);
+            tasksToReport.add(taskName);
+          }
+        }
+
+        // Include tasks that started and completed between collection cycles
+        Set<String> tasksStartedAfter = 
_helixTaskResourceManager.getTasksStartedAfter(
+            taskType, previousExecutionTimestamp);
+
+        // Filter out tasks we already know about to avoid duplicates
+        Set<String> shortLivedTasks = new HashSet<>();

Review Comment:
   not clear why we're creating shortLivedTasks and then adding it to 
tasksToReport
   
   instead of lines 140 to 152, we can just do 
tasksToReport.addAll(tasksStartedAfter);
   
   That will take care of deduplication etc 
   



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java:
##########
@@ -461,6 +461,41 @@ public synchronized Set<String> getTasksInProgress(String 
taskType) {
     }
   }
 
+  /**
+   * Returns tasks that started after the given timestamp.
+   * This is used to detect short-lived tasks that started and completed 
between metric collection cycles.
+   *
+   * @param taskType Task type
+   * @param afterTimestampMs Only include tasks that started after this 
timestamp (in milliseconds)
+   * @return Set of task names that started after the timestamp
+   */
+  public synchronized Set<String> getTasksStartedAfter(String taskType, long 
afterTimestampMs) {

Review Comment:
   rather than fetching tasks twice from helix for the same task type (in 
getTasksInProgress and getTasksStartedAfter), we can just update 
getTasksInProgress() to accept an optional start timestamp. 
   
   the method is anyways used only in metrics emitter so its fine to make the 
change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to