This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new bf16a1d302 speeds up checking zoo locks for lots of server processes (#5058) bf16a1d302 is described below commit bf16a1d302197477a21b643e5e653c8dc6dd34b8 Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu Nov 14 17:01:40 2024 -0500 speeds up checking zoo locks for lots of server processes (#5058) Many operations like listcompactions, listscans, admin serviceStatus check the zookeeper locks of server processes. These lock checks are currently done serially and each require a RPC to zookeeper. This change parallelizes the lock checks. --- .../accumulo/core/lock/ServiceLockPaths.java | 61 +++++++++++++++++----- 1 file changed, 48 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java index 6b577f7e06..1df14549de 100644 --- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java +++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java @@ -18,12 +18,17 @@ */ package org.apache.accumulo.core.lock; +import java.util.ArrayList; import java.util.Collection; -import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.function.Predicate; import org.apache.accumulo.core.Constants; @@ -412,7 +417,7 @@ public class ServiceLockPaths { Objects.requireNonNull(resourceGroupPredicate); Objects.requireNonNull(addressSelector); - final Set<ServiceLockPath> results = new HashSet<>(); + final Set<ServiceLockPath> results = ConcurrentHashMap.newKeySet(); final String typePath = ctx.getZooKeeperRoot() + serverType; final ZooCache cache = ctx.getZooCache(); @@ -451,21 +456,51 @@ public class ServiceLockPaths { addressPredicate = addressSelector.getPredicate(); } - for (final String server : servers) { - if (addressPredicate.test(server)) { - final ServiceLockPath slp = - parse(Optional.of(serverType), typePath + "/" + group + "/" + server); - if (!withLock || slp.getType().equals(Constants.ZDEADTSERVERS)) { - // Dead TServers don't have lock data - results.add(slp); - } else { - final ZcStat stat = new ZcStat(); - Optional<ServiceLockData> sld = ServiceLock.getLockData(cache, slp, stat); - if (!sld.isEmpty()) { + ExecutorService executor = null; + try { + if (withLock) { + int numThreads = Math.max(1, Math.min(servers.size() / 1000, 16)); + executor = Executors.newFixedThreadPool(numThreads); + } + + List<Future<?>> futures = new ArrayList<>(); + + for (final String server : servers) { + if (addressPredicate.test(server)) { + final ServiceLockPath slp = + parse(Optional.of(serverType), typePath + "/" + group + "/" + server); + if (!withLock || slp.getType().equals(Constants.ZDEADTSERVERS)) { + // Dead TServers don't have lock data results.add(slp); + } else { + // Execute reads to zookeeper to get lock info in parallel. The zookeeper client + // has a single shared connection to a server so this will not create lots of + // connections, it will place multiple outgoing request on that single zookeeper + // connection at the same time though. + futures.add(executor.submit(() -> { + final ZcStat stat = new ZcStat(); + Optional<ServiceLockData> sld = ServiceLock.getLockData(cache, slp, stat); + if (sld.isPresent()) { + results.add(slp); + } + return null; + })); } } } + + // wait for futures to complete and check for errors + for (var future : futures) { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new IllegalStateException(e); + } + } + } finally { + if (executor != null) { + executor.shutdownNow(); + } } } }