This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 4973-new-monitor-metrics in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/4973-new-monitor-metrics by this push: new 7ef6cf5f6f Uses more concurrent data structs in new monitor 7ef6cf5f6f is described below commit 7ef6cf5f6f1d57adc3f3f7dd6e7f5df986ae4599 Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri Nov 22 20:47:30 2024 +0000 Uses more concurrent data structs in new monitor In the new monitor code SystemInformation class there are ConcurrentHashMaps that have non concurrent data structs for values. This was causing problems when multiple threads accessed. Changed the values to be concurrent. Also changed a few other map to be concurrent just to remove use the normal HashMap import in the class. --- .../accumulo/monitor/next/SystemInformation.java | 45 ++++++++++++---------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java index a00b53b26f..a6f9f62e66 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java @@ -22,8 +22,7 @@ import java.nio.ByteBuffer; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -243,13 +242,13 @@ public class SystemInformation { } public static class ProcessSummary { - private long configured = 0; - private long responded = 0; - private Set<String> notResponded = new HashSet<>(); + private final AtomicLong configured = new AtomicLong(0); + private final AtomicLong responded = new AtomicLong(); + private final Set<String> notResponded = ConcurrentHashMap.newKeySet(); public void addResponded() { - configured++; - responded++; + configured.incrementAndGet(); + responded.incrementAndGet(); } public void addNotResponded(ServerId server) { @@ -257,11 +256,11 @@ public class SystemInformation { } public long getConfigured() { - return this.configured; + return this.configured.get(); } public long getResponded() { - return this.responded; + return this.responded.get(); } public long getNotResponded() { @@ -283,8 +282,8 @@ public class SystemInformation { private final Cache<ServerId,MetricResponse> allMetrics; - private final Set<String> resourceGroups = new HashSet<>(); - private final Set<ServerId> problemHosts = new HashSet<>(); + private final Set<String> resourceGroups = ConcurrentHashMap.newKeySet(); + private final Set<ServerId> problemHosts = ConcurrentHashMap.newKeySet(); private final AtomicReference<ServerId> manager = new AtomicReference<>(); private final AtomicReference<ServerId> gc = new AtomicReference<>(); @@ -312,7 +311,7 @@ public class SystemInformation { new ConcurrentHashMap<>(); // Compaction Information - private final Map<String,List<FMetric>> queueMetrics = new HashMap<>(); + private final Map<String,List<FMetric>> queueMetrics = new ConcurrentHashMap<>(); private final AtomicReference<Map<String,TExternalCompaction>> runningCompactions = new AtomicReference<>(); private final AtomicReference<Map<Long,String>> runningCompactionsDurationIndex = @@ -323,7 +322,7 @@ public class SystemInformation { private final Map<String,List<TabletInformation>> tablets = new ConcurrentHashMap<>(); // Deployment Overview - private final Map<String,Map<String,ProcessSummary>> deployment = new HashMap<>(); + private final Map<String,Map<String,ProcessSummary>> deployment = new ConcurrentHashMap<>(); public SystemInformation(Cache<ServerId,MetricResponse> allMetrics) { this.allMetrics = allMetrics; @@ -389,7 +388,9 @@ public class SystemInformation { for (int i = 0; i < fm.tagsLength(); i++) { FTag t = fm.tags(i); if (t.key().equals("queue.id")) { - queueMetrics.computeIfAbsent(t.value(), (k) -> new ArrayList<>()).add(fm); + queueMetrics + .computeIfAbsent(t.value(), (k) -> Collections.synchronizedList(new ArrayList<>())) + .add(fm); } } } @@ -402,7 +403,8 @@ public class SystemInformation { resourceGroups.add(response.getResourceGroup()); switch (response.serverType) { case COMPACTOR: - compactors.computeIfAbsent(response.getResourceGroup(), (rg) -> new HashSet<>()) + compactors + .computeIfAbsent(response.getResourceGroup(), (rg) -> ConcurrentHashMap.newKeySet()) .add(server); updateAggregates(response, totalCompactorMetrics, rgCompactorMetrics); break; @@ -418,11 +420,13 @@ public class SystemInformation { createCompactionSummary(response); break; case SCAN_SERVER: - sservers.computeIfAbsent(response.getResourceGroup(), (rg) -> new HashSet<>()).add(server); + sservers.computeIfAbsent(response.getResourceGroup(), (rg) -> ConcurrentHashMap.newKeySet()) + .add(server); updateAggregates(response, totalSServerMetrics, rgSServerMetrics); break; case TABLET_SERVER: - tservers.computeIfAbsent(response.getResourceGroup(), (rg) -> new HashSet<>()).add(server); + tservers.computeIfAbsent(response.getResourceGroup(), (rg) -> ConcurrentHashMap.newKeySet()) + .add(server); updateAggregates(response, totalTServerMetrics, rgTServerMetrics); break; default: @@ -451,7 +455,8 @@ public class SystemInformation { public void processTabletInformation(String tableName, TabletInformation info) { final SanitizedTabletInformation sti = new SanitizedTabletInformation(info); - tablets.computeIfAbsent(tableName, (t) -> new ArrayList<>()).add(sti); + tablets.computeIfAbsent(tableName, (t) -> Collections.synchronizedList(new ArrayList<>())) + .add(sti); tables.computeIfAbsent(tableName, (t) -> new TableSummary()).addTablet(sti); } @@ -462,11 +467,11 @@ public class SystemInformation { public void finish() { // Iterate over the metrics allMetrics.asMap().keySet().forEach(serverId -> { - deployment.computeIfAbsent(serverId.getResourceGroup(), g -> new HashMap<>()) + deployment.computeIfAbsent(serverId.getResourceGroup(), g -> new ConcurrentHashMap<>()) .computeIfAbsent(serverId.getType().name(), t -> new ProcessSummary()).addResponded(); }); problemHosts.forEach(serverId -> { - deployment.computeIfAbsent(serverId.getResourceGroup(), g -> new HashMap<>()) + deployment.computeIfAbsent(serverId.getResourceGroup(), g -> new ConcurrentHashMap<>()) .computeIfAbsent(serverId.getType().name(), t -> new ProcessSummary()) .addNotResponded(serverId); });