This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new ee7b1dc607 HDDS-12901. Introduce EventExecutorMetrics instead of 
setting the metrics props unsafely (#8371)
ee7b1dc607 is described below

commit ee7b1dc6077b0a19727a0136ff1bfff554c8381f
Author: Peter Lee <[email protected]>
AuthorDate: Fri May 2 13:32:39 2025 +0800

    HDDS-12901. Introduce EventExecutorMetrics instead of setting the metrics 
props unsafely (#8371)
---
 .../hdds/server/events/EventExecutorMetrics.java   | 143 +++++++++++++++++++++
 .../FixedThreadPoolWithAffinityExecutor.java       |  60 +++------
 .../hdds/server/events/SingleThreadExecutor.java   |  39 ++----
 .../org/apache/hadoop/hdds/utils/MetricsUtil.java  | 100 --------------
 4 files changed, 171 insertions(+), 171 deletions(-)

diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutorMetrics.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutorMetrics.java
new file mode 100644
index 0000000000..894191798b
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventExecutorMetrics.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.server.events;
+
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+
+/**
+ * Metrics source for EventExecutor implementations.
+ */
+@Metrics(about = "Executor Metrics", context = "EventQueue")
+public class EventExecutorMetrics implements MetricsSource {
+  private final String name;
+  private final String description;
+  private final MetricsRegistry registry;
+
+  @Metric("Number of tasks queued")
+  private MutableCounterLong queued;
+
+  @Metric("Number of tasks scheduled")
+  private MutableCounterLong scheduled;
+
+  @Metric("Number of tasks completed")
+  private MutableCounterLong done;
+
+  @Metric("Number of tasks failed")
+  private MutableCounterLong failed;
+
+  @Metric("Number of tasks dropped")
+  private MutableCounterLong dropped;
+
+  @Metric("Number of tasks with long execution time")
+  private MutableCounterLong longExecution;
+
+  @Metric("Number of tasks with long wait time in queue")
+  private MutableCounterLong longWaitInQueue;
+
+  public EventExecutorMetrics(String name, String description) {
+    this.name = name;
+    this.description = description;
+
+    registry = new MetricsRegistry(name);
+    init();
+  }
+
+  public void init() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    ms.register(name, description, this);
+  }
+
+  public void unregister() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    ms.unregisterSource(name);
+  }
+
+  @Override
+  public void getMetrics(MetricsCollector collector, boolean all) {
+    MetricsRecordBuilder builder = collector.addRecord(name);
+    registry.snapshot(builder, all);
+  }
+
+  public void incrementQueued() {
+    queued.incr();
+  }
+
+  public void incrementScheduled() {
+    scheduled.incr();
+  }
+
+  public void incrementDone() {
+    done.incr();
+  }
+
+  public void incrementFailed() {
+    failed.incr();
+  }
+
+  public void incrementDropped() {
+    dropped.incr();
+  }
+
+  public void incrementDropped(int count) {
+    dropped.incr(count);
+  }
+
+  public void incrementLongExecution() {
+    longExecution.incr();
+  }
+
+  public void incrementLongWaitInQueue() {
+    longWaitInQueue.incr();
+  }
+
+  public long getQueued() {
+    return queued.value();
+  }
+
+  public long getScheduled() {
+    return scheduled.value();
+  }
+
+  public long getDone() {
+    return done.value();
+  }
+
+  public long getFailed() {
+    return failed.value();
+  }
+
+  public long getDropped() {
+    return dropped.value();
+  }
+
+  public long getLongExecution() {
+    return longExecution.value();
+  }
+
+  public long getLongWaitInQueue() {
+    return longWaitInQueue.value();
+  }
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java
index f934b20ed6..07804c2f2e 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/FixedThreadPoolWithAffinityExecutor.java
@@ -30,11 +30,7 @@
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.hadoop.hdds.utils.MetricsUtil;
-import org.apache.hadoop.metrics2.annotation.Metric;
-import org.apache.hadoop.metrics2.annotation.Metrics;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,7 +41,6 @@
  *
  * @param <P> the payload type of events
  */
-@Metrics(context = "EventQueue")
 public class FixedThreadPoolWithAffinityExecutor<P, Q>
     implements EventExecutor<P> {
 
@@ -66,27 +61,7 @@ public class FixedThreadPoolWithAffinityExecutor<P, Q>
 
   private final List<ThreadPoolExecutor> executors;
 
-  // MutableCounterLong is thread safe.
-  @Metric
-  private MutableCounterLong queued;
-
-  @Metric
-  private MutableCounterLong done;
-
-  @Metric
-  private MutableCounterLong failed;
-
-  @Metric
-  private MutableCounterLong scheduled;
-
-  @Metric
-  private MutableCounterLong dropped;
-  
-  @Metric
-  private MutableCounterLong longWaitInQueue;
-  
-  @Metric
-  private MutableCounterLong longTimeExecution;
+  private final EventExecutorMetrics metrics;
 
   private final AtomicBoolean isRunning = new AtomicBoolean(true);
   private long queueWaitThreshold
@@ -112,6 +87,7 @@ public FixedThreadPoolWithAffinityExecutor(
     this.eventPublisher = eventPublisher;
     this.executors = executors;
     this.executorMap = executorMap;
+    this.metrics = new EventExecutorMetrics(EVENT_QUEUE + name, "Event 
Executor metrics");
     executorMap.put(clazz.getName(), this);
 
     // Add runnable which will wait for task over another queue
@@ -125,9 +101,6 @@ public FixedThreadPoolWithAffinityExecutor(
       }
       ++i;
     }
-
-    MetricsUtil.registerDynamic(this, EVENT_QUEUE + name,
-        "Event Executor metrics ", "EventQueue");
   }
   
   public void setQueueWaitThreshold(long queueWaitThreshold) {
@@ -168,7 +141,7 @@ public static <Q> List<ThreadPoolExecutor> 
initializeExecutorPool(
   @Override
   public void onMessage(EventHandler<P> handler, P message, EventPublisher
       publisher) {
-    queued.incr();
+    metrics.incrementQueued();
     // For messages that need to be routed to the same thread need to
     // implement hashCode to match the messages. This should be safe for
     // other messages that implement the native hash.
@@ -176,44 +149,44 @@ public void onMessage(EventHandler<P> handler, P message, 
EventPublisher
     BlockingQueue<Q> queue = workQueues.get(index);
     queue.add((Q) message);
     if (queue instanceof IQueueMetrics) {
-      dropped.incr(((IQueueMetrics) queue).getAndResetDropCount(
+      metrics.incrementDropped(((IQueueMetrics) queue).getAndResetDropCount(
           message.getClass().getSimpleName()));
     }
   }
 
   @Override
   public long failedEvents() {
-    return failed.value();
+    return metrics.getFailed();
   }
 
   @Override
   public long successfulEvents() {
-    return done.value();
+    return metrics.getDone();
   }
 
   @Override
   public long queuedEvents() {
-    return queued.value();
+    return metrics.getQueued();
   }
 
   @Override
   public long scheduledEvents() {
-    return scheduled.value();
+    return metrics.getScheduled();
   }
 
   @Override
   public long droppedEvents() {
-    return dropped.value();
+    return metrics.getDropped();
   }
 
   @Override
   public long longWaitInQueueEvents() {
-    return longWaitInQueue.value();
+    return metrics.getLongWaitInQueue();
   }
   
   @Override
   public long longTimeExecutionEvents() {
-    return longTimeExecution.value();
+    return metrics.getLongExecution();
   }
 
   @Override
@@ -223,6 +196,7 @@ public void close() {
       executor.shutdown();
     }
     executorMap.clear();
+    metrics.unregister();
     DefaultMetricsSystem.instance().unregisterSource(EVENT_QUEUE + name);
   }
 
@@ -273,26 +247,26 @@ public void run() {
           long curTime = Time.monotonicNow();
           if (createTime != 0
               && ((curTime - createTime) > executor.queueWaitThreshold)) {
-            executor.longWaitInQueue.incr();
+            executor.metrics.incrementLongWaitInQueue();
             LOG.warn("Event remained in queue for long time {} millisec, {}",
                 (curTime - createTime), eventId);
           }
 
-          executor.scheduled.incr();
+          executor.metrics.incrementScheduled();
           try {
             executor.eventHandler.onMessage(report,
                 executor.eventPublisher);
-            executor.done.incr();
+            executor.metrics.incrementDone();
             curTime = Time.monotonicNow();
             if (createTime != 0
                 && (curTime - createTime) > executor.execWaitThreshold) {
-              executor.longTimeExecution.incr();
+              executor.metrics.incrementLongExecution();
               LOG.warn("Event taken long execution time {} millisec, {}",
                   (curTime - createTime), eventId);
             }
           } catch (Exception ex) {
             LOG.error("Error on execution message {}", report, ex);
-            executor.failed.incr();
+            executor.metrics.incrementFailed();
           }
           if (Thread.currentThread().isInterrupted()) {
             LOG.warn("Interrupt of execution of Reports");
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
index 48dd30a8d1..5e80e2dace 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
@@ -19,10 +19,6 @@
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import org.apache.hadoop.hdds.utils.MetricsUtil;
-import org.apache.hadoop.metrics2.annotation.Metric;
-import org.apache.hadoop.metrics2.annotation.Metrics;
-import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,7 +27,6 @@
  *
  * @param <P> the payload type of events
  */
-@Metrics(context = "EventQueue")
 public class SingleThreadExecutor<P> implements EventExecutor<P> {
 
   private static final String EVENT_QUEUE = "EventQueue";
@@ -40,20 +35,8 @@ public class SingleThreadExecutor<P> implements 
EventExecutor<P> {
       LoggerFactory.getLogger(SingleThreadExecutor.class);
 
   private final String name;
-
   private final ExecutorService executor;
-
-  @Metric
-  private MutableCounterLong queued;
-
-  @Metric
-  private MutableCounterLong done;
-
-  @Metric
-  private MutableCounterLong failed;
-
-  @Metric
-  private MutableCounterLong scheduled;
+  private final EventExecutorMetrics metrics;
 
   /**
    * Create SingleThreadExecutor.
@@ -63,8 +46,7 @@ public class SingleThreadExecutor<P> implements 
EventExecutor<P> {
    */
   public SingleThreadExecutor(String name, String threadNamePrefix) {
     this.name = name;
-    MetricsUtil.registerDynamic(this, EVENT_QUEUE + name,
-        "Event Executor metrics ", "EventQueue");
+    this.metrics = new EventExecutorMetrics(EVENT_QUEUE + name, "Event 
Executor metrics");
 
     executor = Executors.newSingleThreadExecutor(
         runnable -> {
@@ -77,42 +59,43 @@ public SingleThreadExecutor(String name, String 
threadNamePrefix) {
   @Override
   public void onMessage(EventHandler<P> handler, P message, EventPublisher
       publisher) {
-    queued.incr();
+    metrics.incrementQueued();
     executor.execute(() -> {
-      scheduled.incr();
+      metrics.incrementScheduled();
       try {
         handler.onMessage(message, publisher);
-        done.incr();
+        metrics.incrementDone();
       } catch (Exception ex) {
         LOG.error("Error on execution message {}", message, ex);
-        failed.incr();
+        metrics.incrementFailed();
       }
     });
   }
 
   @Override
   public long failedEvents() {
-    return failed.value();
+    return metrics.getFailed();
   }
 
   @Override
   public long successfulEvents() {
-    return done.value();
+    return metrics.getDone();
   }
 
   @Override
   public long queuedEvents() {
-    return queued.value();
+    return metrics.getQueued();
   }
 
   @Override
   public long scheduledEvents() {
-    return scheduled.value();
+    return metrics.getScheduled();
   }
 
   @Override
   public void close() {
     executor.shutdown();
+    metrics.unregister();
   }
 
   @Override
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/MetricsUtil.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/MetricsUtil.java
deleted file mode 100644
index 81fbf1daf9..0000000000
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/MetricsUtil.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.utils;
-
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.util.Map;
-import org.apache.hadoop.metrics2.annotation.Metrics;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Metrics util for metrics.
- */
-public final class MetricsUtil {
-  private static final String ANNOTATIONS = "annotations";
-  private static final String ANNOTATION_DATA = "annotationData";
-  private static final Class<? extends Annotation> ANNOTATION_TO_ALTER
-      = Metrics.class;
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(MetricsUtil.class);
-  
-  private MetricsUtil() {
-  }
-
-  /**
-   * register metric with changing class annotation for metrics.
-   * 
-   * @param source source to register
-   * @param name name of metric
-   * @param desc description of metric
-   * @param context context of metric
-   * @param <T> source type
-   */
-  public static <T> void registerDynamic(
-      T source, String name, String desc, String context) {
-    updateAnnotation(source.getClass(), name, desc, context);
-    DefaultMetricsSystem.instance().register(name, desc, source);
-  }
-  
-  private static void updateAnnotation(
-      Class clz, String name, String desc, String context) {
-    try {
-      Annotation annotationValue = new Metrics() {
-
-        @Override
-        public Class<? extends Annotation> annotationType() {
-          return ANNOTATION_TO_ALTER;
-        }
-
-        @Override
-        public String name() {
-          return name;
-        }
-
-        @Override
-        public String about() {
-          return desc;
-        }
-
-        @Override
-        public String context() {
-          return context;
-        }
-      };
-      
-      Method method = clz.getClass().getDeclaredMethod(
-          ANNOTATION_DATA, null);
-      method.setAccessible(true);
-      Object annotationData = method.invoke(clz);
-      Field annotations = annotationData.getClass()
-          .getDeclaredField(ANNOTATIONS);
-      annotations.setAccessible(true);
-      Map<Class<? extends Annotation>, Annotation> map =
-          (Map<Class<? extends Annotation>, Annotation>) annotations
-              .get(annotationData);
-      map.put(ANNOTATION_TO_ALTER, annotationValue);
-    } catch (Exception e) {
-      LOG.error("Update Metrics annotation failed. ", e);
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to