This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 91d2a95  Clean up controller-table related metrics in 
ControllerPeriodicTask (#7557)
91d2a95 is described below

commit 91d2a958945f0b5d862821a216b0cba065be8461
Author: Jialiang Li <j...@linkedin.com>
AuthorDate: Fri Nov 5 17:56:33 2021 -0700

    Clean up controller-table related metrics in ControllerPeriodicTask (#7557)
    
    Co-authored-by: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz>
---
 .../pinot/common/metrics/ValidationMetrics.java    | 107 +++++++++++++++++----
 .../controller/helix/SegmentStatusChecker.java     |  16 +++
 .../core/periodictask/ControllerPeriodicTask.java  |  14 +++
 .../validation/OfflineSegmentIntervalChecker.java  |  16 +++
 .../RealtimeSegmentValidationManager.java          |  10 ++
 5 files changed, 145 insertions(+), 18 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java
index efe3c82..b47d068 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java
@@ -136,8 +136,16 @@ public class ValidationMetrics {
    * @param missingSegmentCount The number of missing segments
    */
   public void updateMissingSegmentCountGauge(final String resource, final int 
missingSegmentCount) {
-    final String fullGaugeName = makeGaugeName(resource, 
"missingSegmentCount");
-    makeGauge(fullGaugeName, makeMetricName(fullGaugeName), 
_storedValueGaugeFactory, missingSegmentCount);
+    makeGauge(resource, ValidationMetricName.MISSING_SEGMENT_COUNT, 
_storedValueGaugeFactory, missingSegmentCount);
+  }
+
+  /**
+   * Cleans up the missing segment count gauge.
+   *
+   * @param resource The resource for which the gauge is removed
+   */
+  public void cleanupMissingSegmentCountGauge(final String resource) {
+    removeGauge(resource, ValidationMetricName.MISSING_SEGMENT_COUNT);
   }
 
   /**
@@ -148,12 +156,20 @@ public class ValidationMetrics {
    *                               if there is no such time.
    */
   public void updateOfflineSegmentDelayGauge(final String resource, final long 
lastOfflineSegmentTime) {
-    final String fullGaugeNameHours = makeGaugeName(resource, 
"offlineSegmentDelayHours");
-    makeGauge(fullGaugeNameHours, makeMetricName(fullGaugeNameHours), 
_currentTimeMillisDeltaGaugeHoursFactory,
+    makeGauge(resource, ValidationMetricName.OFFLINE_SEGMENT_DELAY_HOURS, 
_currentTimeMillisDeltaGaugeHoursFactory,
         lastOfflineSegmentTime);
   }
 
   /**
+   * Cleans up offline segment delay gauge.
+   *
+   * @param resource The resource for which the gauge is removed
+   */
+  public void cleanupOfflineSegmentDelayGauge(final String resource) {
+    removeGauge(resource, ValidationMetricName.OFFLINE_SEGMENT_DELAY_HOURS);
+  }
+
+  /**
    * Updates the last push time gauge.
    *
    * @param resource The resource for which the gauge is updated
@@ -161,20 +177,36 @@ public class ValidationMetrics {
    *                           such time.
    */
   public void updateLastPushTimeGauge(final String resource, final long 
lastPushTimeMillis) {
-    final String fullGaugeNameHours = makeGaugeName(resource, 
"lastPushTimeDelayHours");
-    makeGauge(fullGaugeNameHours, makeMetricName(fullGaugeNameHours), 
_currentTimeMillisDeltaGaugeHoursFactory,
+    makeGauge(resource, ValidationMetricName.LAST_PUSH_TIME_DELAY_HOURS, 
_currentTimeMillisDeltaGaugeHoursFactory,
         lastPushTimeMillis);
   }
 
   /**
+   * Cleans up the last push time gauge.
+   *
+   * @param resource The resource for which the gauge is removed
+   */
+  public void cleanupLastPushTimeGauge(final String resource) {
+    removeGauge(resource, ValidationMetricName.LAST_PUSH_TIME_DELAY_HOURS);
+  }
+
+  /**
    * Updates the total document count gauge.
    *
    * @param resource The resource for which the gauge is updated
    * @param documentCount Total document count for the given resource name or 
table name
    */
   public void updateTotalDocumentCountGauge(final String resource, final long 
documentCount) {
-    final String fullGaugeName = makeGaugeName(resource, "TotalDocumentCount");
-    makeGauge(fullGaugeName, makeMetricName(fullGaugeName), 
_storedValueGaugeFactory, documentCount);
+    makeGauge(resource, ValidationMetricName.TOTAL_DOCUMENT_COUNT, 
_storedValueGaugeFactory, documentCount);
+  }
+
+  /**
+   * Cleans up the total document count gauge.
+   *
+   * @param resource The resource for which the gauge is removed
+   */
+  public void cleanupTotalDocumentCountGauge(final String resource) {
+    removeGauge(resource, ValidationMetricName.TOTAL_DOCUMENT_COUNT);
   }
 
   /**
@@ -184,8 +216,7 @@ public class ValidationMetrics {
    * @param partitionCount Number of partitions that do not have any segment 
in CONSUMING state.
    */
   public void updateNonConsumingPartitionCountMetric(final String resource, 
final int partitionCount) {
-    final String fullGaugeName = makeGaugeName(resource, 
"NonConsumingPartitionCount");
-    makeGauge(fullGaugeName, makeMetricName(fullGaugeName), 
_storedValueGaugeFactory, partitionCount);
+    makeGauge(resource, ValidationMetricName.NON_CONSUMING_PARTITION_COUNT, 
_storedValueGaugeFactory, partitionCount);
   }
 
   /**
@@ -195,8 +226,16 @@ public class ValidationMetrics {
    * @param segmentCount Total segment count for the given resource name or 
table name
    */
   public void updateSegmentCountGauge(final String resource, final long 
segmentCount) {
-    final String fullGaugeName = makeGaugeName(resource, "SegmentCount");
-    makeGauge(fullGaugeName, makeMetricName(fullGaugeName), 
_storedValueGaugeFactory, segmentCount);
+    makeGauge(resource, ValidationMetricName.SEGMENT_COUNT, 
_storedValueGaugeFactory, segmentCount);
+  }
+
+  /**
+   * Cleans up the segment count gauge.
+   *
+   * @param resource The resource for which the gauge is removed
+   */
+  public void cleanupSegmentCountGauge(final String resource) {
+    removeGauge(resource, ValidationMetricName.SEGMENT_COUNT);
   }
 
   @VisibleForTesting
@@ -208,17 +247,27 @@ public class ValidationMetrics {
     return PinotMetricUtils.makePinotMetricName(ValidationMetrics.class, 
gaugeName);
   }
 
-  private void makeGauge(final String gaugeName, final PinotMetricName 
metricName, final GaugeFactory<?> gaugeFactory,
-      final long value) {
-    if (!_gaugeValues.containsKey(gaugeName)) {
-      _gaugeValues.put(gaugeName, value);
-      PinotMetricUtils.makeGauge(_metricsRegistry, metricName, 
gaugeFactory.buildGauge(gaugeName));
+  private void makeGauge(final String resource, final ValidationMetricName 
validationMetricName,
+      final GaugeFactory<?> gaugeFactory, final long value) {
+    final String fullGaugeName = makeGaugeName(resource, 
validationMetricName.getMetricName());
+    PinotMetricName metricName = makeMetricName(fullGaugeName);
+    if (!_gaugeValues.containsKey(fullGaugeName)) {
+      _gaugeValues.put(fullGaugeName, value);
+      PinotMetricUtils.makeGauge(_metricsRegistry, metricName, 
gaugeFactory.buildGauge(fullGaugeName));
       _metricNames.add(metricName);
     } else {
-      _gaugeValues.put(gaugeName, value);
+      _gaugeValues.put(fullGaugeName, value);
     }
   }
 
+  private void removeGauge(final String resource, final ValidationMetricName 
validationMetricName) {
+    final String fullGaugeName = makeGaugeName(resource, 
validationMetricName.getMetricName());
+    PinotMetricName pinotMetricName = makeMetricName(fullGaugeName);
+    PinotMetricUtils.removeMetric(_metricsRegistry, pinotMetricName);
+    _metricNames.remove(pinotMetricName);
+    _gaugeValues.remove(fullGaugeName);
+  }
+
   /**
    * Unregisters all validation metrics.
    */
@@ -239,4 +288,26 @@ public class ValidationMetrics {
     }
     return value;
   }
+
+  /**
+   * Names of validation metrics.
+   */
+  public enum ValidationMetricName {
+    MISSING_SEGMENT_COUNT("missingSegmentCount"),
+    OFFLINE_SEGMENT_DELAY_HOURS("offlineSegmentDelayHours"),
+    LAST_PUSH_TIME_DELAY_HOURS("lastPushTimeDelayHours"),
+    TOTAL_DOCUMENT_COUNT("TotalDocumentCount"),
+    NON_CONSUMING_PARTITION_COUNT("NonConsumingPartitionCount"),
+    SEGMENT_COUNT("SegmentCount");
+
+    private final String _metricName;
+
+    ValidationMetricName(String metricName) {
+      _metricName = metricName;
+    }
+
+    public String getMetricName() {
+      return _metricName;
+    }
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index d7d74f9..93350fe 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -240,6 +240,22 @@ public class SegmentStatusChecker extends 
ControllerPeriodicTask<SegmentStatusCh
     }
   }
 
+  @Override
+  protected void nonLeaderCleanup(List<String> tableNamesWithType) {
+    for (String tableNameWithType : tableNamesWithType) {
+      _controllerMetrics.removeTableGauge(tableNameWithType, 
ControllerGauge.NUMBER_OF_REPLICAS);
+      _controllerMetrics.removeTableGauge(tableNameWithType, 
ControllerGauge.PERCENT_OF_REPLICAS);
+      _controllerMetrics.removeTableGauge(tableNameWithType, 
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE);
+
+      _controllerMetrics.removeTableGauge(tableNameWithType, 
ControllerGauge.IDEALSTATE_ZNODE_SIZE);
+      _controllerMetrics.removeTableGauge(tableNameWithType, 
ControllerGauge.IDEALSTATE_ZNODE_BYTE_SIZE);
+      _controllerMetrics.removeTableGauge(tableNameWithType, 
ControllerGauge.SEGMENT_COUNT);
+
+      _controllerMetrics.removeTableGauge(tableNameWithType, 
ControllerGauge.SEGMENTS_IN_ERROR_STATE);
+      _controllerMetrics.removeTableGauge(tableNameWithType, 
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE);
+    }
+  }
+
   private void setStatusToDefault() {
     List<String> allTableNames = _pinotHelixResourceManager.getAllTables();
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 9d7a676..439f8be 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -65,11 +65,14 @@ public abstract class ControllerPeriodicTask<C> extends 
BasePeriodicTask {
 
       // Process the tables that are managed by this controller
       List<String> tablesToProcess = new ArrayList<>();
+      List<String> nonLeaderForTables = new ArrayList<>();
       if (propTableNameWithType == null) {
         // Table name is not available, so task should run on all tables for 
which this controller is the lead.
         for (String tableNameWithType : 
_pinotHelixResourceManager.getAllTables()) {
           if (_leadControllerManager.isLeaderForTable(tableNameWithType)) {
             tablesToProcess.add(tableNameWithType);
+          } else {
+            nonLeaderForTables.add(tableNameWithType);
           }
         }
       } else {
@@ -82,6 +85,9 @@ public abstract class ControllerPeriodicTask<C> extends 
BasePeriodicTask {
       if (!tablesToProcess.isEmpty()) {
         processTables(tablesToProcess);
       }
+      if (!nonLeaderForTables.isEmpty()) {
+        nonLeaderCleanup(nonLeaderForTables);
+      }
     } catch (Exception e) {
       LOGGER.error("Caught exception while running task: {}", _taskName, e);
       _controllerMetrics.addMeteredTableValue(_taskName, 
ControllerMeter.CONTROLLER_PERIODIC_TASK_ERROR, 1L);
@@ -156,4 +162,12 @@ public abstract class ControllerPeriodicTask<C> extends 
BasePeriodicTask {
    */
   protected void postprocess() {
   }
+
+  /**
+   * Can be overridden to perform cleanups for tables that the current 
controller isn't the leader.
+   *
+   * @param tableNamesWithType the table names that the current controller 
isn't the leader for
+   */
+  protected void nonLeaderCleanup(List<String> tableNamesWithType) {
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index 1e11d1e..1b08c01 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -136,6 +136,22 @@ public class OfflineSegmentIntervalChecker extends 
ControllerPeriodicTask<Void>
     _validationMetrics.updateSegmentCountGauge(offlineTableName, numSegments);
   }
 
+  @Override
+  protected void nonLeaderCleanup(List<String> tableNamesWithType) {
+    for (String tableNameWithType : tableNamesWithType) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == TableType.OFFLINE) {
+        // TODO: we can further split the existing ValidationMetricName enum 
to OFFLINE and REALTIME,
+        //  so that we can simply loop through all the enum values and clean 
up the metrics.
+        _validationMetrics.cleanupMissingSegmentCountGauge(tableNameWithType);
+        _validationMetrics.cleanupOfflineSegmentDelayGauge(tableNameWithType);
+        _validationMetrics.cleanupLastPushTimeGauge(tableNameWithType);
+        _validationMetrics.cleanupTotalDocumentCountGauge(tableNameWithType);
+        _validationMetrics.cleanupSegmentCountGauge(tableNameWithType);
+      }
+    }
+  }
+
   /**
    * Computes the number of missing segments based on the given existing 
segment intervals and the expected frequency
    * of the intervals.
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 53291b3..237924a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -124,6 +124,16 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
     }
   }
 
+  @Override
+  protected void nonLeaderCleanup(List<String> tableNamesWithType) {
+    for (String tableNameWithType : tableNamesWithType) {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == TableType.REALTIME) {
+        _validationMetrics.cleanupTotalDocumentCountGauge(tableNameWithType);
+      }
+    }
+  }
+
   @VisibleForTesting
   static long computeRealtimeTotalDocumentInSegments(List<SegmentZKMetadata> 
segmentsZKMetadata,
       boolean countHLCSegments) {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to