This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new a6196ba Adding metrics for minion tasks status (#6549) a6196ba is described below commit a6196baa5c8dc09a28c9284645a5a96e69828452 Author: Xiang Fu <fx19880...@gmail.com> AuthorDate: Tue Feb 9 22:19:02 2021 -0800 Adding metrics for minion tasks status (#6549) * Adding metrics for minion tasks status * Address comments --- .../etc/jmx_prometheus_javaagent/configs/pinot.yml | 5 +++ .../pinot/common/metrics/ControllerGauge.java | 5 ++- .../helix/core/minion/PinotTaskManager.java | 42 +++++++++++++++++- .../helix/core/minion/TaskTypeMetricsUpdater.java | 50 ++++++++++++++++++++++ .../tests/SimpleMinionClusterIntegrationTest.java | 49 ++++++++++++++++++--- 5 files changed, 141 insertions(+), 10 deletions(-) diff --git a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml index 0221fde..40cd394 100644 --- a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml +++ b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml @@ -67,6 +67,11 @@ rules: labels: table: "$1" taskType: "$2" +- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", name=\"pinot.controller.taskStatus.(\\w+)\\.(\\w+)\"><>(\\w+)" + name: "pinot_controller_taskStatus_$3" + labels: + taskType: "$1" + status: "$2" # Pinot Broker - pattern: "\"org.apache.pinot.common.metrics\"<type=\"BrokerMetrics\", name=\"pinot.broker.(\\w+).authorization\"><>(\\w+)" name: "pinot_broker_authorization_$2" diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java index 3f69f45..c4990a5 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java @@ -67,7 +67,10 @@ public enum ControllerGauge implements AbstractMetrics.Gauge { TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT("TableStorageEstMissingSegmentPercent", false), // Number of scheduled Cron jobs - CRON_SCHEDULER_JOB_SCHEDULED("cronSchedulerJobScheduled", false); + CRON_SCHEDULER_JOB_SCHEDULED("cronSchedulerJobScheduled", false), + + // Number of Tasks Status + TASK_STATUS("taskStatus", false); private final String gaugeName; private final String unit; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java index 0a899c5..4ab65e4 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nullable; +import org.apache.helix.task.TaskState; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; @@ -72,12 +73,15 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { private static final String TABLE_CONFIG_PARENT_PATH = "/CONFIGS/TABLE"; private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/"; + private static final String TASK_QUEUE_PATH_PATTERN = "/TaskRebalancer/TaskQueue_%s/Context"; private final PinotHelixTaskResourceManager _helixTaskResourceManager; private final ClusterInfoAccessor _clusterInfoAccessor; private final TaskGeneratorRegistry _taskGeneratorRegistry; private final Map<String, Map<String, String>> _tableTaskTypeToCronExpressionMap = new ConcurrentHashMap<>(); private final Map<String, TableTaskSchedulerUpdater> _tableTaskSchedulerUpdaterMap = new ConcurrentHashMap<>(); + private final Map<String, TaskTypeMetricsUpdater> _taskTypeMetricsUpdaterMap = new ConcurrentHashMap<>(); + private final Map<TaskState, Integer> _taskStateToCountMap = new ConcurrentHashMap<>(); private Scheduler _scheduledExecutorService = null; @@ -125,6 +129,10 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { return TABLE_CONFIG_PATH_PREFIX + tableWithType; } + private String getPropertyStorePathForTaskQueue(String taskType) { + return String.format(TASK_QUEUE_PATH_PATTERN, taskType); + } + public synchronized void cleanUpCronTaskSchedulerForTable(String tableWithType) { LOGGER.info("Cleaning up task in scheduler for table {}", tableWithType); TableTaskSchedulerUpdater tableTaskSchedulerUpdater = _tableTaskSchedulerUpdaterMap.get(tableWithType); @@ -279,8 +287,8 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { LOGGER.error("Failed to check job existence for job key - table: {}, task: {} ", tableWithType, taskType, e); } if (!exists) { - LOGGER - .info("Trying to schedule a job with cron expression: {} for table {}, task type: {}", cronExprStr, tableWithType, taskType); + LOGGER.info("Trying to schedule a job with cron expression: {} for table {}, task type: {}", cronExprStr, + tableWithType, taskType); Trigger trigger = TriggerBuilder.newTrigger().withIdentity(TriggerKey.triggerKey(tableWithType, taskType)) .withSchedule(CronScheduleBuilder.cronSchedule(cronExprStr)).build(); JobDataMap jobDataMap = new JobDataMap(); @@ -371,6 +379,7 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType); if (taskGenerator != null) { _helixTaskResourceManager.ensureTaskQueueExists(taskType); + addTaskTypeMetricsUpdaterIfNeeded(taskType); tasksScheduled.put(taskType, scheduleTask(taskGenerator, enabledTableConfigs, isLeader)); } else { List<String> enabledTables = new ArrayList<>(enabledTableConfigs.size()); @@ -439,6 +448,7 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { } _helixTaskResourceManager.ensureTaskQueueExists(taskType); + addTaskTypeMetricsUpdaterIfNeeded(taskType); return scheduleTask(taskGenerator, enabledTableConfigs, false); } @@ -459,6 +469,7 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { "Table: %s does not have task type: %s enabled", tableNameWithType, taskType); _helixTaskResourceManager.ensureTaskQueueExists(taskType); + addTaskTypeMetricsUpdaterIfNeeded(taskType); return scheduleTask(taskGenerator, Collections.singletonList(tableConfig), false); } @@ -478,4 +489,31 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> { public Scheduler getScheduler() { return _scheduledExecutorService; } + + public synchronized void reportMetrics(String taskType) { + Map<String, TaskState> taskStates = _helixTaskResourceManager.getTaskStates(taskType); + Map<TaskState, Integer> taskStateToCountMap = new HashMap<>(); + for (TaskState taskState : taskStates.values()) { + taskStateToCountMap.merge(taskState, 1, Integer::sum); + } + // Reset all the status to 0 + for (TaskState taskState : _taskStateToCountMap.keySet()) { + _taskStateToCountMap.put(taskState, 0); + } + _taskStateToCountMap.putAll(taskStateToCountMap); + for (Map.Entry<TaskState, Integer> taskStateEntry : _taskStateToCountMap.entrySet()) { + _controllerMetrics + .setValueOfTableGauge(String.format("%s.%s", taskType, taskStateEntry.getKey()), ControllerGauge.TASK_STATUS, + taskStateEntry.getValue()); + } + } + + private synchronized void addTaskTypeMetricsUpdaterIfNeeded(String taskType) { + if (!_taskTypeMetricsUpdaterMap.containsKey(taskType)) { + TaskTypeMetricsUpdater taskTypeMetricsUpdater = new TaskTypeMetricsUpdater(taskType, this); + _pinotHelixResourceManager.getPropertyStore() + .subscribeDataChanges(getPropertyStorePathForTaskQueue(taskType), taskTypeMetricsUpdater); + _taskTypeMetricsUpdaterMap.put(taskType, taskTypeMetricsUpdater); + } + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskTypeMetricsUpdater.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskTypeMetricsUpdater.java new file mode 100644 index 0000000..f4ebc8e --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskTypeMetricsUpdater.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.minion; + +import org.I0Itec.zkclient.IZkDataListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class TaskTypeMetricsUpdater implements IZkDataListener { + private static final Logger LOGGER = LoggerFactory.getLogger(TaskTypeMetricsUpdater.class); + private final String _taskType; + private final PinotTaskManager _pinotTaskManager; + + public TaskTypeMetricsUpdater(String taskType, PinotTaskManager pinotTaskManager) { + _taskType = taskType; + _pinotTaskManager = pinotTaskManager; + } + + @Override + public void handleDataChange(String dataPath, Object data) + throws Exception { + try { + _pinotTaskManager.reportMetrics(_taskType); + } catch (Exception e) { + LOGGER.error("Failed to update metrics for task type {}", _taskType, e); + throw e; + } + } + + @Override + public void handleDataDeleted(String dataPath) { + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java index b2c2674..120ed89 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; import org.apache.helix.task.TaskState; +import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor; import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; @@ -45,11 +46,16 @@ import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.util.TestUtils; +import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import static org.testng.Assert.*; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; /** @@ -62,6 +68,7 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { private static final String TABLE_NAME_2 = "testTable2"; private static final String TABLE_NAME_3 = "testTable3"; private static final long STATE_TRANSITION_TIMEOUT_MS = 60_000L; // 1 minute + private static final int NUM_TASKS = 2; private static final AtomicBoolean HOLD = new AtomicBoolean(); private static final AtomicBoolean TASK_START_NOTIFIED = new AtomicBoolean(); @@ -120,7 +127,7 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { // Wait at most 60 seconds for all tasks IN_PROGRESS TestUtils.waitForCondition(input -> { Collection<TaskState> taskStates = _helixTaskResourceManager.getTaskStates(TASK_TYPE).values(); - assertEquals(taskStates.size(), 2); + assertEquals(taskStates.size(), NUM_TASKS); for (TaskState taskState : taskStates) { if (taskState != TaskState.IN_PROGRESS) { return false; @@ -133,13 +140,20 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { return true; }, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks IN_PROGRESS"); + Assert.assertEquals(_controllerStarter.getControllerMetrics() + .getValueOfTableGauge(TASK_TYPE + "." + TaskState.IN_PROGRESS, ControllerGauge.TASK_STATUS), NUM_TASKS); + Assert.assertEquals(_controllerStarter.getControllerMetrics() + .getValueOfTableGauge(TASK_TYPE + "." + TaskState.COMPLETED, ControllerGauge.TASK_STATUS), 0); + Assert.assertEquals(_controllerStarter.getControllerMetrics() + .getValueOfTableGauge(TASK_TYPE + "." + TaskState.STOPPED, ControllerGauge.TASK_STATUS), 0); + // Stop the task queue _helixTaskResourceManager.stopTaskQueue(TASK_TYPE); // Wait at most 60 seconds for all tasks STOPPED TestUtils.waitForCondition(input -> { Collection<TaskState> taskStates = _helixTaskResourceManager.getTaskStates(TASK_TYPE).values(); - assertEquals(taskStates.size(), 2); + assertEquals(taskStates.size(), NUM_TASKS); for (TaskState taskState : taskStates) { if (taskState != TaskState.STOPPED) { return false; @@ -152,6 +166,13 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { return true; }, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks STOPPED"); + Assert.assertEquals(_controllerStarter.getControllerMetrics() + .getValueOfTableGauge(TASK_TYPE + "." + TaskState.IN_PROGRESS, ControllerGauge.TASK_STATUS), 0); + Assert.assertEquals(_controllerStarter.getControllerMetrics() + .getValueOfTableGauge(TASK_TYPE + "." + TaskState.COMPLETED, ControllerGauge.TASK_STATUS), 0); + Assert.assertEquals(_controllerStarter.getControllerMetrics() + .getValueOfTableGauge(TASK_TYPE + "." + TaskState.STOPPED, ControllerGauge.TASK_STATUS), NUM_TASKS); + // Resume the task queue, and let the task complete _helixTaskResourceManager.resumeTaskQueue(TASK_TYPE); HOLD.set(false); @@ -159,7 +180,7 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { // Wait at most 60 seconds for all tasks COMPLETED TestUtils.waitForCondition(input -> { Collection<TaskState> taskStates = _helixTaskResourceManager.getTaskStates(TASK_TYPE).values(); - assertEquals(taskStates.size(), 2); + assertEquals(taskStates.size(), NUM_TASKS); for (TaskState taskState : taskStates) { if (taskState != TaskState.COMPLETED) { return false; @@ -172,12 +193,26 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { return true; }, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks COMPLETED"); + Assert.assertEquals(_controllerStarter.getControllerMetrics() + .getValueOfTableGauge(TASK_TYPE + "." + TaskState.IN_PROGRESS, ControllerGauge.TASK_STATUS), 0); + Assert.assertEquals(_controllerStarter.getControllerMetrics() + .getValueOfTableGauge(TASK_TYPE + "." + TaskState.COMPLETED, ControllerGauge.TASK_STATUS), NUM_TASKS); + Assert.assertEquals(_controllerStarter.getControllerMetrics() + .getValueOfTableGauge(TASK_TYPE + "." + TaskState.STOPPED, ControllerGauge.TASK_STATUS), 0); + // Delete the task queue _helixTaskResourceManager.deleteTaskQueue(TASK_TYPE, false); // Wait at most 60 seconds for task queue to be deleted TestUtils.waitForCondition(input -> !_helixTaskResourceManager.getTaskTypes().contains(TASK_TYPE), STATE_TRANSITION_TIMEOUT_MS, "Failed to delete the task queue"); + + Assert.assertEquals(_controllerStarter.getControllerMetrics() + .getValueOfTableGauge(TASK_TYPE + "." + TaskState.IN_PROGRESS, ControllerGauge.TASK_STATUS), 0); + Assert.assertEquals(_controllerStarter.getControllerMetrics() + .getValueOfTableGauge(TASK_TYPE + "." + TaskState.COMPLETED, ControllerGauge.TASK_STATUS), NUM_TASKS); + Assert.assertEquals(_controllerStarter.getControllerMetrics() + .getValueOfTableGauge(TASK_TYPE + "." + TaskState.STOPPED, ControllerGauge.TASK_STATUS), 0); } @AfterClass @@ -209,10 +244,10 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { @Override public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { - assertEquals(tableConfigs.size(), 2); + assertEquals(tableConfigs.size(), NUM_TASKS); // Generate at most 2 tasks - if (_clusterInfoAccessor.getTaskStates(TASK_TYPE).size() >= 2) { + if (_clusterInfoAccessor.getTaskStates(TASK_TYPE).size() >= NUM_TASKS) { return Collections.emptyList(); } @@ -249,7 +284,7 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { assertEquals(pinotTaskConfig.getTaskType(), TASK_TYPE); Map<String, String> configs = pinotTaskConfig.getConfigs(); - assertEquals(configs.size(), 2); + assertEquals(configs.size(), NUM_TASKS); String offlineTableName = configs.get("tableName"); assertEquals(TableNameBuilder.getTableTypeFromTableName(offlineTableName), TableType.OFFLINE); String rawTableName = TableNameBuilder.extractRawTableName(offlineTableName); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org