This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new ee7b1dc607 HDDS-12901. Introduce EventExecutorMetrics instead of
setting the metrics props unsafely (#8371)
ee7b1dc607 is described below
commit ee7b1dc6077b0a19727a0136ff1bfff554c8381f
Author: Peter Lee <[email protected]>
AuthorDate: Fri May 2 13:32:39 2025 +0800
HDDS-12901. Introduce EventExecutorMetrics instead of setting the metrics
props unsafely (#8371)
---
.../hdds/server/events/EventExecutorMetrics.java | 143 +++++++++++++++++++++
.../FixedThreadPoolWithAffinityExecutor.java | 60 +++------
.../hdds/server/events/SingleThreadExecutor.java | 39 ++----
.../org/apache/hadoop/hdds/utils/MetricsUtil.java | 100 --------------
4 files changed, 171 insertions(+), 171 deletions(-)
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutorMetrics.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutorMetrics.java
new file mode 100644
index 0000000000..894191798b
--- /dev/null
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutorMetrics.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.server.events;
+
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+
+/**
+ * Metrics source for EventExecutor implementations.
+ */
+@Metrics(about = "Executor Metrics", context = "EventQueue")
+public class EventExecutorMetrics implements MetricsSource {
+ private final String name;
+ private final String description;
+ private final MetricsRegistry registry;
+
+ @Metric("Number of tasks queued")
+ private MutableCounterLong queued;
+
+ @Metric("Number of tasks scheduled")
+ private MutableCounterLong scheduled;
+
+ @Metric("Number of tasks completed")
+ private MutableCounterLong done;
+
+ @Metric("Number of tasks failed")
+ private MutableCounterLong failed;
+
+ @Metric("Number of tasks dropped")
+ private MutableCounterLong dropped;
+
+ @Metric("Number of tasks with long execution time")
+ private MutableCounterLong longExecution;
+
+ @Metric("Number of tasks with long wait time in queue")
+ private MutableCounterLong longWaitInQueue;
+
+ public EventExecutorMetrics(String name, String description) {
+ this.name = name;
+ this.description = description;
+
+ registry = new MetricsRegistry(name);
+ init();
+ }
+
+ public void init() {
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ ms.register(name, description, this);
+ }
+
+ public void unregister() {
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ ms.unregisterSource(name);
+ }
+
+ @Override
+ public void getMetrics(MetricsCollector collector, boolean all) {
+ MetricsRecordBuilder builder = collector.addRecord(name);
+ registry.snapshot(builder, all);
+ }
+
+ public void incrementQueued() {
+ queued.incr();
+ }
+
+ public void incrementScheduled() {
+ scheduled.incr();
+ }
+
+ public void incrementDone() {
+ done.incr();
+ }
+
+ public void incrementFailed() {
+ failed.incr();
+ }
+
+ public void incrementDropped() {
+ dropped.incr();
+ }
+
+ public void incrementDropped(int count) {
+ dropped.incr(count);
+ }
+
+ public void incrementLongExecution() {
+ longExecution.incr();
+ }
+
+ public void incrementLongWaitInQueue() {
+ longWaitInQueue.incr();
+ }
+
+ public long getQueued() {
+ return queued.value();
+ }
+
+ public long getScheduled() {
+ return scheduled.value();
+ }
+
+ public long getDone() {
+ return done.value();
+ }
+
+ public long getFailed() {
+ return failed.value();
+ }
+
+ public long getDropped() {
+ return dropped.value();
+ }
+
+ public long getLongExecution() {
+ return longExecution.value();
+ }
+
+ public long getLongWaitInQueue() {
+ return longWaitInQueue.value();
+ }
+}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java
index f934b20ed6..07804c2f2e 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java
@@ -30,11 +30,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.hadoop.hdds.utils.MetricsUtil;
-import org.apache.hadoop.metrics2.annotation.Metric;
-import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +41,6 @@
*
* @param <P> the payload type of events
*/
-@Metrics(context = "EventQueue")
public class FixedThreadPoolWithAffinityExecutor<P, Q>
implements EventExecutor<P> {
@@ -66,27 +61,7 @@ public class FixedThreadPoolWithAffinityExecutor<P, Q>
private final List<ThreadPoolExecutor> executors;
- // MutableCounterLong is thread safe.
- @Metric
- private MutableCounterLong queued;
-
- @Metric
- private MutableCounterLong done;
-
- @Metric
- private MutableCounterLong failed;
-
- @Metric
- private MutableCounterLong scheduled;
-
- @Metric
- private MutableCounterLong dropped;
-
- @Metric
- private MutableCounterLong longWaitInQueue;
-
- @Metric
- private MutableCounterLong longTimeExecution;
+ private final EventExecutorMetrics metrics;
private final AtomicBoolean isRunning = new AtomicBoolean(true);
private long queueWaitThreshold
@@ -112,6 +87,7 @@ public FixedThreadPoolWithAffinityExecutor(
this.eventPublisher = eventPublisher;
this.executors = executors;
this.executorMap = executorMap;
+ this.metrics = new EventExecutorMetrics(EVENT_QUEUE + name, "Event
Executor metrics");
executorMap.put(clazz.getName(), this);
// Add runnable which will wait for task over another queue
@@ -125,9 +101,6 @@ public FixedThreadPoolWithAffinityExecutor(
}
++i;
}
-
- MetricsUtil.registerDynamic(this, EVENT_QUEUE + name,
- "Event Executor metrics ", "EventQueue");
}
public void setQueueWaitThreshold(long queueWaitThreshold) {
@@ -168,7 +141,7 @@ public static <Q> List<ThreadPoolExecutor>
initializeExecutorPool(
@Override
public void onMessage(EventHandler<P> handler, P message, EventPublisher
publisher) {
- queued.incr();
+ metrics.incrementQueued();
// For messages that need to be routed to the same thread need to
// implement hashCode to match the messages. This should be safe for
// other messages that implement the native hash.
@@ -176,44 +149,44 @@ public void onMessage(EventHandler<P> handler, P message,
EventPublisher
BlockingQueue<Q> queue = workQueues.get(index);
queue.add((Q) message);
if (queue instanceof IQueueMetrics) {
- dropped.incr(((IQueueMetrics) queue).getAndResetDropCount(
+ metrics.incrementDropped(((IQueueMetrics) queue).getAndResetDropCount(
message.getClass().getSimpleName()));
}
}
@Override
public long failedEvents() {
- return failed.value();
+ return metrics.getFailed();
}
@Override
public long successfulEvents() {
- return done.value();
+ return metrics.getDone();
}
@Override
public long queuedEvents() {
- return queued.value();
+ return metrics.getQueued();
}
@Override
public long scheduledEvents() {
- return scheduled.value();
+ return metrics.getScheduled();
}
@Override
public long droppedEvents() {
- return dropped.value();
+ return metrics.getDropped();
}
@Override
public long longWaitInQueueEvents() {
- return longWaitInQueue.value();
+ return metrics.getLongWaitInQueue();
}
@Override
public long longTimeExecutionEvents() {
- return longTimeExecution.value();
+ return metrics.getLongExecution();
}
@Override
@@ -223,6 +196,7 @@ public void close() {
executor.shutdown();
}
executorMap.clear();
+ metrics.unregister();
DefaultMetricsSystem.instance().unregisterSource(EVENT_QUEUE + name);
}
@@ -273,26 +247,26 @@ public void run() {
long curTime = Time.monotonicNow();
if (createTime != 0
&& ((curTime - createTime) > executor.queueWaitThreshold)) {
- executor.longWaitInQueue.incr();
+ executor.metrics.incrementLongWaitInQueue();
LOG.warn("Event remained in queue for long time {} millisec, {}",
(curTime - createTime), eventId);
}
- executor.scheduled.incr();
+ executor.metrics.incrementScheduled();
try {
executor.eventHandler.onMessage(report,
executor.eventPublisher);
- executor.done.incr();
+ executor.metrics.incrementDone();
curTime = Time.monotonicNow();
if (createTime != 0
&& (curTime - createTime) > executor.execWaitThreshold) {
- executor.longTimeExecution.incr();
+ executor.metrics.incrementLongExecution();
LOG.warn("Event taken long execution time {} millisec, {}",
(curTime - createTime), eventId);
}
} catch (Exception ex) {
LOG.error("Error on execution message {}", report, ex);
- executor.failed.incr();
+ executor.metrics.incrementFailed();
}
if (Thread.currentThread().isInterrupted()) {
LOG.warn("Interrupt of execution of Reports");
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
index 48dd30a8d1..5e80e2dace 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
@@ -19,10 +19,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import org.apache.hadoop.hdds.utils.MetricsUtil;
-import org.apache.hadoop.metrics2.annotation.Metric;
-import org.apache.hadoop.metrics2.annotation.Metrics;
-import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,7 +27,6 @@
*
* @param <P> the payload type of events
*/
-@Metrics(context = "EventQueue")
public class SingleThreadExecutor<P> implements EventExecutor<P> {
private static final String EVENT_QUEUE = "EventQueue";
@@ -40,20 +35,8 @@ public class SingleThreadExecutor<P> implements
EventExecutor<P> {
LoggerFactory.getLogger(SingleThreadExecutor.class);
private final String name;
-
private final ExecutorService executor;
-
- @Metric
- private MutableCounterLong queued;
-
- @Metric
- private MutableCounterLong done;
-
- @Metric
- private MutableCounterLong failed;
-
- @Metric
- private MutableCounterLong scheduled;
+ private final EventExecutorMetrics metrics;
/**
* Create SingleThreadExecutor.
@@ -63,8 +46,7 @@ public class SingleThreadExecutor<P> implements
EventExecutor<P> {
*/
public SingleThreadExecutor(String name, String threadNamePrefix) {
this.name = name;
- MetricsUtil.registerDynamic(this, EVENT_QUEUE + name,
- "Event Executor metrics ", "EventQueue");
+ this.metrics = new EventExecutorMetrics(EVENT_QUEUE + name, "Event
Executor metrics");
executor = Executors.newSingleThreadExecutor(
runnable -> {
@@ -77,42 +59,43 @@ public SingleThreadExecutor(String name, String
threadNamePrefix) {
@Override
public void onMessage(EventHandler<P> handler, P message, EventPublisher
publisher) {
- queued.incr();
+ metrics.incrementQueued();
executor.execute(() -> {
- scheduled.incr();
+ metrics.incrementScheduled();
try {
handler.onMessage(message, publisher);
- done.incr();
+ metrics.incrementDone();
} catch (Exception ex) {
LOG.error("Error on execution message {}", message, ex);
- failed.incr();
+ metrics.incrementFailed();
}
});
}
@Override
public long failedEvents() {
- return failed.value();
+ return metrics.getFailed();
}
@Override
public long successfulEvents() {
- return done.value();
+ return metrics.getDone();
}
@Override
public long queuedEvents() {
- return queued.value();
+ return metrics.getQueued();
}
@Override
public long scheduledEvents() {
- return scheduled.value();
+ return metrics.getScheduled();
}
@Override
public void close() {
executor.shutdown();
+ metrics.unregister();
}
@Override
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/MetricsUtil.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/MetricsUtil.java
deleted file mode 100644
index 81fbf1daf9..0000000000
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/MetricsUtil.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.utils;
-
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.util.Map;
-import org.apache.hadoop.metrics2.annotation.Metrics;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Metrics util for metrics.
- */
-public final class MetricsUtil {
- private static final String ANNOTATIONS = "annotations";
- private static final String ANNOTATION_DATA = "annotationData";
- private static final Class<? extends Annotation> ANNOTATION_TO_ALTER
- = Metrics.class;
-
- private static final Logger LOG =
- LoggerFactory.getLogger(MetricsUtil.class);
-
- private MetricsUtil() {
- }
-
- /**
- * register metric with changing class annotation for metrics.
- *
- * @param source source to register
- * @param name name of metric
- * @param desc description of metric
- * @param context context of metric
- * @param <T> source type
- */
- public static <T> void registerDynamic(
- T source, String name, String desc, String context) {
- updateAnnotation(source.getClass(), name, desc, context);
- DefaultMetricsSystem.instance().register(name, desc, source);
- }
-
- private static void updateAnnotation(
- Class clz, String name, String desc, String context) {
- try {
- Annotation annotationValue = new Metrics() {
-
- @Override
- public Class<? extends Annotation> annotationType() {
- return ANNOTATION_TO_ALTER;
- }
-
- @Override
- public String name() {
- return name;
- }
-
- @Override
- public String about() {
- return desc;
- }
-
- @Override
- public String context() {
- return context;
- }
- };
-
- Method method = clz.getClass().getDeclaredMethod(
- ANNOTATION_DATA, null);
- method.setAccessible(true);
- Object annotationData = method.invoke(clz);
- Field annotations = annotationData.getClass()
- .getDeclaredField(ANNOTATIONS);
- annotations.setAccessible(true);
- Map<Class<? extends Annotation>, Annotation> map =
- (Map<Class<? extends Annotation>, Annotation>) annotations
- .get(annotationData);
- map.put(ANNOTATION_TO_ALTER, annotationValue);
- } catch (Exception e) {
- LOG.error("Update Metrics annotation failed. ", e);
- }
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]