This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch cleanup-realtime-segment-metric in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit d136011b7dec98ae789c68e923d6839934ad2d87 Author: Jack Li(Analytics Engineering) <j...@jlli-mn1.linkedin.biz> AuthorDate: Thu May 6 15:00:09 2021 -0700 Remove realtime metrics if it's destroyed --- .../pinot/common/metrics/AbstractMetrics.java | 59 +++++++++++++++++----- .../realtime/LLRealtimeSegmentDataManager.java | 12 +++++ 2 files changed, 58 insertions(+), 13 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java index 44a53c0..1ed8e85 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java @@ -151,11 +151,12 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e */ private void addValueToTimer(String fullTimerName, final long duration, final TimeUnit timeUnit) { final PinotMetricName metricName = PinotMetricUtils.makePinotMetricName(_clazz, fullTimerName); - PinotTimer timer = PinotMetricUtils.makePinotTimer(_metricsRegistry, metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS); + PinotTimer timer = + PinotMetricUtils.makePinotTimer(_metricsRegistry, metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS); if (timer != null) { timer.update(duration, timeUnit); - } - } + } + } /** * Logs a value to a meter. @@ -462,16 +463,48 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e * @param valueCallback The callback function used to retrieve the value of the gauge */ public void addCallbackGauge(final String metricName, final Callable<Long> valueCallback) { - PinotMetricUtils.makeGauge(_metricsRegistry, PinotMetricUtils.makePinotMetricName(_clazz, _metricPrefix + metricName), - PinotMetricUtils.makePinotGauge(avoid -> { - try { - return valueCallback.call(); - } catch (Exception e) { - LOGGER.error("Caught exception", e); - Utils.rethrowException(e); - throw new AssertionError("Should not reach this"); - } - })); + PinotMetricUtils + .makeGauge(_metricsRegistry, PinotMetricUtils.makePinotMetricName(_clazz, _metricPrefix + metricName), + PinotMetricUtils.makePinotGauge(avoid -> { + try { + return valueCallback.call(); + } catch (Exception e) { + LOGGER.error("Caught exception", e); + Utils.rethrowException(e); + throw new AssertionError("Should not reach this"); + } + })); + } + + /** + * Removes a table gauge given the table name and the gauge. + * @param tableName table name + * @param gauge the gauge to be removed + */ + public void removeTableGauge(final String tableName, final G gauge) { + final String fullGaugeName; + String gaugeName = gauge.getGaugeName(); + fullGaugeName = gaugeName + "." + getTableName(tableName); + removeGauge(fullGaugeName); + } + + /** + * Remove gauge from Pinot metrics. + * @param gaugeName gauge name + */ + private void removeGauge(final String gaugeName) { + if (_gaugeValues.remove(gaugeName) != null) { + removeCallbackGauge(gaugeName); + } + } + + /** + * Remove callback gauge. + * @param metricName metric name + */ + private void removeCallbackGauge(String metricName) { + PinotMetricUtils + .removeMetric(_metricsRegistry, PinotMetricUtils.makePinotMetricName(_clazz, _metricPrefix + metricName)); } protected abstract QP[] getQueryPhases(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index b182f45..81fa9cb 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -923,6 +923,17 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { } } + /** + * Cleans up the metrics that reflects the state of the realtime segment. + * This step is essential as the instance may not be the target location for some of the partitions. + * E.g. if the number of partitions increases, or a host swap is needed, the target location for some partitions may change, + * and the current host remains to run. In this case, the current server would still keep the state of the old partitions, + * which no longer resides in this host any more, thus causes false positive information to the metric system. + */ + private void cleanupMetrics() { + _serverMetrics.removeTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING); + } + protected void hold() { try { Thread.sleep(SegmentCompletionProtocol.MAX_HOLD_TIME_MS); @@ -1083,6 +1094,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { } _realtimeSegment.destroy(); closeStreamConsumers(); + cleanupMetrics(); } protected void start() { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org