This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 4973-new-monitor-metrics
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/4973-new-monitor-metrics by 
this push:
     new 7ef6cf5f6f Uses more concurrent data structs in new monitor
7ef6cf5f6f is described below

commit 7ef6cf5f6f1d57adc3f3f7dd6e7f5df986ae4599
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Fri Nov 22 20:47:30 2024 +0000

    Uses more concurrent data structs in new monitor
    
    In the new monitor code SystemInformation class there are
    ConcurrentHashMaps that have non concurrent data structs for values.
    This was causing problems when multiple threads accessed.  Changed the
    values to be concurrent.  Also changed a few other map to be concurrent
    just to remove use the normal HashMap import in the class.
---
 .../accumulo/monitor/next/SystemInformation.java   | 45 ++++++++++++----------
 1 file changed, 25 insertions(+), 20 deletions(-)

diff --git 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java
 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java
index a00b53b26f..a6f9f62e66 100644
--- 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java
+++ 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java
@@ -22,8 +22,7 @@ import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -243,13 +242,13 @@ public class SystemInformation {
   }
 
   public static class ProcessSummary {
-    private long configured = 0;
-    private long responded = 0;
-    private Set<String> notResponded = new HashSet<>();
+    private final AtomicLong configured = new AtomicLong(0);
+    private final AtomicLong responded = new AtomicLong();
+    private final Set<String> notResponded = ConcurrentHashMap.newKeySet();
 
     public void addResponded() {
-      configured++;
-      responded++;
+      configured.incrementAndGet();
+      responded.incrementAndGet();
     }
 
     public void addNotResponded(ServerId server) {
@@ -257,11 +256,11 @@ public class SystemInformation {
     }
 
     public long getConfigured() {
-      return this.configured;
+      return this.configured.get();
     }
 
     public long getResponded() {
-      return this.responded;
+      return this.responded.get();
     }
 
     public long getNotResponded() {
@@ -283,8 +282,8 @@ public class SystemInformation {
 
   private final Cache<ServerId,MetricResponse> allMetrics;
 
-  private final Set<String> resourceGroups = new HashSet<>();
-  private final Set<ServerId> problemHosts = new HashSet<>();
+  private final Set<String> resourceGroups = ConcurrentHashMap.newKeySet();
+  private final Set<ServerId> problemHosts = ConcurrentHashMap.newKeySet();
   private final AtomicReference<ServerId> manager = new AtomicReference<>();
   private final AtomicReference<ServerId> gc = new AtomicReference<>();
 
@@ -312,7 +311,7 @@ public class SystemInformation {
       new ConcurrentHashMap<>();
 
   // Compaction Information
-  private final Map<String,List<FMetric>> queueMetrics = new HashMap<>();
+  private final Map<String,List<FMetric>> queueMetrics = new 
ConcurrentHashMap<>();
   private final AtomicReference<Map<String,TExternalCompaction>> 
runningCompactions =
       new AtomicReference<>();
   private final AtomicReference<Map<Long,String>> 
runningCompactionsDurationIndex =
@@ -323,7 +322,7 @@ public class SystemInformation {
   private final Map<String,List<TabletInformation>> tablets = new 
ConcurrentHashMap<>();
 
   // Deployment Overview
-  private final Map<String,Map<String,ProcessSummary>> deployment = new 
HashMap<>();
+  private final Map<String,Map<String,ProcessSummary>> deployment = new 
ConcurrentHashMap<>();
 
   public SystemInformation(Cache<ServerId,MetricResponse> allMetrics) {
     this.allMetrics = allMetrics;
@@ -389,7 +388,9 @@ public class SystemInformation {
         for (int i = 0; i < fm.tagsLength(); i++) {
           FTag t = fm.tags(i);
           if (t.key().equals("queue.id")) {
-            queueMetrics.computeIfAbsent(t.value(), (k) -> new 
ArrayList<>()).add(fm);
+            queueMetrics
+                .computeIfAbsent(t.value(), (k) -> 
Collections.synchronizedList(new ArrayList<>()))
+                .add(fm);
           }
         }
       }
@@ -402,7 +403,8 @@ public class SystemInformation {
     resourceGroups.add(response.getResourceGroup());
     switch (response.serverType) {
       case COMPACTOR:
-        compactors.computeIfAbsent(response.getResourceGroup(), (rg) -> new 
HashSet<>())
+        compactors
+            .computeIfAbsent(response.getResourceGroup(), (rg) -> 
ConcurrentHashMap.newKeySet())
             .add(server);
         updateAggregates(response, totalCompactorMetrics, rgCompactorMetrics);
         break;
@@ -418,11 +420,13 @@ public class SystemInformation {
         createCompactionSummary(response);
         break;
       case SCAN_SERVER:
-        sservers.computeIfAbsent(response.getResourceGroup(), (rg) -> new 
HashSet<>()).add(server);
+        sservers.computeIfAbsent(response.getResourceGroup(), (rg) -> 
ConcurrentHashMap.newKeySet())
+            .add(server);
         updateAggregates(response, totalSServerMetrics, rgSServerMetrics);
         break;
       case TABLET_SERVER:
-        tservers.computeIfAbsent(response.getResourceGroup(), (rg) -> new 
HashSet<>()).add(server);
+        tservers.computeIfAbsent(response.getResourceGroup(), (rg) -> 
ConcurrentHashMap.newKeySet())
+            .add(server);
         updateAggregates(response, totalTServerMetrics, rgTServerMetrics);
         break;
       default:
@@ -451,7 +455,8 @@ public class SystemInformation {
 
   public void processTabletInformation(String tableName, TabletInformation 
info) {
     final SanitizedTabletInformation sti = new 
SanitizedTabletInformation(info);
-    tablets.computeIfAbsent(tableName, (t) -> new ArrayList<>()).add(sti);
+    tablets.computeIfAbsent(tableName, (t) -> Collections.synchronizedList(new 
ArrayList<>()))
+        .add(sti);
     tables.computeIfAbsent(tableName, (t) -> new 
TableSummary()).addTablet(sti);
   }
 
@@ -462,11 +467,11 @@ public class SystemInformation {
   public void finish() {
     // Iterate over the metrics
     allMetrics.asMap().keySet().forEach(serverId -> {
-      deployment.computeIfAbsent(serverId.getResourceGroup(), g -> new 
HashMap<>())
+      deployment.computeIfAbsent(serverId.getResourceGroup(), g -> new 
ConcurrentHashMap<>())
           .computeIfAbsent(serverId.getType().name(), t -> new 
ProcessSummary()).addResponded();
     });
     problemHosts.forEach(serverId -> {
-      deployment.computeIfAbsent(serverId.getResourceGroup(), g -> new 
HashMap<>())
+      deployment.computeIfAbsent(serverId.getResourceGroup(), g -> new 
ConcurrentHashMap<>())
           .computeIfAbsent(serverId.getType().name(), t -> new 
ProcessSummary())
           .addNotResponded(serverId);
     });

Reply via email to