This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new d4356b47ed clean up the logic of reading guage metrics (#10106) d4356b47ed is described below commit d4356b47ede99351625426a08117e18c00facadb Author: Haitao Zhang <hai...@startree.ai> AuthorDate: Thu Jan 19 10:00:06 2023 -0800 clean up the logic of reading guage metrics (#10106) * clean up the logic of reading guage metrics * fix format * clean up metric registry in some unit tests * minor fix * address comments * add a missing test case * fix test cases * address comment --- .../ConnectionFailureDetectorTest.java | 8 +- .../pinot/common/metrics/AbstractMetrics.java | 38 +---- .../pinot/common/metrics/AbstractMetricsTest.java | 18 +-- .../pinot/common/metrics/MetricValueUtils.java | 123 ++++++++++++++ .../pinot/common/metrics/PinotMetricUtilsTest.java | 13 ++ .../pinot/controller/api/TableSizeReaderTest.java | 70 ++++---- .../helix/RealtimeConsumerMonitorTest.java | 11 +- .../controller/helix/SegmentStatusCheckerTest.java | 179 +++++++++++---------- .../StaleInstancesCleanupTaskStatelessTest.java | 65 ++++---- .../periodictask/ControllerPeriodicTaskTest.java | 26 +-- .../validation/StorageQuotaCheckerTest.java | 25 +-- .../ControllerPeriodicTasksIntegrationTest.java | 36 ++--- .../tests/SimpleMinionClusterIntegrationTest.java | 34 ++-- .../apache/pinot/spi/metrics/PinotMetricUtils.java | 11 ++ 14 files changed, 398 insertions(+), 259 deletions(-) diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/failuredetector/ConnectionFailureDetectorTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/failuredetector/ConnectionFailureDetectorTest.java index 52c00884d5..859b834f0c 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/failuredetector/ConnectionFailureDetectorTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/failuredetector/ConnectionFailureDetectorTest.java @@ -23,6 +23,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import org.apache.pinot.common.metrics.BrokerGauge; import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.metrics.MetricValueUtils; import org.apache.pinot.core.transport.QueryResponse; import org.apache.pinot.core.transport.ServerRoutingInstance; import org.apache.pinot.spi.config.table.TableType; @@ -96,14 +97,14 @@ public class ConnectionFailureDetectorTest { int numRetries = _listener._retryUnhealthyServerCalled.get(); if (numRetries < Broker.FailureDetector.DEFAULT_MAX_RETIRES) { assertEquals(_failureDetector.getUnhealthyServers(), Collections.singleton(INSTANCE_ID)); - assertEquals(_brokerMetrics.getValueOfGlobalGauge(BrokerGauge.UNHEALTHY_SERVERS), 1); + assertEquals(MetricValueUtils.getGlobalGaugeValue(_brokerMetrics, BrokerGauge.UNHEALTHY_SERVERS), 1); return false; } assertEquals(numRetries, Broker.FailureDetector.DEFAULT_MAX_RETIRES); // There might be a small delay between the last retry and removing failed server from the unhealthy servers. // Perform a check instead of an assertion. return _failureDetector.getUnhealthyServers().isEmpty() - && _brokerMetrics.getValueOfGlobalGauge(BrokerGauge.UNHEALTHY_SERVERS) == 0 + && MetricValueUtils.getGaugeValue(_brokerMetrics, BrokerGauge.UNHEALTHY_SERVERS.getGaugeName()) == 0 && _listener._notifyUnhealthyServerCalled.get() == 1 && _listener._notifyHealthyServerCalled.get() == 1; }, 5_000L, "Failed to get 10 retires"); @@ -113,7 +114,8 @@ public class ConnectionFailureDetectorTest { private void verify(Set<String> expectedUnhealthyServers, int expectedNotifyUnhealthyServerCalled, int expectedNotifyHealthyServerCalled) { assertEquals(_failureDetector.getUnhealthyServers(), expectedUnhealthyServers); - assertEquals(_brokerMetrics.getValueOfGlobalGauge(BrokerGauge.UNHEALTHY_SERVERS), expectedUnhealthyServers.size()); + assertEquals(MetricValueUtils.getGlobalGaugeValue(_brokerMetrics, BrokerGauge.UNHEALTHY_SERVERS), + expectedUnhealthyServers.size()); assertEquals(_listener._notifyUnhealthyServerCalled.get(), expectedNotifyUnhealthyServerCalled); assertEquals(_listener._notifyHealthyServerCalled.get(), expectedNotifyHealthyServerCalled); } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java index 0ea497d28c..f1044abd77 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java @@ -414,7 +414,7 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e synchronized (_gaugeValues) { if (!_gaugeValues.containsKey(gaugeName)) { _gaugeValues.put(gaugeName, new AtomicLong(value)); - addCallbackGauge(gaugeName, () -> _gaugeValues.get(gaugeName).get()); + setOrUpdateGauge(gaugeName, () -> _gaugeValues.get(gaugeName).get()); } else { _gaugeValues.get(gaugeName).set(value); } @@ -438,13 +438,7 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e synchronized (_gaugeValues) { if (!_gaugeValues.containsKey(gaugeName)) { _gaugeValues.put(gaugeName, new AtomicLong(unitCount)); - addCallbackGauge(gaugeName, new Callable<Long>() { - @Override - public Long call() - throws Exception { - return _gaugeValues.get(gaugeName).get(); - } - }); + setOrUpdateGauge(gaugeName, () -> _gaugeValues.get(gaugeName).get()); } else { _gaugeValues.get(gaugeName).addAndGet(unitCount); } @@ -454,20 +448,6 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e } } - @VisibleForTesting - public long getValueOfGlobalGauge(final G gauge) { - String gaugeName = gauge.getGaugeName(); - AtomicLong gaugeValue = _gaugeValues.get(gaugeName); - return gaugeValue == null ? 0 : gaugeValue.get(); - } - - @VisibleForTesting - public long getValueOfGlobalGauge(final G gauge, String suffix) { - String fullGaugeName = composeGlobalGaugeName(suffix, gauge); - AtomicLong gaugeValue = _gaugeValues.get(fullGaugeName); - return gaugeValue == null ? 0 : gaugeValue.get(); - } - /** * Gets the value of a table gauge. * @@ -481,20 +461,6 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e return gaugeValue == null ? 0 : gaugeValue.get(); } - /** - * Gets the value of a table partition gauge. - * - * @param tableName The table name - * @param partitionId The partition name - * @param gauge The gauge to use - */ - public long getValueOfPartitionGauge(final String tableName, final int partitionId, final G gauge) { - final String fullGaugeName = composeTableGaugeName(tableName, String.valueOf(partitionId), gauge); - - AtomicLong gaugeValue = _gaugeValues.get(fullGaugeName); - return gaugeValue == null ? -1 : gaugeValue.get(); - } - /** * Initializes all global meters (such as exceptions count) to zero. */ diff --git a/pinot-common/src/test/java/org/apache/pinot/common/metrics/AbstractMetricsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/metrics/AbstractMetricsTest.java index 7a014448e1..1ff1d1a58f 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/metrics/AbstractMetricsTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/metrics/AbstractMetricsTest.java @@ -18,13 +18,8 @@ */ package org.apache.pinot.common.metrics; -import com.yammer.metrics.core.MetricName; -import org.apache.pinot.plugin.metrics.yammer.YammerMetric; -import org.apache.pinot.plugin.metrics.yammer.YammerMetricName; import org.apache.pinot.plugin.metrics.yammer.YammerMetricsRegistry; -import org.apache.pinot.plugin.metrics.yammer.YammerSettableGauge; import org.apache.pinot.spi.env.PinotConfiguration; -import org.apache.pinot.spi.metrics.PinotMetric; import org.apache.pinot.spi.metrics.PinotMetricUtils; import org.testng.Assert; import org.testng.annotations.Test; @@ -43,23 +38,14 @@ public class AbstractMetricsTest { String metricName = "test"; // add gauge controllerMetrics.setOrUpdateGauge(metricName, () -> 1L); - checkGauge(controllerMetrics, metricName, 1); + Assert.assertEquals(MetricValueUtils.getGaugeValue(controllerMetrics, metricName), 1); // update gauge controllerMetrics.setOrUpdateGauge(metricName, () -> 2L); - checkGauge(controllerMetrics, metricName, 2); + Assert.assertEquals(MetricValueUtils.getGaugeValue(controllerMetrics, metricName), 2); // remove gauge controllerMetrics.removeGauge(metricName); Assert.assertTrue(controllerMetrics.getMetricsRegistry().allMetrics().isEmpty()); } - - private void checkGauge(ControllerMetrics controllerMetrics, String metricName, long value) { - Assert.assertEquals(controllerMetrics.getMetricsRegistry().allMetrics().size(), 1); - PinotMetric pinotMetric = controllerMetrics.getMetricsRegistry().allMetrics() - .get(new YammerMetricName(new MetricName(ControllerMetrics.class, "pinot.controller." + metricName))); - Assert.assertTrue(pinotMetric instanceof YammerMetric); - Assert.assertTrue(pinotMetric.getMetric() instanceof YammerSettableGauge); - Assert.assertEquals(((YammerSettableGauge<Long>) pinotMetric.getMetric()).value(), Long.valueOf(value)); - } } diff --git a/pinot-common/src/test/java/org/apache/pinot/common/metrics/MetricValueUtils.java b/pinot-common/src/test/java/org/apache/pinot/common/metrics/MetricValueUtils.java new file mode 100644 index 0000000000..9008f8971c --- /dev/null +++ b/pinot-common/src/test/java/org/apache/pinot/common/metrics/MetricValueUtils.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.metrics; + +import com.yammer.metrics.core.MetricName; +import org.apache.pinot.plugin.metrics.yammer.YammerMetricName; +import org.apache.pinot.plugin.metrics.yammer.YammerSettableGauge; +import org.apache.pinot.spi.metrics.PinotMetric; + + +public class MetricValueUtils { + private MetricValueUtils() { + } + + public static boolean gaugeExists(AbstractMetrics metrics, String metricName) { + return extractMetric(metrics, metricName) != null; + } + + public static long getGaugeValue(AbstractMetrics metrics, String metricName) { + PinotMetric pinotMetric = extractMetric(metrics, metricName); + if (pinotMetric == null) { + return 0; + } + return ((YammerSettableGauge<Long>) pinotMetric.getMetric()).value(); + } + + public static boolean globalGaugeExists(AbstractMetrics metrics, AbstractMetrics.Gauge gauge) { + return extractMetric(metrics, gauge.getGaugeName()) != null; + } + + public static long getGlobalGaugeValue(AbstractMetrics metrics, AbstractMetrics.Gauge gauge) { + PinotMetric pinotMetric = extractMetric(metrics, gauge.getGaugeName()); + if (pinotMetric == null) { + return 0; + } + return ((YammerSettableGauge<Long>) pinotMetric.getMetric()).value(); + } + + public static boolean globalGaugeExists(AbstractMetrics metrics, String key, AbstractMetrics.Gauge gauge) { + return extractMetric(metrics, gauge.getGaugeName() + "." + key) != null; + } + + public static long getGlobalGaugeValue(AbstractMetrics metrics, String key, AbstractMetrics.Gauge gauge) { + PinotMetric pinotMetric = extractMetric(metrics, gauge.getGaugeName() + "." + key); + if (pinotMetric == null) { + return 0; + } + return ((YammerSettableGauge<Long>) pinotMetric.getMetric()).value(); + } + + public static boolean tableGaugeExists(AbstractMetrics metrics, String tableName, AbstractMetrics.Gauge gauge) { + return extractMetric(metrics, gauge.getGaugeName() + "." + tableName) != null; + } + + public static long getTableGaugeValue(AbstractMetrics metrics, String tableName, AbstractMetrics.Gauge gauge) { + PinotMetric pinotMetric = extractMetric(metrics, gauge.getGaugeName() + "." + tableName); + if (pinotMetric == null) { + return 0; + } + return ((YammerSettableGauge<Long>) pinotMetric.getMetric()).value(); + } + + public static boolean tableGaugeExists(AbstractMetrics metrics, String tableName, String key, + AbstractMetrics.Gauge gauge) { + return extractMetric(metrics, gauge.getGaugeName() + "." + tableName + "." + key) != null; + } + + public static long getTableGaugeValue(AbstractMetrics metrics, String tableName, String key, + AbstractMetrics.Gauge gauge) { + PinotMetric pinotMetric = extractMetric(metrics, gauge.getGaugeName() + "." + tableName + "." + key); + if (pinotMetric == null) { + return 0; + } + return ((YammerSettableGauge<Long>) pinotMetric.getMetric()).value(); + } + + public static boolean partitionGaugeExists(AbstractMetrics metrics, String tableName, int partitionId, + AbstractMetrics.Gauge gauge) { + return extractMetric(metrics, gauge.getGaugeName() + "." + tableName + "." + partitionId) != null; + } + + public static long getPartitionGaugeValue(AbstractMetrics metrics, String tableName, int partitionId, + AbstractMetrics.Gauge gauge) { + PinotMetric pinotMetric = extractMetric(metrics, gauge.getGaugeName() + "." + tableName + "." + partitionId); + if (pinotMetric == null) { + return 0; + } + return ((YammerSettableGauge<Long>) pinotMetric.getMetric()).value(); + } + + private static PinotMetric extractMetric(AbstractMetrics metrics, String metricName) { + String metricPrefix; + if (metrics instanceof ControllerMetrics) { + metricPrefix = "pinot.controller."; + } else if (metrics instanceof BrokerMetrics) { + metricPrefix = "pinot.broker."; + } else if (metrics instanceof ServerMetrics) { + metricPrefix = "pinot.server."; + } else if (metrics instanceof MinionMetrics) { + metricPrefix = "pinot.minion."; + } else { + throw new RuntimeException(String.format("unsupported AbstractMetrics type: %s", metrics.getClass().toString())); + } + return metrics.getMetricsRegistry().allMetrics() + .get(new YammerMetricName(new MetricName(metrics.getClass(), metricPrefix + metricName))); + } +} diff --git a/pinot-common/src/test/java/org/apache/pinot/common/metrics/PinotMetricUtilsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/metrics/PinotMetricUtilsTest.java index fc65509f36..b8429e88ac 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/metrics/PinotMetricUtilsTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/metrics/PinotMetricUtilsTest.java @@ -111,6 +111,7 @@ public class PinotMetricUtilsTest { @Test public void testMetricRegistryFailure() { + PinotMetricUtils.cleanUp(); try { Map<String, Object> properties = new HashMap<>(); properties.put("factory.className", "NonExistentClass"); @@ -121,4 +122,16 @@ public class PinotMetricUtilsTest { // Expected } } + + @Test + public void testCleanUp() { + PinotMetricUtils.cleanUp(); + PinotMetricsRegistry registry = PinotMetricUtils.getPinotMetricsRegistry(); + PinotMetricsRegistry registry1 = PinotMetricUtils.getPinotMetricsRegistry(); + Assert.assertEquals(registry, registry1); + PinotMetricUtils.cleanUp(); + // after cleaning up, a new one will be created + PinotMetricsRegistry registry2 = PinotMetricUtils.getPinotMetricsRegistry(); + Assert.assertNotEquals(registry, registry2); + } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderTest.java index a46fd912c5..7b460161db 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderTest.java @@ -38,6 +38,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.pinot.common.exception.InvalidConfigException; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.common.metrics.MetricValueUtils; import org.apache.pinot.common.restlet.resources.SegmentSizeInfo; import org.apache.pinot.common.restlet.resources.TableSizeInfo; import org.apache.pinot.common.utils.config.TableConfigUtils; @@ -62,10 +63,7 @@ import org.testng.annotations.Test; import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertNull; -import static org.testng.Assert.assertTrue; +import static org.testng.Assert.*; public class TableSizeReaderTest { @@ -323,16 +321,17 @@ public class TableSizeReaderTest { assertEquals(tableSizeDetails._reportedSizeInBytes, offlineSizes._reportedSizeInBytes); assertEquals(tableSizeDetails._estimatedSizeInBytes, offlineSizes._estimatedSizeInBytes); String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(table); - assertEquals(_controllerMetrics.getValueOfTableGauge(tableNameWithType, + assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableNameWithType, ControllerGauge.TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT), 0); assertEquals( - _controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_SIZE_PER_REPLICA_ON_SERVER), - offlineSizes._estimatedSizeInBytes / NUM_REPLICAS); - assertEquals(_controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_TOTAL_SIZE_ON_SERVER), - offlineSizes._estimatedSizeInBytes); + MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableNameWithType, + ControllerGauge.TABLE_SIZE_PER_REPLICA_ON_SERVER), offlineSizes._estimatedSizeInBytes / NUM_REPLICAS); assertEquals( - _controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.LARGEST_SEGMENT_SIZE_ON_SERVER), - 160); + MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableNameWithType, + ControllerGauge.TABLE_TOTAL_SIZE_ON_SERVER), offlineSizes._estimatedSizeInBytes); + assertEquals( + MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableNameWithType, + ControllerGauge.LARGEST_SEGMENT_SIZE_ON_SERVER), 160); } @Test @@ -348,16 +347,18 @@ public class TableSizeReaderTest { assertEquals(offlineSizes._reportedSizeInBytes, TableSizeReader.DEFAULT_SIZE_WHEN_MISSING_OR_ERROR); assertEquals(tableSizeDetails._estimatedSizeInBytes, TableSizeReader.DEFAULT_SIZE_WHEN_MISSING_OR_ERROR); String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(table); - assertEquals(_controllerMetrics.getValueOfTableGauge(tableNameWithType, - ControllerGauge.TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT), 100); assertEquals( - _controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_SIZE_PER_REPLICA_ON_SERVER), - offlineSizes._estimatedSizeInBytes / NUM_REPLICAS); - assertEquals(_controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_TOTAL_SIZE_ON_SERVER), - offlineSizes._estimatedSizeInBytes); - // 0 means not found for the gauge + MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableNameWithType, + ControllerGauge.TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT), 100); + assertEquals( + MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableNameWithType, + ControllerGauge.TABLE_SIZE_PER_REPLICA_ON_SERVER), offlineSizes._estimatedSizeInBytes / NUM_REPLICAS); assertEquals( - _controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.LARGEST_SEGMENT_SIZE_ON_SERVER), 0); + MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableNameWithType, + ControllerGauge.TABLE_TOTAL_SIZE_ON_SERVER), offlineSizes._estimatedSizeInBytes); + assertFalse( + MetricValueUtils.tableGaugeExists(_controllerMetrics, tableNameWithType, + ControllerGauge.LARGEST_SEGMENT_SIZE_ON_SERVER)); } @Test @@ -373,16 +374,18 @@ public class TableSizeReaderTest { validateTableSubTypeSize(servers, offlineSizes); assertNull(tableSizeDetails._realtimeSegments); String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(table); - assertEquals(_controllerMetrics.getValueOfTableGauge(tableNameWithType, - ControllerGauge.TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT), 20); assertEquals( - _controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_SIZE_PER_REPLICA_ON_SERVER), - offlineSizes._estimatedSizeInBytes / NUM_REPLICAS); - assertEquals(_controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_TOTAL_SIZE_ON_SERVER), - offlineSizes._estimatedSizeInBytes); + MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableNameWithType, + ControllerGauge.TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT), 20); + assertEquals( + MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableNameWithType, + ControllerGauge.TABLE_SIZE_PER_REPLICA_ON_SERVER), offlineSizes._estimatedSizeInBytes / NUM_REPLICAS); assertEquals( - _controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.LARGEST_SEGMENT_SIZE_ON_SERVER), - 160); + MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableNameWithType, + ControllerGauge.TABLE_TOTAL_SIZE_ON_SERVER), offlineSizes._estimatedSizeInBytes); + assertEquals( + MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableNameWithType, + ControllerGauge.LARGEST_SEGMENT_SIZE_ON_SERVER), 160); } @Test @@ -398,12 +401,13 @@ public class TableSizeReaderTest { validateTableSubTypeSize(servers, realtimeSegments); String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(table); assertEquals( - _controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_SIZE_PER_REPLICA_ON_SERVER), - realtimeSegments._estimatedSizeInBytes / NUM_REPLICAS); - assertEquals(_controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_TOTAL_SIZE_ON_SERVER), - realtimeSegments._estimatedSizeInBytes); + MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableNameWithType, + ControllerGauge.TABLE_SIZE_PER_REPLICA_ON_SERVER), realtimeSegments._estimatedSizeInBytes / NUM_REPLICAS); + assertEquals( + MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableNameWithType, + ControllerGauge.TABLE_TOTAL_SIZE_ON_SERVER), realtimeSegments._estimatedSizeInBytes); assertEquals( - _controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.LARGEST_SEGMENT_SIZE_ON_SERVER), - 120); + MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableNameWithType, + ControllerGauge.LARGEST_SEGMENT_SIZE_ON_SERVER), 120); } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java index 1450c02a35..31682d911e 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java @@ -30,6 +30,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.common.metrics.MetricValueUtils; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.LeadControllerManager; @@ -142,13 +143,13 @@ public class RealtimeConsumerMonitorTest { controllerMetrics, consumingSegmentReader); realtimeConsumerMonitor.start(); realtimeConsumerMonitor.run(); - Assert.assertEquals(controllerMetrics.getValueOfPartitionGauge(tableName, 1, + Assert.assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, tableName, 1, ControllerGauge.MAX_RECORDS_LAG), 0); - Assert.assertEquals(controllerMetrics.getValueOfPartitionGauge(tableName, 2, + Assert.assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, tableName, 2, ControllerGauge.MAX_RECORDS_LAG), 40); - Assert.assertEquals(controllerMetrics.getValueOfPartitionGauge(tableName, 1, - ControllerGauge.MAX_RECORD_AVAILABILITY_LAG_MS), 0); - Assert.assertEquals(controllerMetrics.getValueOfPartitionGauge(tableName, 2, + Assert.assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, tableName, 1, + ControllerGauge.MAX_RECORD_AVAILABILITY_LAG_MS), 0); + Assert.assertEquals(MetricValueUtils.getPartitionGaugeValue(controllerMetrics, tableName, 2, ControllerGauge.MAX_RECORD_AVAILABILITY_LAG_MS), 60000); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java index 83147e677c..a2ae5aaa96 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java @@ -39,6 +39,7 @@ import org.apache.pinot.common.lineage.SegmentLineageUtils; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.common.metrics.MetricValueUtils; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.LeadControllerManager; @@ -142,6 +143,7 @@ public class SegmentStatusCheckerTest { _tableSizeReader = mock(TableSizeReader.class); when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null); } + PinotMetricUtils.cleanUp(); _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); _controllerMetrics = new ControllerMetrics(_metricsRegistry); _segmentStatusChecker = @@ -150,25 +152,22 @@ public class SegmentStatusCheckerTest { _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.REPLICATION_FROM_CONFIG), 2); - Assert - .assertEquals(_controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.SEGMENT_COUNT), 3); - Assert.assertEquals( - _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.SEGMENT_COUNT_INCLUDING_REPLACED), - 5); - Assert.assertEquals( - _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.SEGMENTS_IN_ERROR_STATE), 1); - Assert - .assertEquals(_controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.NUMBER_OF_REPLICAS), - 2); - Assert - .assertEquals( - _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.PERCENT_OF_REPLICAS), - 66); - Assert.assertEquals( - _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); - Assert.assertEquals( - _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.TABLE_COMPRESSED_SIZE), 0); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, + ControllerGauge.REPLICATION_FROM_CONFIG), 2); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.SEGMENT_COUNT), 3); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.SEGMENT_COUNT_INCLUDING_REPLACED), 5); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.SEGMENTS_IN_ERROR_STATE), 1); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.NUMBER_OF_REPLICAS), 2); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.PERCENT_OF_REPLICAS), 66); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.TABLE_COMPRESSED_SIZE), 0); } @Test @@ -233,6 +232,7 @@ public class SegmentStatusCheckerTest { _tableSizeReader = mock(TableSizeReader.class); when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null); } + PinotMetricUtils.cleanUp(); _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); _controllerMetrics = new ControllerMetrics(_metricsRegistry); _segmentStatusChecker = @@ -241,20 +241,18 @@ public class SegmentStatusCheckerTest { _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.REPLICATION_FROM_CONFIG), 3); - Assert.assertEquals( - _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0); - Assert - .assertEquals(_controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.NUMBER_OF_REPLICAS), - 3); - Assert - .assertEquals( - _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.PERCENT_OF_REPLICAS), - 100); - Assert.assertEquals( - _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); - Assert.assertEquals(_controllerMetrics - .getValueOfTableGauge(externalView.getId(), ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT), 2); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, + ControllerGauge.REPLICATION_FROM_CONFIG), 3); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.NUMBER_OF_REPLICAS), 3); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.PERCENT_OF_REPLICAS), 100); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT), 2); } Map<String, String> getStreamConfigMap() { @@ -334,6 +332,7 @@ public class SegmentStatusCheckerTest { _tableSizeReader = mock(TableSizeReader.class); when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null); } + PinotMetricUtils.cleanUp(); _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); _controllerMetrics = new ControllerMetrics(_metricsRegistry); _segmentStatusChecker = @@ -342,15 +341,14 @@ public class SegmentStatusCheckerTest { _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertEquals( - _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.SEGMENTS_IN_ERROR_STATE), 1); - Assert - .assertEquals(_controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.NUMBER_OF_REPLICAS), - 0); - Assert.assertEquals( - _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 75); - Assert.assertEquals( - _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.TABLE_COMPRESSED_SIZE), 1111); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.SEGMENTS_IN_ERROR_STATE), 1); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.NUMBER_OF_REPLICAS), 0); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 75); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.TABLE_COMPRESSED_SIZE), 1111); } @Test @@ -391,6 +389,7 @@ public class SegmentStatusCheckerTest { _tableSizeReader = mock(TableSizeReader.class); when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null); } + PinotMetricUtils.cleanUp(); _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); _controllerMetrics = new ControllerMetrics(_metricsRegistry); _segmentStatusChecker = @@ -399,10 +398,12 @@ public class SegmentStatusCheckerTest { _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0); - Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.NUMBER_OF_REPLICAS), 0); - Assert.assertEquals( - _controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.TABLE_COMPRESSED_SIZE), 0); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, + ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, + ControllerGauge.NUMBER_OF_REPLICAS), 0); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, + ControllerGauge.TABLE_COMPRESSED_SIZE), 0); } @Test @@ -431,6 +432,7 @@ public class SegmentStatusCheckerTest { _tableSizeReader = mock(TableSizeReader.class); when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null); } + PinotMetricUtils.cleanUp(); _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); _controllerMetrics = new ControllerMetrics(_metricsRegistry); _segmentStatusChecker = @@ -439,14 +441,14 @@ public class SegmentStatusCheckerTest { _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE), - Long.MIN_VALUE); - Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.NUMBER_OF_REPLICAS), - Long.MIN_VALUE); - Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.PERCENT_OF_REPLICAS), - Long.MIN_VALUE); - Assert.assertEquals( - _controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.TABLE_COMPRESSED_SIZE), 0); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, + ControllerGauge.SEGMENTS_IN_ERROR_STATE), Long.MIN_VALUE); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, + ControllerGauge.NUMBER_OF_REPLICAS), Long.MIN_VALUE); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, + ControllerGauge.PERCENT_OF_REPLICAS), Long.MIN_VALUE); + Assert.assertFalse(MetricValueUtils.tableGaugeExists(_controllerMetrics, tableName, + ControllerGauge.TABLE_COMPRESSED_SIZE)); } @Test @@ -522,6 +524,7 @@ public class SegmentStatusCheckerTest { _tableSizeReader = mock(TableSizeReader.class); when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null); } + PinotMetricUtils.cleanUp(); _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); _controllerMetrics = new ControllerMetrics(_metricsRegistry); _segmentStatusChecker = @@ -530,19 +533,16 @@ public class SegmentStatusCheckerTest { _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertEquals( - _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0); - Assert - .assertEquals(_controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.NUMBER_OF_REPLICAS), - 2); - Assert - .assertEquals( - _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.PERCENT_OF_REPLICAS), - 100); - Assert.assertEquals( - _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); - Assert.assertEquals( - _controllerMetrics.getValueOfTableGauge(externalView.getId(), ControllerGauge.TABLE_COMPRESSED_SIZE), 0); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.NUMBER_OF_REPLICAS), 2); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.PERCENT_OF_REPLICAS), 100); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, externalView.getId(), + ControllerGauge.TABLE_COMPRESSED_SIZE), 0); } @Test @@ -579,6 +579,7 @@ public class SegmentStatusCheckerTest { _tableSizeReader = mock(TableSizeReader.class); when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null); } + PinotMetricUtils.cleanUp(); _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); _controllerMetrics = new ControllerMetrics(_metricsRegistry); _segmentStatusChecker = @@ -587,11 +588,14 @@ public class SegmentStatusCheckerTest { _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0); - Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.NUMBER_OF_REPLICAS), 1); - Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.PERCENT_OF_REPLICAS), 100); - Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), - 100); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, + ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, + ControllerGauge.NUMBER_OF_REPLICAS), 1); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, + ControllerGauge.PERCENT_OF_REPLICAS), 100); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, + ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); } @Test @@ -625,17 +629,20 @@ public class SegmentStatusCheckerTest { _leadControllerManager = mock(LeadControllerManager.class); when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); } + PinotMetricUtils.cleanUp(); _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); _controllerMetrics = new ControllerMetrics(_metricsRegistry); _segmentStatusChecker = new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, _executorService); // verify state before test - Assert.assertEquals(_controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT), 0); + Assert.assertEquals( + MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT), 0); // update metrics _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertEquals(_controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT), 1); + Assert.assertEquals( + MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT), 1); } @Test @@ -665,17 +672,20 @@ public class SegmentStatusCheckerTest { _leadControllerManager = mock(LeadControllerManager.class); when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); } + PinotMetricUtils.cleanUp(); _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); _controllerMetrics = new ControllerMetrics(_metricsRegistry); _segmentStatusChecker = new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics, _executorService); // verify state before test - Assert.assertEquals(_controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT), 0); + Assert.assertFalse( + MetricValueUtils.globalGaugeExists(_controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT)); // update metrics _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertEquals(_controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT), 1); + Assert.assertEquals( + MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT), 1); } @Test @@ -720,6 +730,7 @@ public class SegmentStatusCheckerTest { _tableSizeReader = mock(TableSizeReader.class); when(_tableSizeReader.getTableSizeDetails(anyString(), anyInt())).thenReturn(null); } + PinotMetricUtils.cleanUp(); _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(); _controllerMetrics = new ControllerMetrics(_metricsRegistry); _segmentStatusChecker = @@ -728,12 +739,14 @@ public class SegmentStatusCheckerTest { _segmentStatusChecker.setTableSizeReader(_tableSizeReader); _segmentStatusChecker.start(); _segmentStatusChecker.run(); - Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE), - Long.MIN_VALUE); - Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.NUMBER_OF_REPLICAS), - nReplicasExpectedValue); - Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.PERCENT_OF_REPLICAS), 100); - Assert.assertEquals(_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), - 100); + + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, + ControllerGauge.SEGMENTS_IN_ERROR_STATE), Long.MIN_VALUE); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, + ControllerGauge.NUMBER_OF_REPLICAS), nReplicasExpectedValue); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, + ControllerGauge.PERCENT_OF_REPLICAS), 100); + Assert.assertEquals(MetricValueUtils.getTableGaugeValue(_controllerMetrics, tableName, + ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100); } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTaskStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTaskStatelessTest.java index 4bc0b4cd10..86335e8f80 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTaskStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/cleanup/StaleInstancesCleanupTaskStatelessTest.java @@ -21,8 +21,10 @@ package org.apache.pinot.controller.helix.core.cleanup; import java.util.Map; import java.util.Properties; import org.apache.pinot.common.metrics.ControllerGauge; +import org.apache.pinot.common.metrics.MetricValueUtils; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.helix.ControllerTest; +import org.apache.pinot.spi.metrics.PinotMetricUtils; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -41,82 +43,85 @@ public class StaleInstancesCleanupTaskStatelessTest extends ControllerTest { @Test public void testStaleInstancesCleanupTaskForBrokers() throws Exception { + PinotMetricUtils.cleanUp(); StaleInstancesCleanupTask staleInstancesCleanupTask = _controllerStarter.getStaleInstancesCleanupTask(); staleInstancesCleanupTask.runTask(new Properties()); - Assert.assertEquals( - _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_BROKER_INSTANCES), 0); + Assert.assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.DROPPED_BROKER_INSTANCES), 0); addFakeBrokerInstancesToAutoJoinHelixCluster(3, true); - Assert.assertEquals( - _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_BROKER_INSTANCES), 0); + Assert.assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.DROPPED_BROKER_INSTANCES), 0); stopFakeInstance("Broker_localhost_0"); Thread.sleep(1000); staleInstancesCleanupTask.runTask(new Properties()); - Assert.assertEquals( - _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_BROKER_INSTANCES), 1); + Assert.assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.DROPPED_BROKER_INSTANCES), 1); stopFakeInstance("Broker_localhost_1"); Thread.sleep(1000); staleInstancesCleanupTask.runTask(new Properties()); - Assert.assertEquals( - _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_BROKER_INSTANCES), 2); + Assert.assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.DROPPED_BROKER_INSTANCES), 2); stopFakeInstance("Broker_localhost_2"); Thread.sleep(1000); staleInstancesCleanupTask.runTask(new Properties()); - Assert.assertEquals( - _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_BROKER_INSTANCES), 3); + Assert.assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.DROPPED_BROKER_INSTANCES), 3); } @Test public void testStaleInstancesCleanupTaskForServers() throws Exception { + PinotMetricUtils.cleanUp(); StaleInstancesCleanupTask staleInstancesCleanupTask = _controllerStarter.getStaleInstancesCleanupTask(); staleInstancesCleanupTask.runTask(new Properties()); - Assert.assertEquals( - _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_SERVER_INSTANCES), 0); + Assert.assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.DROPPED_SERVER_INSTANCES), 0); addFakeServerInstancesToAutoJoinHelixCluster(3, true); - Assert.assertEquals( - _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_SERVER_INSTANCES), 0); + Assert.assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.DROPPED_SERVER_INSTANCES), 0); stopFakeInstance("Server_localhost_0"); Thread.sleep(1000); staleInstancesCleanupTask.runTask(new Properties()); - Assert.assertEquals( - _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_SERVER_INSTANCES), 1); + Assert.assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.DROPPED_SERVER_INSTANCES), 1); stopFakeInstance("Server_localhost_1"); Thread.sleep(1000); staleInstancesCleanupTask.runTask(new Properties()); - Assert.assertEquals( - _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_SERVER_INSTANCES), 2); + Assert.assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.DROPPED_SERVER_INSTANCES), 2); stopFakeInstance("Server_localhost_2"); Thread.sleep(1000); staleInstancesCleanupTask.runTask(new Properties()); - Assert.assertEquals( - _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_SERVER_INSTANCES), 3); + Assert.assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.DROPPED_SERVER_INSTANCES), 3); } @Test public void testStaleInstancesCleanupTaskForMinions() throws Exception { + PinotMetricUtils.cleanUp(); StaleInstancesCleanupTask staleInstancesCleanupTask = _controllerStarter.getStaleInstancesCleanupTask(); staleInstancesCleanupTask.runTask(new Properties()); - Assert.assertEquals( - _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 0); + Assert.assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.DROPPED_MINION_INSTANCES), 0); addFakeMinionInstancesToAutoJoinHelixCluster(3); - Assert.assertEquals( - _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 0); + Assert.assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.DROPPED_MINION_INSTANCES), 0); stopFakeInstance("Minion_localhost_0"); Thread.sleep(1000); staleInstancesCleanupTask.runTask(new Properties()); - Assert.assertEquals( - _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 1); + Assert.assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.DROPPED_MINION_INSTANCES), 1); stopFakeInstance("Minion_localhost_1"); Thread.sleep(1000); staleInstancesCleanupTask.runTask(new Properties()); - Assert.assertEquals( - _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 2); + Assert.assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.DROPPED_MINION_INSTANCES), 2); stopFakeInstance("Minion_localhost_2"); Thread.sleep(1000); staleInstancesCleanupTask.runTask(new Properties()); - Assert.assertEquals( - _controllerStarter.getControllerMetrics().getValueOfGlobalGauge(ControllerGauge.DROPPED_MINION_INSTANCES), 3); + Assert.assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerStarter.getControllerMetrics(), + ControllerGauge.DROPPED_MINION_INSTANCES), 3); } @Override diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java index a012d6a275..7cac8ef101 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.common.metrics.MetricValueUtils; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; @@ -118,8 +119,8 @@ public class ControllerPeriodicTaskTest { assertEquals(_tablesProcessed.get(), 0); assertFalse(_stopTaskCalled.get()); assertTrue(_task.isStarted()); - assertEquals( - _controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, TASK_NAME), 0); + assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, TASK_NAME, + ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), 0); // Run periodic task with leadership resetState(); @@ -127,8 +128,8 @@ public class ControllerPeriodicTaskTest { assertFalse(_startTaskCalled.get()); assertTrue(_processTablesCalled.get()); assertEquals(_tablesProcessed.get(), _numTables); - assertEquals( - _controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, TASK_NAME), + assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, TASK_NAME, + ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), _numTables); assertFalse(_stopTaskCalled.get()); assertTrue(_task.isStarted()); @@ -139,8 +140,8 @@ public class ControllerPeriodicTaskTest { assertFalse(_startTaskCalled.get()); assertFalse(_processTablesCalled.get()); assertEquals(_tablesProcessed.get(), 0); - assertEquals( - _controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, TASK_NAME), 0); + assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, TASK_NAME, + ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), 0); assertTrue(_stopTaskCalled.get()); assertFalse(_task.isStarted()); @@ -150,8 +151,8 @@ public class ControllerPeriodicTaskTest { assertFalse(_startTaskCalled.get()); assertFalse(_processTablesCalled.get()); assertEquals(_tablesProcessed.get(), 0); - assertEquals( - _controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, TASK_NAME), 0); + assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, TASK_NAME, + ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), 0); assertFalse(_stopTaskCalled.get()); assertFalse(_task.isStarted()); @@ -163,8 +164,8 @@ public class ControllerPeriodicTaskTest { assertEquals(_tablesProcessed.get(), 0); assertFalse(_stopTaskCalled.get()); assertTrue(_task.isStarted()); - assertEquals( - _controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, TASK_NAME), 0); + assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, TASK_NAME, + ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), 0); // Run periodic task with leadership resetState(); @@ -172,9 +173,8 @@ public class ControllerPeriodicTaskTest { assertFalse(_startTaskCalled.get()); assertTrue(_processTablesCalled.get()); assertEquals(_tablesProcessed.get(), _numTables); - assertEquals( - _controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, TASK_NAME), - _numTables); + assertEquals(MetricValueUtils.getGlobalGaugeValue(_controllerMetrics, TASK_NAME, + ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED), _numTables); assertFalse(_stopTaskCalled.get()); assertTrue(_task.isStarted()); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java index e00d2470b7..888db0e4cc 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java @@ -22,6 +22,7 @@ import java.util.Collections; import org.apache.pinot.common.exception.InvalidConfigException; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.common.metrics.MetricValueUtils; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.util.TableSizeReader; import org.apache.pinot.spi.config.table.QuotaConfig; @@ -104,34 +105,38 @@ public class StorageQuotaCheckerTest { // No response from server, should pass without updating metrics mockTableSizeResult(-1, 0); assertTrue(isSegmentWithinQuota()); - assertEquals( - controllerMetrics.getValueOfTableGauge(OFFLINE_TABLE_NAME, ControllerGauge.OFFLINE_TABLE_ESTIMATED_SIZE), 0); + assertFalse( + MetricValueUtils.tableGaugeExists(controllerMetrics, OFFLINE_TABLE_NAME, + ControllerGauge.OFFLINE_TABLE_ESTIMATED_SIZE)); // Within quota but with missing segments, should pass without updating metrics mockTableSizeResult(4 * 1024, 1); assertTrue(isSegmentWithinQuota()); - assertEquals( - controllerMetrics.getValueOfTableGauge(OFFLINE_TABLE_NAME, ControllerGauge.OFFLINE_TABLE_ESTIMATED_SIZE), 0); + assertFalse( + MetricValueUtils.tableGaugeExists(controllerMetrics, OFFLINE_TABLE_NAME, + ControllerGauge.OFFLINE_TABLE_ESTIMATED_SIZE)); + // Exceed quota and with missing segments, should fail without updating metrics mockTableSizeResult(8 * 1024, 1); assertFalse(isSegmentWithinQuota()); - assertEquals( - controllerMetrics.getValueOfTableGauge(OFFLINE_TABLE_NAME, ControllerGauge.OFFLINE_TABLE_ESTIMATED_SIZE), 0); + assertFalse( + MetricValueUtils.tableGaugeExists(controllerMetrics, OFFLINE_TABLE_NAME, + ControllerGauge.OFFLINE_TABLE_ESTIMATED_SIZE)); // Within quota without missing segments, should pass and update metrics mockTableSizeResult(3 * 1024, 0); assertTrue(isSegmentWithinQuota()); assertEquals( - controllerMetrics.getValueOfTableGauge(OFFLINE_TABLE_NAME, ControllerGauge.OFFLINE_TABLE_ESTIMATED_SIZE), - 3 * 1024); + MetricValueUtils.getTableGaugeValue(controllerMetrics, OFFLINE_TABLE_NAME, + ControllerGauge.OFFLINE_TABLE_ESTIMATED_SIZE), 3 * 1024); // Exceed quota without missing segments, should fail and update metrics mockTableSizeResult(4 * 1024, 0); assertFalse(isSegmentWithinQuota()); assertEquals( - controllerMetrics.getValueOfTableGauge(OFFLINE_TABLE_NAME, ControllerGauge.OFFLINE_TABLE_ESTIMATED_SIZE), - 4 * 1024); + MetricValueUtils.getTableGaugeValue(controllerMetrics, OFFLINE_TABLE_NAME, + ControllerGauge.OFFLINE_TABLE_ESTIMATED_SIZE), 4 * 1024); } private boolean isSegmentWithinQuota() diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java index 46d486becc..967ef11bb5 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java @@ -30,6 +30,7 @@ import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.common.metrics.MetricValueUtils; import org.apache.pinot.common.metrics.ValidationMetrics; import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.common.utils.helix.HelixHelper; @@ -206,9 +207,8 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati int numTables = 5; ControllerMetrics controllerMetrics = _controllerStarter.getControllerMetrics(); TestUtils.waitForCondition(aVoid -> { - if (controllerMetrics - .getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, "SegmentStatusChecker") - != numTables) { + if (MetricValueUtils.getGlobalGaugeValue(controllerMetrics, "SegmentStatusChecker", + ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED) != numTables) { return false; } if (!checkSegmentStatusCheckerMetrics(controllerMetrics, TableNameBuilder.OFFLINE.tableNameWithType(emptyTable), @@ -239,9 +239,9 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati 100)) { return false; } - return controllerMetrics.getValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT) == 4 - && controllerMetrics.getValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT) == 1 - && controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT) == 1; + return MetricValueUtils.getGlobalGaugeValue(controllerMetrics, ControllerGauge.OFFLINE_TABLE_COUNT) == 4 + && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, ControllerGauge.REALTIME_TABLE_COUNT) == 1 + && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, ControllerGauge.DISABLED_TABLE_COUNT) == 1; }, 60_000, "Timed out waiting for SegmentStatusChecker"); dropOfflineTable(emptyTable); @@ -253,23 +253,23 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati IdealState idealState, long expectedNumReplicas, long expectedPercentReplicas, long expectedSegmentsInErrorState, long expectedPercentSegmentsAvailable) { if (idealState != null) { - if (controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.IDEALSTATE_ZNODE_SIZE) != idealState - .toString().length()) { + if (MetricValueUtils.getTableGaugeValue(controllerMetrics, tableNameWithType, + ControllerGauge.IDEALSTATE_ZNODE_SIZE) != idealState.toString().length()) { return false; } - if (controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENT_COUNT) != idealState - .getPartitionSet().size()) { + if (MetricValueUtils.getTableGaugeValue(controllerMetrics, tableNameWithType, ControllerGauge.SEGMENT_COUNT) + != idealState.getPartitionSet().size()) { return false; } } - return controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS) - == expectedNumReplicas - && controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS) - == expectedPercentReplicas - && controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE) - == expectedSegmentsInErrorState - && controllerMetrics.getValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE) - == expectedPercentSegmentsAvailable; + return MetricValueUtils.getTableGaugeValue(controllerMetrics, tableNameWithType, + ControllerGauge.NUMBER_OF_REPLICAS) == expectedNumReplicas + && MetricValueUtils.getTableGaugeValue(controllerMetrics, tableNameWithType, + ControllerGauge.PERCENT_OF_REPLICAS) == expectedPercentReplicas + && MetricValueUtils.getTableGaugeValue(controllerMetrics, tableNameWithType, + ControllerGauge.SEGMENTS_IN_ERROR_STATE) == expectedSegmentsInErrorState + && MetricValueUtils.getTableGaugeValue(controllerMetrics, tableNameWithType, + ControllerGauge.PERCENT_SEGMENTS_AVAILABLE) == expectedPercentSegmentsAvailable; } @Test diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java index 2519a791a1..afb92bce3a 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java @@ -26,6 +26,7 @@ import org.apache.helix.model.builder.HelixConfigScopeBuilder; import org.apache.helix.task.TaskState; import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.common.metrics.MetricValueUtils; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; @@ -166,9 +167,11 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { String stoppedGauge = TASK_TYPE + "." + TaskState.STOPPED; String completedGauge = TASK_TYPE + "." + TaskState.COMPLETED; TestUtils.waitForCondition( - input -> controllerMetrics.getValueOfTableGauge(inProgressGauge, ControllerGauge.TASK_STATUS) == NUM_TASKS - && controllerMetrics.getValueOfTableGauge(stoppedGauge, ControllerGauge.TASK_STATUS) == 0 - && controllerMetrics.getValueOfTableGauge(completedGauge, ControllerGauge.TASK_STATUS) == 0, + input -> MetricValueUtils.getGlobalGaugeValue(controllerMetrics, inProgressGauge, ControllerGauge.TASK_STATUS) + == NUM_TASKS + && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, stoppedGauge, ControllerGauge.TASK_STATUS) == 0 + && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, completedGauge, ControllerGauge.TASK_STATUS) + == 0, ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges"); // Stop the task queue @@ -192,9 +195,12 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { // Wait at most 30 seconds for ZK callback to update the controller gauges TestUtils.waitForCondition( - input -> controllerMetrics.getValueOfTableGauge(inProgressGauge, ControllerGauge.TASK_STATUS) == 0 - && controllerMetrics.getValueOfTableGauge(stoppedGauge, ControllerGauge.TASK_STATUS) == NUM_TASKS - && controllerMetrics.getValueOfTableGauge(completedGauge, ControllerGauge.TASK_STATUS) == 0, + input -> MetricValueUtils.getGlobalGaugeValue(controllerMetrics, inProgressGauge, ControllerGauge.TASK_STATUS) + == 0 + && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, stoppedGauge, ControllerGauge.TASK_STATUS) + == NUM_TASKS + && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, completedGauge, ControllerGauge.TASK_STATUS) + == 0, ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges"); // Task deletion requires the task queue to be stopped, @@ -225,9 +231,11 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { // Wait at most 30 seconds for ZK callback to update the controller gauges TestUtils.waitForCondition( - input -> controllerMetrics.getValueOfTableGauge(inProgressGauge, ControllerGauge.TASK_STATUS) == 0 - && controllerMetrics.getValueOfTableGauge(stoppedGauge, ControllerGauge.TASK_STATUS) == 0 - && controllerMetrics.getValueOfTableGauge(completedGauge, ControllerGauge.TASK_STATUS) == (NUM_TASKS - 1), + input -> MetricValueUtils.getGlobalGaugeValue(controllerMetrics, inProgressGauge, ControllerGauge.TASK_STATUS) + == 0 + && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, stoppedGauge, ControllerGauge.TASK_STATUS) == 0 + && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, completedGauge, ControllerGauge.TASK_STATUS) + == (NUM_TASKS - 1), ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges"); // Delete the task queue @@ -239,9 +247,11 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest { // Wait at most 30 seconds for ZK callback to update the controller gauges TestUtils.waitForCondition( - input -> controllerMetrics.getValueOfTableGauge(inProgressGauge, ControllerGauge.TASK_STATUS) == 0 - && controllerMetrics.getValueOfTableGauge(stoppedGauge, ControllerGauge.TASK_STATUS) == 0 - && controllerMetrics.getValueOfTableGauge(completedGauge, ControllerGauge.TASK_STATUS) == 0, + input -> MetricValueUtils.getGlobalGaugeValue(controllerMetrics, inProgressGauge, ControllerGauge.TASK_STATUS) + == 0 + && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, stoppedGauge, ControllerGauge.TASK_STATUS) == 0 + && MetricValueUtils.getGlobalGaugeValue(controllerMetrics, completedGauge, ControllerGauge.TASK_STATUS) + == 0, ZK_CALLBACK_TIMEOUT_MS, "Failed to update the controller gauges"); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/metrics/PinotMetricUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/metrics/PinotMetricUtils.java index ef379e04da..f098820056 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/metrics/PinotMetricUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/metrics/PinotMetricUtils.java @@ -189,6 +189,17 @@ public class PinotMetricUtils { return getPinotMetricsRegistry(new PinotConfiguration(Collections.emptyMap())); } + /** + * Cleans up previous emitted metrics + */ + @VisibleForTesting + public static void cleanUp() { + if (_pinotMetricsFactory != null) { + _pinotMetricsFactory.getPinotMetricsRegistry().shutdown(); + _pinotMetricsFactory = null; + } + } + /** * Returns the metricsRegistry from the initialised metricsFactory. * If the metricsFactory is null, first creates and initializes the metricsFactory and registers the metrics registry. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org