This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new f06db5b626 refactors filtering getServers API call (#4960) f06db5b626 is described below commit f06db5b6265025a41584a426a4ede7a7c1d80a29 Author: Keith Turner <ktur...@apache.org> AuthorDate: Tue Oct 8 19:40:32 2024 -0400 refactors filtering getServers API call (#4960) Changes filtering in the getServers API call so that it can prune branches while walking the tree of servers in zookeeper. --- .../core/client/admin/InstanceOperations.java | 8 ++- .../core/clientImpl/InstanceOperationsImpl.java | 57 +++++++++++++++------- .../accumulo/core/lock/ServiceLockPaths.java | 7 +-- 3 files changed, 50 insertions(+), 22 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java index 935fcb0283..110812390a 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java @@ -21,6 +21,7 @@ package org.apache.accumulo.core.client.admin; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.BiPredicate; import java.util.function.Consumer; import java.util.function.Predicate; @@ -241,10 +242,15 @@ public interface InstanceOperations { /** * Returns the servers of a given type that match the given criteria * + * @param resourceGroupPredicate only returns servers where the resource group matches this + * predicate. For the manager it does not have a resoruce group and this parameters is not + * used. + * @param hostPortPredicate only returns servers where its host and port match this predicate. * @return set of servers of the supplied type matching the supplied test * @since 4.0.0 */ - Set<ServerId> getServers(ServerId.Type type, Predicate<ServerId> test); + Set<ServerId> getServers(ServerId.Type type, Predicate<String> resourceGroupPredicate, + BiPredicate<String,Integer> hostPortPredicate); /** * List the active scans on a tablet server. diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java index fa9dd6ba44..b5646ce294 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java @@ -38,9 +38,9 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.function.BiPredicate; import java.util.function.Consumer; import java.util.function.Predicate; -import java.util.stream.Collectors; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; @@ -489,7 +489,7 @@ public class InstanceOperationsImpl implements InstanceOperations { throw new IllegalStateException("Multiple servers matching provided address"); } case MANAGER: - Set<ServerId> managers = getServers(type, null); + Set<ServerId> managers = getServers(type, rg2 -> true, hp); if (managers.isEmpty()) { return null; } else { @@ -520,43 +520,64 @@ public class InstanceOperationsImpl implements InstanceOperations { @Override public Set<ServerId> getServers(ServerId.Type type) { - return getServers(type, null); + AddressPredicate addressPredicate = addr -> true; + return getServers(type, rg -> true, addressPredicate); } @Override - public Set<ServerId> getServers(ServerId.Type type, Predicate<ServerId> test) { + public Set<ServerId> getServers(ServerId.Type type, Predicate<String> resourceGroupPredicate, + BiPredicate<String,Integer> hostPortPredicate) { + Objects.requireNonNull(type, "Server type was null"); + Objects.requireNonNull(resourceGroupPredicate, "Resource group predicate was null"); + Objects.requireNonNull(hostPortPredicate, "Host port predicate was null"); + + AddressPredicate addressPredicate = addr -> { + var hp = HostAndPort.fromString(addr); + return hostPortPredicate.test(hp.getHost(), hp.getPort()); + }; + + return getServers(type, resourceGroupPredicate, addressPredicate); + } + + private Set<ServerId> getServers(ServerId.Type type, Predicate<String> resourceGroupPredicate, + AddressPredicate addressPredicate) { + final Set<ServerId> results = new HashSet<>(); + switch (type) { case COMPACTOR: - context.getServerPaths().getCompactor(rg -> true, addr -> true, true) + context.getServerPaths().getCompactor(resourceGroupPredicate::test, addressPredicate, true) .forEach(c -> results.add(createServerId(type, c))); break; case MANAGER: ServiceLockPath m = context.getServerPaths().getManager(true); - Optional<ServiceLockData> sld = context.getZooCache().getLockData(m); - String location = null; - if (sld.isPresent()) { - location = sld.orElseThrow().getAddressString(ThriftService.MANAGER); - HostAndPort hp = HostAndPort.fromString(location); - results.add(new ServerId(type, Constants.DEFAULT_RESOURCE_GROUP_NAME, hp.getHost(), - hp.getPort())); + if (m != null) { + Optional<ServiceLockData> sld = context.getZooCache().getLockData(m); + String location = null; + if (sld.isPresent()) { + location = sld.orElseThrow().getAddressString(ThriftService.MANAGER); + if (addressPredicate.test(location)) { + HostAndPort hp = HostAndPort.fromString(location); + results.add(new ServerId(type, Constants.DEFAULT_RESOURCE_GROUP_NAME, hp.getHost(), + hp.getPort())); + } + } } break; case SCAN_SERVER: - context.getServerPaths().getScanServer(rg -> true, addr -> true, true) + context.getServerPaths().getScanServer(resourceGroupPredicate::test, addressPredicate, true) .forEach(s -> results.add(createServerId(type, s))); break; case TABLET_SERVER: - context.getServerPaths().getTabletServer(rg -> true, addr -> true, true) + context.getServerPaths() + .getTabletServer(resourceGroupPredicate::test, addressPredicate, true) .forEach(t -> results.add(createServerId(type, t))); break; default: break; } - if (test == null) { - return Collections.unmodifiableSet(results); - } - return results.stream().filter(test).collect(Collectors.toUnmodifiableSet()); + + return Collections.unmodifiableSet(results); } private ServerId createServerId(ServerId.Type type, ServiceLockPath slp) { diff --git a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java index 13752a9ae1..38cad55da5 100644 --- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java +++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java @@ -26,6 +26,7 @@ import java.util.Set; import java.util.function.Predicate; import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat; @@ -396,14 +397,14 @@ public class ServiceLockPaths { if (resourceGroupPredicate.test(group)) { final List<String> servers = cache.getChildren(typePath + "/" + group); for (final String server : servers) { - final ZcStat stat = new ZcStat(); - final ServiceLockPath slp = - parse(Optional.of(serverType), typePath + "/" + group + "/" + server); if (addressPredicate.test(server)) { + final ServiceLockPath slp = + parse(Optional.of(serverType), typePath + "/" + group + "/" + server); if (!withLock || slp.getType().equals(Constants.ZDEADTSERVERS)) { // Dead TServers don't have lock data results.add(slp); } else { + final ZcStat stat = new ZcStat(); Optional<ServiceLockData> sld = ServiceLock.getLockData(cache, slp, stat); if (!sld.isEmpty()) { results.add(slp);