zhtaoxiang commented on code in PR #10083: URL: https://github.com/apache/pinot/pull/10083#discussion_r1065453776
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java: ########## @@ -71,42 +78,105 @@ protected final void runTask(Properties periodicTaskProperties) { _helixTaskResourceManager.getTaskMetadataLastUpdateTimeMs(); taskMetadataLastUpdateTime.forEach((tableNameWithType, taskTypeLastUpdateTime) -> taskTypeLastUpdateTime.forEach((taskType, lastUpdateTimeMs) -> - _controllerMetrics.addOrUpdateGauge( - ControllerGauge.TIME_MS_SINCE_LAST_MINION_TASK_METADATA_UPDATE.getGaugeName() + "." - + tableNameWithType + "." + taskType, () -> System.currentTimeMillis() - lastUpdateTimeMs))); + _controllerMetrics.setOrUpdateTableGauge(tableNameWithType, taskType, + ControllerGauge.TIME_MS_SINCE_LAST_MINION_TASK_METADATA_UPDATE, + () -> System.currentTimeMillis() - lastUpdateTimeMs))); // The call to get task types can take time if there are a lot of tasks. // Potential optimization is to call it every (say) 30m if we detect a barrage of // zk requests. Set<String> taskTypes = _helixTaskResourceManager.getTaskTypes(); for (String taskType : taskTypes) { - TaskCount accumulated = new TaskCount(); + TaskCount taskTypeAccumulatedCount = new TaskCount(); + Map<String, TaskCount> tableAccumulatedCount = new HashMap<>(); try { Set<String> tasksInProgress = _helixTaskResourceManager.getTasksInProgress(taskType); final int numRunningTasks = tasksInProgress.size(); for (String task : tasksInProgress) { - TaskCount taskCount = _helixTaskResourceManager.getTaskCount(task); - accumulated.accumulate(taskCount); + Map<String, TaskCount> tableTaskCount = _helixTaskResourceManager.getTableTaskCount(task); + tableTaskCount.forEach((tableNameWithType, taskCount) -> { + taskTypeAccumulatedCount.accumulate(taskCount); + tableAccumulatedCount.compute(tableNameWithType, (name, count) -> { + if (count == null) { + count = new TaskCount(); + } + count.accumulate(taskCount); + return count; + }); + }); } // Emit metrics for taskType. _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.NUM_MINION_TASKS_IN_PROGRESS, taskType, numRunningTasks); _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.NUM_MINION_SUBTASKS_RUNNING, taskType, - accumulated.getRunning()); + taskTypeAccumulatedCount.getRunning()); _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.NUM_MINION_SUBTASKS_WAITING, taskType, - accumulated.getWaiting()); + taskTypeAccumulatedCount.getWaiting()); _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.NUM_MINION_SUBTASKS_ERROR, taskType, - accumulated.getError()); - int total = accumulated.getTotal(); - int percent = total != 0 ? (accumulated.getWaiting() + accumulated.getRunning()) * 100 / total : 0; + taskTypeAccumulatedCount.getError()); + int total = taskTypeAccumulatedCount.getTotal(); + int percent = total != 0 + ? (taskTypeAccumulatedCount.getWaiting() + taskTypeAccumulatedCount.getRunning()) * 100 / total : 0; _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.PERCENT_MINION_SUBTASKS_IN_QUEUE, taskType, percent); - percent = total != 0 ? accumulated.getError() * 100 / total : 0; + percent = total != 0 ? taskTypeAccumulatedCount.getError() * 100 / total : 0; _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.PERCENT_MINION_SUBTASKS_IN_ERROR, taskType, percent); + + // Emit metrics for table taskType + tableAccumulatedCount.forEach((tableNameWithType, taskCount) -> { + _controllerMetrics.setOrUpdateTableGauge(tableNameWithType, taskType, + ControllerGauge.NUM_MINION_SUBTASKS_RUNNING, () -> (long) taskCount.getRunning()); + _controllerMetrics.setOrUpdateTableGauge(tableNameWithType, taskType, + ControllerGauge.NUM_MINION_SUBTASKS_WAITING, taskCount.getWaiting()); + _controllerMetrics.setOrUpdateTableGauge(tableNameWithType, taskType, + ControllerGauge.NUM_MINION_SUBTASKS_ERROR, taskCount.getError()); + int tableTotal = taskCount.getTotal(); + int tablePercent = tableTotal != 0 ? (taskCount.getWaiting() + taskCount.getRunning()) * 100 / tableTotal : 0; + _controllerMetrics.setOrUpdateTableGauge(tableNameWithType, taskType, + ControllerGauge.PERCENT_MINION_SUBTASKS_IN_QUEUE, tablePercent); + tablePercent = tableTotal != 0 ? taskCount.getError() * 100 / tableTotal : 0; + _controllerMetrics.setOrUpdateTableGauge(tableNameWithType, taskType, + ControllerGauge.PERCENT_MINION_SUBTASKS_IN_ERROR, tablePercent); + }); + + + if (_preReportedTables.containsKey(taskType)) { + Set<String> tableNameWithTypeSet = _preReportedTables.get(taskType); + tableNameWithTypeSet.removeAll(tableAccumulatedCount.keySet()); + removeTableTaskTypeMetrics(tableNameWithTypeSet, taskType); + } + if (!tableAccumulatedCount.isEmpty()) { + // need to make a copy of the set because we may want to chagne the set later + Set<String> tableNameWithTypeSet = new HashSet<>(tableAccumulatedCount.keySet()); + _preReportedTables.put(taskType, tableNameWithTypeSet); + } else { + _preReportedTables.remove(taskType); + } } catch (Exception e) { LOGGER.error("Caught exception while getting metrics for task type {}", taskType, e); } } + // clean up metrics for task types that have already been removed + _preReportedTaskTypes.removeAll(taskTypes); + for (String taskType : _preReportedTaskTypes) { Review Comment: The previous code does not have the logic to clean up metrics, although it does not report table level metrics (only task type level metrics) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org