zhtaoxiang commented on code in PR #10083:
URL: https://github.com/apache/pinot/pull/10083#discussion_r1065453776


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java:
##########
@@ -71,42 +78,105 @@ protected final void runTask(Properties 
periodicTaskProperties) {
         _helixTaskResourceManager.getTaskMetadataLastUpdateTimeMs();
     taskMetadataLastUpdateTime.forEach((tableNameWithType, 
taskTypeLastUpdateTime) ->
         taskTypeLastUpdateTime.forEach((taskType, lastUpdateTimeMs) ->
-            _controllerMetrics.addOrUpdateGauge(
-                
ControllerGauge.TIME_MS_SINCE_LAST_MINION_TASK_METADATA_UPDATE.getGaugeName() + 
"."
-                    + tableNameWithType + "." + taskType, () -> 
System.currentTimeMillis() - lastUpdateTimeMs)));
+            _controllerMetrics.setOrUpdateTableGauge(tableNameWithType, 
taskType,
+                ControllerGauge.TIME_MS_SINCE_LAST_MINION_TASK_METADATA_UPDATE,
+                () -> System.currentTimeMillis() - lastUpdateTimeMs)));
 
     // The call to get task types can take time if there are a lot of tasks.
     // Potential optimization is to call it every (say) 30m if we detect a 
barrage of
     // zk requests.
     Set<String> taskTypes = _helixTaskResourceManager.getTaskTypes();
     for (String taskType : taskTypes) {
-      TaskCount accumulated = new TaskCount();
+      TaskCount taskTypeAccumulatedCount = new TaskCount();
+      Map<String, TaskCount> tableAccumulatedCount = new HashMap<>();
       try {
         Set<String> tasksInProgress = 
_helixTaskResourceManager.getTasksInProgress(taskType);
         final int numRunningTasks = tasksInProgress.size();
         for (String task : tasksInProgress) {
-          TaskCount taskCount = _helixTaskResourceManager.getTaskCount(task);
-          accumulated.accumulate(taskCount);
+          Map<String, TaskCount> tableTaskCount = 
_helixTaskResourceManager.getTableTaskCount(task);
+          tableTaskCount.forEach((tableNameWithType, taskCount) -> {
+            taskTypeAccumulatedCount.accumulate(taskCount);
+            tableAccumulatedCount.compute(tableNameWithType, (name, count) -> {
+              if (count == null) {
+                count = new TaskCount();
+              }
+              count.accumulate(taskCount);
+              return count;
+            });
+          });
         }
         // Emit metrics for taskType.
         
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.NUM_MINION_TASKS_IN_PROGRESS,
 taskType,
             numRunningTasks);
         
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.NUM_MINION_SUBTASKS_RUNNING,
 taskType,
-            accumulated.getRunning());
+            taskTypeAccumulatedCount.getRunning());
         
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.NUM_MINION_SUBTASKS_WAITING,
 taskType,
-            accumulated.getWaiting());
+            taskTypeAccumulatedCount.getWaiting());
         
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.NUM_MINION_SUBTASKS_ERROR,
 taskType,
-            accumulated.getError());
-        int total = accumulated.getTotal();
-        int percent = total != 0 ? (accumulated.getWaiting() + 
accumulated.getRunning()) * 100 / total : 0;
+            taskTypeAccumulatedCount.getError());
+        int total = taskTypeAccumulatedCount.getTotal();
+        int percent = total != 0
+            ? (taskTypeAccumulatedCount.getWaiting() + 
taskTypeAccumulatedCount.getRunning()) * 100 / total : 0;
         
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.PERCENT_MINION_SUBTASKS_IN_QUEUE,
 taskType, percent);
-        percent = total != 0 ? accumulated.getError() * 100 / total : 0;
+        percent = total != 0 ? taskTypeAccumulatedCount.getError() * 100 / 
total : 0;
         
_controllerMetrics.setValueOfGlobalGauge(ControllerGauge.PERCENT_MINION_SUBTASKS_IN_ERROR,
 taskType, percent);
+
+        // Emit metrics for table taskType
+        tableAccumulatedCount.forEach((tableNameWithType, taskCount) -> {
+          _controllerMetrics.setOrUpdateTableGauge(tableNameWithType, taskType,
+              ControllerGauge.NUM_MINION_SUBTASKS_RUNNING, () -> (long) 
taskCount.getRunning());
+          _controllerMetrics.setOrUpdateTableGauge(tableNameWithType, taskType,
+              ControllerGauge.NUM_MINION_SUBTASKS_WAITING, 
taskCount.getWaiting());
+          _controllerMetrics.setOrUpdateTableGauge(tableNameWithType, taskType,
+              ControllerGauge.NUM_MINION_SUBTASKS_ERROR, taskCount.getError());
+          int tableTotal = taskCount.getTotal();
+          int tablePercent = tableTotal != 0 ? (taskCount.getWaiting() + 
taskCount.getRunning()) * 100 / tableTotal : 0;
+          _controllerMetrics.setOrUpdateTableGauge(tableNameWithType, taskType,
+              ControllerGauge.PERCENT_MINION_SUBTASKS_IN_QUEUE, tablePercent);
+          tablePercent = tableTotal != 0 ? taskCount.getError() * 100 / 
tableTotal : 0;
+          _controllerMetrics.setOrUpdateTableGauge(tableNameWithType, taskType,
+              ControllerGauge.PERCENT_MINION_SUBTASKS_IN_ERROR, tablePercent);
+        });
+
+
+        if (_preReportedTables.containsKey(taskType)) {
+          Set<String> tableNameWithTypeSet = _preReportedTables.get(taskType);
+          tableNameWithTypeSet.removeAll(tableAccumulatedCount.keySet());
+          removeTableTaskTypeMetrics(tableNameWithTypeSet, taskType);
+        }
+        if (!tableAccumulatedCount.isEmpty()) {
+          // need to make a copy of the set because we may want to chagne the 
set later
+          Set<String> tableNameWithTypeSet = new 
HashSet<>(tableAccumulatedCount.keySet());
+          _preReportedTables.put(taskType, tableNameWithTypeSet);
+        } else {
+          _preReportedTables.remove(taskType);
+        }
       } catch (Exception e) {
         LOGGER.error("Caught exception while getting metrics for task type 
{}", taskType, e);
       }
     }
 
+    // clean up metrics for task types that have already been removed
+    _preReportedTaskTypes.removeAll(taskTypes);
+    for (String taskType : _preReportedTaskTypes) {

Review Comment:
   The previous code does not have the logic to clean up metrics, although it 
does not report table level metrics (only task type level metrics)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to