This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 91d2a95 Clean up controller-table related metrics in ControllerPeriodicTask (#7557) 91d2a95 is described below commit 91d2a958945f0b5d862821a216b0cba065be8461 Author: Jialiang Li <j...@linkedin.com> AuthorDate: Fri Nov 5 17:56:33 2021 -0700 Clean up controller-table related metrics in ControllerPeriodicTask (#7557) Co-authored-by: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz> --- .../pinot/common/metrics/ValidationMetrics.java | 107 +++++++++++++++++---- .../controller/helix/SegmentStatusChecker.java | 16 +++ .../core/periodictask/ControllerPeriodicTask.java | 14 +++ .../validation/OfflineSegmentIntervalChecker.java | 16 +++ .../RealtimeSegmentValidationManager.java | 10 ++ 5 files changed, 145 insertions(+), 18 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java index efe3c82..b47d068 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java @@ -136,8 +136,16 @@ public class ValidationMetrics { * @param missingSegmentCount The number of missing segments */ public void updateMissingSegmentCountGauge(final String resource, final int missingSegmentCount) { - final String fullGaugeName = makeGaugeName(resource, "missingSegmentCount"); - makeGauge(fullGaugeName, makeMetricName(fullGaugeName), _storedValueGaugeFactory, missingSegmentCount); + makeGauge(resource, ValidationMetricName.MISSING_SEGMENT_COUNT, _storedValueGaugeFactory, missingSegmentCount); + } + + /** + * Cleans up the missing segment count gauge. + * + * @param resource The resource for which the gauge is removed + */ + public void cleanupMissingSegmentCountGauge(final String resource) { + removeGauge(resource, ValidationMetricName.MISSING_SEGMENT_COUNT); } /** @@ -148,12 +156,20 @@ public class ValidationMetrics { * if there is no such time. */ public void updateOfflineSegmentDelayGauge(final String resource, final long lastOfflineSegmentTime) { - final String fullGaugeNameHours = makeGaugeName(resource, "offlineSegmentDelayHours"); - makeGauge(fullGaugeNameHours, makeMetricName(fullGaugeNameHours), _currentTimeMillisDeltaGaugeHoursFactory, + makeGauge(resource, ValidationMetricName.OFFLINE_SEGMENT_DELAY_HOURS, _currentTimeMillisDeltaGaugeHoursFactory, lastOfflineSegmentTime); } /** + * Cleans up offline segment delay gauge. + * + * @param resource The resource for which the gauge is removed + */ + public void cleanupOfflineSegmentDelayGauge(final String resource) { + removeGauge(resource, ValidationMetricName.OFFLINE_SEGMENT_DELAY_HOURS); + } + + /** * Updates the last push time gauge. * * @param resource The resource for which the gauge is updated @@ -161,20 +177,36 @@ public class ValidationMetrics { * such time. */ public void updateLastPushTimeGauge(final String resource, final long lastPushTimeMillis) { - final String fullGaugeNameHours = makeGaugeName(resource, "lastPushTimeDelayHours"); - makeGauge(fullGaugeNameHours, makeMetricName(fullGaugeNameHours), _currentTimeMillisDeltaGaugeHoursFactory, + makeGauge(resource, ValidationMetricName.LAST_PUSH_TIME_DELAY_HOURS, _currentTimeMillisDeltaGaugeHoursFactory, lastPushTimeMillis); } /** + * Cleans up the last push time gauge. + * + * @param resource The resource for which the gauge is removed + */ + public void cleanupLastPushTimeGauge(final String resource) { + removeGauge(resource, ValidationMetricName.LAST_PUSH_TIME_DELAY_HOURS); + } + + /** * Updates the total document count gauge. * * @param resource The resource for which the gauge is updated * @param documentCount Total document count for the given resource name or table name */ public void updateTotalDocumentCountGauge(final String resource, final long documentCount) { - final String fullGaugeName = makeGaugeName(resource, "TotalDocumentCount"); - makeGauge(fullGaugeName, makeMetricName(fullGaugeName), _storedValueGaugeFactory, documentCount); + makeGauge(resource, ValidationMetricName.TOTAL_DOCUMENT_COUNT, _storedValueGaugeFactory, documentCount); + } + + /** + * Cleans up the total document count gauge. + * + * @param resource The resource for which the gauge is removed + */ + public void cleanupTotalDocumentCountGauge(final String resource) { + removeGauge(resource, ValidationMetricName.TOTAL_DOCUMENT_COUNT); } /** @@ -184,8 +216,7 @@ public class ValidationMetrics { * @param partitionCount Number of partitions that do not have any segment in CONSUMING state. */ public void updateNonConsumingPartitionCountMetric(final String resource, final int partitionCount) { - final String fullGaugeName = makeGaugeName(resource, "NonConsumingPartitionCount"); - makeGauge(fullGaugeName, makeMetricName(fullGaugeName), _storedValueGaugeFactory, partitionCount); + makeGauge(resource, ValidationMetricName.NON_CONSUMING_PARTITION_COUNT, _storedValueGaugeFactory, partitionCount); } /** @@ -195,8 +226,16 @@ public class ValidationMetrics { * @param segmentCount Total segment count for the given resource name or table name */ public void updateSegmentCountGauge(final String resource, final long segmentCount) { - final String fullGaugeName = makeGaugeName(resource, "SegmentCount"); - makeGauge(fullGaugeName, makeMetricName(fullGaugeName), _storedValueGaugeFactory, segmentCount); + makeGauge(resource, ValidationMetricName.SEGMENT_COUNT, _storedValueGaugeFactory, segmentCount); + } + + /** + * Cleans up the segment count gauge. + * + * @param resource The resource for which the gauge is removed + */ + public void cleanupSegmentCountGauge(final String resource) { + removeGauge(resource, ValidationMetricName.SEGMENT_COUNT); } @VisibleForTesting @@ -208,17 +247,27 @@ public class ValidationMetrics { return PinotMetricUtils.makePinotMetricName(ValidationMetrics.class, gaugeName); } - private void makeGauge(final String gaugeName, final PinotMetricName metricName, final GaugeFactory<?> gaugeFactory, - final long value) { - if (!_gaugeValues.containsKey(gaugeName)) { - _gaugeValues.put(gaugeName, value); - PinotMetricUtils.makeGauge(_metricsRegistry, metricName, gaugeFactory.buildGauge(gaugeName)); + private void makeGauge(final String resource, final ValidationMetricName validationMetricName, + final GaugeFactory<?> gaugeFactory, final long value) { + final String fullGaugeName = makeGaugeName(resource, validationMetricName.getMetricName()); + PinotMetricName metricName = makeMetricName(fullGaugeName); + if (!_gaugeValues.containsKey(fullGaugeName)) { + _gaugeValues.put(fullGaugeName, value); + PinotMetricUtils.makeGauge(_metricsRegistry, metricName, gaugeFactory.buildGauge(fullGaugeName)); _metricNames.add(metricName); } else { - _gaugeValues.put(gaugeName, value); + _gaugeValues.put(fullGaugeName, value); } } + private void removeGauge(final String resource, final ValidationMetricName validationMetricName) { + final String fullGaugeName = makeGaugeName(resource, validationMetricName.getMetricName()); + PinotMetricName pinotMetricName = makeMetricName(fullGaugeName); + PinotMetricUtils.removeMetric(_metricsRegistry, pinotMetricName); + _metricNames.remove(pinotMetricName); + _gaugeValues.remove(fullGaugeName); + } + /** * Unregisters all validation metrics. */ @@ -239,4 +288,26 @@ public class ValidationMetrics { } return value; } + + /** + * Names of validation metrics. + */ + public enum ValidationMetricName { + MISSING_SEGMENT_COUNT("missingSegmentCount"), + OFFLINE_SEGMENT_DELAY_HOURS("offlineSegmentDelayHours"), + LAST_PUSH_TIME_DELAY_HOURS("lastPushTimeDelayHours"), + TOTAL_DOCUMENT_COUNT("TotalDocumentCount"), + NON_CONSUMING_PARTITION_COUNT("NonConsumingPartitionCount"), + SEGMENT_COUNT("SegmentCount"); + + private final String _metricName; + + ValidationMetricName(String metricName) { + _metricName = metricName; + } + + public String getMetricName() { + return _metricName; + } + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index d7d74f9..93350fe 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -240,6 +240,22 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh } } + @Override + protected void nonLeaderCleanup(List<String> tableNamesWithType) { + for (String tableNameWithType : tableNamesWithType) { + _controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS); + _controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS); + _controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE); + + _controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.IDEALSTATE_ZNODE_SIZE); + _controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.IDEALSTATE_ZNODE_BYTE_SIZE); + _controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.SEGMENT_COUNT); + + _controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE); + _controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE); + } + } + private void setStatusToDefault() { List<String> allTableNames = _pinotHelixResourceManager.getAllTables(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java index 9d7a676..439f8be 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java @@ -65,11 +65,14 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask { // Process the tables that are managed by this controller List<String> tablesToProcess = new ArrayList<>(); + List<String> nonLeaderForTables = new ArrayList<>(); if (propTableNameWithType == null) { // Table name is not available, so task should run on all tables for which this controller is the lead. for (String tableNameWithType : _pinotHelixResourceManager.getAllTables()) { if (_leadControllerManager.isLeaderForTable(tableNameWithType)) { tablesToProcess.add(tableNameWithType); + } else { + nonLeaderForTables.add(tableNameWithType); } } } else { @@ -82,6 +85,9 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask { if (!tablesToProcess.isEmpty()) { processTables(tablesToProcess); } + if (!nonLeaderForTables.isEmpty()) { + nonLeaderCleanup(nonLeaderForTables); + } } catch (Exception e) { LOGGER.error("Caught exception while running task: {}", _taskName, e); _controllerMetrics.addMeteredTableValue(_taskName, ControllerMeter.CONTROLLER_PERIODIC_TASK_ERROR, 1L); @@ -156,4 +162,12 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask { */ protected void postprocess() { } + + /** + * Can be overridden to perform cleanups for tables that the current controller isn't the leader. + * + * @param tableNamesWithType the table names that the current controller isn't the leader for + */ + protected void nonLeaderCleanup(List<String> tableNamesWithType) { + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java index 1e11d1e..1b08c01 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java @@ -136,6 +136,22 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask<Void> _validationMetrics.updateSegmentCountGauge(offlineTableName, numSegments); } + @Override + protected void nonLeaderCleanup(List<String> tableNamesWithType) { + for (String tableNameWithType : tableNamesWithType) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + if (tableType == TableType.OFFLINE) { + // TODO: we can further split the existing ValidationMetricName enum to OFFLINE and REALTIME, + // so that we can simply loop through all the enum values and clean up the metrics. + _validationMetrics.cleanupMissingSegmentCountGauge(tableNameWithType); + _validationMetrics.cleanupOfflineSegmentDelayGauge(tableNameWithType); + _validationMetrics.cleanupLastPushTimeGauge(tableNameWithType); + _validationMetrics.cleanupTotalDocumentCountGauge(tableNameWithType); + _validationMetrics.cleanupSegmentCountGauge(tableNameWithType); + } + } + } + /** * Computes the number of missing segments based on the given existing segment intervals and the expected frequency * of the intervals. diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java index 53291b3..237924a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java @@ -124,6 +124,16 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea } } + @Override + protected void nonLeaderCleanup(List<String> tableNamesWithType) { + for (String tableNameWithType : tableNamesWithType) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + if (tableType == TableType.REALTIME) { + _validationMetrics.cleanupTotalDocumentCountGauge(tableNameWithType); + } + } + } + @VisibleForTesting static long computeRealtimeTotalDocumentInSegments(List<SegmentZKMetadata> segmentsZKMetadata, boolean countHLCSegments) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org