This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new be5e4751f7 Avoids scanning all servers when looking for a single server (#4987) be5e4751f7 is described below commit be5e4751f7200a07f4f2d262d81a994e9822e203 Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu Oct 17 17:47:36 2024 -0400 Avoids scanning all servers when looking for a single server (#4987) Modified code that finds a server in zookeeper to avoid scanning all servers when looking for single server. fixes #4961 --- .../accumulo/core/clientImpl/ClientContext.java | 7 +- .../core/clientImpl/InstanceOperationsImpl.java | 27 +++-- .../core/clientImpl/ZookeeperLockChecker.java | 8 +- .../accumulo/core/lock/ServiceLockPaths.java | 81 ++++++++++++--- .../core/metadata/schema/TabletMetadata.java | 5 +- .../accumulo/core/rpc/clients/TServerClient.java | 17 ++-- .../util/compaction/ExternalCompactionUtil.java | 9 +- .../core/clientImpl/ZookeeperLockCheckerTest.java | 6 +- .../accumulo/core/lock/ServiceLockPathsTest.java | 110 +++++++++++++-------- .../miniclusterImpl/MiniAccumuloClusterImpl.java | 7 +- .../accumulo/server/manager/LiveTServerSet.java | 7 +- .../server/manager/state/DeadServerList.java | 3 +- .../accumulo/server/util/AccumuloStatus.java | 3 +- .../org/apache/accumulo/server/util/Admin.java | 6 +- .../accumulo/server/util/ServiceStatusCmd.java | 7 +- .../accumulo/server/util/TabletServerLocks.java | 6 +- .../org/apache/accumulo/server/util/ZooZap.java | 7 +- .../org/apache/accumulo/server/util/AdminTest.java | 1 - .../java/org/apache/accumulo/manager/Manager.java | 3 +- .../java/org/apache/accumulo/test/RecoveryIT.java | 5 +- .../test/ScanServerConcurrentTabletScanIT.java | 3 +- .../test/ScanServerGroupConfigurationIT.java | 13 +-- .../org/apache/accumulo/test/ScanServerIT.java | 3 +- .../accumulo/test/ScanServerMetadataEntriesIT.java | 3 +- .../accumulo/test/ScanServerMultipleScansIT.java | 3 +- .../apache/accumulo/test/ScanServerShutdownIT.java | 5 +- .../CompactionPriorityQueueMetricsIT.java | 3 +- .../accumulo/test/fate/FateOpsCommandsIT.java | 3 +- .../test/functional/MemoryStarvedMajCIT.java | 4 +- .../test/functional/MemoryStarvedScanIT.java | 3 +- .../functional/TabletManagementIteratorIT.java | 3 +- .../functional/TabletResourceGroupBalanceIT.java | 3 +- .../accumulo/test/lock/ServiceLockPathsIT.java | 70 +++++++------ 33 files changed, 278 insertions(+), 166 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index 65ad0556a0..f5bc596872 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@ -86,6 +86,7 @@ import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; import org.apache.accumulo.core.lock.ServiceLockPaths; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.metadata.schema.Ample; @@ -196,8 +197,8 @@ public class ClientContext implements AccumuloClient { @Override public Supplier<Collection<ScanServerInfo>> getScanServers() { - return () -> getServerPaths().getScanServer(rg -> true, addr -> true, true).stream() - .map(entry -> new ScanServerInfo() { + return () -> getServerPaths().getScanServer(rg -> true, AddressSelector.all(), true) + .stream().map(entry -> new ScanServerInfo() { @Override public String getAddress() { return entry.getServer(); @@ -414,7 +415,7 @@ public class ClientContext implements AccumuloClient { public Map<String,Pair<UUID,String>> getScanServers() { Map<String,Pair<UUID,String>> liveScanServers = new HashMap<>(); Set<ServiceLockPath> scanServerPaths = - getServerPaths().getScanServer(rg -> true, addr -> true, true); + getServerPaths().getScanServer(rg -> true, AddressSelector.all(), true); for (ServiceLockPath path : scanServerPaths) { try { ZcStat stat = new ZcStat(); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java index 1ba125ab0f..60ccc9145d 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java @@ -59,7 +59,7 @@ import org.apache.accumulo.core.data.InstanceId; import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; -import org.apache.accumulo.core.lock.ServiceLockPaths.AddressPredicate; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.lock.ServiceLockPaths.ResourceGroupPredicate; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; @@ -237,7 +237,7 @@ public class InstanceOperationsImpl implements InstanceOperations { @Deprecated(since = "4.0.0") public Set<String> getCompactors() { Set<String> results = new HashSet<>(); - context.getServerPaths().getCompactor(rg -> true, addr -> true, true) + context.getServerPaths().getCompactor(rg -> true, AddressSelector.all(), true) .forEach(t -> results.add(t.getServer())); return results; } @@ -246,7 +246,7 @@ public class InstanceOperationsImpl implements InstanceOperations { @Deprecated(since = "4.0.0") public Set<String> getScanServers() { Set<String> results = new HashSet<>(); - context.getServerPaths().getScanServer(rg -> true, addr -> true, true) + context.getServerPaths().getScanServer(rg -> true, AddressSelector.all(), true) .forEach(t -> results.add(t.getServer())); return results; } @@ -255,7 +255,7 @@ public class InstanceOperationsImpl implements InstanceOperations { @Deprecated(since = "4.0.0") public List<String> getTabletServers() { List<String> results = new ArrayList<>(); - context.getServerPaths().getTabletServer(rg -> true, addr -> true, true) + context.getServerPaths().getTabletServer(rg -> true, AddressSelector.all(), true) .forEach(t -> results.add(t.getServer())); return results; } @@ -482,7 +482,7 @@ public class InstanceOperationsImpl implements InstanceOperations { final ResourceGroupPredicate rg = resourceGroup == null ? rgt -> true : rgt -> rgt.equals(resourceGroup); - final AddressPredicate hp = AddressPredicate.exact(HostAndPort.fromParts(host, port)); + final AddressSelector hp = AddressSelector.exact(HostAndPort.fromParts(host, port)); switch (type) { case COMPACTOR: @@ -526,8 +526,7 @@ public class InstanceOperationsImpl implements InstanceOperations { @Override public Set<ServerId> getServers(ServerId.Type type) { - AddressPredicate addressPredicate = addr -> true; - return getServers(type, rg -> true, addressPredicate); + return getServers(type, rg -> true, AddressSelector.all()); } @Override @@ -537,22 +536,22 @@ public class InstanceOperationsImpl implements InstanceOperations { Objects.requireNonNull(resourceGroupPredicate, "Resource group predicate was null"); Objects.requireNonNull(hostPortPredicate, "Host port predicate was null"); - AddressPredicate addressPredicate = addr -> { + AddressSelector addressPredicate = AddressSelector.matching(addr -> { var hp = HostAndPort.fromString(addr); return hostPortPredicate.test(hp.getHost(), hp.getPort()); - }; + }); return getServers(type, resourceGroupPredicate, addressPredicate); } private Set<ServerId> getServers(ServerId.Type type, Predicate<String> resourceGroupPredicate, - AddressPredicate addressPredicate) { + AddressSelector addressSelector) { final Set<ServerId> results = new HashSet<>(); switch (type) { case COMPACTOR: - context.getServerPaths().getCompactor(resourceGroupPredicate::test, addressPredicate, true) + context.getServerPaths().getCompactor(resourceGroupPredicate::test, addressSelector, true) .forEach(c -> results.add(createServerId(type, c))); break; case MANAGER: @@ -562,7 +561,7 @@ public class InstanceOperationsImpl implements InstanceOperations { String location = null; if (sld.isPresent()) { location = sld.orElseThrow().getAddressString(ThriftService.MANAGER); - if (addressPredicate.test(location)) { + if (addressSelector.getPredicate().test(location)) { HostAndPort hp = HostAndPort.fromString(location); results.add(new ServerId(type, Constants.DEFAULT_RESOURCE_GROUP_NAME, hp.getHost(), hp.getPort())); @@ -571,12 +570,12 @@ public class InstanceOperationsImpl implements InstanceOperations { } break; case SCAN_SERVER: - context.getServerPaths().getScanServer(resourceGroupPredicate::test, addressPredicate, true) + context.getServerPaths().getScanServer(resourceGroupPredicate::test, addressSelector, true) .forEach(s -> results.add(createServerId(type, s))); break; case TABLET_SERVER: context.getServerPaths() - .getTabletServer(resourceGroupPredicate::test, addressPredicate, true) + .getTabletServer(resourceGroupPredicate::test, addressSelector, true) .forEach(t -> results.add(createServerId(type, t))); break; default: diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ZookeeperLockChecker.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ZookeeperLockChecker.java index 2f0367c102..23e8001deb 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ZookeeperLockChecker.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ZookeeperLockChecker.java @@ -22,7 +22,7 @@ import java.util.Set; import org.apache.accumulo.core.clientImpl.ClientTabletCacheImpl.TabletServerLockChecker; import org.apache.accumulo.core.lock.ServiceLock; -import org.apache.accumulo.core.lock.ServiceLockPaths.AddressPredicate; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import com.google.common.net.HostAndPort; @@ -39,7 +39,7 @@ public class ZookeeperLockChecker implements TabletServerLockChecker { // ServiceLockPaths only returns items that have a lock var hostAndPort = HostAndPort.fromString(server); Set<ServiceLockPath> tservers = - ctx.getServerPaths().getTabletServer(rg -> true, AddressPredicate.exact(hostAndPort), true); + ctx.getServerPaths().getTabletServer(rg -> true, AddressSelector.exact(hostAndPort), true); return !tservers.isEmpty(); } @@ -48,7 +48,7 @@ public class ZookeeperLockChecker implements TabletServerLockChecker { // ServiceLockPaths only returns items that have a lock var hostAndPort = HostAndPort.fromString(server); Set<ServiceLockPath> tservers = - ctx.getServerPaths().getTabletServer(rg -> true, AddressPredicate.exact(hostAndPort), true); + ctx.getServerPaths().getTabletServer(rg -> true, AddressSelector.exact(hostAndPort), true); for (ServiceLockPath slp : tservers) { if (ServiceLock.getSessionId(ctx.getZooCache(), slp) == Long.parseLong(session, 16)) { return true; @@ -60,7 +60,7 @@ public class ZookeeperLockChecker implements TabletServerLockChecker { @Override public void invalidateCache(String tserver) { var hostAndPort = HostAndPort.fromString(tserver); - ctx.getServerPaths().getTabletServer(rg -> true, AddressPredicate.exact(hostAndPort), false) + ctx.getServerPaths().getTabletServer(rg -> true, AddressSelector.exact(hostAndPort), false) .forEach(slp -> { ctx.getZooCache().clear(slp.toString()); }); diff --git a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java index 38cad55da5..fd3d765a3b 100644 --- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java +++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.core.lock; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Objects; @@ -285,7 +286,7 @@ public class ServiceLockPaths { } public Set<ServiceLockPath> getCompactor(ResourceGroupPredicate resourceGroupPredicate, - AddressPredicate address, boolean withLock) { + AddressSelector address, boolean withLock) { return get(Constants.ZCOMPACTORS, resourceGroupPredicate, address, withLock); } @@ -295,7 +296,8 @@ public class ServiceLockPaths { * the ZooKeeper path. */ public ServiceLockPath getGarbageCollector(boolean withLock) { - Set<ServiceLockPath> results = get(Constants.ZGC_LOCK, rg -> true, addr -> true, withLock); + Set<ServiceLockPath> results = + get(Constants.ZGC_LOCK, rg -> true, AddressSelector.all(), withLock); if (results.isEmpty()) { return null; } else { @@ -309,7 +311,8 @@ public class ServiceLockPaths { * InstanceOperations.getServers(ServerId.Type.MANAGER) to get the location. */ public ServiceLockPath getManager(boolean withLock) { - Set<ServiceLockPath> results = get(Constants.ZMANAGER_LOCK, rg -> true, addr -> true, withLock); + Set<ServiceLockPath> results = + get(Constants.ZMANAGER_LOCK, rg -> true, AddressSelector.all(), withLock); if (results.isEmpty()) { return null; } else { @@ -318,7 +321,8 @@ public class ServiceLockPaths { } public ServiceLockPath getMonitor(boolean withLock) { - Set<ServiceLockPath> results = get(Constants.ZMONITOR_LOCK, rg -> true, addr -> true, withLock); + Set<ServiceLockPath> results = + get(Constants.ZMONITOR_LOCK, rg -> true, AddressSelector.all(), withLock); if (results.isEmpty()) { return null; } else { @@ -327,17 +331,17 @@ public class ServiceLockPaths { } public Set<ServiceLockPath> getScanServer(ResourceGroupPredicate resourceGroupPredicate, - AddressPredicate address, boolean withLock) { + AddressSelector address, boolean withLock) { return get(Constants.ZSSERVERS, resourceGroupPredicate, address, withLock); } public Set<ServiceLockPath> getTabletServer(ResourceGroupPredicate resourceGroupPredicate, - AddressPredicate address, boolean withLock) { + AddressSelector address, boolean withLock) { return get(Constants.ZTSERVERS, resourceGroupPredicate, address, withLock); } public Set<ServiceLockPath> getDeadTabletServer(ResourceGroupPredicate resourceGroupPredicate, - AddressPredicate address, boolean withLock) { + AddressSelector address, boolean withLock) { return get(Constants.ZDEADTSERVERS, resourceGroupPredicate, address, withLock); } @@ -345,11 +349,41 @@ public class ServiceLockPaths { } - public interface AddressPredicate extends Predicate<String> { + public static class AddressSelector { + private final Predicate<String> predicate; + private final HostAndPort exactAddress; - static AddressPredicate exact(HostAndPort hostAndPort) { - Objects.requireNonNull(hostAndPort); - AddressPredicate predicate = addr -> hostAndPort.equals(HostAndPort.fromString(addr)); + private AddressSelector(Predicate<String> predicate, HostAndPort exactAddress) { + Preconditions.checkArgument((predicate == null && exactAddress != null) + || (predicate != null && exactAddress == null)); + if (predicate == null) { + String hp = exactAddress.toString(); + this.predicate = addr -> addr.equals(hp); + } else { + this.predicate = predicate; + } + this.exactAddress = exactAddress; + } + + public static AddressSelector exact(HostAndPort hostAndPort) { + return new AddressSelector(null, hostAndPort); + } + + public static AddressSelector matching(Predicate<String> predicate) { + return new AddressSelector(predicate, null); + } + + private static AddressSelector ALL = new AddressSelector(s -> true, null); + + public static AddressSelector all() { + return ALL; + } + + public HostAndPort getExactAddress() { + return exactAddress; + } + + public Predicate<String> getPredicate() { return predicate; } } @@ -361,18 +395,18 @@ public class ServiceLockPaths { * @param serverType type of lock, should be something like Constants.ZTSERVERS or * Constants.ZMANAGER_LOCK * @param resourceGroupPredicate only returns servers in resource groups that pass this predicate - * @param addressPredicate only return servers that match this predicate + * @param addressSelector only return servers that meet this criteria * @param withLock supply true if you only want to return servers that have an active lock. Not * applicable for types that don't use a lock (e.g. dead tservers) * @return set of ServiceLockPath objects for the paths found based on the search criteria */ private Set<ServiceLockPath> get(final String serverType, - ResourceGroupPredicate resourceGroupPredicate, AddressPredicate addressPredicate, + ResourceGroupPredicate resourceGroupPredicate, AddressSelector addressSelector, boolean withLock) { Objects.requireNonNull(serverType); Objects.requireNonNull(resourceGroupPredicate); - Objects.requireNonNull(addressPredicate); + Objects.requireNonNull(addressSelector); final Set<ServiceLockPath> results = new HashSet<>(); final String typePath = ctx.getZooKeeperRoot() + serverType; @@ -395,7 +429,24 @@ public class ServiceLockPaths { final List<String> resourceGroups = cache.getChildren(typePath); for (final String group : resourceGroups) { if (resourceGroupPredicate.test(group)) { - final List<String> servers = cache.getChildren(typePath + "/" + group); + final Collection<String> servers; + final Predicate<String> addressPredicate; + + if (addressSelector.getExactAddress() != null) { + var server = addressSelector.getExactAddress().toString(); + if (withLock || cache.get(typePath + "/" + group + "/" + server) != null) { + // When withLock is true the server in the list may not exist in zookeeper, if it does + // not exist then no lock will be found later when looking for a lock in zookeeper. + servers = List.of(server); + } else { + servers = List.of(); + } + addressPredicate = s -> true; + } else { + servers = cache.getChildren(typePath + "/" + group); + addressPredicate = addressSelector.getPredicate(); + } + for (final String server : servers) { if (addressPredicate.test(server)) { final ServiceLockPath slp = diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java index c9f9059b10..795ebfafed 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java @@ -52,6 +52,7 @@ import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.StoredTabletFile; @@ -633,8 +634,8 @@ public class TabletMetadata { public static synchronized Set<TServerInstance> getLiveTServers(ClientContext context) { final Set<TServerInstance> liveServers = new HashSet<>(); - for (ServiceLockPath slp : context.getServerPaths().getTabletServer(rg -> true, addr -> true, - true)) { + for (ServiceLockPath slp : context.getServerPaths().getTabletServer(rg -> true, + AddressSelector.all(), true)) { checkTabletServer(context, slp).ifPresent(liveServers::add); } diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java index 213e8ffec3..e7236a1365 100644 --- a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java @@ -39,7 +39,7 @@ import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; -import org.apache.accumulo.core.lock.ServiceLockPaths.AddressPredicate; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes.Exec; @@ -88,16 +88,19 @@ public interface TServerClient<C extends TServiceClient> { // correct one. HostAndPort hp = HostAndPort.fromString(debugHost); serverPaths.addAll( - context.getServerPaths().getCompactor(rg -> true, AddressPredicate.exact(hp), true)); + context.getServerPaths().getCompactor(rg -> true, AddressSelector.exact(hp), true)); serverPaths.addAll( - context.getServerPaths().getScanServer(rg -> true, AddressPredicate.exact(hp), true)); + context.getServerPaths().getScanServer(rg -> true, AddressSelector.exact(hp), true)); serverPaths.addAll( - context.getServerPaths().getTabletServer(rg -> true, AddressPredicate.exact(hp), true)); + context.getServerPaths().getTabletServer(rg -> true, AddressSelector.exact(hp), true)); } else { - serverPaths.addAll(context.getServerPaths().getTabletServer(rg -> true, addr -> true, true)); + serverPaths.addAll( + context.getServerPaths().getTabletServer(rg -> true, AddressSelector.all(), true)); if (type == ThriftClientTypes.CLIENT) { - serverPaths.addAll(context.getServerPaths().getCompactor(rg -> true, addr -> true, true)); - serverPaths.addAll(context.getServerPaths().getScanServer(rg -> true, addr -> true, true)); + serverPaths + .addAll(context.getServerPaths().getCompactor(rg -> true, AddressSelector.all(), true)); + serverPaths.addAll( + context.getServerPaths().getScanServer(rg -> true, AddressSelector.all(), true)); } if (serverPaths.isEmpty()) { if (warned.compareAndSet(false, true)) { diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java index 15c3a92019..3d5fc44576 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java @@ -40,6 +40,7 @@ import org.apache.accumulo.core.compaction.thrift.CompactorService; import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.rpc.ThriftUtil; @@ -114,7 +115,7 @@ public class ExternalCompactionUtil { */ public static Map<String,Set<HostAndPort>> getCompactorAddrs(ClientContext context) { final Map<String,Set<HostAndPort>> groupsAndAddresses = new HashMap<>(); - context.getServerPaths().getCompactor(rg -> true, addr -> true, true).forEach(slp -> { + context.getServerPaths().getCompactor(rg -> true, AddressSelector.all(), true).forEach(slp -> { groupsAndAddresses.computeIfAbsent(slp.getResourceGroup(), (k) -> new HashSet<>()) .add(HostAndPort.fromString(slp.getServer())); }); @@ -201,7 +202,7 @@ public class ExternalCompactionUtil { final ExecutorService executor = ThreadPools.getServerThreadPools() .getPoolBuilder(COMPACTOR_RUNNING_COMPACTIONS_POOL).numCoreThreads(16).build(); - context.getServerPaths().getCompactor(rg -> true, addr -> true, true).forEach(slp -> { + context.getServerPaths().getCompactor(rg -> true, AddressSelector.all(), true).forEach(slp -> { final HostAndPort hp = HostAndPort.fromString(slp.getServer()); rcFutures.add(new RunningCompactionFuture(slp, executor.submit(() -> getRunningCompaction(hp, context)))); @@ -229,7 +230,7 @@ public class ExternalCompactionUtil { .getPoolBuilder(COMPACTOR_RUNNING_COMPACTION_IDS_POOL).numCoreThreads(16).build(); List<Future<ExternalCompactionId>> futures = new ArrayList<>(); - context.getServerPaths().getCompactor(rg -> true, addr -> true, true).forEach(slp -> { + context.getServerPaths().getCompactor(rg -> true, AddressSelector.all(), true).forEach(slp -> { final HostAndPort hp = HostAndPort.fromString(slp.getServer()); futures.add(executor.submit(() -> getRunningCompactionId(hp, context))); }); @@ -254,7 +255,7 @@ public class ExternalCompactionUtil { public static int countCompactors(String groupName, ClientContext context) { var start = Timer.startNew(); int count = context.getServerPaths() - .getCompactor(rg -> rg.equals(groupName), addr -> true, true).size(); + .getCompactor(rg -> rg.equals(groupName), AddressSelector.all(), true).size(); long elapsed = start.elapsed(MILLISECONDS); if (elapsed > 100) { LOG.debug("Took {} ms to count {} compactors for {}", elapsed, count, groupName); diff --git a/core/src/test/java/org/apache/accumulo/core/clientImpl/ZookeeperLockCheckerTest.java b/core/src/test/java/org/apache/accumulo/core/clientImpl/ZookeeperLockCheckerTest.java index ed2b6aa3e4..e6c8affbd3 100644 --- a/core/src/test/java/org/apache/accumulo/core/clientImpl/ZookeeperLockCheckerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/clientImpl/ZookeeperLockCheckerTest.java @@ -53,10 +53,8 @@ public class ZookeeperLockCheckerTest { .anyTimes(); expect(zc.getChildren(context.getZooKeeperRoot() + Constants.ZTSERVERS)) .andReturn(List.of(Constants.DEFAULT_RESOURCE_GROUP_NAME)).anyTimes(); - expect(zc.getChildren(context.getZooKeeperRoot() + Constants.ZTSERVERS + "/" - + Constants.DEFAULT_RESOURCE_GROUP_NAME)).andReturn(List.of("server")).anyTimes(); - expect(zc.getChildren(context.getZooKeeperRoot() + Constants.ZTSERVERS + "/" - + Constants.DEFAULT_RESOURCE_GROUP_NAME + "/server")).andReturn(List.of()).anyTimes(); + expect(zc.get(context.getZooKeeperRoot() + Constants.ZTSERVERS + "/" + + Constants.DEFAULT_RESOURCE_GROUP_NAME + "/server")).andReturn(new byte[0]).anyTimes(); zc.clear(context.getZooKeeperRoot() + Constants.ZTSERVERS + "/" + Constants.DEFAULT_RESOURCE_GROUP_NAME + "/server"); replay(zc); diff --git a/core/src/test/java/org/apache/accumulo/core/lock/ServiceLockPathsTest.java b/core/src/test/java/org/apache/accumulo/core/lock/ServiceLockPathsTest.java index 27e1e795a2..e35cbd616b 100644 --- a/core/src/test/java/org/apache/accumulo/core/lock/ServiceLockPathsTest.java +++ b/core/src/test/java/org/apache/accumulo/core/lock/ServiceLockPathsTest.java @@ -45,7 +45,7 @@ import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; -import org.apache.accumulo.core.lock.ServiceLockPaths.AddressPredicate; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.easymock.EasyMock; import org.junit.jupiter.api.Test; @@ -382,11 +382,12 @@ public class ServiceLockPathsTest { () -> ctx.getServerPaths().getCompactor(null, null, true)); assertThrows(NullPointerException.class, () -> ctx.getServerPaths().getCompactor(rg -> rg.equals(TEST_RESOURCE_GROUP), null, true)); - assertTrue(ctx.getServerPaths().getCompactor(rg -> true, addr -> true, true).isEmpty()); + assertTrue( + ctx.getServerPaths().getCompactor(rg -> true, AddressSelector.all(), true).isEmpty()); assertTrue(ctx.getServerPaths() - .getCompactor(rg -> rg.equals(TEST_RESOURCE_GROUP), addr -> true, true).isEmpty()); + .getCompactor(rg -> rg.equals(TEST_RESOURCE_GROUP), AddressSelector.all(), true).isEmpty()); assertTrue(ctx.getServerPaths() - .getCompactor(rg -> rg.equals(TEST_RESOURCE_GROUP), AddressPredicate.exact(hp), true) + .getCompactor(rg -> rg.equals(TEST_RESOURCE_GROUP), AddressSelector.exact(hp), true) .isEmpty()); EasyMock.verify(ctx, zc); @@ -438,11 +439,16 @@ public class ServiceLockPathsTest { + HOSTNAME + "/" + svcLock1), EasyMock.isA(ZcStat.class))) .andReturn(sld2.serialize()).anyTimes(); EasyMock.expect(ctx.getServerPaths()).andReturn(new ServiceLockPaths(ctx)).anyTimes(); + EasyMock + .expect(zc.getChildren(ROOT + ZCOMPACTORS + "/" + TEST_RESOURCE_GROUP + "/localhost:1234")) + .andReturn(null).anyTimes(); + EasyMock.expect(zc.get(ROOT + ZCOMPACTORS + "/" + TEST_RESOURCE_GROUP + "/localhost:1234")) + .andReturn(null).anyTimes(); EasyMock.replay(ctx, zc); // query for all Set<ServiceLockPath> results = - ctx.getServerPaths().getCompactor(rg -> true, addr -> true, false); + ctx.getServerPaths().getCompactor(rg -> true, AddressSelector.all(), false); assertEquals(4, results.size()); for (ServiceLockPath path : results) { assertEquals(ZCOMPACTORS, path.getType()); @@ -459,7 +465,7 @@ public class ServiceLockPathsTest { } // query for all with locks - results = ctx.getServerPaths().getCompactor(rg -> true, addr -> true, true); + results = ctx.getServerPaths().getCompactor(rg -> true, AddressSelector.all(), true); assertEquals(2, results.size()); Iterator<ServiceLockPath> iter = results.iterator(); ServiceLockPath slp1 = iter.next(); @@ -489,12 +495,12 @@ public class ServiceLockPathsTest { // query for all in non-existent resource group results = ctx.getServerPaths().getCompactor(rg -> rg.equals("FAKE_RESOURCE_GROUP"), - addr -> true, true); + AddressSelector.all(), true); assertEquals(0, results.size()); // query for all in test resource group - results = - ctx.getServerPaths().getCompactor(rg -> rg.equals(TEST_RESOURCE_GROUP), addr -> true, true); + results = ctx.getServerPaths().getCompactor(rg -> rg.equals(TEST_RESOURCE_GROUP), + AddressSelector.all(), true); assertEquals(1, results.size()); iter = results.iterator(); slp1 = iter.next(); @@ -505,7 +511,7 @@ public class ServiceLockPathsTest { // query for a specific server results = ctx.getServerPaths().getCompactor(rg -> rg.equals(TEST_RESOURCE_GROUP), - AddressPredicate.exact(hp), true); + AddressSelector.exact(hp), true); assertEquals(1, results.size()); iter = results.iterator(); slp1 = iter.next(); @@ -515,9 +521,14 @@ public class ServiceLockPathsTest { assertEquals(ROOT + ZCOMPACTORS + "/" + TEST_RESOURCE_GROUP + "/" + HOSTNAME, slp1.toString()); // query for a wrong server - results = ctx.getServerPaths().getCompactor(rg -> rg.equals(TEST_RESOURCE_GROUP), - AddressPredicate.exact(HostAndPort.fromString("localhost:1234")), true); - assertEquals(0, results.size()); + for (boolean withLock : new boolean[] {true, false}) { + results = ctx.getServerPaths().getCompactor(rg -> rg.equals(TEST_RESOURCE_GROUP), + AddressSelector.exact(HostAndPort.fromString("localhost:1234")), withLock); + assertEquals(0, results.size()); + results = ctx.getServerPaths().getCompactor(rg -> rg.equals(TEST_RESOURCE_GROUP), + AddressSelector.matching(hp -> hp.equals("localhost:1234")), withLock); + assertEquals(0, results.size()); + } EasyMock.verify(ctx, zc); @@ -539,11 +550,13 @@ public class ServiceLockPathsTest { () -> ctx.getServerPaths().getScanServer(null, null, true)); assertThrows(NullPointerException.class, () -> ctx.getServerPaths().getScanServer(rg -> rg.equals(TEST_RESOURCE_GROUP), null, true)); - assertTrue(ctx.getServerPaths().getScanServer(rg -> true, addr -> true, true).isEmpty()); + assertTrue( + ctx.getServerPaths().getScanServer(rg -> true, AddressSelector.all(), true).isEmpty()); assertTrue(ctx.getServerPaths() - .getScanServer(rg -> rg.equals(TEST_RESOURCE_GROUP), addr -> true, true).isEmpty()); + .getScanServer(rg -> rg.equals(TEST_RESOURCE_GROUP), AddressSelector.all(), true) + .isEmpty()); assertTrue(ctx.getServerPaths() - .getScanServer(rg -> rg.equals(TEST_RESOURCE_GROUP), AddressPredicate.exact(hp), true) + .getScanServer(rg -> rg.equals(TEST_RESOURCE_GROUP), AddressSelector.exact(hp), true) .isEmpty()); EasyMock.verify(ctx, zc); @@ -594,11 +607,14 @@ public class ServiceLockPathsTest { ROOT + ZSSERVERS + "/" + DEFAULT_RESOURCE_GROUP_NAME + "/" + HOSTNAME + "/" + svcLock1), EasyMock.isA(ZcStat.class))).andReturn(sld2.serialize()).anyTimes(); EasyMock.expect(ctx.getServerPaths()).andReturn(new ServiceLockPaths(ctx)).anyTimes(); + EasyMock + .expect(zc.getChildren(ROOT + ZSSERVERS + "/" + TEST_RESOURCE_GROUP + "/localhost:1234")) + .andReturn(null).anyTimes(); EasyMock.replay(ctx, zc); // query for all Set<ServiceLockPath> results = - ctx.getServerPaths().getScanServer(rg -> true, addr -> true, false); + ctx.getServerPaths().getScanServer(rg -> true, AddressSelector.all(), false); assertEquals(4, results.size()); for (ServiceLockPath path : results) { assertEquals(ZSSERVERS, path.getType()); @@ -615,7 +631,7 @@ public class ServiceLockPathsTest { } // query for all with lock - results = ctx.getServerPaths().getScanServer(rg -> true, addr -> true, true); + results = ctx.getServerPaths().getScanServer(rg -> true, AddressSelector.all(), true); assertEquals(2, results.size()); Iterator<ServiceLockPath> iter = results.iterator(); ServiceLockPath slp1 = iter.next(); @@ -643,12 +659,12 @@ public class ServiceLockPathsTest { // query for all in non-existent resource group results = ctx.getServerPaths().getScanServer(rg -> rg.equals("FAKE_RESOURCE_GROUP"), - addr -> true, true); + AddressSelector.all(), true); assertEquals(0, results.size()); // query for all in test resource group - results = ctx.getServerPaths().getScanServer(rg -> rg.equals(TEST_RESOURCE_GROUP), addr -> true, - true); + results = ctx.getServerPaths().getScanServer(rg -> rg.equals(TEST_RESOURCE_GROUP), + AddressSelector.all(), true); assertEquals(1, results.size()); iter = results.iterator(); slp1 = iter.next(); @@ -659,7 +675,7 @@ public class ServiceLockPathsTest { // query for a specific server results = ctx.getServerPaths().getScanServer(rg -> rg.equals(TEST_RESOURCE_GROUP), - AddressPredicate.exact(hp), true); + AddressSelector.exact(hp), true); assertEquals(1, results.size()); iter = results.iterator(); slp1 = iter.next(); @@ -670,7 +686,7 @@ public class ServiceLockPathsTest { // query for a wrong server results = ctx.getServerPaths().getScanServer(rg -> rg.equals(TEST_RESOURCE_GROUP), - AddressPredicate.exact(HostAndPort.fromString("localhost:1234")), true); + AddressSelector.exact(HostAndPort.fromString("localhost:1234")), true); assertEquals(0, results.size()); EasyMock.verify(ctx, zc); @@ -693,11 +709,13 @@ public class ServiceLockPathsTest { () -> ctx.getServerPaths().getTabletServer(null, null, true)); assertThrows(NullPointerException.class, () -> ctx.getServerPaths() .getTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), null, true)); - assertTrue(ctx.getServerPaths().getTabletServer(rg -> true, addr -> true, true).isEmpty()); + assertTrue( + ctx.getServerPaths().getTabletServer(rg -> true, AddressSelector.all(), true).isEmpty()); assertTrue(ctx.getServerPaths() - .getTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), addr -> true, true).isEmpty()); + .getTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), AddressSelector.all(), true) + .isEmpty()); assertTrue(ctx.getServerPaths() - .getTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), AddressPredicate.exact(hp), true) + .getTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), AddressSelector.exact(hp), true) .isEmpty()); EasyMock.verify(ctx, zc); @@ -748,11 +766,14 @@ public class ServiceLockPathsTest { ROOT + ZTSERVERS + "/" + DEFAULT_RESOURCE_GROUP_NAME + "/" + HOSTNAME + "/" + svcLock1), EasyMock.isA(ZcStat.class))).andReturn(sld2.serialize()).anyTimes(); EasyMock.expect(ctx.getServerPaths()).andReturn(new ServiceLockPaths(ctx)).anyTimes(); + EasyMock + .expect(zc.getChildren(ROOT + ZTSERVERS + "/" + TEST_RESOURCE_GROUP + "/localhost:1234")) + .andReturn(null).anyTimes(); EasyMock.replay(ctx, zc); // query for all Set<ServiceLockPath> results = - ctx.getServerPaths().getTabletServer(rg -> true, addr -> true, false); + ctx.getServerPaths().getTabletServer(rg -> true, AddressSelector.all(), false); assertEquals(4, results.size()); for (ServiceLockPath path : results) { assertEquals(ZTSERVERS, path.getType()); @@ -769,7 +790,7 @@ public class ServiceLockPathsTest { } // query for all with lock - results = ctx.getServerPaths().getTabletServer(rg -> true, addr -> true, true); + results = ctx.getServerPaths().getTabletServer(rg -> true, AddressSelector.all(), true); assertEquals(2, results.size()); Iterator<ServiceLockPath> iter = results.iterator(); ServiceLockPath slp1 = iter.next(); @@ -797,12 +818,12 @@ public class ServiceLockPathsTest { // query for all in non-existent resource group results = ctx.getServerPaths().getTabletServer(rg -> rg.equals("FAKE_RESOURCE_GROUP"), - addr -> true, true); + AddressSelector.all(), true); assertEquals(0, results.size()); // query for all in test resource group results = ctx.getServerPaths().getTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), - addr -> true, true); + AddressSelector.all(), true); assertEquals(1, results.size()); iter = results.iterator(); slp1 = iter.next(); @@ -813,7 +834,7 @@ public class ServiceLockPathsTest { // query for a specific server results = ctx.getServerPaths().getTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), - AddressPredicate.exact(hp), true); + AddressSelector.exact(hp), true); assertEquals(1, results.size()); iter = results.iterator(); slp1 = iter.next(); @@ -824,7 +845,7 @@ public class ServiceLockPathsTest { // query for a wrong server results = ctx.getServerPaths().getTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), - AddressPredicate.exact(HostAndPort.fromString("localhost:1234")), true); + AddressSelector.exact(HostAndPort.fromString("localhost:1234")), true); assertEquals(0, results.size()); EasyMock.verify(ctx, zc); @@ -847,11 +868,14 @@ public class ServiceLockPathsTest { () -> ctx.getServerPaths().getDeadTabletServer(null, null, false)); assertThrows(NullPointerException.class, () -> ctx.getServerPaths() .getDeadTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), null, false)); - assertTrue(ctx.getServerPaths().getDeadTabletServer(rg -> true, addr -> true, false).isEmpty()); + assertTrue(ctx.getServerPaths().getDeadTabletServer(rg -> true, AddressSelector.all(), false) + .isEmpty()); assertTrue(ctx.getServerPaths() - .getDeadTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), addr -> true, false).isEmpty()); - assertTrue(ctx.getServerPaths().getDeadTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), - AddressPredicate.exact(hp), false).isEmpty()); + .getDeadTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), AddressSelector.all(), false) + .isEmpty()); + assertTrue(ctx.getServerPaths() + .getDeadTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), AddressSelector.exact(hp), false) + .isEmpty()); EasyMock.verify(ctx, zc); @@ -882,6 +906,8 @@ public class ServiceLockPathsTest { EasyMock .expect(zc.getChildren(ROOT + ZDEADTSERVERS + "/" + TEST_RESOURCE_GROUP + "/" + HOSTNAME)) .andReturn(List.of(svcLock1, svcLock2)).anyTimes(); + EasyMock.expect(zc.get(ROOT + ZDEADTSERVERS + "/" + TEST_RESOURCE_GROUP + "/" + HOSTNAME)) + .andReturn(new byte[0]).anyTimes(); EasyMock .expect(zc .getChildren(ROOT + ZDEADTSERVERS + "/" + DEFAULT_RESOURCE_GROUP_NAME + "/" + HOSTNAME)) @@ -895,11 +921,13 @@ public class ServiceLockPathsTest { + HOSTNAME + "/" + svcLock1), EasyMock.isA(ZcStat.class))) .andReturn(sld2.serialize()).anyTimes(); EasyMock.expect(ctx.getServerPaths()).andReturn(new ServiceLockPaths(ctx)).anyTimes(); + EasyMock.expect(zc.get(ROOT + ZDEADTSERVERS + "/" + TEST_RESOURCE_GROUP + "/localhost:1234")) + .andReturn(null).anyTimes(); EasyMock.replay(ctx, zc); // query for all Set<ServiceLockPath> results = - ctx.getServerPaths().getDeadTabletServer(rg -> true, addr -> true, false); + ctx.getServerPaths().getDeadTabletServer(rg -> true, AddressSelector.all(), false); assertEquals(2, results.size()); Iterator<ServiceLockPath> iter = results.iterator(); ServiceLockPath slp1 = iter.next(); @@ -929,12 +957,12 @@ public class ServiceLockPathsTest { // query for all in non-existent resource group results = ctx.getServerPaths().getDeadTabletServer(rg -> rg.equals("FAKE_RESOURCE_GROUP"), - addr -> true, false); + AddressSelector.all(), false); assertEquals(0, results.size()); // query for all in test resource group results = ctx.getServerPaths().getDeadTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), - addr -> true, false); + AddressSelector.all(), false); assertEquals(1, results.size()); iter = results.iterator(); slp1 = iter.next(); @@ -946,7 +974,7 @@ public class ServiceLockPathsTest { // query for a specific server results = ctx.getServerPaths().getDeadTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), - AddressPredicate.exact(hp), false); + AddressSelector.exact(hp), false); assertEquals(1, results.size()); iter = results.iterator(); slp1 = iter.next(); @@ -958,7 +986,7 @@ public class ServiceLockPathsTest { // query for a wrong server results = ctx.getServerPaths().getDeadTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), - AddressPredicate.exact(HostAndPort.fromString("localhost:1234")), false); + AddressSelector.exact(HostAndPort.fromString("localhost:1234")), false); assertEquals(0, results.size()); EasyMock.verify(ctx, zc); diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java index 5fe5656147..e8a55081c3 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java @@ -87,6 +87,7 @@ import org.apache.accumulo.core.lock.ServiceLock.AccumuloLockWatcher; import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.apache.accumulo.core.manager.thrift.ManagerGoalState; import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo; @@ -862,7 +863,7 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster { int tsActualCount = 0; while (tsActualCount < tsExpectedCount) { Set<ServiceLockPath> tservers = - context.getServerPaths().getTabletServer(rg -> true, addr -> true, true); + context.getServerPaths().getTabletServer(rg -> true, AddressSelector.all(), true); tsActualCount = tservers.size(); log.info(tsActualCount + " of " + tsExpectedCount + " tablet servers present in ZooKeeper"); Thread.sleep(500); @@ -871,7 +872,7 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster { int ssActualCount = 0; while (ssActualCount < ssExpectedCount) { Set<ServiceLockPath> tservers = - context.getServerPaths().getScanServer(rg -> true, addr -> true, true); + context.getServerPaths().getScanServer(rg -> true, AddressSelector.all(), true); ssActualCount = tservers.size(); log.info(ssActualCount + " of " + ssExpectedCount + " scan servers present in ZooKeeper"); Thread.sleep(500); @@ -880,7 +881,7 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster { int ecActualCount = 0; while (ecActualCount < ecExpectedCount) { Set<ServiceLockPath> compactors = - context.getServerPaths().getCompactor(rg -> true, addr -> true, true); + context.getServerPaths().getCompactor(rg -> true, AddressSelector.all(), true); ecActualCount = compactors.size(); log.info(ecActualCount + " of " + ecExpectedCount + " compactors present in ZooKeeper"); Thread.sleep(500); diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java index 302e1b1161..b6e51b412f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java @@ -40,7 +40,7 @@ import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockPaths; -import org.apache.accumulo.core.lock.ServiceLockPaths.AddressPredicate; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.lock.ServiceLockPaths.ResourceGroupPredicate; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.apache.accumulo.core.manager.thrift.TabletServerStatus; @@ -232,7 +232,7 @@ public class LiveTServerSet implements Watcher { final Set<TServerInstance> updates = new HashSet<>(); final Set<TServerInstance> doomed = new HashSet<>(); final Set<ServiceLockPath> tservers = - context.getServerPaths().getTabletServer(rg -> true, addr -> true, false); + context.getServerPaths().getTabletServer(rg -> true, AddressSelector.all(), false); locklessServers.keySet().retainAll(tservers); @@ -465,7 +465,8 @@ public class LiveTServerSet implements Watcher { ResourceGroupPredicate rgp = rg2 -> rg.equals(rg2); return rgp; }).orElse(rg -> true); - AddressPredicate addrPredicate = address.map(AddressPredicate::exact).orElse(addr -> true); + AddressSelector addrPredicate = + address.map(AddressSelector::exact).orElse(AddressSelector.all()); Set<ServiceLockPath> paths = context.getServerPaths().getTabletServer(rgPredicate, addrPredicate, false); if (paths.isEmpty() || paths.size() > 1) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/DeadServerList.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/DeadServerList.java index c204c04d05..966167d527 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/DeadServerList.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/DeadServerList.java @@ -28,6 +28,7 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.apache.accumulo.core.manager.thrift.DeadServer; import org.apache.accumulo.server.ServerContext; @@ -68,7 +69,7 @@ public class DeadServerList { List<DeadServer> result = new ArrayList<>(); try { Set<ServiceLockPath> deadServers = - ctx.getServerPaths().getDeadTabletServer(rg -> true, addr -> true, false); + ctx.getServerPaths().getDeadTabletServer(rg -> true, AddressSelector.all(), false); for (ServiceLockPath path : deadServers) { Stat stat = new Stat(); byte[] data; diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/AccumuloStatus.java b/server/base/src/main/java/org/apache/accumulo/server/util/AccumuloStatus.java index 9b3229d4d6..f37fea7d0d 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/AccumuloStatus.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/AccumuloStatus.java @@ -21,6 +21,7 @@ package org.apache.accumulo.server.util; import java.util.Set; import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; public class AccumuloStatus { @@ -32,7 +33,7 @@ public class AccumuloStatus { */ public static boolean isAccumuloOffline(ClientContext context) { Set<ServiceLockPath> tservers = - context.getServerPaths().getTabletServer(rg -> true, addr -> true, true); + context.getServerPaths().getTabletServer(rg -> true, AddressSelector.all(), true); if (!tservers.isEmpty()) { return false; } diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java index 66b2601b13..ace5a00dba 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java @@ -78,7 +78,7 @@ import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.lock.ServiceLock; -import org.apache.accumulo.core.lock.ServiceLockPaths; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.apache.accumulo.core.manager.thrift.FateService; import org.apache.accumulo.core.manager.thrift.TFateId; @@ -681,8 +681,8 @@ public class Admin implements KeywordExecutable { static String qualifyWithZooKeeperSessionId(ClientContext context, ZooCache zooCache, String hostAndPort) { var hpObj = HostAndPort.fromString(hostAndPort); - Set<ServiceLockPath> paths = context.getServerPaths().getTabletServer(rg -> true, - ServiceLockPaths.AddressPredicate.exact(hpObj), true); + Set<ServiceLockPath> paths = + context.getServerPaths().getTabletServer(rg -> true, AddressSelector.exact(hpObj), true); if (paths.size() != 1) { return hostAndPort; } diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ServiceStatusCmd.java b/server/base/src/main/java/org/apache/accumulo/server/util/ServiceStatusCmd.java index 70cf41f254..f950975c80 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ServiceStatusCmd.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ServiceStatusCmd.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.accumulo.core.fate.zookeeper.ZooReader; import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.server.ServerContext; @@ -111,7 +112,7 @@ public class ServiceStatusCmd { final AtomicInteger errors = new AtomicInteger(0); final Map<String,Set<String>> hostsByGroups = new TreeMap<>(); final Set<ServiceLockPath> compactors = - context.getServerPaths().getTabletServer(rg -> true, addr -> true, true); + context.getServerPaths().getTabletServer(rg -> true, AddressSelector.all(), true); compactors.forEach(c -> hostsByGroups .computeIfAbsent(c.getResourceGroup(), (k) -> new TreeSet<>()).add(c.getServer())); return new StatusSummary(ServiceStatusReport.ReportKey.T_SERVER, hostsByGroups.keySet(), @@ -128,7 +129,7 @@ public class ServiceStatusCmd { final AtomicInteger errors = new AtomicInteger(0); final Map<String,Set<String>> hostsByGroups = new TreeMap<>(); final Set<ServiceLockPath> scanServers = - context.getServerPaths().getScanServer(rg -> true, addr -> true, true); + context.getServerPaths().getScanServer(rg -> true, AddressSelector.all(), true); scanServers.forEach(c -> hostsByGroups .computeIfAbsent(c.getResourceGroup(), (k) -> new TreeSet<>()).add(c.getServer())); return new StatusSummary(ServiceStatusReport.ReportKey.S_SERVER, hostsByGroups.keySet(), @@ -155,7 +156,7 @@ public class ServiceStatusCmd { final AtomicInteger errors = new AtomicInteger(0); final Map<String,Set<String>> hostsByGroups = new TreeMap<>(); final Set<ServiceLockPath> compactors = - context.getServerPaths().getCompactor(rg -> true, addr -> true, true); + context.getServerPaths().getCompactor(rg -> true, AddressSelector.all(), true); compactors.forEach(c -> hostsByGroups .computeIfAbsent(c.getResourceGroup(), (k) -> new TreeSet<>()).add(c.getServer())); return new StatusSummary(ServiceStatusReport.ReportKey.COMPACTOR, hostsByGroups.keySet(), diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java b/server/base/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java index d46ffdb840..369be4bee3 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java @@ -26,7 +26,7 @@ import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; -import org.apache.accumulo.core.lock.ServiceLockPaths; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.apache.accumulo.server.ServerContext; @@ -43,7 +43,7 @@ public class TabletServerLocks { if (delete == null) { Set<ServiceLockPath> tabletServers = - context.getServerPaths().getTabletServer(rg -> true, addr -> true, false); + context.getServerPaths().getTabletServer(rg -> true, AddressSelector.all(), false); if (tabletServers.isEmpty()) { System.err.println("No tservers found in ZK"); } @@ -65,7 +65,7 @@ public class TabletServerLocks { } else { var hostAndPort = HostAndPort.fromString(lock); Set<ServiceLockPath> paths = context.getServerPaths().getTabletServer(rg -> true, - ServiceLockPaths.AddressPredicate.exact(hostAndPort), true); + AddressSelector.exact(hostAndPort), true); Preconditions.checkArgument(paths.size() == 1, lock + " does not match a single ZooKeeper TabletServer lock. matches=" + paths); ServiceLockPath path = paths.iterator().next(); diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java index 26d0e322dd..473e8a1171 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java @@ -28,6 +28,7 @@ import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.apache.accumulo.core.singletons.SingletonManager; import org.apache.accumulo.core.singletons.SingletonManager.Mode; @@ -110,7 +111,7 @@ public class ZooZap implements KeywordExecutable { if (opts.zapTservers) { try { Set<ServiceLockPath> tserverLockPaths = - context.getServerPaths().getTabletServer(rg -> true, addr -> true, false); + context.getServerPaths().getTabletServer(rg -> true, AddressSelector.all(), false); for (ServiceLockPath tserverPath : tserverLockPaths) { message("Deleting " + tserverPath + " from zookeeper", opts); @@ -132,7 +133,7 @@ public class ZooZap implements KeywordExecutable { if (opts.zapCompactors) { Set<ServiceLockPath> compactorLockPaths = - context.getServerPaths().getCompactor(rg -> true, addr -> true, false); + context.getServerPaths().getCompactor(rg -> true, AddressSelector.all(), false); Set<String> compactorResourceGroupPaths = new HashSet<>(); compactorLockPaths.forEach(p -> compactorResourceGroupPaths .add(p.toString().substring(0, p.toString().lastIndexOf('/')))); @@ -150,7 +151,7 @@ public class ZooZap implements KeywordExecutable { if (opts.zapScanServers) { try { Set<ServiceLockPath> sserverLockPaths = - context.getServerPaths().getScanServer(rg -> true, addr -> true, false); + context.getServerPaths().getScanServer(rg -> true, AddressSelector.all(), false); for (ServiceLockPath sserverPath : sserverLockPaths) { message("Deleting " + sserverPath + " from zookeeper", opts); if (!zoo.getChildren(sserverPath.toString()).isEmpty()) { diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java index f8ce2fadca..d861bf3781 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/util/AdminTest.java @@ -131,7 +131,6 @@ public class AdminTest { EasyMock.expect(ctx.getZooKeeperRoot()).andReturn("/accumulo/id").anyTimes(); EasyMock.expect(ctx.getZooCache()).andReturn(zc).anyTimes(); EasyMock.expect(zc.getChildren(type)).andReturn(List.of(Constants.DEFAULT_RESOURCE_GROUP_NAME)); - EasyMock.expect(zc.getChildren(group)).andReturn(List.of(server)); EasyMock.expect(zc.getChildren(serverPath)).andReturn(Collections.emptyList()); EasyMock.expect(ctx.getServerPaths()).andReturn(new ServiceLockPaths(ctx)).anyTimes(); EasyMock.replay(ctx, zc); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 8118b2d57c..20e72ca3af 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -88,6 +88,7 @@ import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor; import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.apache.accumulo.core.manager.balancer.AssignmentParamsImpl; import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl; @@ -645,7 +646,7 @@ public class Manager extends AbstractServer while (stillManager()) { try { Set<ServiceLockPath> scanServerPaths = - getContext().getServerPaths().getScanServer(rg -> true, addr -> true, false); + getContext().getServerPaths().getScanServer(rg -> true, AddressSelector.all(), false); for (ServiceLockPath path : scanServerPaths) { ZcStat stat = new ZcStat(); diff --git a/test/src/main/java/org/apache/accumulo/test/RecoveryIT.java b/test/src/main/java/org/apache/accumulo/test/RecoveryIT.java index 2b245a6ce7..a631cf642a 100644 --- a/test/src/main/java/org/apache/accumulo/test/RecoveryIT.java +++ b/test/src/main/java/org/apache/accumulo/test/RecoveryIT.java @@ -38,6 +38,7 @@ import org.apache.accumulo.core.client.admin.TabletAvailability; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.spi.balancer.TableLoadBalancer; @@ -143,11 +144,11 @@ public class RecoveryIT extends AccumuloClusterHarness { // Stop any running Compactors and ScanServers control.stopAllServers(ServerType.COMPACTOR); Wait.waitFor(() -> getServerContext().getServerPaths() - .getCompactor(rg -> true, addr -> true, true).isEmpty(), 60_000); + .getCompactor(rg -> true, AddressSelector.all(), true).isEmpty(), 60_000); control.stopAllServers(ServerType.SCAN_SERVER); Wait.waitFor(() -> getServerContext().getServerPaths() - .getScanServer(rg -> true, addr -> true, true).size() == 0, 60_000); + .getScanServer(rg -> true, AddressSelector.all(), true).size() == 0, 60_000); // Kill the TabletServer in resource group that is hosting the table List<Process> procs = control.getTabletServers(RESOURCE_GROUP); diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerConcurrentTabletScanIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerConcurrentTabletScanIT.java index f80b6364fd..4f1deb88f7 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerConcurrentTabletScanIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerConcurrentTabletScanIT.java @@ -39,6 +39,7 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; @@ -92,7 +93,7 @@ public class ScanServerConcurrentTabletScanIT extends SharedMiniClusterBase { 1, null); Wait.waitFor(() -> !getCluster().getServerContext().getServerPaths() - .getScanServer(rg -> true, addr -> true, true).isEmpty()); + .getScanServer(rg -> true, AddressSelector.all(), true).isEmpty()); } diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java index 91e6868665..fcbc4a2fe6 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java @@ -29,6 +29,7 @@ import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.spi.scan.ScanServerSelector; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; @@ -124,7 +125,7 @@ public class ScanServerGroupConfigurationIT extends SharedMiniClusterBase { // Ensure no scan servers running Wait.waitFor(() -> getCluster().getServerContext().getServerPaths() - .getScanServer(rg -> true, addr -> true, true).isEmpty()); + .getScanServer(rg -> true, AddressSelector.all(), true).isEmpty()); try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { final String tableName = getUniqueNames(1)[0]; @@ -146,10 +147,10 @@ public class ScanServerGroupConfigurationIT extends SharedMiniClusterBase { // Start a ScanServer. No group specified, should be in the default group. getCluster().getClusterControl().start(ServerType.SCAN_SERVER, "localhost"); Wait.waitFor(() -> getCluster().getServerContext().getServerPaths() - .getScanServer(rg -> true, addr -> true, true).size() == 1, 30_000); + .getScanServer(rg -> true, AddressSelector.all(), true).size() == 1, 30_000); Wait.waitFor(() -> getCluster().getServerContext().getServerPaths() .getScanServer(rg -> rg.equals(ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME), - addr -> true, true) + AddressSelector.all(), true) .size() > 0); assertEquals(ingestedEntryCount, Iterables.size(scanner), @@ -165,13 +166,13 @@ public class ScanServerGroupConfigurationIT extends SharedMiniClusterBase { .addScanServerResourceGroup("GROUP1", 1); getCluster().getClusterControl().start(ServerType.SCAN_SERVER); Wait.waitFor(() -> getCluster().getServerContext().getServerPaths() - .getScanServer(rg -> true, addr -> true, true).size() == 2, 30_000); + .getScanServer(rg -> true, AddressSelector.all(), true).size() == 2, 30_000); Wait.waitFor(() -> getCluster().getServerContext().getServerPaths() .getScanServer(rg -> rg.equals(ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME), - addr -> true, true) + AddressSelector.all(), true) .size() == 1); Wait.waitFor(() -> getCluster().getServerContext().getServerPaths() - .getScanServer(rg -> rg.equals("GROUP1"), addr -> true, true).size() == 1); + .getScanServer(rg -> rg.equals("GROUP1"), AddressSelector.all(), true).size() == 1); scanner.setExecutionHints(Map.of("scan_type", "use_group1")); assertEquals(ingestedEntryCount + additionalIngest1, Iterables.size(scanner), diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java index 0bfa121a9e..690ee6d15f 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java @@ -59,6 +59,7 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.Ample.TabletsMutator; @@ -115,7 +116,7 @@ public class ScanServerIT extends SharedMiniClusterBase { "localhost"); Wait.waitFor(() -> !getCluster().getServerContext().getServerPaths() - .getScanServer(rg -> true, addr -> true, true).isEmpty()); + .getScanServer(rg -> true, AddressSelector.all(), true).isEmpty()); } @AfterAll diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java index 176fc7dd66..81e133ad45 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java @@ -46,6 +46,7 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.gc.Reference; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.ScanServerRefTabletFile; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; @@ -91,7 +92,7 @@ public class ScanServerMetadataEntriesIT extends SharedMiniClusterBase { "localhost"); Wait.waitFor(() -> !getCluster().getServerContext().getServerPaths() - .getScanServer(rg -> true, addr -> true, true).isEmpty()); + .getScanServer(rg -> true, AddressSelector.all(), true).isEmpty()); } diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerMultipleScansIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerMultipleScansIT.java index 4c2e37a18e..063b9c5529 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerMultipleScansIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerMultipleScansIT.java @@ -46,6 +46,7 @@ import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; @@ -89,7 +90,7 @@ public class ScanServerMultipleScansIT extends SharedMiniClusterBase { "localhost"); Wait.waitFor(() -> !getCluster().getServerContext().getServerPaths() - .getScanServer(rg -> true, addr -> true, true).isEmpty()); + .getScanServer(rg -> true, AddressSelector.all(), true).isEmpty()); } @AfterAll diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java index 52a831a502..59fba1b2ad 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java @@ -35,6 +35,7 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; @@ -85,8 +86,8 @@ public class ScanServerShutdownIT extends SharedMiniClusterBase { ServerContext ctx = getCluster().getServerContext(); - Wait.waitFor( - () -> !ctx.getServerPaths().getScanServer(rg -> true, addr -> true, true).isEmpty()); + Wait.waitFor(() -> !ctx.getServerPaths().getScanServer(rg -> true, AddressSelector.all(), true) + .isEmpty()); try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { final String tableName = getUniqueNames(1)[0]; diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java index d569c4f511..d7d923cddf 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java @@ -56,6 +56,7 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVWriter; import org.apache.accumulo.core.file.rfile.RFile; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.metadata.UnreferencedTabletFile; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; @@ -119,7 +120,7 @@ public class CompactionPriorityQueueMetricsIT extends SharedMiniClusterBase { public void setupMetricsTest() throws Exception { getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); Wait.waitFor(() -> getCluster().getServerContext().getServerPaths() - .getCompactor(rg -> true, addr -> true, true).isEmpty(), 60_000); + .getCompactor(rg -> true, AddressSelector.all(), true).isEmpty(), 60_000); try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { tableName = getUniqueNames(1)[0]; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java index 7676350790..491fc7a2b4 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java @@ -64,6 +64,7 @@ import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.iterators.IteratorUtil; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; @@ -101,7 +102,7 @@ public abstract class FateOpsCommandsIT extends ConfigurableMacBase // this issue. getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); Wait.waitFor(() -> getServerContext().getServerPaths() - .getCompactor(rg -> true, addr -> true, true).isEmpty(), 60_000); + .getCompactor(rg -> true, AddressSelector.all(), true).isEmpty(), 60_000); } @Test diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java index 5464bbcf0e..87cf984f90 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java @@ -37,6 +37,7 @@ import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; @@ -139,7 +140,8 @@ public class MemoryStarvedMajCIT extends SharedMiniClusterBase { ClientContext ctx = (ClientContext) client; Wait.waitFor(() -> ctx.getServerPaths() - .getCompactor(rg -> rg.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME), addr -> true, true) + .getCompactor(rg -> rg.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME), + AddressSelector.all(), true) .size() == 1, 60_000); ServerId csi = ctx.instanceOperations().getServers(ServerId.Type.COMPACTOR).iterator().next(); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java index 6f219be23f..1320a6018f 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java @@ -54,6 +54,7 @@ import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.iterators.WrappingIterator; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; @@ -200,7 +201,7 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase { final ZooCache zc = context.getZooCache(); Set<ServiceLockPath> servers = - context.getServerPaths().getTabletServer(rg -> true, addr -> true, true); + context.getServerPaths().getTabletServer(rg -> true, AddressSelector.all(), true); for (ServiceLockPath server : servers) { Optional<ServiceLockData> data = zc.getLockData(server); if (data != null && data.isPresent()) { diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java index 02f2ede2b3..3d44ede9e0 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java @@ -68,6 +68,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.manager.state.tables.TableState; @@ -576,7 +577,7 @@ public class TabletManagementIteratorIT extends AccumuloClusterHarness { HashSet<TServerInstance> tservers = new HashSet<>(); for (ServiceLockPath tserver : context.getServerPaths().getTabletServer(rg -> true, - addr -> true, true)) { + AddressSelector.all(), true)) { try { long sessionId = ServiceLock.getSessionId(context.getZooCache(), tserver); tservers.add(new TServerInstance(tserver.getServer(), sessionId)); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TabletResourceGroupBalanceIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TabletResourceGroupBalanceIT.java index ce8d3fee0f..52ee70ea48 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/TabletResourceGroupBalanceIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletResourceGroupBalanceIT.java @@ -44,6 +44,7 @@ import org.apache.accumulo.core.clientImpl.ClientTabletCache.LocationNeed; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.schema.Ample; @@ -97,7 +98,7 @@ public class TabletResourceGroupBalanceIT extends SharedMiniClusterBase { Map<String,String> tservers = new HashMap<>(); for (ServiceLockPath tserver : cluster.getServerContext().getServerPaths() - .getTabletServer(rg -> true, addr -> true, true)) { + .getTabletServer(rg -> true, AddressSelector.all(), true)) { tservers.put(tserver.getServer(), tserver.getResourceGroup()); } return tservers; diff --git a/test/src/main/java/org/apache/accumulo/test/lock/ServiceLockPathsIT.java b/test/src/main/java/org/apache/accumulo/test/lock/ServiceLockPathsIT.java index dd2e2d079f..36b5a7af00 100644 --- a/test/src/main/java/org/apache/accumulo/test/lock/ServiceLockPathsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/lock/ServiceLockPathsIT.java @@ -25,6 +25,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.lock.ServiceLockPaths; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; @@ -51,40 +52,49 @@ public class ServiceLockPathsIT extends AccumuloClusterHarness { assertNotNull(paths.getGarbageCollector(true)); assertNotNull(paths.getManager(true)); assertNull(paths.getMonitor(true)); // monitor not started - assertEquals(2, paths.getTabletServer(rg -> true, addr -> true, true).size()); - assertEquals(1, paths - .getTabletServer(rg -> rg.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME), addr -> true, true) - .size()); - assertEquals(1, paths.getTabletServer(rg -> rg.equals("TTEST"), addr -> true, true).size()); - assertEquals(0, paths.getTabletServer(rg -> rg.equals("FAKE"), addr -> true, true).size()); - assertEquals(0, paths.getTabletServer(rg -> rg.equals("CTEST"), addr -> true, true).size()); - assertEquals(0, paths.getTabletServer(rg -> rg.equals("STEST"), addr -> true, true).size()); - - assertEquals(4, paths.getCompactor(rg -> true, addr -> true, true).size()); - assertEquals(1, paths - .getCompactor(rg -> rg.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME), addr -> true, true) - .size()); - assertEquals(3, paths.getCompactor(rg -> rg.equals("CTEST"), addr -> true, true).size()); - assertEquals(0, paths.getCompactor(rg -> rg.equals("FAKE"), addr -> true, true).size()); - assertEquals(0, paths.getCompactor(rg -> rg.equals("TTEST"), addr -> true, true).size()); - assertEquals(0, paths.getCompactor(rg -> rg.equals("STEST"), addr -> true, true).size()); - - assertEquals(3, paths.getScanServer(rg -> true, addr -> true, true).size()); - assertEquals(1, paths - .getScanServer(rg -> rg.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME), addr -> true, true) - .size()); - assertEquals(2, paths.getScanServer(rg -> rg.equals("STEST"), addr -> true, true).size()); - assertEquals(0, paths.getScanServer(rg -> rg.equals("FAKE"), addr -> true, true).size()); - assertEquals(0, paths.getScanServer(rg -> rg.equals("CTEST"), addr -> true, true).size()); - assertEquals(0, paths.getScanServer(rg -> rg.equals("TTEST"), addr -> true, true).size()); + assertEquals(2, paths.getTabletServer(rg -> true, AddressSelector.all(), true).size()); + assertEquals(1, paths.getTabletServer(rg -> rg.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME), + AddressSelector.all(), true).size()); + assertEquals(1, + paths.getTabletServer(rg -> rg.equals("TTEST"), AddressSelector.all(), true).size()); + assertEquals(0, + paths.getTabletServer(rg -> rg.equals("FAKE"), AddressSelector.all(), true).size()); + assertEquals(0, + paths.getTabletServer(rg -> rg.equals("CTEST"), AddressSelector.all(), true).size()); + assertEquals(0, + paths.getTabletServer(rg -> rg.equals("STEST"), AddressSelector.all(), true).size()); + + assertEquals(4, paths.getCompactor(rg -> true, AddressSelector.all(), true).size()); + assertEquals(1, paths.getCompactor(rg -> rg.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME), + AddressSelector.all(), true).size()); + assertEquals(3, + paths.getCompactor(rg -> rg.equals("CTEST"), AddressSelector.all(), true).size()); + assertEquals(0, + paths.getCompactor(rg -> rg.equals("FAKE"), AddressSelector.all(), true).size()); + assertEquals(0, + paths.getCompactor(rg -> rg.equals("TTEST"), AddressSelector.all(), true).size()); + assertEquals(0, + paths.getCompactor(rg -> rg.equals("STEST"), AddressSelector.all(), true).size()); + + assertEquals(3, paths.getScanServer(rg -> true, AddressSelector.all(), true).size()); + assertEquals(1, paths.getScanServer(rg -> rg.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME), + AddressSelector.all(), true).size()); + assertEquals(2, + paths.getScanServer(rg -> rg.equals("STEST"), AddressSelector.all(), true).size()); + assertEquals(0, + paths.getScanServer(rg -> rg.equals("FAKE"), AddressSelector.all(), true).size()); + assertEquals(0, + paths.getScanServer(rg -> rg.equals("CTEST"), AddressSelector.all(), true).size()); + assertEquals(0, + paths.getScanServer(rg -> rg.equals("TTEST"), AddressSelector.all(), true).size()); getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); - Wait.waitFor(() -> paths.getCompactor(rg -> true, addr -> true, true).size() == 0); + Wait.waitFor(() -> paths.getCompactor(rg -> true, AddressSelector.all(), true).size() == 0); getCluster().getClusterControl().stopAllServers(ServerType.SCAN_SERVER); - Wait.waitFor(() -> paths.getScanServer(rg -> true, addr -> true, true).size() == 0); + Wait.waitFor(() -> paths.getScanServer(rg -> true, AddressSelector.all(), true).size() == 0); getCluster().getClusterControl().stopAllServers(ServerType.GARBAGE_COLLECTOR); @@ -96,8 +106,8 @@ public class ServiceLockPathsIT extends AccumuloClusterHarness { getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER); - Wait.waitFor(() -> paths.getTabletServer(rg -> true, addr -> true, true).size() == 0); - Wait.waitFor(() -> paths.getTabletServer(rg -> true, addr -> true, false).size() == 2); + Wait.waitFor(() -> paths.getTabletServer(rg -> true, AddressSelector.all(), true).size() == 0); + Wait.waitFor(() -> paths.getTabletServer(rg -> true, AddressSelector.all(), false).size() == 2); }