This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a6196ba  Adding metrics for minion tasks status (#6549)
a6196ba is described below

commit a6196baa5c8dc09a28c9284645a5a96e69828452
Author: Xiang Fu <fx19880...@gmail.com>
AuthorDate: Tue Feb 9 22:19:02 2021 -0800

    Adding metrics for minion tasks status (#6549)
    
    * Adding metrics for minion tasks status
    
    * Address comments
---
 .../etc/jmx_prometheus_javaagent/configs/pinot.yml |  5 +++
 .../pinot/common/metrics/ControllerGauge.java      |  5 ++-
 .../helix/core/minion/PinotTaskManager.java        | 42 +++++++++++++++++-
 .../helix/core/minion/TaskTypeMetricsUpdater.java  | 50 ++++++++++++++++++++++
 .../tests/SimpleMinionClusterIntegrationTest.java  | 49 ++++++++++++++++++---
 5 files changed, 141 insertions(+), 10 deletions(-)

diff --git a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml 
b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml
index 0221fde..40cd394 100644
--- a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml
+++ b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml
@@ -67,6 +67,11 @@ rules:
   labels:
     table: "$1"
     taskType: "$2"
+- pattern: "\"org.apache.pinot.common.metrics\"<type=\"ControllerMetrics\", 
name=\"pinot.controller.taskStatus.(\\w+)\\.(\\w+)\"><>(\\w+)"
+  name: "pinot_controller_taskStatus_$3"
+  labels:
+    taskType: "$1"
+    status: "$2"
 # Pinot Broker
 - pattern: "\"org.apache.pinot.common.metrics\"<type=\"BrokerMetrics\", 
name=\"pinot.broker.(\\w+).authorization\"><>(\\w+)"
   name: "pinot_broker_authorization_$2"
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 3f69f45..c4990a5 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -67,7 +67,10 @@ public enum ControllerGauge implements AbstractMetrics.Gauge 
{
   
TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT("TableStorageEstMissingSegmentPercent",
 false),
 
   // Number of scheduled Cron jobs
-  CRON_SCHEDULER_JOB_SCHEDULED("cronSchedulerJobScheduled", false);
+  CRON_SCHEDULER_JOB_SCHEDULED("cronSchedulerJobScheduled", false),
+
+  // Number of Tasks Status
+  TASK_STATUS("taskStatus", false);
 
   private final String gaugeName;
   private final String unit;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 0a899c5..4ab65e4 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nullable;
+import org.apache.helix.task.TaskState;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
@@ -72,12 +73,15 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
 
   private static final String TABLE_CONFIG_PARENT_PATH = "/CONFIGS/TABLE";
   private static final String TABLE_CONFIG_PATH_PREFIX = "/CONFIGS/TABLE/";
+  private static final String TASK_QUEUE_PATH_PATTERN = 
"/TaskRebalancer/TaskQueue_%s/Context";
 
   private final PinotHelixTaskResourceManager _helixTaskResourceManager;
   private final ClusterInfoAccessor _clusterInfoAccessor;
   private final TaskGeneratorRegistry _taskGeneratorRegistry;
   private final Map<String, Map<String, String>> 
_tableTaskTypeToCronExpressionMap = new ConcurrentHashMap<>();
   private final Map<String, TableTaskSchedulerUpdater> 
_tableTaskSchedulerUpdaterMap = new ConcurrentHashMap<>();
+  private final Map<String, TaskTypeMetricsUpdater> _taskTypeMetricsUpdaterMap 
= new ConcurrentHashMap<>();
+  private final Map<TaskState, Integer> _taskStateToCountMap = new 
ConcurrentHashMap<>();
 
   private Scheduler _scheduledExecutorService = null;
 
@@ -125,6 +129,10 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
     return TABLE_CONFIG_PATH_PREFIX + tableWithType;
   }
 
+  private String getPropertyStorePathForTaskQueue(String taskType) {
+    return String.format(TASK_QUEUE_PATH_PATTERN, taskType);
+  }
+
   public synchronized void cleanUpCronTaskSchedulerForTable(String 
tableWithType) {
     LOGGER.info("Cleaning up task in scheduler for table {}", tableWithType);
     TableTaskSchedulerUpdater tableTaskSchedulerUpdater = 
_tableTaskSchedulerUpdaterMap.get(tableWithType);
@@ -279,8 +287,8 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
       LOGGER.error("Failed to check job existence for job key - table: {}, 
task: {} ", tableWithType, taskType, e);
     }
     if (!exists) {
-      LOGGER
-          .info("Trying to schedule a job with cron expression: {} for table 
{}, task type: {}", cronExprStr, tableWithType, taskType);
+      LOGGER.info("Trying to schedule a job with cron expression: {} for table 
{}, task type: {}", cronExprStr,
+          tableWithType, taskType);
       Trigger trigger = 
TriggerBuilder.newTrigger().withIdentity(TriggerKey.triggerKey(tableWithType, 
taskType))
           .withSchedule(CronScheduleBuilder.cronSchedule(cronExprStr)).build();
       JobDataMap jobDataMap = new JobDataMap();
@@ -371,6 +379,7 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
       PinotTaskGenerator taskGenerator = 
_taskGeneratorRegistry.getTaskGenerator(taskType);
       if (taskGenerator != null) {
         _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+        addTaskTypeMetricsUpdaterIfNeeded(taskType);
         tasksScheduled.put(taskType, scheduleTask(taskGenerator, 
enabledTableConfigs, isLeader));
       } else {
         List<String> enabledTables = new 
ArrayList<>(enabledTableConfigs.size());
@@ -439,6 +448,7 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
     }
 
     _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+    addTaskTypeMetricsUpdaterIfNeeded(taskType);
     return scheduleTask(taskGenerator, enabledTableConfigs, false);
   }
 
@@ -459,6 +469,7 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
             "Table: %s does not have task type: %s enabled", 
tableNameWithType, taskType);
 
     _helixTaskResourceManager.ensureTaskQueueExists(taskType);
+    addTaskTypeMetricsUpdaterIfNeeded(taskType);
     return scheduleTask(taskGenerator, Collections.singletonList(tableConfig), 
false);
   }
 
@@ -478,4 +489,31 @@ public class PinotTaskManager extends 
ControllerPeriodicTask<Void> {
   public Scheduler getScheduler() {
     return _scheduledExecutorService;
   }
+
+  public synchronized void reportMetrics(String taskType) {
+    Map<String, TaskState> taskStates = 
_helixTaskResourceManager.getTaskStates(taskType);
+    Map<TaskState, Integer> taskStateToCountMap = new HashMap<>();
+    for (TaskState taskState : taskStates.values()) {
+      taskStateToCountMap.merge(taskState, 1, Integer::sum);
+    }
+    // Reset all the status to 0
+    for (TaskState taskState : _taskStateToCountMap.keySet()) {
+      _taskStateToCountMap.put(taskState, 0);
+    }
+    _taskStateToCountMap.putAll(taskStateToCountMap);
+    for (Map.Entry<TaskState, Integer> taskStateEntry : 
_taskStateToCountMap.entrySet()) {
+      _controllerMetrics
+          .setValueOfTableGauge(String.format("%s.%s", taskType, 
taskStateEntry.getKey()), ControllerGauge.TASK_STATUS,
+              taskStateEntry.getValue());
+    }
+  }
+
+  private synchronized void addTaskTypeMetricsUpdaterIfNeeded(String taskType) 
{
+    if (!_taskTypeMetricsUpdaterMap.containsKey(taskType)) {
+      TaskTypeMetricsUpdater taskTypeMetricsUpdater = new 
TaskTypeMetricsUpdater(taskType, this);
+      _pinotHelixResourceManager.getPropertyStore()
+          .subscribeDataChanges(getPropertyStorePathForTaskQueue(taskType), 
taskTypeMetricsUpdater);
+      _taskTypeMetricsUpdaterMap.put(taskType, taskTypeMetricsUpdater);
+    }
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskTypeMetricsUpdater.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskTypeMetricsUpdater.java
new file mode 100644
index 0000000..f4ebc8e
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskTypeMetricsUpdater.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion;
+
+import org.I0Itec.zkclient.IZkDataListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class TaskTypeMetricsUpdater implements IZkDataListener {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TaskTypeMetricsUpdater.class);
+  private final String _taskType;
+  private final PinotTaskManager _pinotTaskManager;
+
+  public TaskTypeMetricsUpdater(String taskType, PinotTaskManager 
pinotTaskManager) {
+    _taskType = taskType;
+    _pinotTaskManager = pinotTaskManager;
+  }
+
+  @Override
+  public void handleDataChange(String dataPath, Object data)
+      throws Exception {
+    try {
+      _pinotTaskManager.reportMetrics(_taskType);
+    } catch (Exception e) {
+      LOGGER.error("Failed to update metrics for task type {}", _taskType, e);
+      throw e;
+    }
+  }
+
+  @Override
+  public void handleDataDeleted(String dataPath) {
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index b2c2674..120ed89 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
 import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
 import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
@@ -45,11 +46,16 @@ import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import static org.testng.Assert.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 
 
 /**
@@ -62,6 +68,7 @@ public class SimpleMinionClusterIntegrationTest extends 
ClusterTest {
   private static final String TABLE_NAME_2 = "testTable2";
   private static final String TABLE_NAME_3 = "testTable3";
   private static final long STATE_TRANSITION_TIMEOUT_MS = 60_000L;  // 1 minute
+  private static final int NUM_TASKS = 2;
 
   private static final AtomicBoolean HOLD = new AtomicBoolean();
   private static final AtomicBoolean TASK_START_NOTIFIED = new AtomicBoolean();
@@ -120,7 +127,7 @@ public class SimpleMinionClusterIntegrationTest extends 
ClusterTest {
     // Wait at most 60 seconds for all tasks IN_PROGRESS
     TestUtils.waitForCondition(input -> {
       Collection<TaskState> taskStates = 
_helixTaskResourceManager.getTaskStates(TASK_TYPE).values();
-      assertEquals(taskStates.size(), 2);
+      assertEquals(taskStates.size(), NUM_TASKS);
       for (TaskState taskState : taskStates) {
         if (taskState != TaskState.IN_PROGRESS) {
           return false;
@@ -133,13 +140,20 @@ public class SimpleMinionClusterIntegrationTest extends 
ClusterTest {
       return true;
     }, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks IN_PROGRESS");
 
+    Assert.assertEquals(_controllerStarter.getControllerMetrics()
+        .getValueOfTableGauge(TASK_TYPE + "." + TaskState.IN_PROGRESS, 
ControllerGauge.TASK_STATUS), NUM_TASKS);
+    Assert.assertEquals(_controllerStarter.getControllerMetrics()
+        .getValueOfTableGauge(TASK_TYPE + "." + TaskState.COMPLETED, 
ControllerGauge.TASK_STATUS), 0);
+    Assert.assertEquals(_controllerStarter.getControllerMetrics()
+        .getValueOfTableGauge(TASK_TYPE + "." + TaskState.STOPPED, 
ControllerGauge.TASK_STATUS), 0);
+
     // Stop the task queue
     _helixTaskResourceManager.stopTaskQueue(TASK_TYPE);
 
     // Wait at most 60 seconds for all tasks STOPPED
     TestUtils.waitForCondition(input -> {
       Collection<TaskState> taskStates = 
_helixTaskResourceManager.getTaskStates(TASK_TYPE).values();
-      assertEquals(taskStates.size(), 2);
+      assertEquals(taskStates.size(), NUM_TASKS);
       for (TaskState taskState : taskStates) {
         if (taskState != TaskState.STOPPED) {
           return false;
@@ -152,6 +166,13 @@ public class SimpleMinionClusterIntegrationTest extends 
ClusterTest {
       return true;
     }, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks STOPPED");
 
+    Assert.assertEquals(_controllerStarter.getControllerMetrics()
+        .getValueOfTableGauge(TASK_TYPE + "." + TaskState.IN_PROGRESS, 
ControllerGauge.TASK_STATUS), 0);
+    Assert.assertEquals(_controllerStarter.getControllerMetrics()
+        .getValueOfTableGauge(TASK_TYPE + "." + TaskState.COMPLETED, 
ControllerGauge.TASK_STATUS), 0);
+    Assert.assertEquals(_controllerStarter.getControllerMetrics()
+        .getValueOfTableGauge(TASK_TYPE + "." + TaskState.STOPPED, 
ControllerGauge.TASK_STATUS), NUM_TASKS);
+
     // Resume the task queue, and let the task complete
     _helixTaskResourceManager.resumeTaskQueue(TASK_TYPE);
     HOLD.set(false);
@@ -159,7 +180,7 @@ public class SimpleMinionClusterIntegrationTest extends 
ClusterTest {
     // Wait at most 60 seconds for all tasks COMPLETED
     TestUtils.waitForCondition(input -> {
       Collection<TaskState> taskStates = 
_helixTaskResourceManager.getTaskStates(TASK_TYPE).values();
-      assertEquals(taskStates.size(), 2);
+      assertEquals(taskStates.size(), NUM_TASKS);
       for (TaskState taskState : taskStates) {
         if (taskState != TaskState.COMPLETED) {
           return false;
@@ -172,12 +193,26 @@ public class SimpleMinionClusterIntegrationTest extends 
ClusterTest {
       return true;
     }, STATE_TRANSITION_TIMEOUT_MS, "Failed to get all tasks COMPLETED");
 
+    Assert.assertEquals(_controllerStarter.getControllerMetrics()
+        .getValueOfTableGauge(TASK_TYPE + "." + TaskState.IN_PROGRESS, 
ControllerGauge.TASK_STATUS), 0);
+    Assert.assertEquals(_controllerStarter.getControllerMetrics()
+        .getValueOfTableGauge(TASK_TYPE + "." + TaskState.COMPLETED, 
ControllerGauge.TASK_STATUS), NUM_TASKS);
+    Assert.assertEquals(_controllerStarter.getControllerMetrics()
+        .getValueOfTableGauge(TASK_TYPE + "." + TaskState.STOPPED, 
ControllerGauge.TASK_STATUS), 0);
+
     // Delete the task queue
     _helixTaskResourceManager.deleteTaskQueue(TASK_TYPE, false);
 
     // Wait at most 60 seconds for task queue to be deleted
     TestUtils.waitForCondition(input -> 
!_helixTaskResourceManager.getTaskTypes().contains(TASK_TYPE),
         STATE_TRANSITION_TIMEOUT_MS, "Failed to delete the task queue");
+
+    Assert.assertEquals(_controllerStarter.getControllerMetrics()
+        .getValueOfTableGauge(TASK_TYPE + "." + TaskState.IN_PROGRESS, 
ControllerGauge.TASK_STATUS), 0);
+    Assert.assertEquals(_controllerStarter.getControllerMetrics()
+        .getValueOfTableGauge(TASK_TYPE + "." + TaskState.COMPLETED, 
ControllerGauge.TASK_STATUS), NUM_TASKS);
+    Assert.assertEquals(_controllerStarter.getControllerMetrics()
+        .getValueOfTableGauge(TASK_TYPE + "." + TaskState.STOPPED, 
ControllerGauge.TASK_STATUS), 0);
   }
 
   @AfterClass
@@ -209,10 +244,10 @@ public class SimpleMinionClusterIntegrationTest extends 
ClusterTest {
 
     @Override
     public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) 
{
-      assertEquals(tableConfigs.size(), 2);
+      assertEquals(tableConfigs.size(), NUM_TASKS);
 
       // Generate at most 2 tasks
-      if (_clusterInfoAccessor.getTaskStates(TASK_TYPE).size() >= 2) {
+      if (_clusterInfoAccessor.getTaskStates(TASK_TYPE).size() >= NUM_TASKS) {
         return Collections.emptyList();
       }
 
@@ -249,7 +284,7 @@ public class SimpleMinionClusterIntegrationTest extends 
ClusterTest {
 
           assertEquals(pinotTaskConfig.getTaskType(), TASK_TYPE);
           Map<String, String> configs = pinotTaskConfig.getConfigs();
-          assertEquals(configs.size(), 2);
+          assertEquals(configs.size(), NUM_TASKS);
           String offlineTableName = configs.get("tableName");
           
assertEquals(TableNameBuilder.getTableTypeFromTableName(offlineTableName), 
TableType.OFFLINE);
           String rawTableName = 
TableNameBuilder.extractRawTableName(offlineTableName);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to