This is an automated email from the ASF dual-hosted git repository. mcvsubbu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new c950926 Added TaskMetricsEmitted periodic controller job (#7091) c950926 is described below commit c9509261672d0f9c2b35fa834eb8fe998ed23161 Author: Subbu Subramaniam <mcvsu...@users.noreply.github.com> AuthorDate: Mon Jun 28 14:01:14 2021 -0700 Added TaskMetricsEmitted periodic controller job (#7091) * Added TaskMetricsEmitted periodic controller job This job runs every 5 mins by default and emits metrics about Minion tasks scheduled in Pinot. For now, the following metrics are emitted for each task type: - Number of running tasks - Number of running sub-tasks - Number of waiting sub-taks (unassigned to a minion as yet) - Number of error sub-tasks (completed with an error/exception) - Percent of sub-tasks in error - Percent of sub-tasks in waiting or running states. * Addressed review comments and added unit tests * Addressed review comments and resolved conflicts * Cleaned up with spotless:apply * Handle another null case * spotless apply * Fix compile error after merge * Added comments on the minion integration tests --- .../pinot/common/metrics/ControllerGauge.java | 6 ++ .../apache/pinot/controller/ControllerConf.java | 12 +++ .../core/minion/PinotHelixTaskResourceManager.java | 118 ++++++++++++++++++++- .../helix/core/minion/PinotTaskManager.java | 3 +- .../helix/core/minion/TaskMetricsEmitter.java | 90 ++++++++++++++++ .../tests/SimpleMinionClusterIntegrationTest.java | 36 +++++-- 6 files changed, 253 insertions(+), 12 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java index 1ebeac1..2b91ba0 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java @@ -45,6 +45,12 @@ public enum ControllerGauge implements AbstractMetrics.Gauge { OFFLINE_TABLE_COUNT("TableCount", true), DISABLED_TABLE_COUNT("TableCount", true), PERIODIC_TASK_NUM_TABLES_PROCESSED("PeriodicTaskNumTablesProcessed", true), + NUM_MINION_TASKS_IN_PROGRESS("NumMinionTasksInProgress", true), + NUM_MINION_SUBTASKS_WAITING("NumMinionSubtasksWaiting", true), + NUM_MINION_SUBTASKS_RUNNING("NumMinionSubtasksRunning", true), + NUM_MINION_SUBTASKS_ERROR("NumMinionSubtasksError", true), + PERCENT_MINION_SUBTASKS_IN_QUEUE("PercentMinionSubtasksInQueue", true), + PERCENT_MINION_SUBTASKS_IN_ERROR("PercentMinionSubtasksInError", true), // Pinot controller leader PINOT_CONTROLLER_LEADER("PinotControllerLeader", true), diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java index 80bd173..7db3a27 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java @@ -98,6 +98,7 @@ public class ControllerConf extends PinotConfiguration { "controller.minion.instances.cleanup.task.frequencyInSeconds"; public static final String MINION_INSTANCES_CLEANUP_TASK_INITIAL_DELAY_SECONDS = "controller.minion.instances.cleanup.task.initialDelaySeconds"; + public static final String TASK_METRICS_EMITTER_FREQUENCY_IN_SECONDS = "controller.minion.task.metrics.emitter.frequencyInSeconds"; public static final String PINOT_TASK_MANAGER_SCHEDULER_ENABLED = "controller.task.scheduler.enabled"; @Deprecated @@ -140,6 +141,7 @@ public class ControllerConf extends PinotConfiguration { private static final int DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour. private static final int DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour. private static final int DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS = 5 * 60; // 5 minutes + private static final int DEFAULT_TASK_METRICS_EMITTER_FREQUENCY_IN_SECONDS = 5 * 60; // 5 minutes private static final int DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS = 10 * 60; // 10 minutes private static final int DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS = -1; // Disabled private static final int DEFAULT_MINION_INSTANCES_CLEANUP_TASK_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour. @@ -490,6 +492,16 @@ public class ControllerConf extends PinotConfiguration { Integer.toString(statusCheckerFrequencyInSeconds)); } + public int getTaskMetricsEmitterFrequencyInSeconds() { + return getProperty(ControllerPeriodicTasksConf.TASK_METRICS_EMITTER_FREQUENCY_IN_SECONDS, + ControllerPeriodicTasksConf.DEFAULT_TASK_METRICS_EMITTER_FREQUENCY_IN_SECONDS); + } + + public void setTaskMetricsEmitterFrequencyInSeconds(int taskMetricsEmitterFrequencyInSeconds) { + setProperty(ControllerPeriodicTasksConf.TASK_METRICS_EMITTER_FREQUENCY_IN_SECONDS, + Integer.toString(taskMetricsEmitterFrequencyInSeconds)); + } + public int getStatusCheckerWaitForPushTimeInSeconds() { return getProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS, ControllerPeriodicTasksConf.DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java index ee08384..265d9f3 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java @@ -29,11 +29,14 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.helix.task.JobConfig; +import org.apache.helix.task.JobContext; import org.apache.helix.task.JobQueue; import org.apache.helix.task.TaskConfig; import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.TaskPartitionState; import org.apache.helix.task.TaskState; import org.apache.helix.task.WorkflowConfig; +import org.apache.helix.task.WorkflowContext; import org.apache.pinot.core.minion.PinotTaskConfig; import org.apache.pinot.spi.utils.CommonConstants.Helix; import org.slf4j.Logger; @@ -42,6 +45,7 @@ import org.slf4j.LoggerFactory; /** * The class <code>PinotHelixTaskResourceManager</code> manages all the task resources in Pinot cluster. + * In case you are wondering why methods that access taskDriver are synchronized, see comment in PR #1437 */ public class PinotHelixTaskResourceManager { private static final Logger LOGGER = LoggerFactory.getLogger(PinotHelixTaskResourceManager.class); @@ -60,7 +64,8 @@ public class PinotHelixTaskResourceManager { /** * Get all task types. - * + * @note: It reads all resource config back and check which are workflows and which are jobs, so it can take some time + * if there are a lot of tasks. * @return Set of all task types */ public synchronized Set<String> getTaskTypes() { @@ -226,7 +231,6 @@ public class PinotHelixTaskResourceManager { return parentTaskName; } - /** * Get all tasks for the given task type. * @@ -249,8 +253,13 @@ public class PinotHelixTaskResourceManager { * @return Map from task name to task state */ public synchronized Map<String, TaskState> getTaskStates(String taskType) { - Map<String, TaskState> helixJobStates = - _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)).getJobStates(); + Map<String, TaskState> helixJobStates = new HashMap<>(); + WorkflowContext workflowContext = _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)); + + if (workflowContext == null) { + return helixJobStates; + } + helixJobStates = workflowContext.getJobStates(); Map<String, TaskState> taskStates = new HashMap<>(helixJobStates.size()); for (Map.Entry<String, TaskState> entry : helixJobStates.entrySet()) { taskStates.put(getPinotTaskName(entry.getKey()), entry.getValue()); @@ -259,6 +268,58 @@ public class PinotHelixTaskResourceManager { } /** + * This method returns a count of sub-tasks in various states, given the top-level task name. + * @param parentTaskName (e.g. "Task_TestTask_1624403781879") + * @return TaskCount object + */ + public synchronized TaskCount getTaskCount(String parentTaskName) { + TaskCount taskCount = new TaskCount(); + JobContext jobContext = _taskDriver.getJobContext(getHelixJobName(parentTaskName)); + + if (jobContext == null) { + return taskCount; + } + Set<Integer> partitionSet = jobContext.getPartitionSet(); + taskCount.addToTotal(partitionSet.size()); + for (int partition : partitionSet) { + TaskPartitionState state = jobContext.getPartitionState(partition); + // Helix returns state as null if the task is not enqueued anywhere yet + if (state == null) { + // task is not yet assigned to a participant + taskCount.addToWaiting(1); + } else if (state.equals(TaskPartitionState.INIT) || state.equals(TaskPartitionState.RUNNING)) { + taskCount.addToRunning(1); + } else if (state.equals(TaskPartitionState.TASK_ERROR)) { + taskCount.addToError(1); + } + } + return taskCount; + } + + /** + * Returns a set of Task names (in the form "Task_TestTask_1624403781879") that are in progress or not started yet. + * + * @param taskType + * @return Set of task names + */ + public synchronized Set<String> getTasksInProgress(String taskType) { + Set<String> tasksInProgress = new HashSet<>(); + WorkflowContext workflowContext = _taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)); + if (workflowContext == null) { + return tasksInProgress; + } + + Map<String, TaskState> helixJobStates = workflowContext.getJobStates(); + + for (Map.Entry<String, TaskState> entry : helixJobStates.entrySet()) { + if (entry.getValue().equals(TaskState.NOT_STARTED) || entry.getValue().equals(TaskState.IN_PROGRESS)) { + tasksInProgress.add(getPinotTaskName(entry.getKey())); + } + } + return tasksInProgress; + } + + /** * Get the task state for the given task name. * * @param taskName Task name @@ -330,4 +391,53 @@ public class PinotHelixTaskResourceManager { private static String getTaskType(String name) { return name.split(TASK_NAME_SEPARATOR)[1]; } + + public static class TaskCount { + private int _waiting; // Number of tasks waiting to be scheduled on minions + private int _error; // Number of tasks in error + private int _running; // Number of tasks currently running in minions + private int _total; // Total number of tasks in the batch + + public TaskCount() { + } + + public void addToWaiting(int waiting) { + _waiting += waiting; + } + + public void addToRunning(int running) { + _running += running; + } + + public void addToTotal(int total) { + _total += total; + } + + public void addToError(int error) { + _error += error; + } + + public int getWaiting() { + return _waiting; + } + + public int getRunning() { + return _running; + } + + public int getTotal() { + return _total; + } + + public int getError() { + return _error; + } + + public void accumulate(TaskCount other) { + addToWaiting(other.getWaiting()); + addToRunning(other.getRunning()); + addToError(other.getError()); + addToTotal(other.getTotal()); + } + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java index f7a673a..6ae1449 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java @@ -401,8 +401,7 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { @Nullable private String scheduleTask(PinotTaskGenerator taskGenerator, List<TableConfig> enabledTableConfigs, boolean isLeader) { - LOGGER.info("Trying to schedule task type: {}, with table config: {}, isLeader: {}", taskGenerator.getTaskType(), - enabledTableConfigs, isLeader); + LOGGER.info("Trying to schedule task type: {}, isLeader: {}", taskGenerator.getTaskType(), isLeader); List<PinotTaskConfig> pinotTaskConfigs = taskGenerator.generateTasks(enabledTableConfigs); if (!isLeader) { taskGenerator.nonLeaderCleanUp(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java new file mode 100644 index 0000000..5874d9e --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.minion; + +import java.util.Set; +import org.apache.pinot.common.metrics.ControllerGauge; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.LeadControllerManager; +import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager.TaskCount; +import org.apache.pinot.core.periodictask.BasePeriodicTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This class emits task metrics for each type of minion task that is set up in + * a Pinot cluster. It is intended to be scheduled with a fairly high frequency, + * of the order of minutes. + * See ControllerConf class for the default value. + */ +public class TaskMetricsEmitter extends BasePeriodicTask { + private static final Logger LOGGER = LoggerFactory.getLogger(TaskMetricsEmitter.class); + private final static String TASK_NAME = "TaskMetricsEmitter"; + + private final PinotHelixTaskResourceManager _helixTaskResourceManager; + private final ControllerMetrics _controllerMetrics; + private final LeadControllerManager _leadControllerManager; + + public TaskMetricsEmitter(PinotHelixTaskResourceManager helixTaskResourceManager, + LeadControllerManager leadControllerManager, ControllerConf controllerConf, ControllerMetrics controllerMetrics) { + super(TASK_NAME, controllerConf.getTaskMetricsEmitterFrequencyInSeconds(), + controllerConf.getPeriodicTaskInitialDelayInSeconds()); + _helixTaskResourceManager = helixTaskResourceManager; + _controllerMetrics = controllerMetrics; + _leadControllerManager = leadControllerManager; + } + + @Override + protected final void runTask() { + // Make it so that only one controller returns the metric for all the tasks. + if (!_leadControllerManager.isLeaderForTable(TASK_NAME)) { + return; + } + + // The call to get task types can take time if there are a lot of tasks. + // Potential optimization is to call it every (say) 30m if we detect a barrage of + // zk requests. + Set<String> taskTypes = _helixTaskResourceManager.getTaskTypes(); + for (String taskType : taskTypes) { + TaskCount accumulated = new TaskCount(); + try { + Set<String> tasksInProgress = _helixTaskResourceManager.getTasksInProgress(taskType); + final int numRunningTasks = tasksInProgress.size(); + for (String task : tasksInProgress) { + TaskCount taskCount = _helixTaskResourceManager.getTaskCount(task); + accumulated.accumulate(taskCount); + } + // Emit metrics for taskType. + _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.NUM_MINION_TASKS_IN_PROGRESS, taskType, numRunningTasks); + _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.NUM_MINION_SUBTASKS_RUNNING, taskType, accumulated.getRunning()); + _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.NUM_MINION_SUBTASKS_WAITING, taskType, accumulated.getWaiting()); + _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.NUM_MINION_SUBTASKS_ERROR, taskType, accumulated.getError()); + int total = accumulated.getTotal(); + int percent = total != 0 ? (accumulated.getWaiting() + accumulated.getRunning()) * 100 / total : 0; + _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.PERCENT_MINION_SUBTASKS_IN_QUEUE, taskType, percent); + percent = total != 0 ? accumulated.getError() * 100 / total : 0; + _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.PERCENT_MINION_SUBTASKS_IN_ERROR, taskType, percent); + } catch (Exception e) { + LOGGER.error("Caught exception while getting metrics for task type {}", taskType, e); + } + } + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java index 17e984c..930a41e 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java @@ -85,20 +85,44 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { startMinion(); } + private void verifyTaskCount(String task, int errors, int waiting, int running, int total) { + PinotHelixTaskResourceManager.TaskCount taskCount = _helixTaskResourceManager.getTaskCount(task); + assertEquals(taskCount.getError(), errors); + assertEquals(taskCount.getWaiting(), waiting); + assertEquals(taskCount.getRunning(), running); + assertEquals(taskCount.getTotal(), total); + } + @Test public void testStopResumeDeleteTaskQueue() { // Hold the task HOLD.set(true); + // No tasks before we start. + assertEquals(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).size(),0); + verifyTaskCount("Task_" + TASK_TYPE + "_1624403781879", 0, 0, 0, 0); // Should create the task queues and generate a task - assertNotNull(_taskManager.scheduleTasks().get(TASK_TYPE)); + String task1 = _taskManager.scheduleTasks().get(TASK_TYPE); + assertNotNull(task1); assertTrue(_helixTaskResourceManager.getTaskQueues() .contains(PinotHelixTaskResourceManager.getHelixJobQueueName(TASK_TYPE))); - - // Should generate one more task - assertNotNull(_taskManager.scheduleTask(TASK_TYPE)); - - // Should not generate more tasks + assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task1)); + + // Since we have two tables, two sub-tasks are generated -- one for each table. + // The default concurrent sub-tasks per minion instance is 1, and we have one minion + // instance spun up. So, one sub-tasks gets scheduled in a minion, and the other one + // waits. + verifyTaskCount(task1, 0, 1, 1, 2); + // Should generate one more task, with two sub-tasks. Both of these sub-tasks will wait + // since we have one minion instance that is still running one of the sub-tasks. + String task2 = _taskManager.scheduleTask(TASK_TYPE); + assertNotNull(task2); + assertTrue(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).contains(task2)); + verifyTaskCount(task2, 0, 2, 0, 2); + + // Should not generate more tasks since SimpleMinionClusterIntegrationTests.NUM_TASKS is 2. + // Our test task generator does not generate if there are already this many sub-tasks in the + // running+waiting count already. assertNull(_taskManager.scheduleTasks().get(TASK_TYPE)); assertNull(_taskManager.scheduleTask(TASK_TYPE)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org