This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new f06db5b626 refactors filtering getServers API call (#4960)
f06db5b626 is described below

commit f06db5b6265025a41584a426a4ede7a7c1d80a29
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Tue Oct 8 19:40:32 2024 -0400

    refactors filtering getServers API call (#4960)
    
    Changes filtering in the getServers API call so that it can prune
    branches while walking the tree of servers in zookeeper.
---
 .../core/client/admin/InstanceOperations.java      |  8 ++-
 .../core/clientImpl/InstanceOperationsImpl.java    | 57 +++++++++++++++-------
 .../accumulo/core/lock/ServiceLockPaths.java       |  7 +--
 3 files changed, 50 insertions(+), 22 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
 
b/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
index 935fcb0283..110812390a 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.core.client.admin;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.BiPredicate;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 
@@ -241,10 +242,15 @@ public interface InstanceOperations {
   /**
    * Returns the servers of a given type that match the given criteria
    *
+   * @param resourceGroupPredicate only returns servers where the resource 
group matches this
+   *        predicate. For the manager it does not have a resoruce group and 
this parameters is not
+   *        used.
+   * @param hostPortPredicate only returns servers where its host and port 
match this predicate.
    * @return set of servers of the supplied type matching the supplied test
    * @since 4.0.0
    */
-  Set<ServerId> getServers(ServerId.Type type, Predicate<ServerId> test);
+  Set<ServerId> getServers(ServerId.Type type, Predicate<String> 
resourceGroupPredicate,
+      BiPredicate<String,Integer> hostPortPredicate);
 
   /**
    * List the active scans on a tablet server.
diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
index fa9dd6ba44..b5646ce294 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java
@@ -38,9 +38,9 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.function.BiPredicate;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
-import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
@@ -489,7 +489,7 @@ public class InstanceOperationsImpl implements 
InstanceOperations {
           throw new IllegalStateException("Multiple servers matching provided 
address");
         }
       case MANAGER:
-        Set<ServerId> managers = getServers(type, null);
+        Set<ServerId> managers = getServers(type, rg2 -> true, hp);
         if (managers.isEmpty()) {
           return null;
         } else {
@@ -520,43 +520,64 @@ public class InstanceOperationsImpl implements 
InstanceOperations {
 
   @Override
   public Set<ServerId> getServers(ServerId.Type type) {
-    return getServers(type, null);
+    AddressPredicate addressPredicate = addr -> true;
+    return getServers(type, rg -> true, addressPredicate);
   }
 
   @Override
-  public Set<ServerId> getServers(ServerId.Type type, Predicate<ServerId> 
test) {
+  public Set<ServerId> getServers(ServerId.Type type, Predicate<String> 
resourceGroupPredicate,
+      BiPredicate<String,Integer> hostPortPredicate) {
+    Objects.requireNonNull(type, "Server type was null");
+    Objects.requireNonNull(resourceGroupPredicate, "Resource group predicate 
was null");
+    Objects.requireNonNull(hostPortPredicate, "Host port predicate was null");
+
+    AddressPredicate addressPredicate = addr -> {
+      var hp = HostAndPort.fromString(addr);
+      return hostPortPredicate.test(hp.getHost(), hp.getPort());
+    };
+
+    return getServers(type, resourceGroupPredicate, addressPredicate);
+  }
+
+  private Set<ServerId> getServers(ServerId.Type type, Predicate<String> 
resourceGroupPredicate,
+      AddressPredicate addressPredicate) {
+
     final Set<ServerId> results = new HashSet<>();
+
     switch (type) {
       case COMPACTOR:
-        context.getServerPaths().getCompactor(rg -> true, addr -> true, true)
+        context.getServerPaths().getCompactor(resourceGroupPredicate::test, 
addressPredicate, true)
             .forEach(c -> results.add(createServerId(type, c)));
         break;
       case MANAGER:
         ServiceLockPath m = context.getServerPaths().getManager(true);
-        Optional<ServiceLockData> sld = context.getZooCache().getLockData(m);
-        String location = null;
-        if (sld.isPresent()) {
-          location = sld.orElseThrow().getAddressString(ThriftService.MANAGER);
-          HostAndPort hp = HostAndPort.fromString(location);
-          results.add(new ServerId(type, 
Constants.DEFAULT_RESOURCE_GROUP_NAME, hp.getHost(),
-              hp.getPort()));
+        if (m != null) {
+          Optional<ServiceLockData> sld = context.getZooCache().getLockData(m);
+          String location = null;
+          if (sld.isPresent()) {
+            location = 
sld.orElseThrow().getAddressString(ThriftService.MANAGER);
+            if (addressPredicate.test(location)) {
+              HostAndPort hp = HostAndPort.fromString(location);
+              results.add(new ServerId(type, 
Constants.DEFAULT_RESOURCE_GROUP_NAME, hp.getHost(),
+                  hp.getPort()));
+            }
+          }
         }
         break;
       case SCAN_SERVER:
-        context.getServerPaths().getScanServer(rg -> true, addr -> true, true)
+        context.getServerPaths().getScanServer(resourceGroupPredicate::test, 
addressPredicate, true)
             .forEach(s -> results.add(createServerId(type, s)));
         break;
       case TABLET_SERVER:
-        context.getServerPaths().getTabletServer(rg -> true, addr -> true, 
true)
+        context.getServerPaths()
+            .getTabletServer(resourceGroupPredicate::test, addressPredicate, 
true)
             .forEach(t -> results.add(createServerId(type, t)));
         break;
       default:
         break;
     }
-    if (test == null) {
-      return Collections.unmodifiableSet(results);
-    }
-    return 
results.stream().filter(test).collect(Collectors.toUnmodifiableSet());
+
+    return Collections.unmodifiableSet(results);
   }
 
   private ServerId createServerId(ServerId.Type type, ServiceLockPath slp) {
diff --git 
a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java 
b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java
index 13752a9ae1..38cad55da5 100644
--- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java
+++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import java.util.function.Predicate;
 
 import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.admin.servers.ServerId;
 import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.fate.zookeeper.ZooCache;
 import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat;
@@ -396,14 +397,14 @@ public class ServiceLockPaths {
         if (resourceGroupPredicate.test(group)) {
           final List<String> servers = cache.getChildren(typePath + "/" + 
group);
           for (final String server : servers) {
-            final ZcStat stat = new ZcStat();
-            final ServiceLockPath slp =
-                parse(Optional.of(serverType), typePath + "/" + group + "/" + 
server);
             if (addressPredicate.test(server)) {
+              final ServiceLockPath slp =
+                  parse(Optional.of(serverType), typePath + "/" + group + "/" 
+ server);
               if (!withLock || slp.getType().equals(Constants.ZDEADTSERVERS)) {
                 // Dead TServers don't have lock data
                 results.add(slp);
               } else {
+                final ZcStat stat = new ZcStat();
                 Optional<ServiceLockData> sld = ServiceLock.getLockData(cache, 
slp, stat);
                 if (!sld.isEmpty()) {
                   results.add(slp);

Reply via email to