This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new bf16a1d302 speeds up checking zoo locks for lots of server processes 
(#5058)
bf16a1d302 is described below

commit bf16a1d302197477a21b643e5e653c8dc6dd34b8
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Thu Nov 14 17:01:40 2024 -0500

    speeds up checking zoo locks for lots of server processes (#5058)
    
    Many operations like listcompactions, listscans, admin serviceStatus
    check the zookeeper locks of server processes.  These lock checks are
    currently done serially and each require a RPC to zookeeper.  This
    change parallelizes the lock checks.
---
 .../accumulo/core/lock/ServiceLockPaths.java       | 61 +++++++++++++++++-----
 1 file changed, 48 insertions(+), 13 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java 
b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java
index 6b577f7e06..1df14549de 100644
--- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java
+++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java
@@ -18,12 +18,17 @@
  */
 package org.apache.accumulo.core.lock;
 
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.function.Predicate;
 
 import org.apache.accumulo.core.Constants;
@@ -412,7 +417,7 @@ public class ServiceLockPaths {
     Objects.requireNonNull(resourceGroupPredicate);
     Objects.requireNonNull(addressSelector);
 
-    final Set<ServiceLockPath> results = new HashSet<>();
+    final Set<ServiceLockPath> results = ConcurrentHashMap.newKeySet();
     final String typePath = ctx.getZooKeeperRoot() + serverType;
     final ZooCache cache = ctx.getZooCache();
 
@@ -451,21 +456,51 @@ public class ServiceLockPaths {
             addressPredicate = addressSelector.getPredicate();
           }
 
-          for (final String server : servers) {
-            if (addressPredicate.test(server)) {
-              final ServiceLockPath slp =
-                  parse(Optional.of(serverType), typePath + "/" + group + "/" 
+ server);
-              if (!withLock || slp.getType().equals(Constants.ZDEADTSERVERS)) {
-                // Dead TServers don't have lock data
-                results.add(slp);
-              } else {
-                final ZcStat stat = new ZcStat();
-                Optional<ServiceLockData> sld = ServiceLock.getLockData(cache, 
slp, stat);
-                if (!sld.isEmpty()) {
+          ExecutorService executor = null;
+          try {
+            if (withLock) {
+              int numThreads = Math.max(1, Math.min(servers.size() / 1000, 
16));
+              executor = Executors.newFixedThreadPool(numThreads);
+            }
+
+            List<Future<?>> futures = new ArrayList<>();
+
+            for (final String server : servers) {
+              if (addressPredicate.test(server)) {
+                final ServiceLockPath slp =
+                    parse(Optional.of(serverType), typePath + "/" + group + 
"/" + server);
+                if (!withLock || 
slp.getType().equals(Constants.ZDEADTSERVERS)) {
+                  // Dead TServers don't have lock data
                   results.add(slp);
+                } else {
+                  // Execute reads to zookeeper to get lock info in parallel. 
The zookeeper client
+                  // has a single shared connection to a server so this will 
not create lots of
+                  // connections, it will place multiple outgoing request on 
that single zookeeper
+                  // connection at the same time though.
+                  futures.add(executor.submit(() -> {
+                    final ZcStat stat = new ZcStat();
+                    Optional<ServiceLockData> sld = 
ServiceLock.getLockData(cache, slp, stat);
+                    if (sld.isPresent()) {
+                      results.add(slp);
+                    }
+                    return null;
+                  }));
                 }
               }
             }
+
+            // wait for futures to complete and check for errors
+            for (var future : futures) {
+              try {
+                future.get();
+              } catch (InterruptedException | ExecutionException e) {
+                throw new IllegalStateException(e);
+              }
+            }
+          } finally {
+            if (executor != null) {
+              executor.shutdownNow();
+            }
           }
         }
       }

Reply via email to