This is an automated email from the ASF dual-hosted git repository. edcoleman pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 6c1e453f53 Merge branch '2.1' 6c1e453f53 is described below commit 6c1e453f5399dc21bf9889bb78e0ff541ad95878 Author: Ed Coleman <edcole...@apache.org> AuthorDate: Wed Apr 24 18:47:12 2024 +0000 Merge branch '2.1' - excludes changes to the grep iterator and grep shell command. --- core/pom.xml | 1 + .../org/apache/accumulo/core/conf/Property.java | 11 +- .../core/metrics/MeterRegistryFactory.java | 5 + .../apache/accumulo/core/metrics/MetricsInfo.java | 114 ++++++++ .../accumulo/core/metrics/MetricsProducer.java | 43 --- .../apache/accumulo/core/metrics/MetricsUtil.java | 132 --------- .../spi/metrics/LoggingMeterRegistryFactory.java | 81 ++++++ .../core/spi/metrics/MeterRegistryFactory.java | 72 +++++ .../accumulo/core/util/threads/ThreadPools.java | 38 ++- .../metrics/LoggingMeterRegistryFactoryTest.java | 52 ++++ .../miniclusterImpl/MiniAccumuloConfigImpl.java | 3 + .../org/apache/accumulo/server/AbstractServer.java | 9 +- .../org/apache/accumulo/server/ServerContext.java | 13 + .../server/compaction/PausedCompactionMetrics.java | 5 +- .../conf/store/impl/PropCacheCaffeineImpl.java | 18 +- .../server/conf/store/impl/PropStoreMetrics.java | 92 ------ .../server/conf/store/impl/ZooPropLoader.java | 19 +- .../server/conf/store/impl/ZooPropStore.java | 15 +- .../server/metrics/MeterRegistryEnvPropImpl.java | 75 +++++ .../accumulo/server/metrics/MetricsInfoImpl.java | 319 +++++++++++++++++++++ .../apache/accumulo/server/rpc/TServerUtils.java | 11 +- .../apache/accumulo/server/rpc/TimedProcessor.java | 6 +- .../conf/store/impl/PropCacheCaffeineImplTest.java | 6 +- .../server/conf/store/impl/PropStoreEventTest.java | 8 +- .../server/conf/store/impl/ZooPropLoaderTest.java | 78 ++--- .../metrics/MeterRegistryEnvPropImplTest.java | 50 ++++ .../server/metrics/MetricsInfoImplTest.java | 84 ++++++ .../accumulo/server/rpc/TServerUtilsTest.java | 19 +- .../coordinator/CompactionCoordinator.java | 15 +- .../org/apache/accumulo/compactor/Compactor.java | 18 +- .../apache/accumulo/gc/SimpleGarbageCollector.java | 19 +- .../java/org/apache/accumulo/manager/Manager.java | 20 +- .../accumulo/manager/metrics/ManagerMetrics.java | 24 +- .../accumulo/manager/metrics/fate/FateMetrics.java | 49 ++-- .../java/org/apache/accumulo/monitor/Monitor.java | 6 + .../org/apache/accumulo/tserver/ScanServer.java | 19 +- .../org/apache/accumulo/tserver/TabletServer.java | 34 +-- .../tserver/TabletServerResourceManager.java | 35 +-- .../tserver/metrics/TabletServerMinCMetrics.java | 8 +- .../tserver/metrics/TabletServerScanMetrics.java | 24 +- .../accumulo/tserver/tablet/CompactableImpl.java | 9 + .../test/conf/store/PropCacheCaffeineImplZkIT.java | 10 +- .../accumulo/test/functional/ZombieTServer.java | 2 +- .../apache/accumulo/test/metrics/MetricsIT.java | 16 +- .../test/metrics/TestStatsDRegistryFactory.java | 6 +- .../accumulo/test/performance/NullTserver.java | 2 +- test/src/main/resources/log4j2-test.properties | 23 -- 47 files changed, 1109 insertions(+), 609 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index 05d9a02c69..5d70035fab 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -271,6 +271,7 @@ </includes> <excludes /> <allows> + <allow>io[.]micrometer[.]core[.]instrument[.]MeterRegistry</allow> <allow>io[.]opentelemetry[.]api[.]OpenTelemetry</allow> <allow>org[.]apache[.]hadoop[.]io[.]Text</allow> <allow>org[.]apache[.]accumulo[.]core[.]client[.].*</allow> diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 7d48c05ae1..6dc56205ba 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -322,8 +322,15 @@ public enum Property { "Enables metrics functionality using Micrometer.", "2.1.0"), GENERAL_MICROMETER_JVM_METRICS_ENABLED("general.micrometer.jvm.metrics.enabled", "false", PropertyType.BOOLEAN, "Enables JVM metrics functionality using Micrometer.", "2.1.0"), - GENERAL_MICROMETER_FACTORY("general.micrometer.factory", "", PropertyType.CLASSNAME, - "Name of class that implements MeterRegistryFactory.", "2.1.0"), + GENERAL_MICROMETER_FACTORY("general.micrometer.factory", + "org.apache.accumulo.core.spi.metrics.LoggingMeterRegistryFactory", + PropertyType.CLASSNAMELIST, + "A comma separated list of one or more class names that implements" + + " org.apache.accumulo.core.spi.metrics.MeterRegistryFactory. Prior to" + + " 2.1.3 this was a single value and the default was an empty string. In 2.1.3 the default" + + " was changed and it now can accept multiple class names. The metrics spi was introduced in 2.1.3," + + " the deprecated factory is org.apache.accumulo.core.metrics.MeterRegistryFactory.", + "2.1.0"), GENERAL_PROCESS_BIND_ADDRESS("general.process.bind.addr", "0.0.0.0", PropertyType.STRING, "The local IP address to which this server should bind for sending and receiving network traffic.", "3.0.0"), diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/MeterRegistryFactory.java b/core/src/main/java/org/apache/accumulo/core/metrics/MeterRegistryFactory.java index acb4c4cbd2..e6fe10356c 100644 --- a/core/src/main/java/org/apache/accumulo/core/metrics/MeterRegistryFactory.java +++ b/core/src/main/java/org/apache/accumulo/core/metrics/MeterRegistryFactory.java @@ -20,6 +20,11 @@ package org.apache.accumulo.core.metrics; import io.micrometer.core.instrument.MeterRegistry; +/** + * @deprecated since 2.1.3; use {@link org.apache.accumulo.core.spi.metrics.MeterRegistryFactory} + * instead + */ +@Deprecated() public interface MeterRegistryFactory { MeterRegistry create(); } diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsInfo.java b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsInfo.java new file mode 100644 index 0000000000..b6ef72b1df --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsInfo.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.metrics; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import com.google.common.net.HostAndPort; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; + +public interface MetricsInfo { + + /** + * Convenience method to create tag name / value pair for the instance name + * + * @param instanceName the instance name + */ + static Tag instanceNameTag(final String instanceName) { + Objects.requireNonNull(instanceName, + "cannot create the tag without providing the instance name"); + return Tag.of("instance.name", instanceName); + } + + /** + * Convenience method to create tag name / value pair for the process name + * + * @param processName the process name + */ + static Tag processTag(final String processName) { + Objects.requireNonNull(processName, "cannot create the tag without providing the process name"); + return Tag.of("process.name", processName); + } + + /** + * Convenience method to create tag name / value pairs for the host and port from address + * host:port pair. + * + * @param hostAndPort the host:port pair + */ + static List<Tag> addressTags(final HostAndPort hostAndPort) { + Objects.requireNonNull(hostAndPort, "cannot create the tag without providing the hostAndPort"); + List<Tag> tags = new ArrayList<>(2); + tags.add(Tag.of("host", hostAndPort.getHost())); + int port = hostAndPort.getPort(); + if (port != 0) { + tags.add(Tag.of("port", Integer.toString(hostAndPort.getPort()))); + } + return Collections.unmodifiableList(tags); + } + + boolean isMetricsEnabled(); + + /** + * Convenience method to set the common tags for application (process), host and port. + * + * @param applicationName the application (process) name. + * @param hostAndPort the host:port pair + */ + void addServiceTags(final String applicationName, final HostAndPort hostAndPort); + + /** + * Add the list of tag name / value pair to the common tags that will be emitted with all metrics. + * Common tags must ne added before initialization of any registries. Tags that are added after a + * registry is initialized may not be emitted by the underlying metrics system. This would cause + * inconsistent grouping and filtering based on tags, + * + * @param updates list of tags (name / value pairs) + */ + void addCommonTags(final List<Tag> updates); + + /** + * Get the current list of common tags. + */ + Collection<Tag> getCommonTags(); + + void addRegistry(MeterRegistry registry); + + void addMetricsProducers(MetricsProducer... producer); + + /** + * Initialize the metrics system. This sets the list of common tags that are emitted with the + * metrics. + */ + void init(); + + MeterRegistry getRegistry(); + + /** + * Close the underlying registry and release resources. The registry will not accept new meters + * and will stop publishing metrics. + */ + void close(); +} diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java index 7aea9a7a6c..e50d2d9c03 100644 --- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java +++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java @@ -556,42 +556,6 @@ import io.micrometer.core.instrument.MeterRegistry; * <td>Distribution Summary</td> * <td></td> * </tr> - * <!-- ZooKeeper property cache --> - * <tr> - * <td>N/A</td> - * <td>N/A</td> - * <td>{@value #METRICS_PROPSTORE_LOAD_TIMER}</td> - * <td>Timer</td> - * <td></td> - * </tr> - * <tr> - * <td>N/A</td> - * <td>N/A</td> - * <td>{@value #METRICS_PROPSTORE_REFRESH_COUNT}</td> - * <td>Counter</td> - * <td></td> - * </tr> - * <tr> - * <td>N/A</td> - * <td>N/A</td> - * <td>{@value #METRICS_PROPSTORE_REFRESH_LOAD_COUNT}</td> - * <td>Counter</td> - * <td></td> - * </tr> - * <tr> - * <td>N/A</td> - * <td>N/A</td> - * <td>{@value #METRICS_PROPSTORE_EVICTION_COUNT}</td> - * <td>Counter</td> - * <td></td> - * </tr> - * <tr> - * <td>N/A</td> - * <td>N/A</td> - * <td>{@value #METRICS_PROPSTORE_ZK_ERROR_COUNT}</td> - * <td>Counter</td> - * <td></td> - * </tr> * </table> * * @since 2.1.0 @@ -683,13 +647,6 @@ public interface MetricsProducer { String METRICS_UPDATE_WALOG_WRITE = METRICS_UPDATE_PREFIX + "walog.write"; String METRICS_UPDATE_MUTATION_ARRAY_SIZE = METRICS_UPDATE_PREFIX + "mutation.arrays.size"; - String METRICS_PROPSTORE_PREFIX = "accumulo.prop.store."; - String METRICS_PROPSTORE_LOAD_TIMER = METRICS_PROPSTORE_PREFIX + "load"; - String METRICS_PROPSTORE_REFRESH_COUNT = METRICS_PROPSTORE_PREFIX + "refresh"; - String METRICS_PROPSTORE_REFRESH_LOAD_COUNT = METRICS_PROPSTORE_PREFIX + "refresh.load"; - String METRICS_PROPSTORE_EVICTION_COUNT = METRICS_PROPSTORE_PREFIX + "evictions"; - String METRICS_PROPSTORE_ZK_ERROR_COUNT = METRICS_PROPSTORE_PREFIX + "zookeeper.error"; - /** * Build Micrometer Meter objects and register them with the registry */ diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsUtil.java b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsUtil.java deleted file mode 100644 index fe24c93899..0000000000 --- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsUtil.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.core.metrics; - -import java.lang.reflect.InvocationTargetException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ExecutorService; - -import org.apache.accumulo.core.classloader.ClassLoaderUtil; -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.Property; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.net.HostAndPort; - -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.Metrics; -import io.micrometer.core.instrument.Tag; -import io.micrometer.core.instrument.binder.jvm.ClassLoaderMetrics; -import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics; -import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics; -import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics; -import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics; -import io.micrometer.core.instrument.binder.system.ProcessorMetrics; - -public class MetricsUtil { - - private static final Logger LOG = LoggerFactory.getLogger(MetricsUtil.class); - - private static JvmGcMetrics gc; - private static List<Tag> commonTags; - - public static void initializeMetrics(final AccumuloConfiguration conf, final String appName, - final HostAndPort address, final String instanceName) throws ClassNotFoundException, - InstantiationException, IllegalAccessException, IllegalArgumentException, - InvocationTargetException, NoSuchMethodException, SecurityException { - initializeMetrics(conf.getBoolean(Property.GENERAL_MICROMETER_ENABLED), - conf.getBoolean(Property.GENERAL_MICROMETER_JVM_METRICS_ENABLED), - conf.get(Property.GENERAL_MICROMETER_FACTORY), appName, address, instanceName); - } - - private static void initializeMetrics(boolean enabled, boolean jvmMetricsEnabled, - String factoryClass, String appName, HostAndPort address, String instanceName) - throws ClassNotFoundException, InstantiationException, IllegalAccessException, - IllegalArgumentException, InvocationTargetException, NoSuchMethodException, - SecurityException { - - LOG.info("initializing metrics, enabled:{}, class:{}", enabled, factoryClass); - - if (enabled && factoryClass != null && !factoryClass.isEmpty()) { - - String processName = appName; - String serviceInstance = System.getProperty("accumulo.metrics.service.instance", ""); - if (!serviceInstance.isBlank()) { - processName += serviceInstance; - } - - List<Tag> tags = new ArrayList<>(); - tags.add(Tag.of("instance.name", instanceName)); - tags.add(Tag.of("process.name", processName)); - - if (address != null) { - if (!address.getHost().isEmpty()) { - tags.add(Tag.of("host", address.getHost())); - } - if (address.getPort() > 0) { - tags.add(Tag.of("port", Integer.toString(address.getPort()))); - } - } - - commonTags = Collections.unmodifiableList(tags); - - Class<? extends MeterRegistryFactory> clazz = - ClassLoaderUtil.loadClass(factoryClass, MeterRegistryFactory.class); - MeterRegistryFactory factory = clazz.getDeclaredConstructor().newInstance(); - - MeterRegistry registry = factory.create(); - registry.config().commonTags(commonTags); - Metrics.addRegistry(registry); - - if (jvmMetricsEnabled) { - new ClassLoaderMetrics(commonTags).bindTo(Metrics.globalRegistry); - new JvmMemoryMetrics(commonTags).bindTo(Metrics.globalRegistry); - gc = new JvmGcMetrics(commonTags); - gc.bindTo(Metrics.globalRegistry); - new ProcessorMetrics(commonTags).bindTo(Metrics.globalRegistry); - new JvmThreadMetrics(commonTags).bindTo(Metrics.globalRegistry); - } - } - } - - public static void initializeProducers(MetricsProducer... producer) { - for (MetricsProducer p : producer) { - p.registerMetrics(Metrics.globalRegistry); - LOG.info("Metric producer {} initialize", p.getClass().getSimpleName()); - } - } - - public static void addExecutorServiceMetrics(ExecutorService executor, String name) { - new ExecutorServiceMetrics(executor, name, commonTags).bindTo(Metrics.globalRegistry); - } - - public static List<Tag> getCommonTags() { - return commonTags; - } - - public static void close() { - if (gc != null) { - gc.close(); - } - } - -} diff --git a/core/src/main/java/org/apache/accumulo/core/spi/metrics/LoggingMeterRegistryFactory.java b/core/src/main/java/org/apache/accumulo/core/spi/metrics/LoggingMeterRegistryFactory.java new file mode 100644 index 0000000000..2e24726e86 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/spi/metrics/LoggingMeterRegistryFactory.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.spi.metrics; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.logging.LoggingMeterRegistry; +import io.micrometer.core.instrument.logging.LoggingRegistryConfig; + +/** + * Example implementation of enabling a metrics provider by implementing the + * {@link org.apache.accumulo.core.spi.metrics.MeterRegistryFactory} interface. When enabled though + * properties by enabling {@code Property.GENERAL_MICROMETER_ENABLED} and providing this class for + * the {@code Property.GENERAL_MICROMETER_FACTORY} + * <p> + * The metrics will appear in the normal service logs with a named logger of + * {@code org.apache.accumulo.METRICS} at the INFO level. The metrics output can be directed to a + * file using standard logging configuration properties by configuring the log4j2-service.properties + * file. + * <p> + * Properties can be passed in the Accumulo properties files using the prefix + * {@code general.custom.metrics.opts} + * <p> + * For example, the default polling rate is 60 sec. To modify the update frequency set + * {@code general.custom.metrics.opts.logging.step} in the Accumulo configuration. + * + * <pre> + * general.custom.metrics.opts.logging.step = 10s + * </pre> + */ +public class LoggingMeterRegistryFactory implements MeterRegistryFactory { + + private static final Logger LOG = LoggerFactory.getLogger(LoggingMeterRegistryFactory.class); + + // named logger that can be configured using standard logging properties. + private static final Logger METRICS = LoggerFactory.getLogger("org.apache.accumulo.METRICS"); + + public LoggingMeterRegistryFactory() { + // needed for classloader + } + + @Override + public MeterRegistry create(final InitParameters params) { + final Consumer<String> metricConsumer = METRICS::info; + final Map<String,String> metricsProps = new HashMap<>(); + + // defines the metrics update period, default is 60 seconds. + final LoggingRegistryConfig lconf = c -> { + if (c.equals("logging.step")) { + return metricsProps.getOrDefault("logging.step", "60s"); + } + return null; + }; + + LOG.info("Creating logging metrics registry with params: {}", params); + metricsProps.putAll(params.getOptions()); + return LoggingMeterRegistry.builder(lconf).loggingSink(metricConsumer).build(); + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/spi/metrics/MeterRegistryFactory.java b/core/src/main/java/org/apache/accumulo/core/spi/metrics/MeterRegistryFactory.java new file mode 100644 index 0000000000..3e57cc200a --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/spi/metrics/MeterRegistryFactory.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.spi.metrics; + +import java.util.Map; + +import org.apache.accumulo.core.spi.common.ServiceEnvironment; + +import io.micrometer.core.instrument.MeterRegistry; + +/** + * The Micrometer metrics allows for different monitoring systems. and can be enabled within + * Accumulo with properties and are initialized by implementing this interface and providing the + * factory implementation clas name as a property. Metrics are specified with the following + * properties: + * <p> + * Property.GENERAL_MICROMETER_ENABLED = true + * <p> + * Property.GENERAL_MICROMETER_FACTORY = [implementation].class.getName() + * + * @since 2.1.3 + */ +public interface MeterRegistryFactory { + // full form in property file is "general.custom.metrics.opts" + String METRICS_PROP_SUBSTRING = "metrics.opts."; + + interface InitParameters { + /** + * Get the configured metrics properties passed as {@code general.custom.metrics.opts} The + * returned map is the stripped names with {@code general.custom.metrics.opts} removed. + * <p> + * For example properties {@code general.custom.metrics.opts.prop1=abc} and + * {@code general.custom.metrics.opts.prop9=123} are set, then this map would contain + * {@code prop1=abc} and {@code prop9=123}. + * + * @return a map of property name, value pairs, stripped of a prefix. + */ + Map<String,String> getOptions(); + + /** + * Optional extension point to pass additional information though the ServiceEnvironment. + * + * @return the service environment + */ + ServiceEnvironment getServiceEnv(); + } + + /** + * Called on metrics initialization. Implementations should note the initial parameters set when + * instantiating a MeterRegistry should be considered fixed. Once a MeterRegistry is initialized + * parameters such as common tags may not be updated with later additions or changes. + * + * @return a Micrometer registry that will be added to the metrics configuration. + */ + MeterRegistry create(final InitParameters params); +} diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java index 334fb46a53..324fafebc8 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java @@ -31,6 +31,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledFuture; @@ -42,7 +43,6 @@ import java.util.function.IntSupplier; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.metrics.MetricsUtil; import org.apache.accumulo.core.trace.TraceUtil; import org.checkerframework.checker.nullness.qual.NonNull; import org.slf4j.Logger; @@ -51,6 +51,8 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics; @SuppressFBWarnings(value = "RV_EXCEPTION_NOT_THROWN", justification = "Throwing Error for it to be caught by AccumuloUncaughtExceptionHandler") @@ -439,16 +441,30 @@ public class ThreadPools { } /** - * When set to true will emit metrics and register the metrics in a static registry. After the - * thread pool is deleted, there will still be metrics objects related to it in the static - * registry. There is no way to clean these leftover objects up therefore its recommended that - * this option only be set true for long-lived thread pools. Creating lots of short-lived thread - * pools and registering them can lead to out of memory errors over long time periods. + * When set to true will emit metrics and register the metrics in a registry. After the thread + * pool is deleted, there will still be metrics objects related to it in the static registry. + * There is no way to clean these leftover objects up therefore its recommended that this option + * only be set true for long-lived thread pools. Creating lots of short-lived thread pools and + * registering them can lead to out of memory errors over long time periods. * * @return a fluent-style builder instance */ public ThreadPoolExecutorBuilder enableThreadPoolMetrics() { - this.emitThreadPoolMetrics = true; + return enableThreadPoolMetrics(true); + } + + /** + * Optionally set to register pool metrics. When set to true will emit metrics and register the + * metrics in a registry. After the thread pool is deleted, there will still be metrics objects + * related to it in the static registry. There is no way to clean these leftover objects up + * therefore its recommended that this option only be set true for long-lived thread pools. + * Creating lots of short-lived thread pools and registering them can lead to out of memory + * errors over long time periods. + * + * @return a fluent-style builder instance + */ + public ThreadPoolExecutorBuilder enableThreadPoolMetrics(final boolean enable) { + this.emitThreadPoolMetrics = enable; return this; } } @@ -513,7 +529,7 @@ public class ThreadPools { result.allowCoreThreadTimeOut(true); } if (emitThreadPoolMetrics) { - MetricsUtil.addExecutorServiceMetrics(result, name); + ThreadPools.addExecutorServiceMetrics(result, name); } return result; } @@ -618,9 +634,13 @@ public class ThreadPools { }; if (emitThreadPoolMetrics) { - MetricsUtil.addExecutorServiceMetrics(result, name); + ThreadPools.addExecutorServiceMetrics(result, name); } return result; } + private static void addExecutorServiceMetrics(ExecutorService executor, String name) { + new ExecutorServiceMetrics(executor, name, List.of()).bindTo(Metrics.globalRegistry); + } + } diff --git a/core/src/test/java/org/apache/accumulo/core/spi/metrics/LoggingMeterRegistryFactoryTest.java b/core/src/test/java/org/apache/accumulo/core/spi/metrics/LoggingMeterRegistryFactoryTest.java new file mode 100644 index 0000000000..9d628d2775 --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/spi/metrics/LoggingMeterRegistryFactoryTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.spi.metrics; + +import static org.junit.jupiter.api.Assertions.assertInstanceOf; + +import java.util.Map; + +import org.apache.accumulo.core.spi.common.ServiceEnvironment; +import org.junit.jupiter.api.Test; + +import io.micrometer.core.instrument.logging.LoggingMeterRegistry; + +class LoggingMeterRegistryFactoryTest { + + @Test + public void createTest() { + LoggingMeterRegistryFactory factory = new LoggingMeterRegistryFactory(); + var reg = factory.create(new LoggingMetricsParams()); + assertInstanceOf(LoggingMeterRegistry.class, reg); + } + + private static class LoggingMetricsParams implements MeterRegistryFactory.InitParameters { + + @Override + public Map<String,String> getOptions() { + // note: general.custom.metrics.opts. is expected to be stripped before passing the options. + return Map.of("prop1", "abc", "logging.step", "1s"); + } + + @Override + public ServiceEnvironment getServiceEnv() { + return null; + } + } +} diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java index efa6343868..b133280b84 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java @@ -147,6 +147,9 @@ public class MiniAccumuloConfigImpl { mergeProp(Property.INSTANCE_SECRET.getKey(), DEFAULT_INSTANCE_SECRET); } + // enable metrics reporting - by default will appear in standard log files. + mergeProp(Property.GENERAL_MICROMETER_ENABLED.getKey(), "true"); + mergeProp(Property.TSERV_PORTSEARCH.getKey(), "true"); mergeProp(Property.TSERV_DATACACHE_SIZE.getKey(), "10M"); mergeProp(Property.TSERV_INDEXCACHE_SIZE.getKey(), "10M"); diff --git a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java index 8966687983..bd66689d0f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java @@ -28,7 +28,6 @@ import org.apache.accumulo.core.cli.ConfigOpts; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.metrics.MetricsProducer; -import org.apache.accumulo.core.metrics.MetricsUtil; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.server.mem.LowMemoryDetector; @@ -116,9 +115,11 @@ public abstract class AbstractServer implements AutoCloseable, MetricsProducer, return getContext().getConfiguration(); } - @Override - public void close() { - MetricsUtil.close(); + public String getApplicationName() { + return applicationName; } + @Override + public void close() {} + } diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java index e58645d00b..bf75821828 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java +++ b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java @@ -53,6 +53,7 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.fate.zookeeper.ZooReader; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.rpc.SslConnectionParams; import org.apache.accumulo.core.singletons.SingletonReservation; import org.apache.accumulo.core.spi.crypto.CryptoServiceFactory; @@ -67,6 +68,7 @@ import org.apache.accumulo.server.conf.store.impl.ZooPropStore; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.mem.LowMemoryDetector; import org.apache.accumulo.server.metadata.ServerAmpleImpl; +import org.apache.accumulo.server.metrics.MetricsInfoImpl; import org.apache.accumulo.server.rpc.SaslServerConnectionParams; import org.apache.accumulo.server.rpc.ThriftServerType; import org.apache.accumulo.server.security.AuditedSecurityOperation; @@ -103,6 +105,7 @@ public class ServerContext extends ClientContext { private final Supplier<AuditedSecurityOperation> securityOperation; private final Supplier<CryptoServiceFactory> cryptoFactorySupplier; private final Supplier<LowMemoryDetector> lowMemoryDetector; + private final Supplier<MetricsInfo> metricsInfoSupplier; public ServerContext(SiteConfiguration siteConfig) { this(new ServerInfo(siteConfig)); @@ -129,6 +132,7 @@ public class ServerContext extends ClientContext { memoize(() -> new AuditedSecurityOperation(this, SecurityOperation.getAuthorizor(this), SecurityOperation.getAuthenticator(this), SecurityOperation.getPermHandler(this))); lowMemoryDetector = memoize(() -> new LowMemoryDetector()); + metricsInfoSupplier = memoize(() -> new MetricsInfoImpl(this)); } /** @@ -459,4 +463,13 @@ public class ServerContext extends ClientContext { return lowMemoryDetector.get(); } + public MetricsInfo getMetricsInfo() { + return metricsInfoSupplier.get(); + } + + @Override + public void close() { + getMetricsInfo().close(); + super.close(); + } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/PausedCompactionMetrics.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/PausedCompactionMetrics.java index b8731cda08..3d1a07be42 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/PausedCompactionMetrics.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/PausedCompactionMetrics.java @@ -19,7 +19,6 @@ package org.apache.accumulo.server.compaction; import org.apache.accumulo.core.metrics.MetricsProducer; -import org.apache.accumulo.core.metrics.MetricsUtil; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.MeterRegistry; @@ -40,9 +39,9 @@ public class PausedCompactionMetrics implements MetricsProducer { @Override public void registerMetrics(MeterRegistry registry) { majcPauses = Counter.builder(METRICS_MAJC_PAUSED).description("major compaction pause count") - .tags(MetricsUtil.getCommonTags()).register(registry); + .register(registry); mincPauses = Counter.builder(METRICS_MINC_PAUSED).description("minor compactor pause count") - .tags(MetricsUtil.getCommonTags()).register(registry); + .register(registry); } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java index 5c42d0c0a3..d55c9465b1 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java @@ -48,13 +48,10 @@ public class PropCacheCaffeineImpl implements PropCache { ThreadPools.getServerThreadPools().getPoolBuilder("caffeine-tasks").numCoreThreads(1) .numMaxThreads(20).withTimeOut(60L, SECONDS).build(); - private final PropStoreMetrics metrics; - private final LoadingCache<PropStoreKey<?>,VersionedProperties> cache; private PropCacheCaffeineImpl(final CacheLoader<PropStoreKey<?>,VersionedProperties> cacheLoader, - final PropStoreMetrics metrics, final Ticker ticker, boolean runTasksInline) { - this.metrics = metrics; + final Ticker ticker, boolean runTasksInline) { var builder = Caffeine.newBuilder().expireAfterAccess(EXPIRE_MIN, BASE_TIME_UNITS) .evictionListener(this::evictionNotifier); if (runTasksInline) { @@ -68,14 +65,9 @@ public class PropCacheCaffeineImpl implements PropCache { cache = builder.build(cacheLoader); } - public PropStoreMetrics getMetrics() { - return metrics; - } - void evictionNotifier(PropStoreKey<?> propStoreKey, VersionedProperties value, RemovalCause cause) { log.trace("Evicted: ID: {} was evicted from cache. Reason: {}", propStoreKey, cause); - metrics.incrEviction(); } @Override @@ -85,7 +77,6 @@ public class PropCacheCaffeineImpl implements PropCache { return cache.get(propStoreKey); } catch (Exception ex) { log.info("Cache failed to retrieve properties for: " + propStoreKey, ex); - metrics.incrZkError(); return null; } } @@ -116,20 +107,17 @@ public class PropCacheCaffeineImpl implements PropCache { } public static class Builder { - - private final PropStoreMetrics metrics; private final ZooPropLoader zooPropLoader; private Ticker ticker = null; private boolean runTasksInline = false; - public Builder(final ZooPropLoader zooPropLoader, final PropStoreMetrics metrics) { + public Builder(final ZooPropLoader zooPropLoader) { Objects.requireNonNull(zooPropLoader, "A PropStoreChangeMonitor must be provided"); this.zooPropLoader = zooPropLoader; - this.metrics = metrics; } public PropCacheCaffeineImpl build() { - return new PropCacheCaffeineImpl(zooPropLoader, metrics, ticker, runTasksInline); + return new PropCacheCaffeineImpl(zooPropLoader, ticker, runTasksInline); } public Builder forTests(final Ticker ticker) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreMetrics.java b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreMetrics.java deleted file mode 100644 index a85c14eec2..0000000000 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreMetrics.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.server.conf.store.impl; - -import java.time.Duration; -import java.util.concurrent.TimeUnit; - -import org.apache.accumulo.core.metrics.MetricsProducer; -import org.apache.accumulo.core.metrics.MetricsUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.micrometer.core.instrument.Counter; -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.Timer; - -public class PropStoreMetrics implements MetricsProducer { - - private static final Logger log = LoggerFactory.getLogger(PropStoreMetrics.class); - - private Timer load; - private Counter refresh; - private Counter refreshLoad; - private Counter eviction; - private Counter zkError; - - @Override - public void registerMetrics(MeterRegistry registry) { - - load = Timer.builder(METRICS_PROPSTORE_LOAD_TIMER).description("prop store load time") - .tags(MetricsUtil.getCommonTags()).register(registry); - - refresh = - Counter.builder(METRICS_PROPSTORE_REFRESH_COUNT).description("prop store refresh count") - .tags(MetricsUtil.getCommonTags()).register(registry); - - refreshLoad = Counter.builder(METRICS_PROPSTORE_REFRESH_LOAD_COUNT) - .description("prop store refresh load count").tags(MetricsUtil.getCommonTags()) - .register(registry); - - eviction = - Counter.builder(METRICS_PROPSTORE_EVICTION_COUNT).description("prop store eviction count") - .tags(MetricsUtil.getCommonTags()).register(registry); - - zkError = Counter.builder(METRICS_PROPSTORE_ZK_ERROR_COUNT) - .description("prop store ZooKeeper error count").tags(MetricsUtil.getCommonTags()) - .register(registry); - - } - - public PropStoreMetrics() { - log.debug("Creating PropStore metrics"); - } - - public void addLoadTime(final long value) { - log.trace("Load time: {}", value); - load.record(Duration.ofMillis(value)); - log.trace("Load count: {} time:{}", load.count(), load.totalTime(TimeUnit.MILLISECONDS)); - } - - public void incrRefresh() { - refresh.increment(); - } - - public void incrRefreshLoad() { - refreshLoad.increment(); - } - - public void incrEviction() { - eviction.increment(); - } - - public void incrZkError() { - zkError.increment(); - } -} diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropLoader.java b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropLoader.java index 48620f6a92..c5842e5b66 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropLoader.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropLoader.java @@ -22,7 +22,6 @@ import static java.util.Objects.requireNonNull; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.server.conf.codec.VersionedPropCodec; @@ -44,14 +43,12 @@ public class ZooPropLoader implements CacheLoader<PropStoreKey<?>,VersionedPrope private final VersionedPropCodec propCodec; // used to set watcher, does not react to events. private final PropStoreWatcher propStoreWatcher; - private final PropStoreMetrics metrics; public ZooPropLoader(final ZooReaderWriter zrw, final VersionedPropCodec propCodec, - final PropStoreWatcher propStoreWatcher, final PropStoreMetrics metrics) { + final PropStoreWatcher propStoreWatcher) { this.zrw = zrw; this.propCodec = propCodec; this.propStoreWatcher = propStoreWatcher; - this.metrics = metrics; } @Override @@ -59,26 +56,18 @@ public class ZooPropLoader implements CacheLoader<PropStoreKey<?>,VersionedPrope try { log.trace("load called for {}", propStoreKey); - long startNanos = System.nanoTime(); - Stat stat = new Stat(); byte[] bytes = zrw.getData(propStoreKey.getPath(), propStoreWatcher, stat); if (stat.getDataLength() == 0) { return new VersionedProperties(); } VersionedProperties vProps = propCodec.fromBytes(stat.getVersion(), bytes); - - metrics.addLoadTime( - TimeUnit.MILLISECONDS.convert(System.nanoTime() - startNanos, TimeUnit.NANOSECONDS)); - return vProps; } catch (KeeperException.NoNodeException ex) { - metrics.incrZkError(); log.debug("property node for {} does not exist - it may be being created", propStoreKey); propStoreWatcher.signalZkChangeEvent(propStoreKey); return null; } catch (Exception ex) { - metrics.incrZkError(); log.info("Failed to load properties for: {} from ZooKeeper, returning null", propStoreKey, ex); propStoreWatcher.signalZkChangeEvent(propStoreKey); @@ -97,8 +86,6 @@ public class ZooPropLoader implements CacheLoader<PropStoreKey<?>,VersionedPrope public CompletableFuture<VersionedProperties> asyncReload(PropStoreKey<?> propStoreKey, VersionedProperties oldValue, Executor executor) throws Exception { log.trace("asyncReload called for key: {}", propStoreKey); - metrics.incrRefresh(); - return CompletableFuture.supplyAsync(() -> loadIfDifferentVersion(propStoreKey, oldValue), executor); } @@ -107,7 +94,6 @@ public class ZooPropLoader implements CacheLoader<PropStoreKey<?>,VersionedPrope public @Nullable VersionedProperties reload(PropStoreKey<?> propStoreKey, VersionedProperties oldValue) throws Exception { log.trace("reload called for: {}", propStoreKey); - metrics.incrRefresh(); return loadIfDifferentVersion(propStoreKey, oldValue); } @@ -141,8 +127,6 @@ public class ZooPropLoader implements CacheLoader<PropStoreKey<?>,VersionedPrope var updatedValue = load(propCacheId); - metrics.incrRefreshLoad(); - // The cache will be updated - notify external listeners value changed. propStoreWatcher.signalCacheChangeEvent(propCacheId); log.trace("Updated value {}", updatedValue == null ? "null" : updatedValue.print(true)); @@ -150,7 +134,6 @@ public class ZooPropLoader implements CacheLoader<PropStoreKey<?>,VersionedPrope } catch (RuntimeException | KeeperException | InterruptedException ex) { log.warn("async exception occurred reading properties from ZooKeeper for: {} returning null", propCacheId, ex); - metrics.incrZkError(); propStoreWatcher.signalZkChangeEvent(propCacheId); return null; } diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java index 881f394107..9f733e8495 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java @@ -31,7 +31,6 @@ import org.apache.accumulo.core.data.InstanceId; import org.apache.accumulo.core.fate.zookeeper.ZooReader; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; -import org.apache.accumulo.core.metrics.MetricsUtil; import org.apache.accumulo.server.conf.codec.VersionedPropCodec; import org.apache.accumulo.server.conf.codec.VersionedProperties; import org.apache.accumulo.server.conf.store.PropCache; @@ -55,7 +54,6 @@ public class ZooPropStore implements PropStore, PropChangeListener { private final ZooReaderWriter zrw; private final PropStoreWatcher propStoreWatcher; private final PropCacheCaffeineImpl cache; - private final PropStoreMetrics cacheMetrics = new PropStoreMetrics(); private final ReadyMonitor zkReadyMon; /** @@ -89,17 +87,14 @@ public class ZooPropStore implements PropStore, PropChangeListener { this.propStoreWatcher = requireNonNullElseGet(watcher, () -> new PropStoreWatcher(zkReadyMon)); - ZooPropLoader propLoader = new ZooPropLoader(zrw, codec, this.propStoreWatcher, cacheMetrics); + ZooPropLoader propLoader = new ZooPropLoader(zrw, codec, this.propStoreWatcher); if (ticker == null) { - this.cache = new PropCacheCaffeineImpl.Builder(propLoader, cacheMetrics).build(); + this.cache = new PropCacheCaffeineImpl.Builder(propLoader).build(); } else { - this.cache = - new PropCacheCaffeineImpl.Builder(propLoader, cacheMetrics).forTests(ticker).build(); + this.cache = new PropCacheCaffeineImpl.Builder(propLoader).forTests(ticker).build(); } - MetricsUtil.initializeProducers(cacheMetrics); - try { var path = ZooUtil.getRoot(instanceId); if (zrw.exists(path, propStoreWatcher)) { @@ -139,10 +134,6 @@ public class ZooPropStore implements PropStore, PropChangeListener { return false; } - public PropStoreMetrics getMetrics() { - return cacheMetrics; - } - @Override public void create(PropStoreKey<?> propStoreKey, Map<String,String> props) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/MeterRegistryEnvPropImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/MeterRegistryEnvPropImpl.java new file mode 100644 index 0000000000..f3b447defe --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/MeterRegistryEnvPropImpl.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.metrics; + +import static org.apache.accumulo.core.conf.Property.GENERAL_ARBITRARY_PROP_PREFIX; +import static org.apache.accumulo.core.spi.metrics.MeterRegistryFactory.METRICS_PROP_SUBSTRING; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.accumulo.core.spi.common.ServiceEnvironment; +import org.apache.accumulo.core.spi.metrics.MeterRegistryFactory; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.ServiceEnvironmentImpl; + +/** + * Provides a way to pass parameters from an Accumulo configuration to the MeterRegistryFactory. + * Properties need have the form {@code general.custom.metrics.opts.PARAMETER_NAME = VALUE}. The + * prefix {@code general.custom.metrics.opts.} is stripped and the resulting Map returned by + * {@link #getOptions()} will be map of {@code PROPERTY_NAME, VALUE} key value pairs. + * <p> + * Other implementations can extend + * {@link org.apache.accumulo.core.spi.metrics.MeterRegistryFactory.InitParameters} to provide other + * implementations + */ +public class MeterRegistryEnvPropImpl implements MeterRegistryFactory.InitParameters { + + private final ServerContext context; + + public MeterRegistryEnvPropImpl(final ServerContext context) { + this.context = context; + } + + /** + * Properties that match {@code general.custom.metrics.opts.PARAMETER_NAME = VALUE} with be + * filtered and returned with the prefix stripped. + * + * @return a map of the filtered, stripped property, value pairs. + */ + @Override + public Map<String,String> getOptions() { + Map<String,String> filtered = new HashMap<>(); + + Map<String,String> options = context.getConfiguration() + .getAllPropertiesWithPrefixStripped(GENERAL_ARBITRARY_PROP_PREFIX); + options.forEach((k, v) -> { + if (k.startsWith(METRICS_PROP_SUBSTRING)) { + String name = k.substring(METRICS_PROP_SUBSTRING.length()); + filtered.put(name, v); + } + }); + return filtered; + } + + @Override + public ServiceEnvironment getServiceEnv() { + return new ServiceEnvironmentImpl(context); + } +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java new file mode 100644 index 0000000000..988bf38f63 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsInfoImpl.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.metrics; + +import static org.apache.hadoop.util.StringUtils.getTrimmedStrings; + +import java.lang.reflect.InvocationTargetException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.accumulo.core.classloader.ClassLoaderUtil; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.metrics.MetricsInfo; +import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.server.ServerContext; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.net.HostAndPort; + +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.binder.jvm.ClassLoaderMetrics; +import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics; +import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics; +import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics; +import io.micrometer.core.instrument.binder.system.ProcessorMetrics; +import io.micrometer.core.instrument.composite.CompositeMeterRegistry; +import io.micrometer.core.instrument.config.MeterFilter; +import io.micrometer.core.instrument.distribution.DistributionStatisticConfig; + +public class MetricsInfoImpl implements MetricsInfo { + + private static final Logger LOG = LoggerFactory.getLogger(MetricsInfoImpl.class); + + private final ServerContext context; + + private final Lock lock = new ReentrantLock(); + + private final Map<String,Tag> commonTags; + + // JvmGcMetrics are declared with AutoCloseable - keep reference to use with close() + private JvmGcMetrics jvmGcMetrics; + + private final boolean metricsEnabled; + + private CompositeMeterRegistry composite = null; + private final List<MeterRegistry> pendingRegistries = new ArrayList<>(); + + private final List<MetricsProducer> producers = new ArrayList<>(); + + public MetricsInfoImpl(final ServerContext context) { + this.context = context; + metricsEnabled = context.getConfiguration().getBoolean(Property.GENERAL_MICROMETER_ENABLED); + printMetricsConfig(); + commonTags = new HashMap<>(); + Tag t = MetricsInfo.instanceNameTag(context.getInstanceName()); + commonTags.put(t.getKey(), t); + } + + private void printMetricsConfig() { + final boolean jvmMetricsEnabled = + context.getConfiguration().getBoolean(Property.GENERAL_MICROMETER_JVM_METRICS_ENABLED); + LOG.info("micrometer metrics enabled: {}", metricsEnabled); + if (jvmMetricsEnabled) { + if (metricsEnabled) { + LOG.info("detailed jvm metrics enabled: {}", jvmMetricsEnabled); + } else { + LOG.info("requested jvm metrics, but micrometer metrics are disabled."); + } + } + if (metricsEnabled) { + LOG.info("metrics registry factories: {}", + context.getConfiguration().get(Property.GENERAL_MICROMETER_FACTORY)); + } + } + + @Override + public boolean isMetricsEnabled() { + return metricsEnabled; + } + + /** + * Common tags for all services. + */ + @Override + public void addServiceTags(final String applicationName, final HostAndPort hostAndPort) { + List<Tag> tags = new ArrayList<>(); + + if (applicationName != null && !applicationName.isEmpty()) { + tags.add(MetricsInfo.processTag(applicationName)); + } + if (hostAndPort != null) { + tags.addAll(MetricsInfo.addressTags(hostAndPort)); + } + addCommonTags(tags); + } + + @Override + public void addCommonTags(List<Tag> updates) { + lock.lock(); + try { + if (composite != null) { + LOG.warn( + "Common tags after registry has been initialized may be ignored. Current common tags: {}", + commonTags); + return; + } + updates.forEach(t -> commonTags.put(t.getKey(), t)); + } finally { + lock.unlock(); + } + } + + @Override + public Collection<Tag> getCommonTags() { + lock.lock(); + try { + return Collections.unmodifiableCollection(commonTags.values()); + } finally { + lock.unlock(); + } + } + + @Override + public void addRegistry(MeterRegistry registry) { + lock.lock(); + try { + if (composite != null) { + composite.add(registry); + } else { + // defer until composite is initialized + pendingRegistries.add(registry); + } + + } finally { + lock.unlock(); + } + } + + @Override + public void addMetricsProducers(MetricsProducer... producer) { + if (producer.length == 0) { + LOG.debug( + "called addMetricsProducers() without providing at least one producer - this has no effect"); + return; + } + lock.lock(); + try { + if (composite == null) { + producers.addAll(Arrays.asList(producer)); + } else { + Arrays.stream(producer).forEach(p -> p.registerMetrics(composite)); + } + } finally { + lock.unlock(); + } + } + + @Override + public MeterRegistry getRegistry() { + lock.lock(); + try { + if (composite == null) { + throw new IllegalStateException("metrics have not been initialized, call init() first"); + } + } finally { + lock.unlock(); + } + return composite; + } + + @Override + public void init() { + lock.lock(); + try { + if (composite != null) { + LOG.warn("metrics registry has already been initialized"); + return; + } + composite = new CompositeMeterRegistry(); + composite.config().commonTags(commonTags.values()); + + LOG.info("Metrics initialization. common tags: {}", commonTags); + + boolean jvmMetricsEnabled = + context.getConfiguration().getBoolean(Property.GENERAL_MICROMETER_JVM_METRICS_ENABLED); + + if (jvmMetricsEnabled) { + LOG.info("enabling detailed jvm, classloader, jvm gc and process metrics"); + new ClassLoaderMetrics().bindTo(composite); + new JvmMemoryMetrics().bindTo(composite); + jvmGcMetrics = new JvmGcMetrics(); + jvmGcMetrics.bindTo(composite); + new ProcessorMetrics().bindTo(composite); + new JvmThreadMetrics().bindTo(composite); + } + + MeterFilter replicationFilter = new MeterFilter() { + @Override + public DistributionStatisticConfig configure(Meter.Id id, + @NonNull DistributionStatisticConfig config) { + if (id.getName().equals("replicationQueue")) { + return DistributionStatisticConfig.builder().percentiles(0.5, 0.75, 0.9, 0.95, 0.99) + .expiry(Duration.ofMinutes(10)).build().merge(config); + } + return config; + } + }; + + if (isMetricsEnabled()) { + // user specified registries + String userRegistryFactories = + context.getConfiguration().get(Property.GENERAL_MICROMETER_FACTORY); + + for (String factoryName : getTrimmedStrings(userRegistryFactories)) { + try { + MeterRegistry registry = getRegistryFromFactory(factoryName, context); + registry.config().commonTags(commonTags.values()); + registry.config().meterFilter(replicationFilter); + addRegistry(registry); + } catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException + | InstantiationException | IllegalAccessException ex) { + LOG.warn("Could not load registry {}", factoryName, ex); + } + } + } + + pendingRegistries.forEach(registry -> composite.add(registry)); + + LOG.info("Metrics initialization. Register producers: {}", producers); + producers.forEach(p -> p.registerMetrics(composite)); + + Metrics.globalRegistry.add(composite); + + } finally { + lock.unlock(); + } + } + + @VisibleForTesting + @SuppressWarnings({"deprecation", + "support for org.apache.accumulo.core.metrics.MeterRegistryFactory can be removed in 3.1"}) + static MeterRegistry getRegistryFromFactory(final String factoryName, final ServerContext context) + throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, + InstantiationException, IllegalAccessException { + try { + LOG.info("look for meter spi registry factory {}", factoryName); + Class<? extends org.apache.accumulo.core.spi.metrics.MeterRegistryFactory> clazz = + ClassLoaderUtil.loadClass(factoryName, + org.apache.accumulo.core.spi.metrics.MeterRegistryFactory.class); + org.apache.accumulo.core.spi.metrics.MeterRegistryFactory factory = + clazz.getDeclaredConstructor().newInstance(); + org.apache.accumulo.core.spi.metrics.MeterRegistryFactory.InitParameters initParameters = + new MeterRegistryEnvPropImpl(context); + return factory.create(initParameters); + } catch (ClassCastException ex) { + // empty. On exception try deprecated version + } + try { + LOG.info("find legacy meter registry factory {}", factoryName); + Class<? extends org.apache.accumulo.core.metrics.MeterRegistryFactory> clazz = ClassLoaderUtil + .loadClass(factoryName, org.apache.accumulo.core.metrics.MeterRegistryFactory.class); + org.apache.accumulo.core.metrics.MeterRegistryFactory factory = + clazz.getDeclaredConstructor().newInstance(); + return factory.create(); + } catch (ClassCastException ex) { + // empty. No valid metrics factory, fall through and then throw exception. + } + throw new ClassNotFoundException( + "Could not find appropriate class implementing a MetricsFactory for: " + factoryName); + } + + @Override + public synchronized void close() { + LOG.info("Closing metrics registry"); + if (jvmGcMetrics != null) { + jvmGcMetrics.close(); + jvmGcMetrics = null; + } + if (composite != null) { + composite.close(); + composite = null; + } + } + + @Override + public String toString() { + return "MetricsCommonTags{tags=" + getCommonTags() + '}'; + } +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java index 0f28ecab8b..249148167d 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java @@ -42,6 +42,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.conf.PropertyType; import org.apache.accumulo.core.conf.PropertyType.PortRange; +import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.rpc.SslConnectionParams; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.UGIAssumingTransportFactory; @@ -173,7 +174,7 @@ public class TServerUtils { // create the TimedProcessor outside the port search loop so we don't try to // register the same // metrics mbean more than once - TimedProcessor timedProcessor = new TimedProcessor(processor); + TimedProcessor timedProcessor = new TimedProcessor(processor, context.getMetricsInfo()); HostAndPort[] addresses = getHostAndPorts(hostname, portHint); try { @@ -568,16 +569,16 @@ public class TServerUtils { ThriftServerType serverType, TProcessor processor, String serverName, String threadName, int numThreads, long threadTimeOut, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, SaslServerConnectionParams saslParams, - long serverSocketTimeout, int backlog, HostAndPort... addresses) { + long serverSocketTimeout, int backlog, MetricsInfo metricsInfo, HostAndPort... addresses) { if (serverType == ThriftServerType.SASL) { processor = updateSaslProcessor(serverType, processor); } try { - return startTServer(serverType, new TimedProcessor(processor), serverName, threadName, - numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize, sslParams, - saslParams, serverSocketTimeout, backlog, addresses); + return startTServer(serverType, new TimedProcessor(processor, metricsInfo), serverName, + threadName, numThreads, threadTimeOut, conf, timeBetweenThreadChecks, maxMessageSize, + sslParams, saslParams, serverSocketTimeout, backlog, addresses); } catch (TTransportException e) { throw new IllegalStateException(e); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java index 165479a71f..4148cfb0ed 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TimedProcessor.java @@ -20,7 +20,7 @@ package org.apache.accumulo.server.rpc; import static java.util.concurrent.TimeUnit.NANOSECONDS; -import org.apache.accumulo.core.metrics.MetricsUtil; +import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.server.metrics.ThriftMetrics; import org.apache.thrift.TException; import org.apache.thrift.TProcessor; @@ -35,10 +35,10 @@ public class TimedProcessor implements TProcessor { private final ThriftMetrics thriftMetrics; private long idleStart; - public TimedProcessor(TProcessor next) { + public TimedProcessor(final TProcessor next, final MetricsInfo metricsInfo) { this.other = next; thriftMetrics = new ThriftMetrics(); - MetricsUtil.initializeProducers(thriftMetrics); + metricsInfo.addMetricsProducers(thriftMetrics); idleStart = System.nanoTime(); } diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImplTest.java b/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImplTest.java index 1121b2b726..3789bb3f41 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImplTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImplTest.java @@ -35,7 +35,6 @@ import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.data.InstanceId; import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.metrics.MetricsUtil; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.conf.codec.VersionedProperties; import org.apache.accumulo.server.conf.store.TablePropKey; @@ -67,9 +66,6 @@ public class PropCacheCaffeineImplTest { ticker = new TestTicker(); instanceId = InstanceId.of(UUID.randomUUID()); - PropStoreMetrics cacheMetrics = new PropStoreMetrics(); - MetricsUtil.initializeProducers(cacheMetrics); - tablePropKey = TablePropKey.of(instanceId, TableId.of("t" + ThreadLocalRandom.current().nextInt(1, 1000))); @@ -85,7 +81,7 @@ public class PropCacheCaffeineImplTest { expect(context.getInstanceID()).andReturn(instanceId).anyTimes(); - cache = new PropCacheCaffeineImpl.Builder(zooPropLoader, cacheMetrics).forTests(ticker).build(); + cache = new PropCacheCaffeineImpl.Builder(zooPropLoader).forTests(ticker).build(); } diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/PropStoreEventTest.java b/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/PropStoreEventTest.java index 748013cd95..71247b4d3b 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/PropStoreEventTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/PropStoreEventTest.java @@ -39,7 +39,6 @@ import java.util.UUID; import org.apache.accumulo.core.data.InstanceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; -import org.apache.accumulo.core.metrics.MetricsUtil; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.conf.codec.VersionedPropCodec; import org.apache.accumulo.server.conf.codec.VersionedProperties; @@ -255,12 +254,9 @@ public class PropStoreEventTest { replay(context, zrw, readyMonitor); - PropStoreMetrics metrics = new PropStoreMetrics(); - MetricsUtil.initializeProducers(metrics); + ZooPropLoader loader = new ZooPropLoader(zrw, propCodec, watcher); - ZooPropLoader loader = new ZooPropLoader(zrw, propCodec, watcher, metrics); - - PropCacheCaffeineImpl cache = new PropCacheCaffeineImpl.Builder(loader, metrics).build(); + PropCacheCaffeineImpl cache = new PropCacheCaffeineImpl.Builder(loader).build(); // load cache var read1 = cache.get(tablePropKey); diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ZooPropLoaderTest.java b/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ZooPropLoaderTest.java index d9f0f4020e..c47fc49752 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ZooPropLoaderTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ZooPropLoaderTest.java @@ -23,7 +23,6 @@ import static org.apache.accumulo.core.conf.Property.MANAGER_CLIENTPORT; import static org.apache.accumulo.core.conf.Property.TSERV_CLIENTPORT; import static org.apache.accumulo.core.conf.Property.TSERV_NATIVEMAP_ENABLED; import static org.apache.accumulo.core.conf.Property.TSERV_SCAN_MAX_OPENFILES; -import static org.easymock.EasyMock.anyLong; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.createMock; @@ -58,13 +57,9 @@ import org.easymock.Capture; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class ZooPropLoaderTest { - private static final Logger log = LoggerFactory.getLogger(ZooPropLoaderTest.class); - private PropCacheCaffeineImplTest.TestTicker ticker; private InstanceId instanceId; private ServerContext context; @@ -72,7 +67,6 @@ public class ZooPropLoaderTest { private VersionedPropCodec propCodec; // mocks - private PropStoreMetrics cacheMetrics; private PropStoreWatcher propStoreWatcher; private ZooReaderWriter zrw; @@ -92,18 +86,16 @@ public class ZooPropLoaderTest { zrw = createMock(ZooReaderWriter.class); - cacheMetrics = createMock(PropStoreMetrics.class); - propStoreWatcher = createMock(PropStoreWatcher.class); // loader used in tests - loader = new ZooPropLoader(zrw, propCodec, propStoreWatcher, cacheMetrics); + loader = new ZooPropLoader(zrw, propCodec, propStoreWatcher); } @AfterEach public void verifyCommonMocks() { - verify(context, zrw, propStoreWatcher, cacheMetrics); + verify(context, zrw, propStoreWatcher); } @Test @@ -122,10 +114,7 @@ public class ZooPropLoaderTest { return (bytes); }).once(); - cacheMetrics.addLoadTime(anyLong()); - expectLastCall().times(1); - - replay(context, zrw, propStoreWatcher, cacheMetrics); + replay(context, zrw, propStoreWatcher); assertNotNull(loader.load(propStoreKey)); } @@ -158,13 +147,10 @@ public class ZooPropLoaderTest { return (bytes); }).once(); - cacheMetrics.addLoadTime(anyLong()); - expectLastCall().times(1); - - replay(context, zrw, propStoreWatcher, cacheMetrics); + replay(context, zrw, propStoreWatcher); PropCacheCaffeineImpl cache = - new PropCacheCaffeineImpl.Builder(loader, cacheMetrics).forTests(ticker).build(); + new PropCacheCaffeineImpl.Builder(loader).forTests(ticker).build(); // load into cache assertNotNull(cache.get(propStoreKey)); @@ -189,17 +175,12 @@ public class ZooPropLoaderTest { propStoreWatcher.signalZkChangeEvent(eq(propStoreKey)); expectLastCall(); - cacheMetrics.incrZkError(); - expectLastCall().once(); - - replay(context, zrw, propStoreWatcher, cacheMetrics); + replay(context, zrw, propStoreWatcher); PropCacheCaffeineImpl cache = - new PropCacheCaffeineImpl.Builder(loader, cacheMetrics).forTests(ticker).build(); + new PropCacheCaffeineImpl.Builder(loader).forTests(ticker).build(); assertNull(cache.get(propStoreKey)); - - log.debug("Metrics: {}", cacheMetrics); } /** @@ -225,16 +206,10 @@ public class ZooPropLoaderTest { return (bytes); }).times(2); - cacheMetrics.addLoadTime(anyLong()); - expectLastCall().times(2); - - cacheMetrics.incrEviction(); - expectLastCall().once(); - - replay(context, zrw, propStoreWatcher, cacheMetrics); + replay(context, zrw, propStoreWatcher); PropCacheCaffeineImpl cache = - new PropCacheCaffeineImpl.Builder(loader, cacheMetrics).forTests(ticker).build(); + new PropCacheCaffeineImpl.Builder(loader).forTests(ticker).build(); // load cache assertNotNull(cache.get(propStoreKey)); @@ -278,17 +253,10 @@ public class ZooPropLoaderTest { propStoreWatcher.signalCacheChangeEvent(anyObject()); expectLastCall().anyTimes(); - cacheMetrics.addLoadTime(anyLong()); - expectLastCall().times(1); - cacheMetrics.incrEviction(); - expectLastCall().times(1); - cacheMetrics.incrZkError(); - expectLastCall().times(1); - - replay(context, zrw, propStoreWatcher, cacheMetrics); + replay(context, zrw, propStoreWatcher); PropCacheCaffeineImpl cache = - new PropCacheCaffeineImpl.Builder(loader, cacheMetrics).forTests(ticker).build(); + new PropCacheCaffeineImpl.Builder(loader).forTests(ticker).build(); // prime cache assertNotNull(cache.get(propStoreKey)); @@ -307,10 +275,10 @@ public class ZooPropLoaderTest { @Test public void getIfCachedTest() { - replay(context, zrw, propStoreWatcher, cacheMetrics); + replay(context, zrw, propStoreWatcher); PropCacheCaffeineImpl cache = - new PropCacheCaffeineImpl.Builder(loader, cacheMetrics).forTests(ticker).build(); + new PropCacheCaffeineImpl.Builder(loader).forTests(ticker).build(); assertNull(cache.getIfCached(propStoreKey)); @@ -348,13 +316,10 @@ public class ZooPropLoaderTest { return (bytes); }).once(); - cacheMetrics.addLoadTime(anyLong()); - expectLastCall().times(2); - - replay(context, zrw, propStoreWatcher, cacheMetrics); + replay(context, zrw, propStoreWatcher); PropCacheCaffeineImpl cache = - new PropCacheCaffeineImpl.Builder(loader, cacheMetrics).forTests(ticker).build(); + new PropCacheCaffeineImpl.Builder(loader).forTests(ticker).build(); // load into cache assertNotNull(cache.get(sysPropKey)); @@ -399,13 +364,10 @@ public class ZooPropLoaderTest { return (bytes); }).once(); - cacheMetrics.addLoadTime(anyLong()); - expectLastCall().times(2); - - replay(context, zrw, propStoreWatcher, cacheMetrics); + replay(context, zrw, propStoreWatcher); PropCacheCaffeineImpl cache = - new PropCacheCaffeineImpl.Builder(loader, cacheMetrics).forTests(ticker).build(); + new PropCacheCaffeineImpl.Builder(loader).forTests(ticker).build(); // load into cache assertNotNull(cache.get(sysPropKey)); @@ -420,10 +382,10 @@ public class ZooPropLoaderTest { @Test public void getIfCachedNotPresentTest() { - replay(context, zrw, propStoreWatcher, cacheMetrics); + replay(context, zrw, propStoreWatcher); PropCacheCaffeineImpl cache = - new PropCacheCaffeineImpl.Builder(loader, cacheMetrics).forTests(ticker).build(); + new PropCacheCaffeineImpl.Builder(loader).forTests(ticker).build(); // load into cache assertNull(cache.getIfCached(propStoreKey)); @@ -451,7 +413,7 @@ public class ZooPropLoaderTest { return propCodec.toBytes(vProps); }).anyTimes(); - replay(context, zrw, propStoreWatcher, cacheMetrics); + replay(context, zrw, propStoreWatcher); Stat statCheck = new Stat(); statCheck.setVersion(9); diff --git a/server/base/src/test/java/org/apache/accumulo/server/metrics/MeterRegistryEnvPropImplTest.java b/server/base/src/test/java/org/apache/accumulo/server/metrics/MeterRegistryEnvPropImplTest.java new file mode 100644 index 0000000000..ff0f958dd5 --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/metrics/MeterRegistryEnvPropImplTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.metrics; + +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.util.Map; + +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.server.ServerContext; +import org.easymock.EasyMock; +import org.junit.jupiter.api.Test; + +public class MeterRegistryEnvPropImplTest { + + @Test + public void customParamsTest() { + ServerContext context = EasyMock.createMock(ServerContext.class); + ConfigurationCopy conf = new ConfigurationCopy(); + conf.set("unknown", "none"); + conf.set("general.custom.metrics.opts.sample.p1", "v1"); + conf.set("general.custom.metrics.opts.sample.p2", "v2"); + + expect(context.getConfiguration()).andReturn(conf).anyTimes(); + + replay(context); + MeterRegistryEnvPropImpl env = new MeterRegistryEnvPropImpl(context); + assertEquals(Map.of("sample.p1", "v1", "sample.p2", "v2"), env.getOptions()); + assertNotNull(env.getServiceEnv()); + } +} diff --git a/server/base/src/test/java/org/apache/accumulo/server/metrics/MetricsInfoImplTest.java b/server/base/src/test/java/org/apache/accumulo/server/metrics/MetricsInfoImplTest.java new file mode 100644 index 0000000000..5ffb615685 --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/metrics/MetricsInfoImplTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.metrics; + +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.mock; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.Map; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.server.ServerContext; +import org.junit.jupiter.api.Test; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; + +public class MetricsInfoImplTest { + @Test + public void factoryTest() throws Exception { + + ServerContext context = mock(ServerContext.class); + AccumuloConfiguration conf = mock(AccumuloConfiguration.class); + expect(context.getConfiguration()).andReturn(conf).anyTimes(); + expect(conf.getAllPropertiesWithPrefixStripped(anyObject())).andReturn(Map.of()).anyTimes(); + expect(conf.newDeriver(anyObject())).andReturn(Map::of).anyTimes(); + replay(context, conf); + assertNotNull(MetricsInfoImpl.getRegistryFromFactory(SPIFactory.class.getName(), context)); + + assertNotNull( + MetricsInfoImpl.getRegistryFromFactory(DeprecatedFactory.class.getName(), context)); + + assertThrows(ClassNotFoundException.class, + () -> MetricsInfoImpl.getRegistryFromFactory(String.class.getName(), context)); + + verify(context, conf); + } + + @SuppressWarnings({"deprecation", + "support for org.apache.accumulo.core.metrics.MeterRegistryFactory can be removed in 3.1"}) + static final class DeprecatedFactory + implements org.apache.accumulo.core.metrics.MeterRegistryFactory { + DeprecatedFactory() { + + } + + @Override + public MeterRegistry create() { + return new SimpleMeterRegistry(); + } + } + + static class SPIFactory implements org.apache.accumulo.core.spi.metrics.MeterRegistryFactory { + + SPIFactory() { + + } + + @Override + public MeterRegistry create(final InitParameters params) { + return new SimpleMeterRegistry(); + } + } +} diff --git a/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java b/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java index 04bd9c6d64..80d261f093 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java @@ -18,8 +18,10 @@ */ package org.apache.accumulo.server.rpc; +import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -41,6 +43,8 @@ import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.InstanceId; +import org.apache.accumulo.core.metrics.MetricsInfo; +import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.client.ClientServiceHandler; @@ -54,6 +58,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; public class TServerUtilsTest { private ServerContext context; + private MetricsInfo metricsInfo; private final ConfigurationCopy conf = new ConfigurationCopy(DefaultConfiguration.getInstance()); @BeforeEach @@ -72,12 +77,16 @@ public class TServerUtilsTest { expect(context.getSaslParams()).andReturn(null).anyTimes(); expect(context.getClientTimeoutInMillis()).andReturn((long) 1000).anyTimes(); expect(context.getSecurityOperation()).andReturn(null).anyTimes(); - replay(context); + metricsInfo = createMock(MetricsInfo.class); + metricsInfo.addMetricsProducers(anyObject(MetricsProducer.class)); + expectLastCall().anyTimes(); + expect(context.getMetricsInfo()).andReturn(metricsInfo).anyTimes(); + replay(context, metricsInfo); } @AfterEach public void verifyMockServerContext() { - verify(context); + verify(context, metricsInfo); } @Test @@ -200,7 +209,7 @@ public class TServerUtilsTest { assertNotNull(server); // Finally ensure that the TServer is using the last port (i.e. port search worked) - assertTrue(address.getAddress().getPort() == tserverFinalPort); + assertEquals(address.getAddress().getPort(), tserverFinalPort); } finally { if (server != null) { server.stop(); @@ -253,9 +262,9 @@ public class TServerUtilsTest { } private int[] findTwoFreeSequentialPorts(int startingAddress) throws UnknownHostException { - boolean sequential = false; + boolean sequential; int low = startingAddress; - int high = 0; + int high; do { low = getFreePort(low); high = getFreePort(low + 1); diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index c86a93350c..ec8cdc514c 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -20,7 +20,6 @@ package org.apache.accumulo.coordinator; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; -import java.lang.reflect.InvocationTargetException; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashSet; @@ -67,7 +66,7 @@ import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.apache.accumulo.core.metrics.MetricsUtil; +import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; @@ -263,15 +262,9 @@ public class CompactionCoordinator extends AbstractServer throw new IllegalStateException("Exception getting Coordinator lock", e); } - try { - MetricsUtil.initializeMetrics(getContext().getConfiguration(), this.applicationName, - clientAddress, getContext().getInstanceName()); - MetricsUtil.initializeProducers(this); - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException - | IllegalArgumentException | InvocationTargetException | NoSuchMethodException - | SecurityException e1) { - LOG.error("Error initializing metrics, metrics will not be emitted.", e1); - } + MetricsInfo metricsInfo = getContext().getMetricsInfo(); + metricsInfo.addServiceTags(getApplicationName(), clientAddress); + metricsInfo.init(); // On a re-start of the coordinator it's possible that external compactions are in-progress. // Attempt to get the running compactions on the compactors and then resolve which tserver diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index f4affca907..6b0931b3d4 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -24,7 +24,6 @@ import static java.util.concurrent.TimeUnit.MINUTES; import static org.apache.accumulo.core.util.LazySingletons.RANDOM; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; @@ -78,8 +77,8 @@ import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; +import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.metrics.MetricsProducer; -import org.apache.accumulo.core.metrics.MetricsUtil; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; @@ -586,16 +585,11 @@ public class Compactor extends AbstractServer implements MetricsProducer, Compac throw new RuntimeException("Error registering compactor in ZooKeeper", e); } - try { - MetricsUtil.initializeMetrics(getContext().getConfiguration(), this.applicationName, - clientAddress, getContext().getInstanceName()); - pausedMetrics = new PausedCompactionMetrics(); - MetricsUtil.initializeProducers(this, pausedMetrics); - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException - | IllegalArgumentException | InvocationTargetException | NoSuchMethodException - | SecurityException e1) { - LOG.error("Error initializing metrics, metrics will not be emitted.", e1); - } + MetricsInfo metricsInfo = getContext().getMetricsInfo(); + metricsInfo.addServiceTags(getApplicationName(), clientAddress); + + metricsInfo.addMetricsProducers(this, pausedMetrics); + metricsInfo.init(); var watcher = new CompactionWatcher(getConfiguration()); var schedExecutor = ThreadPools.getServerThreadPools() diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 968e7d2b8b..e9c23b1729 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -22,7 +22,6 @@ import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterrup import java.io.FileNotFoundException; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; @@ -43,7 +42,7 @@ import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; -import org.apache.accumulo.core.metrics.MetricsUtil; +import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.Halt; @@ -155,16 +154,11 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface { System.exit(1); } - try { - MetricsUtil.initializeMetrics(getContext().getConfiguration(), this.applicationName, address, - getContext().getInstanceName()); - MetricsUtil.initializeProducers(this, new GcMetrics(this)); - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException - | IllegalArgumentException | InvocationTargetException | NoSuchMethodException - | SecurityException e1) { - log.error("Error initializing metrics, metrics will not be emitted.", e1); - } + MetricsInfo metricsInfo = getContext().getMetricsInfo(); + metricsInfo.addServiceTags(getApplicationName(), address); + metricsInfo.addMetricsProducers(this, new GcMetrics(this)); + metricsInfo.init(); try { long delay = getStartDelay(); log.debug("Sleeping for {} milliseconds before beginning garbage collection cycles", delay); @@ -378,7 +372,8 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface { getContext().getThriftServerType(), processor, this.getClass().getSimpleName(), "GC Monitor Service", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, maxMessageSize, getContext().getServerSslParams(), getContext().getSaslParams(), 0, - getConfiguration().getCount(Property.RPC_BACKLOG), addresses); + getConfiguration().getCount(Property.RPC_BACKLOG), getContext().getMetricsInfo(), + addresses); log.debug("Starting garbage collector listening on " + server.address); return server.address; } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index cbbba0fb7f..9cead258f7 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -28,7 +28,6 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; import java.net.UnknownHostException; import java.time.Duration; import java.util.ArrayList; @@ -98,7 +97,8 @@ import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletLocationState; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; -import org.apache.accumulo.core.metrics.MetricsUtil; +import org.apache.accumulo.core.metrics.MetricsInfo; +import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.spi.balancer.BalancerEnvironment; import org.apache.accumulo.core.spi.balancer.SimpleLoadBalancer; @@ -1104,16 +1104,12 @@ public class Manager extends AbstractServer managerUpgrading.set(true); } - try { - MetricsUtil.initializeMetrics(getContext().getConfiguration(), this.applicationName, - sa.getAddress(), getContext().getInstanceName()); - ManagerMetrics mm = new ManagerMetrics(getConfiguration(), this); - MetricsUtil.initializeProducers(this, mm); - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException - | IllegalArgumentException | InvocationTargetException | NoSuchMethodException - | SecurityException e1) { - log.error("Error initializing metrics, metrics will not be emitted.", e1); - } + MetricsInfo metricsInfo = getContext().getMetricsInfo(); + metricsInfo.addServiceTags(getApplicationName(), sa.getAddress()); + + var producers = ManagerMetrics.getProducers(getConfiguration(), this); + metricsInfo.addMetricsProducers(producers.toArray(new MetricsProducer[0])); + metricsInfo.init(); recoveryManager = new RecoveryManager(this, timeToCacheRecoveryWalExistence); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java index 285df23a69..02a30d4d22 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/ManagerMetrics.java @@ -18,7 +18,8 @@ */ package org.apache.accumulo.manager.metrics; -import static java.util.Objects.requireNonNull; +import java.util.ArrayList; +import java.util.List; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; @@ -26,21 +27,12 @@ import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.metrics.fate.FateMetrics; -import io.micrometer.core.instrument.MeterRegistry; +public class ManagerMetrics { -public class ManagerMetrics implements MetricsProducer { - - private final FateMetrics fateMetrics; - - public ManagerMetrics(final AccumuloConfiguration conf, final Manager manager) { - requireNonNull(conf, "AccumuloConfiguration must not be null"); - requireNonNull(conf, "Manager must not be null"); - fateMetrics = new FateMetrics(manager.getContext(), - conf.getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL)); - } - - @Override - public void registerMetrics(MeterRegistry registry) { - fateMetrics.registerMetrics(registry); + public static List<MetricsProducer> getProducers(AccumuloConfiguration conf, Manager manager) { + ArrayList<MetricsProducer> producers = new ArrayList<>(); + producers.add(new FateMetrics(manager.getContext(), + conf.getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL))); + return producers; } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java index 800455fe0e..bed2e53411 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.manager.metrics.fate; +import java.util.List; import java.util.Map.Entry; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -28,7 +29,6 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.ReadOnlyTStore; import org.apache.accumulo.core.fate.ZooStore; import org.apache.accumulo.core.metrics.MetricsProducer; -import org.apache.accumulo.core.metrics.MetricsUtil; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.server.ServerContext; import org.apache.zookeeper.KeeperException; @@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Tags; public class FateMetrics implements MetricsProducer { @@ -127,27 +128,31 @@ public class FateMetrics implements MetricsProducer { @Override public void registerMetrics(final MeterRegistry registry) { - totalCurrentOpsGauge = registry.gauge(METRICS_FATE_TOTAL_IN_PROGRESS, - MetricsUtil.getCommonTags(), new AtomicLong(0)); - totalOpsGauge = - registry.gauge(METRICS_FATE_OPS_ACTIVITY, MetricsUtil.getCommonTags(), new AtomicLong(0)); - fateErrorsGauge = registry.gauge(METRICS_FATE_ERRORS, - Tags.concat(MetricsUtil.getCommonTags(), "type", "zk.connection"), new AtomicLong(0)); - newTxGauge = registry.gauge(METRICS_FATE_TX, Tags.concat(MetricsUtil.getCommonTags(), "state", - ReadOnlyTStore.TStatus.NEW.name().toLowerCase()), new AtomicLong(0)); - submittedTxGauge = registry.gauge(METRICS_FATE_TX, Tags.concat(MetricsUtil.getCommonTags(), - "state", ReadOnlyTStore.TStatus.SUBMITTED.name().toLowerCase()), new AtomicLong(0)); - inProgressTxGauge = registry.gauge(METRICS_FATE_TX, Tags.concat(MetricsUtil.getCommonTags(), - "state", ReadOnlyTStore.TStatus.IN_PROGRESS.name().toLowerCase()), new AtomicLong(0)); - failedInProgressTxGauge = - registry.gauge(METRICS_FATE_TX, Tags.concat(MetricsUtil.getCommonTags(), "state", - ReadOnlyTStore.TStatus.FAILED_IN_PROGRESS.name().toLowerCase()), new AtomicLong(0)); - failedTxGauge = registry.gauge(METRICS_FATE_TX, Tags.concat(MetricsUtil.getCommonTags(), - "state", ReadOnlyTStore.TStatus.FAILED.name().toLowerCase()), new AtomicLong(0)); - successfulTxGauge = registry.gauge(METRICS_FATE_TX, Tags.concat(MetricsUtil.getCommonTags(), - "state", ReadOnlyTStore.TStatus.SUCCESSFUL.name().toLowerCase()), new AtomicLong(0)); - unknownTxGauge = registry.gauge(METRICS_FATE_TX, Tags.concat(MetricsUtil.getCommonTags(), - "state", ReadOnlyTStore.TStatus.UNKNOWN.name().toLowerCase()), new AtomicLong(0)); + totalCurrentOpsGauge = registry.gauge(METRICS_FATE_TOTAL_IN_PROGRESS, new AtomicLong(0)); + totalOpsGauge = registry.gauge(METRICS_FATE_OPS_ACTIVITY, new AtomicLong(0)); + fateErrorsGauge = registry.gauge(METRICS_FATE_ERRORS, List.of(Tag.of("type", "zk.connection")), + new AtomicLong(0)); + newTxGauge = registry.gauge(METRICS_FATE_TX, + List.of(Tag.of("state", ReadOnlyTStore.TStatus.NEW.name().toLowerCase())), + new AtomicLong(0)); + submittedTxGauge = registry.gauge(METRICS_FATE_TX, + List.of(Tag.of("state", ReadOnlyTStore.TStatus.SUBMITTED.name().toLowerCase())), + new AtomicLong(0)); + inProgressTxGauge = registry.gauge(METRICS_FATE_TX, + List.of(Tag.of("state", ReadOnlyTStore.TStatus.IN_PROGRESS.name().toLowerCase())), + new AtomicLong(0)); + failedInProgressTxGauge = registry.gauge(METRICS_FATE_TX, + List.of(Tag.of("state", ReadOnlyTStore.TStatus.FAILED_IN_PROGRESS.name().toLowerCase())), + new AtomicLong(0)); + failedTxGauge = registry.gauge(METRICS_FATE_TX, + List.of(Tag.of("state", ReadOnlyTStore.TStatus.FAILED.name().toLowerCase())), + new AtomicLong(0)); + successfulTxGauge = registry.gauge(METRICS_FATE_TX, + List.of(Tag.of("state", ReadOnlyTStore.TStatus.SUCCESSFUL.name().toLowerCase())), + new AtomicLong(0)); + unknownTxGauge = registry.gauge(METRICS_FATE_TX, + List.of(Tag.of("state", ReadOnlyTStore.TStatus.UNKNOWN.name().toLowerCase())), + new AtomicLong(0)); update(); diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index a93919d61c..fde338f422 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@ -65,6 +65,7 @@ import org.apache.accumulo.core.manager.thrift.ManagerClientService; import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo; import org.apache.accumulo.core.manager.thrift.TableInfo; import org.apache.accumulo.core.manager.thrift.TabletServerStatus; +import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.tabletscan.thrift.ActiveScan; @@ -490,6 +491,11 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { } log.debug("Using {} to advertise monitor location in ZooKeeper", advertiseHost); + MetricsInfo metricsInfo = getContext().getMetricsInfo(); + metricsInfo.addServiceTags(getApplicationName(), + HostAndPort.fromParts(advertiseHost, livePort)); + metricsInfo.init(); + try { URL url = new URL(server.isSecure() ? "https" : "http", advertiseHost, server.getPort(), "/"); final String path = context.getZooKeeperRoot() + Constants.ZMONITOR_HTTP_ADDR; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 8fcc742ba2..ca4b28d06e 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -22,7 +22,6 @@ import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterrup import java.io.IOException; import java.io.UncheckedIOException; -import java.lang.reflect.InvocationTargetException; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -78,7 +77,7 @@ import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; -import org.apache.accumulo.core.metrics.MetricsUtil; +import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.tabletscan.thrift.ActiveScan; import org.apache.accumulo.core.tabletscan.thrift.TSampleNotPresentException; @@ -374,17 +373,13 @@ public class ScanServer extends AbstractServer throw new RuntimeException("Failed to start the compactor client service", e1); } - try { - MetricsUtil.initializeMetrics(getContext().getConfiguration(), this.applicationName, - clientAddress, getContext().getInstanceName()); - scanMetrics = new TabletServerScanMetrics(); - MetricsUtil.initializeProducers(this, scanMetrics); - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException - | IllegalArgumentException | InvocationTargetException | NoSuchMethodException - | SecurityException e1) { - LOG.error("Error initializing metrics, metrics will not be emitted.", e1); - } + MetricsInfo metricsInfo = getContext().getMetricsInfo(); + metricsInfo.addServiceTags(getApplicationName(), clientAddress); + + scanMetrics = new TabletServerScanMetrics(); + metricsInfo.addMetricsProducers(this, scanMetrics); + metricsInfo.init(); // We need to set the compaction manager so that we don't get an NPE in CompactableImpl.close ServiceLock lock = announceExistence(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 678b1294c5..89d503ef58 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -30,7 +30,6 @@ import static org.apache.accumulo.core.util.threads.ThreadPools.watchNonCritical import java.io.IOException; import java.lang.management.ManagementFactory; -import java.lang.reflect.InvocationTargetException; import java.net.UnknownHostException; import java.time.Duration; import java.time.Instant; @@ -93,7 +92,7 @@ import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; -import org.apache.accumulo.core.metrics.MetricsUtil; +import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.spi.fs.VolumeChooserEnvironment; @@ -705,24 +704,19 @@ public class TabletServer extends AbstractServer implements TabletHostingServer throw new RuntimeException("Failed to start the tablet client service", e1); } - try { - MetricsUtil.initializeMetrics(context.getConfiguration(), this.applicationName, clientAddress, - getContext().getInstanceName()); - - metrics = new TabletServerMetrics(this); - updateMetrics = new TabletServerUpdateMetrics(); - scanMetrics = new TabletServerScanMetrics(); - mincMetrics = new TabletServerMinCMetrics(); - ceMetrics = new CompactionExecutorsMetrics(); - pausedMetrics = new PausedCompactionMetrics(); - MetricsUtil.initializeProducers(this, metrics, updateMetrics, scanMetrics, mincMetrics, - ceMetrics, pausedMetrics); - - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException - | IllegalArgumentException | InvocationTargetException | NoSuchMethodException - | SecurityException e1) { - log.error("Error initializing metrics, metrics will not be emitted.", e1); - } + MetricsInfo metricsInfo = getContext().getMetricsInfo(); + metricsInfo.addServiceTags(getApplicationName(), clientAddress); + + metrics = new TabletServerMetrics(this); + updateMetrics = new TabletServerUpdateMetrics(); + scanMetrics = new TabletServerScanMetrics(); + mincMetrics = new TabletServerMinCMetrics(); + ceMetrics = new CompactionExecutorsMetrics(); + pausedMetrics = new PausedCompactionMetrics(); + + metricsInfo.addMetricsProducers(metrics, updateMetrics, scanMetrics, mincMetrics, ceMetrics, + pausedMetrics); + metricsInfo.init(); this.compactionManager = new CompactionManager(() -> Iterators .transform(onlineTablets.snapshot().values().iterator(), Tablet::asCompactable), diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index de47b841fd..2bb80309a1 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@ -140,7 +140,7 @@ public class TabletServerResourceManager { } private ThreadPoolExecutor createPriorityExecutor(ScanExecutorConfig sec, - Map<String,Queue<Runnable>> scanExecQueues) { + Map<String,Queue<Runnable>> scanExecQueues, final boolean enableMetrics) { BlockingQueue<Runnable> queue; @@ -188,7 +188,7 @@ public class TabletServerResourceManager { ThreadPoolExecutor es = ThreadPools.getServerThreadPools().getPoolBuilder("scan-" + sec.name) .numCoreThreads(sec.getCurrentMaxThreads()).numMaxThreads(sec.getCurrentMaxThreads()) .withTimeOut(0L, MILLISECONDS).withQueue(queue).atPriority(sec.priority) - .enableThreadPoolMetrics().build(); + .enableThreadPoolMetrics(enableMetrics).build(); modifyThreadPoolSizesAtRuntime(sec::getCurrentMaxThreads, "scan-" + sec.name, es); return es; @@ -251,6 +251,7 @@ public class TabletServerResourceManager { public TabletServerResourceManager(ServerContext context, TabletHostingServer tserver) { this.context = context; final AccumuloConfiguration acuConf = context.getConfiguration(); + final boolean enableMetrics = context.getMetricsInfo().isMetricsEnabled(); // acuConf.getBoolean(Property.GENERAL_MICROMETER_ENABLED); long maxMemory = acuConf.getAsBytes(Property.TSERV_MAXMEM); boolean usingNativeMap = acuConf.getBoolean(Property.TSERV_NATIVEMAP_ENABLED); if (usingNativeMap) { @@ -307,20 +308,20 @@ public class TabletServerResourceManager { () -> context.getConfiguration().getCount(Property.TSERV_MINC_MAXCONCURRENT), "minor compactor", minorCompactionThreadPool); - splitThreadPool = - ThreadPools.getServerThreadPools().getPoolBuilder("splitter").numCoreThreads(0) - .numMaxThreads(1).withTimeOut(1, SECONDS).enableThreadPoolMetrics().build(); + splitThreadPool = ThreadPools.getServerThreadPools().getPoolBuilder("splitter") + .numCoreThreads(0).numMaxThreads(1).withTimeOut(1, SECONDS) + .enableThreadPoolMetrics(enableMetrics).build(); - defaultSplitThreadPool = - ThreadPools.getServerThreadPools().getPoolBuilder("md splitter").numCoreThreads(0) - .numMaxThreads(1).withTimeOut(60, SECONDS).enableThreadPoolMetrics().build(); + defaultSplitThreadPool = ThreadPools.getServerThreadPools().getPoolBuilder("md splitter") + .numCoreThreads(0).numMaxThreads(1).withTimeOut(60, SECONDS) + .enableThreadPoolMetrics(enableMetrics).build(); defaultMigrationPool = ThreadPools.getServerThreadPools() .getPoolBuilder("metadata tablet migration").numCoreThreads(0).numMaxThreads(1) - .withTimeOut(60, SECONDS).enableThreadPoolMetrics().build(); + .withTimeOut(60, SECONDS).enableThreadPoolMetrics(enableMetrics).build(); migrationPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf, - Property.TSERV_MIGRATE_MAXCONCURRENT, true); + Property.TSERV_MIGRATE_MAXCONCURRENT, enableMetrics); modifyThreadPoolSizesAtRuntime( () -> context.getConfiguration().getCount(Property.TSERV_MIGRATE_MAXCONCURRENT), "tablet migration", migrationPool); @@ -331,31 +332,31 @@ public class TabletServerResourceManager { // individual tablet server run // concurrent assignments would put more load on the metadata table at startup assignmentPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf, - Property.TSERV_ASSIGNMENT_MAXCONCURRENT, true); + Property.TSERV_ASSIGNMENT_MAXCONCURRENT, enableMetrics); modifyThreadPoolSizesAtRuntime( () -> context.getConfiguration().getCount(Property.TSERV_ASSIGNMENT_MAXCONCURRENT), "tablet assignment", assignmentPool); assignMetaDataPool = ThreadPools.getServerThreadPools() .getPoolBuilder("metadata tablet assignment").numCoreThreads(0).numMaxThreads(1) - .withTimeOut(60, SECONDS).enableThreadPoolMetrics().build(); + .withTimeOut(60, SECONDS).enableThreadPoolMetrics(enableMetrics).build(); activeAssignments = new ConcurrentHashMap<>(); summaryRetrievalPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf, - Property.TSERV_SUMMARY_RETRIEVAL_THREADS, true); + Property.TSERV_SUMMARY_RETRIEVAL_THREADS, enableMetrics); modifyThreadPoolSizesAtRuntime( () -> context.getConfiguration().getCount(Property.TSERV_SUMMARY_RETRIEVAL_THREADS), "summary file retriever", summaryRetrievalPool); summaryRemotePool = ThreadPools.getServerThreadPools().createExecutorService(acuConf, - Property.TSERV_SUMMARY_REMOTE_THREADS, true); + Property.TSERV_SUMMARY_REMOTE_THREADS, enableMetrics); modifyThreadPoolSizesAtRuntime( () -> context.getConfiguration().getCount(Property.TSERV_SUMMARY_REMOTE_THREADS), "summary remote", summaryRemotePool); summaryPartitionPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf, - Property.TSERV_SUMMARY_PARTITION_THREADS, true); + Property.TSERV_SUMMARY_PARTITION_THREADS, enableMetrics); modifyThreadPoolSizesAtRuntime( () -> context.getConfiguration().getCount(Property.TSERV_SUMMARY_PARTITION_THREADS), "summary partition", summaryPartitionPool); @@ -364,8 +365,8 @@ public class TabletServerResourceManager { Collection<ScanExecutorConfig> scanExecCfg = acuConf.getScanExecutors(isScanServer); Map<String,Queue<Runnable>> scanExecQueues = new HashMap<>(); - scanExecutors = scanExecCfg.stream().collect( - toUnmodifiableMap(cfg -> cfg.name, cfg -> createPriorityExecutor(cfg, scanExecQueues))); + scanExecutors = scanExecCfg.stream().collect(toUnmodifiableMap(cfg -> cfg.name, + cfg -> createPriorityExecutor(cfg, scanExecQueues, enableMetrics))); scanExecutorChoices = scanExecCfg.stream().collect(toUnmodifiableMap(cfg -> cfg.name, cfg -> new ScanExecutorImpl(cfg, scanExecQueues.get(cfg.name)))); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMinCMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMinCMetrics.java index 3df8aff090..a7f402343c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMinCMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMinCMetrics.java @@ -21,7 +21,6 @@ package org.apache.accumulo.tserver.metrics; import java.time.Duration; import org.apache.accumulo.core.metrics.MetricsProducer; -import org.apache.accumulo.core.metrics.MetricsUtil; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Timer; @@ -42,11 +41,10 @@ public class TabletServerMinCMetrics implements MetricsProducer { @Override public void registerMetrics(MeterRegistry registry) { activeMinc = Timer.builder(METRICS_MINC_RUNNING).description("Minor compactions time active") - .tags(MetricsUtil.getCommonTags()).register(registry); + .register(registry); - queuedMinc = - Timer.builder(METRICS_MINC_QUEUED).description("Queued minor compactions time queued") - .tags(MetricsUtil.getCommonTags()).register(registry); + queuedMinc = Timer.builder(METRICS_MINC_QUEUED) + .description("Queued minor compactions time queued").register(registry); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java index 2dc18a5cbe..9a1faa6261 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java @@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; import org.apache.accumulo.core.metrics.MetricsProducer; -import org.apache.accumulo.core.metrics.MetricsUtil; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; @@ -138,18 +137,14 @@ public class TabletServerScanMetrics implements MetricsProducer { .description("Results per scan").register(registry); yields = DistributionSummary.builder(METRICS_SCAN_YIELDS).description("yields").register(registry); - startScanCalls = - Counter.builder(METRICS_SCAN_START).description("calls to start a scan / multiscan") - .tags(MetricsUtil.getCommonTags()).register(registry); - continueScanCalls = - Counter.builder(METRICS_SCAN_CONTINUE).description("calls to continue a scan / multiscan") - .tags(MetricsUtil.getCommonTags()).register(registry); - closeScanCalls = - Counter.builder(METRICS_SCAN_CLOSE).description("calls to close a scan / multiscan") - .tags(MetricsUtil.getCommonTags()).register(registry); + startScanCalls = Counter.builder(METRICS_SCAN_START) + .description("calls to start a scan / multiscan").register(registry); + continueScanCalls = Counter.builder(METRICS_SCAN_CONTINUE) + .description("calls to continue a scan / multiscan").register(registry); + closeScanCalls = Counter.builder(METRICS_SCAN_CLOSE) + .description("calls to close a scan / multiscan").register(registry); busyTimeoutReturned = Counter.builder(METRICS_SCAN_BUSY_TIMEOUT) - .description("times that a scan has timed out in the queue") - .tags(MetricsUtil.getCommonTags()).register(registry); + .description("times that a scan has timed out in the queue").register(registry); Gauge.builder(METRICS_TSERVER_QUERIES, this, TabletServerScanMetrics::getLookupCount) .description("Number of queries").register(registry); Gauge.builder(METRICS_TSERVER_SCAN_RESULTS, this, TabletServerScanMetrics::getQueryResultCount) @@ -161,11 +156,10 @@ public class TabletServerScanMetrics implements MetricsProducer { Gauge.builder(METRICS_TSERVER_SCANNED_ENTRIES, this, TabletServerScanMetrics::getScannedCount) .description("Scanned rate").register(registry); pausedForMemory = Counter.builder(METRICS_SCAN_PAUSED_FOR_MEM) - .description("scan paused due to server being low on memory") - .tags(MetricsUtil.getCommonTags()).register(registry); + .description("scan paused due to server being low on memory").register(registry); earlyReturnForMemory = Counter.builder(METRICS_SCAN_RETURN_FOR_MEM) .description("scan returned results early due to server being low on memory") - .tags(MetricsUtil.getCommonTags()).register(registry); + .register(registry); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java index 5d8c81ec73..567d8171e1 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java @@ -832,6 +832,7 @@ public class CompactableImpl implements Compactable { synchronized (this) { if (closed) { + log.trace("Selection of files was not initiated {} because closed", getExtent()); return; } @@ -842,6 +843,14 @@ public class CompactableImpl implements Compactable { log.trace("Selected compaction status changed {} {} {} {}", getExtent(), fileMgr.getSelectionStatus(), compactionId, compactionConfig); } else { + if (kind == CompactionKind.USER) { + // Only log for user compaction because this code is only called when one is initiated via + // the API call. For other compaction kinds the tserver will keep periodically attempting + // to initiate which would result in lots of logs. + log.trace( + "Selection of files was not initiated {} compactionId:{} selectStatus:{} selectedFiles:{}", + getExtent(), this.compactionId, fileMgr.selectStatus, fileMgr.selectedFiles.size()); + } return; } } diff --git a/test/src/main/java/org/apache/accumulo/test/conf/store/PropCacheCaffeineImplZkIT.java b/test/src/main/java/org/apache/accumulo/test/conf/store/PropCacheCaffeineImplZkIT.java index 5abb7057d3..22d06b83a1 100644 --- a/test/src/main/java/org/apache/accumulo/test/conf/store/PropCacheCaffeineImplZkIT.java +++ b/test/src/main/java/org/apache/accumulo/test/conf/store/PropCacheCaffeineImplZkIT.java @@ -41,13 +41,11 @@ import org.apache.accumulo.core.data.InstanceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; -import org.apache.accumulo.core.metrics.MetricsUtil; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.conf.codec.VersionedPropCodec; import org.apache.accumulo.server.conf.codec.VersionedProperties; import org.apache.accumulo.server.conf.store.TablePropKey; import org.apache.accumulo.server.conf.store.impl.PropCacheCaffeineImpl; -import org.apache.accumulo.server.conf.store.impl.PropStoreMetrics; import org.apache.accumulo.server.conf.store.impl.PropStoreWatcher; import org.apache.accumulo.server.conf.store.impl.ReadyMonitor; import org.apache.accumulo.server.conf.store.impl.ZooPropLoader; @@ -79,7 +77,6 @@ public class PropCacheCaffeineImplZkIT { private final TableId tIdA = TableId.of("A"); private final TableId tIdB = TableId.of("B"); - private final PropStoreMetrics cacheMetrics = new PropStoreMetrics(); private static ServerContext context; @TempDir @@ -161,12 +158,9 @@ public class PropCacheCaffeineImplZkIT { PropStoreWatcher propStoreWatcher = new PropStoreWatcher(readyMonitor); - MetricsUtil.initializeProducers(cacheMetrics); - ZooPropLoader propLoader = - new ZooPropLoader(zrw, VersionedPropCodec.getDefault(), propStoreWatcher, cacheMetrics); - PropCacheCaffeineImpl cache = - new PropCacheCaffeineImpl.Builder(propLoader, cacheMetrics).build(); + new ZooPropLoader(zrw, VersionedPropCodec.getDefault(), propStoreWatcher); + PropCacheCaffeineImpl cache = new PropCacheCaffeineImpl.Builder(propLoader).build(); VersionedProperties readProps = cache.get(propStoreKey); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java index 2a8abcfffb..a56c34fe3f 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java @@ -127,7 +127,7 @@ public class ZombieTServer { ServerAddress serverPort = TServerUtils.startTServer(context.getConfiguration(), ThriftServerType.CUSTOM_HS_HA, muxProcessor, "ZombieTServer", "walking dead", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, 10 * 1024 * 1024, null, null, -1, - context.getConfiguration().getCount(Property.RPC_BACKLOG), + context.getConfiguration().getCount(Property.RPC_BACKLOG), context.getMetricsInfo(), HostAndPort.fromParts("0.0.0.0", port)); String addressString = serverPort.address.toString(); diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java index 2342a77559..4b92480f19 100644 --- a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java @@ -43,6 +43,7 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.core.spi.metrics.LoggingMeterRegistryFactory; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.functional.ConfigurableMacBase; import org.apache.accumulo.test.metrics.TestStatsDSink.Metric; @@ -79,11 +80,14 @@ public class MetricsIT extends ConfigurableMacBase implements MetricsProducer { cfg.setProperty(Property.GC_CYCLE_START, "1s"); cfg.setProperty(Property.GC_CYCLE_DELAY, "1s"); cfg.setProperty(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL, "1s"); - - // Tell the server processes to use a StatsDMeterRegistry that will be configured - // to push all metrics to the sink we started. + // Tell the server processes to use a StatsDMeterRegistry and the simple logging registry + // that will be configured to push all metrics to the sink we started. cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true"); - cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY, TestStatsDRegistryFactory.class.getName()); + cfg.setProperty(Property.GENERAL_MICROMETER_JVM_METRICS_ENABLED, "true"); + cfg.setProperty("general.custom.metrics.opts.logging.step", "1s"); + String clazzList = LoggingMeterRegistryFactory.class.getName() + "," + + TestStatsDRegistryFactory.class.getName(); + cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY, clazzList); Map<String,String> sysProps = Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1", TestStatsDRegistryFactory.SERVER_PORT, Integer.toString(sink.getPort())); cfg.setSystemProperties(sysProps); @@ -98,9 +102,7 @@ public class MetricsIT extends ConfigurableMacBase implements MetricsProducer { Set<String> unexpectedMetrics = Set.of(METRICS_SCAN_YIELDS, METRICS_UPDATE_ERRORS, METRICS_COMPACTOR_MAJC_STUCK, METRICS_SCAN_BUSY_TIMEOUT, METRICS_SCAN_PAUSED_FOR_MEM, METRICS_SCAN_RETURN_FOR_MEM, METRICS_MINC_PAUSED, METRICS_MAJC_PAUSED); - Set<String> flakyMetrics = Set.of(METRICS_GC_WAL_ERRORS, METRICS_FATE_TYPE_IN_PROGRESS, - METRICS_PROPSTORE_EVICTION_COUNT, METRICS_PROPSTORE_REFRESH_COUNT, - METRICS_PROPSTORE_REFRESH_LOAD_COUNT, METRICS_PROPSTORE_ZK_ERROR_COUNT); + Set<String> flakyMetrics = Set.of(METRICS_GC_WAL_ERRORS, METRICS_FATE_TYPE_IN_PROGRESS); Map<String,String> expectedMetricNames = this.getMetricFields(); flakyMetrics.forEach(expectedMetricNames::remove); // might not see these diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/TestStatsDRegistryFactory.java b/test/src/main/java/org/apache/accumulo/test/metrics/TestStatsDRegistryFactory.java index 6c26fabb4d..8715a40c00 100644 --- a/test/src/main/java/org/apache/accumulo/test/metrics/TestStatsDRegistryFactory.java +++ b/test/src/main/java/org/apache/accumulo/test/metrics/TestStatsDRegistryFactory.java @@ -20,7 +20,7 @@ package org.apache.accumulo.test.metrics; import java.time.Duration; -import org.apache.accumulo.core.metrics.MeterRegistryFactory; +import org.apache.accumulo.core.spi.metrics.MeterRegistryFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,8 +38,8 @@ public class TestStatsDRegistryFactory implements MeterRegistryFactory { public static final String SERVER_PORT = "test.meter.registry.port"; @Override - public MeterRegistry create() { - + public MeterRegistry create(final InitParameters params) { + LOG.info("starting metrics registration."); String host = System.getProperty(SERVER_HOST, null); String port = System.getProperty(SERVER_PORT, null); diff --git a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java index 512acb5c48..8e5a96c51e 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java @@ -339,7 +339,7 @@ public class NullTserver { TServerUtils.startTServer(context.getConfiguration(), ThriftServerType.CUSTOM_HS_HA, muxProcessor, "NullTServer", "null tserver", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, 10 * 1024 * 1024, null, null, -1, context.getConfiguration().getCount(Property.RPC_BACKLOG), - HostAndPort.fromParts("0.0.0.0", opts.port)); + context.getMetricsInfo(), HostAndPort.fromParts("0.0.0.0", opts.port)); HostAndPort addr = HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), opts.port); diff --git a/test/src/main/resources/log4j2-test.properties b/test/src/main/resources/log4j2-test.properties index f0d7d93212..0c77a3871b 100644 --- a/test/src/main/resources/log4j2-test.properties +++ b/test/src/main/resources/log4j2-test.properties @@ -142,28 +142,5 @@ logger.38.level = debug logger.39.name = org.apache.accumulo.manager.Manager logger.39.level = trace -property.metricsFilename = ./target/test-metrics - -# appender.metrics.type = Console -appender.metrics.type = RollingFile -appender.metrics.name = LoggingMetricsOutput -appender.metrics.fileName = ${metricsFilename}.metrics -appender.metrics.filePattern = ${metricsFilename}-%d{MM-dd-yy-HH}-%i.metrics.gz -appender.metrics.layout.type = PatternLayout -appender.metrics.layout.pattern = METRICS: %d{ISO8601}, %m%n -appender.metrics.policies.type = Policies -appender.metrics.policies.time.type = TimeBasedTriggeringPolicy -appender.metrics.policies.time.interval = 1 -appender.metrics.policies.time.modulate = true -appender.metrics.policies.size.type = SizeBasedTriggeringPolicy -appender.metrics.policies.size.size=100MB -appender.metrics.strategy.type = DefaultRolloverStrategy -appender.metrics.strategy.max = 10 - -logger.metrics.name = org.apache.accumulo.metrics -logger.metrics.level = info -logger.metrics.additivity = false -logger.metrics.appenderRef.metrics.ref = LoggingMetricsOutput - rootLogger.level = debug rootLogger.appenderRef.console.ref = STDOUT