This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new a3c9bf7800 updated ServiceLockPaths to use Predicates (#4943) a3c9bf7800 is described below commit a3c9bf78003259f7363f2392a94d84668a2c0f80 Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri Oct 4 09:37:09 2024 -0400 updated ServiceLockPaths to use Predicates (#4943) Updated ServiceLockPaths to use Predicates for resource group and hostname matching. Hoping this can lead to more advanced and efficient server filtering in the shell and monitor. --- .../accumulo/core/clientImpl/ClientContext.java | 2 +- .../core/clientImpl/InstanceOperationsImpl.java | 3 +- .../core/clientImpl/ZookeeperLockChecker.java | 15 +-- .../accumulo/core/lock/ServiceLockPaths.java | 55 +++++----- .../core/metadata/schema/TabletMetadata.java | 4 +- .../accumulo/core/rpc/clients/TServerClient.java | 9 +- .../util/compaction/ExternalCompactionUtil.java | 4 +- .../accumulo/core/lock/ServiceLockPathsTest.java | 116 ++++++++++----------- .../miniclusterImpl/MiniAccumuloClusterImpl.java | 7 +- .../accumulo/server/manager/LiveTServerSet.java | 14 ++- .../server/manager/state/DeadServerList.java | 3 +- .../accumulo/server/util/AccumuloStatus.java | 3 +- .../org/apache/accumulo/server/util/Admin.java | 5 +- .../accumulo/server/util/ServiceStatusCmd.java | 7 +- .../accumulo/server/util/TabletServerLocks.java | 7 +- .../org/apache/accumulo/server/util/ZooZap.java | 7 +- .../java/org/apache/accumulo/manager/Manager.java | 4 +- .../test/ScanServerConcurrentTabletScanIT.java | 3 +- .../test/ScanServerGroupConfigurationIT.java | 7 +- .../org/apache/accumulo/test/ScanServerIT.java | 3 +- .../accumulo/test/ScanServerMetadataEntriesIT.java | 3 +- .../accumulo/test/ScanServerMultipleScansIT.java | 3 +- .../apache/accumulo/test/ScanServerShutdownIT.java | 5 +- .../test/functional/MemoryStarvedScanIT.java | 2 +- .../functional/TabletManagementIteratorIT.java | 5 +- .../functional/TabletResourceGroupBalanceIT.java | 3 +- .../accumulo/test/lock/ServiceLockPathsIT.java | 47 ++++----- 27 files changed, 171 insertions(+), 175 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index 83fdd4a0c9..1369094328 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@ -406,7 +406,7 @@ public class ClientContext implements AccumuloClient { public Map<String,Pair<UUID,String>> getScanServers() { Map<String,Pair<UUID,String>> liveScanServers = new HashMap<>(); Set<ServiceLockPath> scanServerPaths = - getServerPaths().getScanServer(Optional.empty(), Optional.empty(), true); + getServerPaths().getScanServer(rg -> true, addr -> true, true); for (ServiceLockPath path : scanServerPaths) { try { ZcStat stat = new ZcStat(); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java index a6e7d25fbc..8ea74e90bb 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java @@ -34,7 +34,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -236,7 +235,7 @@ public class InstanceOperationsImpl implements InstanceOperations { @Override public List<String> getTabletServers() { Set<ServiceLockPath> paths = - context.getServerPaths().getTabletServer(Optional.empty(), Optional.empty(), true); + context.getServerPaths().getTabletServer(rg -> true, addr -> true, true); List<String> results = new ArrayList<>(); paths.forEach(p -> { if (!p.getServer().equals("manager")) { diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ZookeeperLockChecker.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ZookeeperLockChecker.java index e91b64ea4b..72e4e15d77 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ZookeeperLockChecker.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ZookeeperLockChecker.java @@ -18,7 +18,6 @@ */ package org.apache.accumulo.core.clientImpl; -import java.util.Optional; import java.util.Set; import org.apache.accumulo.core.clientImpl.ClientTabletCacheImpl.TabletServerLockChecker; @@ -37,16 +36,18 @@ public class ZookeeperLockChecker implements TabletServerLockChecker { public boolean doesTabletServerLockExist(String server) { // ServiceLockPaths only returns items that have a lock - Set<ServiceLockPath> tservers = ctx.getServerPaths().getTabletServer(Optional.empty(), - Optional.of(HostAndPort.fromString(server)), true); + var hostAndPort = HostAndPort.fromString(server); + Set<ServiceLockPath> tservers = + ctx.getServerPaths().getTabletServer(rg -> true, addr -> addr.equals(hostAndPort), true); return !tservers.isEmpty(); } @Override public boolean isLockHeld(String server, String session) { // ServiceLockPaths only returns items that have a lock - Set<ServiceLockPath> tservers = ctx.getServerPaths().getTabletServer(Optional.empty(), - Optional.of(HostAndPort.fromString(server)), true); + var hostAndPort = HostAndPort.fromString(server); + Set<ServiceLockPath> tservers = + ctx.getServerPaths().getTabletServer(rg -> true, addr -> addr.equals(hostAndPort), true); for (ServiceLockPath slp : tservers) { if (ServiceLock.getSessionId(ctx.getZooCache(), slp) == Long.parseLong(session, 16)) { return true; @@ -57,8 +58,8 @@ public class ZookeeperLockChecker implements TabletServerLockChecker { @Override public void invalidateCache(String tserver) { - ctx.getServerPaths() - .getTabletServer(Optional.empty(), Optional.of(HostAndPort.fromString(tserver)), false) + var hostAndPort = HostAndPort.fromString(tserver); + ctx.getServerPaths().getTabletServer(rg -> true, addr -> addr.equals(hostAndPort), false) .forEach(slp -> { ctx.getZooCache().clear(slp.toString()); }); diff --git a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java index 62da673356..30c94e9ab4 100644 --- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java +++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.function.Predicate; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.clientImpl.ClientContext; @@ -282,14 +283,13 @@ public class ServiceLockPaths { serverAddress.toString()); } - public Set<ServiceLockPath> getCompactor(Optional<String> resourceGroup, - Optional<HostAndPort> address, boolean withLock) { - return get(Constants.ZCOMPACTORS, resourceGroup, address, withLock); + public Set<ServiceLockPath> getCompactor(ResourceGroupPredicate resourceGroupPredicate, + Predicate<HostAndPort> address, boolean withLock) { + return get(Constants.ZCOMPACTORS, resourceGroupPredicate, address, withLock); } public ServiceLockPath getGarbageCollector(boolean withLock) { - Set<ServiceLockPath> results = - get(Constants.ZGC_LOCK, Optional.empty(), Optional.empty(), withLock); + Set<ServiceLockPath> results = get(Constants.ZGC_LOCK, rg -> true, addr -> true, withLock); if (results.isEmpty()) { return null; } else { @@ -298,8 +298,7 @@ public class ServiceLockPaths { } public ServiceLockPath getManager(boolean withLock) { - Set<ServiceLockPath> results = - get(Constants.ZMANAGER_LOCK, Optional.empty(), Optional.empty(), withLock); + Set<ServiceLockPath> results = get(Constants.ZMANAGER_LOCK, rg -> true, addr -> true, withLock); if (results.isEmpty()) { return null; } else { @@ -308,8 +307,7 @@ public class ServiceLockPaths { } public ServiceLockPath getMonitor(boolean withLock) { - Set<ServiceLockPath> results = - get(Constants.ZMONITOR_LOCK, Optional.empty(), Optional.empty(), withLock); + Set<ServiceLockPath> results = get(Constants.ZMONITOR_LOCK, rg -> true, addr -> true, withLock); if (results.isEmpty()) { return null; } else { @@ -317,19 +315,23 @@ public class ServiceLockPaths { } } - public Set<ServiceLockPath> getScanServer(Optional<String> resourceGroup, - Optional<HostAndPort> address, boolean withLock) { - return get(Constants.ZSSERVERS, resourceGroup, address, withLock); + public Set<ServiceLockPath> getScanServer(ResourceGroupPredicate resourceGroupPredicate, + Predicate<HostAndPort> address, boolean withLock) { + return get(Constants.ZSSERVERS, resourceGroupPredicate, address, withLock); } - public Set<ServiceLockPath> getTabletServer(Optional<String> resourceGroup, - Optional<HostAndPort> address, boolean withLock) { - return get(Constants.ZTSERVERS, resourceGroup, address, withLock); + public Set<ServiceLockPath> getTabletServer(ResourceGroupPredicate resourceGroupPredicate, + Predicate<HostAndPort> address, boolean withLock) { + return get(Constants.ZTSERVERS, resourceGroupPredicate, address, withLock); } - public Set<ServiceLockPath> getDeadTabletServer(Optional<String> resourceGroup, - Optional<HostAndPort> address, boolean withLock) { - return get(Constants.ZDEADTSERVERS, resourceGroup, address, withLock); + public Set<ServiceLockPath> getDeadTabletServer(ResourceGroupPredicate resourceGroupPredicate, + Predicate<HostAndPort> address, boolean withLock) { + return get(Constants.ZDEADTSERVERS, resourceGroupPredicate, address, withLock); + } + + public interface ResourceGroupPredicate extends Predicate<String> { + } /** @@ -338,18 +340,19 @@ public class ServiceLockPaths { * * @param serverType type of lock, should be something like Constants.ZTSERVERS or * Constants.ZMANAGER_LOCK - * @param resourceGroup name of resource group, if empty will return all resource groups - * @param address address of server (host:port), if empty will return all addresses + * @param resourceGroupPredicate only returns servers in resource groups that pass this predicate + * @param addressPredicate only return servers that match this predicate * @param withLock supply true if you only want to return servers that have an active lock. Not * applicable for types that don't use a lock (e.g. dead tservers) * @return set of ServiceLockPath objects for the paths found based on the search criteria */ - private Set<ServiceLockPath> get(final String serverType, Optional<String> resourceGroup, - Optional<HostAndPort> address, boolean withLock) { + private Set<ServiceLockPath> get(final String serverType, + ResourceGroupPredicate resourceGroupPredicate, Predicate<HostAndPort> addressPredicate, + boolean withLock) { Objects.requireNonNull(serverType); - Objects.requireNonNull(resourceGroup); - Objects.requireNonNull(address); + Objects.requireNonNull(resourceGroupPredicate); + Objects.requireNonNull(addressPredicate); final Set<ServiceLockPath> results = new HashSet<>(); final String typePath = ctx.getZooKeeperRoot() + serverType; @@ -371,13 +374,13 @@ public class ServiceLockPaths { || serverType.equals(Constants.ZTSERVERS) || serverType.equals(Constants.ZDEADTSERVERS)) { final List<String> resourceGroups = cache.getChildren(typePath); for (final String group : resourceGroups) { - if (resourceGroup.isEmpty() || resourceGroup.orElseThrow().equals(group)) { + if (resourceGroupPredicate.test(group)) { final List<String> servers = cache.getChildren(typePath + "/" + group); for (final String server : servers) { final ZcStat stat = new ZcStat(); final ServiceLockPath slp = parse(Optional.of(serverType), typePath + "/" + group + "/" + server); - if (address.isEmpty() || address.orElseThrow().toString().equals(server)) { + if (addressPredicate.test(HostAndPort.fromString(server))) { if (!withLock || slp.getType().equals(Constants.ZDEADTSERVERS)) { // Dead TServers don't have lock data results.add(slp); diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java index f3e61aadd1..c9f9059b10 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java @@ -633,8 +633,8 @@ public class TabletMetadata { public static synchronized Set<TServerInstance> getLiveTServers(ClientContext context) { final Set<TServerInstance> liveServers = new HashSet<>(); - for (ServiceLockPath slp : context.getServerPaths().getTabletServer(Optional.empty(), - Optional.empty(), true)) { + for (ServiceLockPath slp : context.getServerPaths().getTabletServer(rg -> true, addr -> true, + true)) { checkTabletServer(context, slp).ifPresent(liveServers::add); } diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java index 2c70796095..20436898fe 100644 --- a/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java @@ -74,12 +74,9 @@ public interface TServerClient<C extends TServiceClient> { final long rpcTimeout = context.getClientTimeoutInMillis(); final ZooCache zc = context.getZooCache(); final List<ServiceLockPath> serverPaths = new ArrayList<>(); - serverPaths - .addAll(context.getServerPaths().getCompactor(Optional.empty(), Optional.empty(), true)); - serverPaths - .addAll(context.getServerPaths().getScanServer(Optional.empty(), Optional.empty(), true)); - serverPaths - .addAll(context.getServerPaths().getTabletServer(Optional.empty(), Optional.empty(), true)); + serverPaths.addAll(context.getServerPaths().getCompactor(rg -> true, addr -> true, true)); + serverPaths.addAll(context.getServerPaths().getScanServer(rg -> true, addr -> true, true)); + serverPaths.addAll(context.getServerPaths().getTabletServer(rg -> true, addr -> true, true)); if (serverPaths.isEmpty()) { if (warned.compareAndSet(false, true)) { LOG.warn( diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java index 72a70ce106..7dafb38929 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java @@ -115,7 +115,7 @@ public class ExternalCompactionUtil { */ public static Map<String,Set<HostAndPort>> getCompactorAddrs(ClientContext context) { final Map<String,Set<HostAndPort>> groupsAndAddresses = new HashMap<>(); - context.getServerPaths().getCompactor(Optional.empty(), Optional.empty(), true).forEach(slp -> { + context.getServerPaths().getCompactor(rg -> true, addr -> true, true).forEach(slp -> { groupsAndAddresses.computeIfAbsent(slp.getResourceGroup(), (k) -> new HashSet<>()) .add(HostAndPort.fromString(slp.getServer())); }); @@ -257,7 +257,7 @@ public class ExternalCompactionUtil { public static int countCompactors(String groupName, ClientContext context) { var start = Timer.startNew(); int count = context.getServerPaths() - .getCompactor(Optional.of(groupName), Optional.empty(), true).size(); + .getCompactor(rg -> rg.equals(groupName), addr -> true, true).size(); long elapsed = start.elapsed(MILLISECONDS); if (elapsed > 100) { LOG.debug("Took {} ms to count {} compactors for {}", elapsed, count, groupName); diff --git a/core/src/test/java/org/apache/accumulo/core/lock/ServiceLockPathsTest.java b/core/src/test/java/org/apache/accumulo/core/lock/ServiceLockPathsTest.java index 51da0571d1..3f91c756b7 100644 --- a/core/src/test/java/org/apache/accumulo/core/lock/ServiceLockPathsTest.java +++ b/core/src/test/java/org/apache/accumulo/core/lock/ServiceLockPathsTest.java @@ -380,13 +380,13 @@ public class ServiceLockPathsTest { assertThrows(NullPointerException.class, () -> ctx.getServerPaths().getCompactor(null, null, true)); assertThrows(NullPointerException.class, - () -> ctx.getServerPaths().getCompactor(Optional.of(TEST_RESOURCE_GROUP), null, true)); - assertTrue( - ctx.getServerPaths().getCompactor(Optional.empty(), Optional.empty(), true).isEmpty()); + () -> ctx.getServerPaths().getCompactor(rg -> rg.equals(TEST_RESOURCE_GROUP), null, true)); + assertTrue(ctx.getServerPaths().getCompactor(rg -> true, addr -> true, true).isEmpty()); assertTrue(ctx.getServerPaths() - .getCompactor(Optional.of(TEST_RESOURCE_GROUP), Optional.empty(), true).isEmpty()); + .getCompactor(rg -> rg.equals(TEST_RESOURCE_GROUP), addr -> true, true).isEmpty()); assertTrue(ctx.getServerPaths() - .getCompactor(Optional.of(TEST_RESOURCE_GROUP), Optional.of(hp), true).isEmpty()); + .getCompactor(rg -> rg.equals(TEST_RESOURCE_GROUP), addr -> addr.equals(hp), true) + .isEmpty()); EasyMock.verify(ctx, zc); @@ -441,7 +441,7 @@ public class ServiceLockPathsTest { // query for all Set<ServiceLockPath> results = - ctx.getServerPaths().getCompactor(Optional.empty(), Optional.empty(), false); + ctx.getServerPaths().getCompactor(rg -> true, addr -> true, false); assertEquals(4, results.size()); for (ServiceLockPath path : results) { assertEquals(ZCOMPACTORS, path.getType()); @@ -458,7 +458,7 @@ public class ServiceLockPathsTest { } // query for all with locks - results = ctx.getServerPaths().getCompactor(Optional.empty(), Optional.empty(), true); + results = ctx.getServerPaths().getCompactor(rg -> true, addr -> true, true); assertEquals(2, results.size()); Iterator<ServiceLockPath> iter = results.iterator(); ServiceLockPath slp1 = iter.next(); @@ -487,13 +487,13 @@ public class ServiceLockPathsTest { } // query for all in non-existent resource group - results = ctx.getServerPaths().getCompactor(Optional.of("FAKE_RESOURCE_GROUP"), - Optional.empty(), true); + results = ctx.getServerPaths().getCompactor(rg -> rg.equals("FAKE_RESOURCE_GROUP"), + addr -> true, true); assertEquals(0, results.size()); // query for all in test resource group results = - ctx.getServerPaths().getCompactor(Optional.of(TEST_RESOURCE_GROUP), Optional.empty(), true); + ctx.getServerPaths().getCompactor(rg -> rg.equals(TEST_RESOURCE_GROUP), addr -> true, true); assertEquals(1, results.size()); iter = results.iterator(); slp1 = iter.next(); @@ -503,8 +503,8 @@ public class ServiceLockPathsTest { assertEquals(ROOT + ZCOMPACTORS + "/" + TEST_RESOURCE_GROUP + "/" + HOSTNAME, slp1.toString()); // query for a specific server - results = - ctx.getServerPaths().getCompactor(Optional.of(TEST_RESOURCE_GROUP), Optional.of(hp), true); + results = ctx.getServerPaths().getCompactor(rg -> rg.equals(TEST_RESOURCE_GROUP), + addr -> addr.equals(hp), true); assertEquals(1, results.size()); iter = results.iterator(); slp1 = iter.next(); @@ -514,8 +514,8 @@ public class ServiceLockPathsTest { assertEquals(ROOT + ZCOMPACTORS + "/" + TEST_RESOURCE_GROUP + "/" + HOSTNAME, slp1.toString()); // query for a wrong server - results = ctx.getServerPaths().getCompactor(Optional.of(TEST_RESOURCE_GROUP), - Optional.of(HostAndPort.fromString("localhost:1234")), true); + results = ctx.getServerPaths().getCompactor(rg -> rg.equals(TEST_RESOURCE_GROUP), + addr -> addr.equals(HostAndPort.fromString("localhost:1234")), true); assertEquals(0, results.size()); EasyMock.verify(ctx, zc); @@ -537,13 +537,13 @@ public class ServiceLockPathsTest { assertThrows(NullPointerException.class, () -> ctx.getServerPaths().getScanServer(null, null, true)); assertThrows(NullPointerException.class, - () -> ctx.getServerPaths().getScanServer(Optional.of(TEST_RESOURCE_GROUP), null, true)); - assertTrue( - ctx.getServerPaths().getScanServer(Optional.empty(), Optional.empty(), true).isEmpty()); + () -> ctx.getServerPaths().getScanServer(rg -> rg.equals(TEST_RESOURCE_GROUP), null, true)); + assertTrue(ctx.getServerPaths().getScanServer(rg -> true, addr -> true, true).isEmpty()); assertTrue(ctx.getServerPaths() - .getScanServer(Optional.of(TEST_RESOURCE_GROUP), Optional.empty(), true).isEmpty()); + .getScanServer(rg -> rg.equals(TEST_RESOURCE_GROUP), addr -> true, true).isEmpty()); assertTrue(ctx.getServerPaths() - .getScanServer(Optional.of(TEST_RESOURCE_GROUP), Optional.of(hp), true).isEmpty()); + .getScanServer(rg -> rg.equals(TEST_RESOURCE_GROUP), addr -> addr.equals(hp), true) + .isEmpty()); EasyMock.verify(ctx, zc); @@ -597,7 +597,7 @@ public class ServiceLockPathsTest { // query for all Set<ServiceLockPath> results = - ctx.getServerPaths().getScanServer(Optional.empty(), Optional.empty(), false); + ctx.getServerPaths().getScanServer(rg -> true, addr -> true, false); assertEquals(4, results.size()); for (ServiceLockPath path : results) { assertEquals(ZSSERVERS, path.getType()); @@ -614,7 +614,7 @@ public class ServiceLockPathsTest { } // query for all with lock - results = ctx.getServerPaths().getScanServer(Optional.empty(), Optional.empty(), true); + results = ctx.getServerPaths().getScanServer(rg -> true, addr -> true, true); assertEquals(2, results.size()); Iterator<ServiceLockPath> iter = results.iterator(); ServiceLockPath slp1 = iter.next(); @@ -641,12 +641,12 @@ public class ServiceLockPathsTest { } // query for all in non-existent resource group - results = ctx.getServerPaths().getScanServer(Optional.of("FAKE_RESOURCE_GROUP"), - Optional.empty(), true); + results = ctx.getServerPaths().getScanServer(rg -> rg.equals("FAKE_RESOURCE_GROUP"), + addr -> true, true); assertEquals(0, results.size()); // query for all in test resource group - results = ctx.getServerPaths().getScanServer(Optional.of(TEST_RESOURCE_GROUP), Optional.empty(), + results = ctx.getServerPaths().getScanServer(rg -> rg.equals(TEST_RESOURCE_GROUP), addr -> true, true); assertEquals(1, results.size()); iter = results.iterator(); @@ -657,8 +657,8 @@ public class ServiceLockPathsTest { assertEquals(ROOT + ZSSERVERS + "/" + TEST_RESOURCE_GROUP + "/" + HOSTNAME, slp1.toString()); // query for a specific server - results = - ctx.getServerPaths().getScanServer(Optional.of(TEST_RESOURCE_GROUP), Optional.of(hp), true); + results = ctx.getServerPaths().getScanServer(rg -> rg.equals(TEST_RESOURCE_GROUP), + addr -> addr.equals(hp), true); assertEquals(1, results.size()); iter = results.iterator(); slp1 = iter.next(); @@ -668,8 +668,8 @@ public class ServiceLockPathsTest { assertEquals(ROOT + ZSSERVERS + "/" + TEST_RESOURCE_GROUP + "/" + HOSTNAME, slp1.toString()); // query for a wrong server - results = ctx.getServerPaths().getScanServer(Optional.of(TEST_RESOURCE_GROUP), - Optional.of(HostAndPort.fromString("localhost:1234")), true); + results = ctx.getServerPaths().getScanServer(rg -> rg.equals(TEST_RESOURCE_GROUP), + addr -> addr.equals(HostAndPort.fromString("localhost:1234")), true); assertEquals(0, results.size()); EasyMock.verify(ctx, zc); @@ -690,14 +690,14 @@ public class ServiceLockPathsTest { assertThrows(NullPointerException.class, () -> ctx.getServerPaths().getTabletServer(null, null, true)); - assertThrows(NullPointerException.class, - () -> ctx.getServerPaths().getTabletServer(Optional.of(TEST_RESOURCE_GROUP), null, true)); - assertTrue( - ctx.getServerPaths().getTabletServer(Optional.empty(), Optional.empty(), true).isEmpty()); + assertThrows(NullPointerException.class, () -> ctx.getServerPaths() + .getTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), null, true)); + assertTrue(ctx.getServerPaths().getTabletServer(rg -> true, addr -> true, true).isEmpty()); assertTrue(ctx.getServerPaths() - .getTabletServer(Optional.of(TEST_RESOURCE_GROUP), Optional.empty(), true).isEmpty()); + .getTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), addr -> true, true).isEmpty()); assertTrue(ctx.getServerPaths() - .getTabletServer(Optional.of(TEST_RESOURCE_GROUP), Optional.of(hp), true).isEmpty()); + .getTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), addr -> addr.equals(hp), true) + .isEmpty()); EasyMock.verify(ctx, zc); @@ -751,7 +751,7 @@ public class ServiceLockPathsTest { // query for all Set<ServiceLockPath> results = - ctx.getServerPaths().getTabletServer(Optional.empty(), Optional.empty(), false); + ctx.getServerPaths().getTabletServer(rg -> true, addr -> true, false); assertEquals(4, results.size()); for (ServiceLockPath path : results) { assertEquals(ZTSERVERS, path.getType()); @@ -768,7 +768,7 @@ public class ServiceLockPathsTest { } // query for all with lock - results = ctx.getServerPaths().getTabletServer(Optional.empty(), Optional.empty(), true); + results = ctx.getServerPaths().getTabletServer(rg -> true, addr -> true, true); assertEquals(2, results.size()); Iterator<ServiceLockPath> iter = results.iterator(); ServiceLockPath slp1 = iter.next(); @@ -795,13 +795,13 @@ public class ServiceLockPathsTest { } // query for all in non-existent resource group - results = ctx.getServerPaths().getTabletServer(Optional.of("FAKE_RESOURCE_GROUP"), - Optional.empty(), true); + results = ctx.getServerPaths().getTabletServer(rg -> rg.equals("FAKE_RESOURCE_GROUP"), + addr -> true, true); assertEquals(0, results.size()); // query for all in test resource group - results = ctx.getServerPaths().getTabletServer(Optional.of(TEST_RESOURCE_GROUP), - Optional.empty(), true); + results = ctx.getServerPaths().getTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), + addr -> true, true); assertEquals(1, results.size()); iter = results.iterator(); slp1 = iter.next(); @@ -811,8 +811,8 @@ public class ServiceLockPathsTest { assertEquals(ROOT + ZTSERVERS + "/" + TEST_RESOURCE_GROUP + "/" + HOSTNAME, slp1.toString()); // query for a specific server - results = ctx.getServerPaths().getTabletServer(Optional.of(TEST_RESOURCE_GROUP), - Optional.of(hp), true); + results = ctx.getServerPaths().getTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), + addr -> addr.equals(hp), true); assertEquals(1, results.size()); iter = results.iterator(); slp1 = iter.next(); @@ -822,8 +822,8 @@ public class ServiceLockPathsTest { assertEquals(ROOT + ZTSERVERS + "/" + TEST_RESOURCE_GROUP + "/" + HOSTNAME, slp1.toString()); // query for a wrong server - results = ctx.getServerPaths().getTabletServer(Optional.of(TEST_RESOURCE_GROUP), - Optional.of(HostAndPort.fromString("localhost:1234")), true); + results = ctx.getServerPaths().getTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), + addr -> addr.equals(HostAndPort.fromString("localhost:1234")), true); assertEquals(0, results.size()); EasyMock.verify(ctx, zc); @@ -845,13 +845,13 @@ public class ServiceLockPathsTest { assertThrows(NullPointerException.class, () -> ctx.getServerPaths().getDeadTabletServer(null, null, false)); assertThrows(NullPointerException.class, () -> ctx.getServerPaths() - .getDeadTabletServer(Optional.of(TEST_RESOURCE_GROUP), null, false)); - assertTrue(ctx.getServerPaths().getDeadTabletServer(Optional.empty(), Optional.empty(), false) - .isEmpty()); + .getDeadTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), null, false)); + assertTrue(ctx.getServerPaths().getDeadTabletServer(rg -> true, addr -> true, false).isEmpty()); assertTrue(ctx.getServerPaths() - .getDeadTabletServer(Optional.of(TEST_RESOURCE_GROUP), Optional.empty(), false).isEmpty()); + .getDeadTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), addr -> true, false).isEmpty()); assertTrue(ctx.getServerPaths() - .getDeadTabletServer(Optional.of(TEST_RESOURCE_GROUP), Optional.of(hp), false).isEmpty()); + .getDeadTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), addr -> addr.equals(hp), false) + .isEmpty()); EasyMock.verify(ctx, zc); @@ -899,7 +899,7 @@ public class ServiceLockPathsTest { // query for all Set<ServiceLockPath> results = - ctx.getServerPaths().getDeadTabletServer(Optional.empty(), Optional.empty(), false); + ctx.getServerPaths().getDeadTabletServer(rg -> true, addr -> true, false); assertEquals(2, results.size()); Iterator<ServiceLockPath> iter = results.iterator(); ServiceLockPath slp1 = iter.next(); @@ -928,13 +928,13 @@ public class ServiceLockPathsTest { } // query for all in non-existent resource group - results = ctx.getServerPaths().getDeadTabletServer(Optional.of("FAKE_RESOURCE_GROUP"), - Optional.empty(), false); + results = ctx.getServerPaths().getDeadTabletServer(rg -> rg.equals("FAKE_RESOURCE_GROUP"), + addr -> true, false); assertEquals(0, results.size()); // query for all in test resource group - results = ctx.getServerPaths().getDeadTabletServer(Optional.of(TEST_RESOURCE_GROUP), - Optional.empty(), false); + results = ctx.getServerPaths().getDeadTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), + addr -> true, false); assertEquals(1, results.size()); iter = results.iterator(); slp1 = iter.next(); @@ -945,8 +945,8 @@ public class ServiceLockPathsTest { slp1.toString()); // query for a specific server - results = ctx.getServerPaths().getDeadTabletServer(Optional.of(TEST_RESOURCE_GROUP), - Optional.of(hp), false); + results = ctx.getServerPaths().getDeadTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), + addr -> addr.equals(hp), false); assertEquals(1, results.size()); iter = results.iterator(); slp1 = iter.next(); @@ -957,8 +957,8 @@ public class ServiceLockPathsTest { slp1.toString()); // query for a wrong server - results = ctx.getServerPaths().getDeadTabletServer(Optional.of(TEST_RESOURCE_GROUP), - Optional.of(HostAndPort.fromString("localhost:1234")), false); + results = ctx.getServerPaths().getDeadTabletServer(rg -> rg.equals(TEST_RESOURCE_GROUP), + addr -> addr.equals(HostAndPort.fromString("localhost:1234")), false); assertEquals(0, results.size()); EasyMock.verify(ctx, zc); diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java index 9970fd387e..5fe5656147 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java @@ -44,7 +44,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.UUID; @@ -863,7 +862,7 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster { int tsActualCount = 0; while (tsActualCount < tsExpectedCount) { Set<ServiceLockPath> tservers = - context.getServerPaths().getTabletServer(Optional.empty(), Optional.empty(), true); + context.getServerPaths().getTabletServer(rg -> true, addr -> true, true); tsActualCount = tservers.size(); log.info(tsActualCount + " of " + tsExpectedCount + " tablet servers present in ZooKeeper"); Thread.sleep(500); @@ -872,7 +871,7 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster { int ssActualCount = 0; while (ssActualCount < ssExpectedCount) { Set<ServiceLockPath> tservers = - context.getServerPaths().getScanServer(Optional.empty(), Optional.empty(), true); + context.getServerPaths().getScanServer(rg -> true, addr -> true, true); ssActualCount = tservers.size(); log.info(ssActualCount + " of " + ssExpectedCount + " scan servers present in ZooKeeper"); Thread.sleep(500); @@ -881,7 +880,7 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster { int ecActualCount = 0; while (ecActualCount < ecExpectedCount) { Set<ServiceLockPath> compactors = - context.getServerPaths().getCompactor(Optional.empty(), Optional.empty(), true); + context.getServerPaths().getCompactor(rg -> true, addr -> true, true); ecActualCount = compactors.size(); log.info(ecActualCount + " of " + ecExpectedCount + " compactors present in ZooKeeper"); Thread.sleep(500); diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java index 305b277bc2..e5ed27b6c7 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java @@ -30,6 +30,7 @@ import java.util.Map.Entry; import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; @@ -40,6 +41,7 @@ import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockPaths; +import org.apache.accumulo.core.lock.ServiceLockPaths.ResourceGroupPredicate; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.apache.accumulo.core.manager.thrift.TabletServerStatus; import org.apache.accumulo.core.metadata.TServerInstance; @@ -230,7 +232,7 @@ public class LiveTServerSet implements Watcher { final Set<TServerInstance> updates = new HashSet<>(); final Set<TServerInstance> doomed = new HashSet<>(); final Set<ServiceLockPath> tservers = - context.getServerPaths().getTabletServer(Optional.empty(), Optional.empty(), false); + context.getServerPaths().getTabletServer(rg -> true, addr -> true, false); locklessServers.keySet().retainAll(tservers); @@ -459,8 +461,16 @@ public class LiveTServerSet implements Watcher { } current.remove(address.orElseThrow().toString()); + ResourceGroupPredicate rgPredicate = resourceGroup.map(rg -> { + ResourceGroupPredicate rgp = rg2 -> rg.equals(rg2); + return rgp; + }).orElse(rg -> true); + Predicate<HostAndPort> addrPredicate = address.map(addr -> { + Predicate<HostAndPort> ap = addr2 -> addr.equals(addr2); + return ap; + }).orElse(addr -> true); Set<ServiceLockPath> paths = - context.getServerPaths().getTabletServer(resourceGroup, address, false); + context.getServerPaths().getTabletServer(rgPredicate, addrPredicate, false); if (paths.isEmpty() || paths.size() > 1) { log.error("Zero or many zookeeper entries match input arguments."); } else { diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/DeadServerList.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/DeadServerList.java index bc26293954..c204c04d05 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/DeadServerList.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/DeadServerList.java @@ -22,7 +22,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; import java.util.ArrayList; import java.util.List; -import java.util.Optional; import java.util.Set; import org.apache.accumulo.core.Constants; @@ -69,7 +68,7 @@ public class DeadServerList { List<DeadServer> result = new ArrayList<>(); try { Set<ServiceLockPath> deadServers = - ctx.getServerPaths().getDeadTabletServer(Optional.empty(), Optional.empty(), false); + ctx.getServerPaths().getDeadTabletServer(rg -> true, addr -> true, false); for (ServiceLockPath path : deadServers) { Stat stat = new Stat(); byte[] data; diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/AccumuloStatus.java b/server/base/src/main/java/org/apache/accumulo/server/util/AccumuloStatus.java index 24b75b89bc..9b3229d4d6 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/AccumuloStatus.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/AccumuloStatus.java @@ -18,7 +18,6 @@ */ package org.apache.accumulo.server.util; -import java.util.Optional; import java.util.Set; import org.apache.accumulo.core.clientImpl.ClientContext; @@ -33,7 +32,7 @@ public class AccumuloStatus { */ public static boolean isAccumuloOffline(ClientContext context) { Set<ServiceLockPath> tservers = - context.getServerPaths().getTabletServer(Optional.empty(), Optional.empty(), true); + context.getServerPaths().getTabletServer(rg -> true, addr -> true, true); if (!tservers.isEmpty()) { return false; } diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java index 5e26567ac4..f1a6c492d4 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java @@ -678,8 +678,9 @@ public class Admin implements KeywordExecutable { */ static String qualifyWithZooKeeperSessionId(ClientContext context, ZooCache zooCache, String hostAndPort) { - Set<ServiceLockPath> paths = context.getServerPaths().getTabletServer(Optional.empty(), - Optional.of(HostAndPort.fromString(hostAndPort)), true); + var hpObj = HostAndPort.fromString(hostAndPort); + Set<ServiceLockPath> paths = + context.getServerPaths().getTabletServer(rg -> true, addr -> addr.equals(hpObj), true); if (paths.size() != 1) { return hostAndPort; } diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ServiceStatusCmd.java b/server/base/src/main/java/org/apache/accumulo/server/util/ServiceStatusCmd.java index c14dfe42f4..70cf41f254 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ServiceStatusCmd.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ServiceStatusCmd.java @@ -21,7 +21,6 @@ package org.apache.accumulo.server.util; import static java.nio.charset.StandardCharsets.UTF_8; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; @@ -112,7 +111,7 @@ public class ServiceStatusCmd { final AtomicInteger errors = new AtomicInteger(0); final Map<String,Set<String>> hostsByGroups = new TreeMap<>(); final Set<ServiceLockPath> compactors = - context.getServerPaths().getTabletServer(Optional.empty(), Optional.empty(), true); + context.getServerPaths().getTabletServer(rg -> true, addr -> true, true); compactors.forEach(c -> hostsByGroups .computeIfAbsent(c.getResourceGroup(), (k) -> new TreeSet<>()).add(c.getServer())); return new StatusSummary(ServiceStatusReport.ReportKey.T_SERVER, hostsByGroups.keySet(), @@ -129,7 +128,7 @@ public class ServiceStatusCmd { final AtomicInteger errors = new AtomicInteger(0); final Map<String,Set<String>> hostsByGroups = new TreeMap<>(); final Set<ServiceLockPath> scanServers = - context.getServerPaths().getScanServer(Optional.empty(), Optional.empty(), true); + context.getServerPaths().getScanServer(rg -> true, addr -> true, true); scanServers.forEach(c -> hostsByGroups .computeIfAbsent(c.getResourceGroup(), (k) -> new TreeSet<>()).add(c.getServer())); return new StatusSummary(ServiceStatusReport.ReportKey.S_SERVER, hostsByGroups.keySet(), @@ -156,7 +155,7 @@ public class ServiceStatusCmd { final AtomicInteger errors = new AtomicInteger(0); final Map<String,Set<String>> hostsByGroups = new TreeMap<>(); final Set<ServiceLockPath> compactors = - context.getServerPaths().getCompactor(Optional.empty(), Optional.empty(), true); + context.getServerPaths().getCompactor(rg -> true, addr -> true, true); compactors.forEach(c -> hostsByGroups .computeIfAbsent(c.getResourceGroup(), (k) -> new TreeSet<>()).add(c.getServer())); return new StatusSummary(ServiceStatusReport.ReportKey.COMPACTOR, hostsByGroups.keySet(), diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java b/server/base/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java index ab18fc13d0..32255b1e19 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/TabletServerLocks.java @@ -42,7 +42,7 @@ public class TabletServerLocks { if (delete == null) { Set<ServiceLockPath> tabletServers = - context.getServerPaths().getTabletServer(Optional.empty(), Optional.empty(), false); + context.getServerPaths().getTabletServer(rg -> true, addr -> true, false); if (tabletServers.isEmpty()) { System.err.println("No tservers found in ZK"); } @@ -62,8 +62,9 @@ public class TabletServerLocks { if (lock == null) { printUsage(); } else { - Set<ServiceLockPath> paths = context.getServerPaths().getTabletServer(Optional.empty(), - Optional.of(HostAndPort.fromString(lock)), true); + var hostAndPort = HostAndPort.fromString(lock); + Set<ServiceLockPath> paths = context.getServerPaths().getTabletServer(rg -> true, + addr -> addr.equals(hostAndPort), true); Preconditions.checkArgument(paths.size() == 1, lock + " does not match a single ZooKeeper TabletServer lock. matches=" + paths); ServiceLockPath path = paths.iterator().next(); diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java index a24b873eb1..26d0e322dd 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java @@ -20,7 +20,6 @@ package org.apache.accumulo.server.util; import java.util.HashSet; import java.util.List; -import java.util.Optional; import java.util.Set; import org.apache.accumulo.core.cli.Help; @@ -111,7 +110,7 @@ public class ZooZap implements KeywordExecutable { if (opts.zapTservers) { try { Set<ServiceLockPath> tserverLockPaths = - context.getServerPaths().getTabletServer(Optional.empty(), Optional.empty(), false); + context.getServerPaths().getTabletServer(rg -> true, addr -> true, false); for (ServiceLockPath tserverPath : tserverLockPaths) { message("Deleting " + tserverPath + " from zookeeper", opts); @@ -133,7 +132,7 @@ public class ZooZap implements KeywordExecutable { if (opts.zapCompactors) { Set<ServiceLockPath> compactorLockPaths = - context.getServerPaths().getCompactor(Optional.empty(), Optional.empty(), false); + context.getServerPaths().getCompactor(rg -> true, addr -> true, false); Set<String> compactorResourceGroupPaths = new HashSet<>(); compactorLockPaths.forEach(p -> compactorResourceGroupPaths .add(p.toString().substring(0, p.toString().lastIndexOf('/')))); @@ -151,7 +150,7 @@ public class ZooZap implements KeywordExecutable { if (opts.zapScanServers) { try { Set<ServiceLockPath> sserverLockPaths = - context.getServerPaths().getScanServer(Optional.empty(), Optional.empty(), false); + context.getServerPaths().getScanServer(rg -> true, addr -> true, false); for (ServiceLockPath sserverPath : sserverLockPaths) { message("Deleting " + sserverPath + " from zookeeper", opts); if (!zoo.getChildren(sserverPath.toString()).isEmpty()) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index ae75437225..7fd30d6003 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -644,8 +644,8 @@ public class Manager extends AbstractServer while (stillManager()) { try { - Set<ServiceLockPath> scanServerPaths = getContext().getServerPaths() - .getScanServer(Optional.empty(), Optional.empty(), false); + Set<ServiceLockPath> scanServerPaths = + getContext().getServerPaths().getScanServer(rg -> true, addr -> true, false); for (ServiceLockPath path : scanServerPaths) { ZcStat stat = new ZcStat(); diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerConcurrentTabletScanIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerConcurrentTabletScanIT.java index 6022f53b62..f80b6364fd 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerConcurrentTabletScanIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerConcurrentTabletScanIT.java @@ -28,7 +28,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; -import java.util.Optional; import java.util.Properties; import org.apache.accumulo.core.client.Accumulo; @@ -93,7 +92,7 @@ public class ScanServerConcurrentTabletScanIT extends SharedMiniClusterBase { 1, null); Wait.waitFor(() -> !getCluster().getServerContext().getServerPaths() - .getScanServer(Optional.empty(), Optional.empty(), true).isEmpty()); + .getScanServer(rg -> true, addr -> true, true).isEmpty()); } diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java index 54a641c7ac..47365f22ef 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java @@ -21,7 +21,6 @@ package org.apache.accumulo.test; import static org.junit.jupiter.api.Assertions.assertEquals; import java.util.Map; -import java.util.Optional; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; @@ -126,7 +125,7 @@ public class ScanServerGroupConfigurationIT extends SharedMiniClusterBase { // Ensure no scan servers running Wait.waitFor(() -> getCluster().getServerContext().getServerPaths() - .getScanServer(Optional.empty(), Optional.empty(), true).isEmpty()); + .getScanServer(rg -> true, addr -> true, true).isEmpty()); try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { final String tableName = getUniqueNames(1)[0]; @@ -148,7 +147,7 @@ public class ScanServerGroupConfigurationIT extends SharedMiniClusterBase { // Start a ScanServer. No group specified, should be in the default group. getCluster().getClusterControl().start(ServerType.SCAN_SERVER, "localhost"); Wait.waitFor(() -> getCluster().getServerContext().getServerPaths() - .getScanServer(Optional.empty(), Optional.empty(), true).size() == 1, 30_000); + .getScanServer(rg -> true, addr -> true, true).size() == 1, 30_000); Wait.waitFor(() -> ((ClientContext) client).getScanServers().values().stream().anyMatch( (p) -> p.getSecond().equals(ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME)) == true); @@ -166,7 +165,7 @@ public class ScanServerGroupConfigurationIT extends SharedMiniClusterBase { .addScanServerResourceGroup("GROUP1", 1); getCluster().getClusterControl().start(ServerType.SCAN_SERVER); Wait.waitFor(() -> getCluster().getServerContext().getServerPaths() - .getScanServer(Optional.empty(), Optional.empty(), true).size() == 2, 30_000); + .getScanServer(rg -> true, addr -> true, true).size() == 2, 30_000); Wait.waitFor(() -> ((ClientContext) client).getScanServers().values().stream().anyMatch( (p) -> p.getSecond().equals(ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME)) == true); diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java index 3b12abaa59..0bfa121a9e 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java @@ -31,7 +31,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Objects; -import java.util.Optional; import java.util.Properties; import java.util.SortedSet; import java.util.TreeSet; @@ -116,7 +115,7 @@ public class ScanServerIT extends SharedMiniClusterBase { "localhost"); Wait.waitFor(() -> !getCluster().getServerContext().getServerPaths() - .getScanServer(Optional.empty(), Optional.empty(), true).isEmpty()); + .getScanServer(rg -> true, addr -> true, true).isEmpty()); } @AfterAll diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java index a6ea8d5648..176fc7dd66 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java @@ -30,7 +30,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map.Entry; -import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; @@ -92,7 +91,7 @@ public class ScanServerMetadataEntriesIT extends SharedMiniClusterBase { "localhost"); Wait.waitFor(() -> !getCluster().getServerContext().getServerPaths() - .getScanServer(Optional.empty(), Optional.empty(), true).isEmpty()); + .getScanServer(rg -> true, addr -> true, true).isEmpty()); } diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerMultipleScansIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerMultipleScansIT.java index 4df1a4b8ad..4c2e37a18e 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerMultipleScansIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerMultipleScansIT.java @@ -28,7 +28,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Optional; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.CountDownLatch; @@ -90,7 +89,7 @@ public class ScanServerMultipleScansIT extends SharedMiniClusterBase { "localhost"); Wait.waitFor(() -> !getCluster().getServerContext().getServerPaths() - .getScanServer(Optional.empty(), Optional.empty(), true).isEmpty()); + .getScanServer(rg -> true, addr -> true, true).isEmpty()); } @AfterAll diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java index 7f53579b6d..99694f633a 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java @@ -25,7 +25,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Collections; import java.util.Iterator; import java.util.Map.Entry; -import java.util.Optional; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; @@ -86,8 +85,8 @@ public class ScanServerShutdownIT extends SharedMiniClusterBase { ServerContext ctx = getCluster().getServerContext(); - Wait.waitFor(() -> !ctx.getServerPaths().getScanServer(Optional.empty(), Optional.empty(), true) - .isEmpty()); + Wait.waitFor( + () -> !ctx.getServerPaths().getScanServer(rg -> true, addr -> true, true).isEmpty()); try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { final String tableName = getUniqueNames(1)[0]; diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java index ffef68465d..6f219be23f 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java @@ -200,7 +200,7 @@ public class MemoryStarvedScanIT extends SharedMiniClusterBase { final ZooCache zc = context.getZooCache(); Set<ServiceLockPath> servers = - context.getServerPaths().getTabletServer(Optional.empty(), Optional.empty(), true); + context.getServerPaths().getTabletServer(rg -> true, addr -> true, true); for (ServiceLockPath server : servers) { Optional<ServiceLockData> data = zc.getLockData(server); if (data != null && data.isPresent()) { diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java index 0364bcdcc6..02f2ede2b3 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java @@ -36,7 +36,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Optional; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; @@ -576,8 +575,8 @@ public class TabletManagementIteratorIT extends AccumuloClusterHarness { tableId -> context.getTableState(tableId) == TableState.ONLINE); HashSet<TServerInstance> tservers = new HashSet<>(); - for (ServiceLockPath tserver : context.getServerPaths().getTabletServer(Optional.empty(), - Optional.empty(), true)) { + for (ServiceLockPath tserver : context.getServerPaths().getTabletServer(rg -> true, + addr -> true, true)) { try { long sessionId = ServiceLock.getSessionId(context.getZooCache(), tserver); tservers.add(new TServerInstance(tserver.getServer(), sessionId)); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TabletResourceGroupBalanceIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TabletResourceGroupBalanceIT.java index 36bceaa383..ce8d3fee0f 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/TabletResourceGroupBalanceIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletResourceGroupBalanceIT.java @@ -27,7 +27,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicInteger; @@ -98,7 +97,7 @@ public class TabletResourceGroupBalanceIT extends SharedMiniClusterBase { Map<String,String> tservers = new HashMap<>(); for (ServiceLockPath tserver : cluster.getServerContext().getServerPaths() - .getTabletServer(Optional.empty(), Optional.empty(), true)) { + .getTabletServer(rg -> true, addr -> true, true)) { tservers.put(tserver.getServer(), tserver.getResourceGroup()); } return tservers; diff --git a/test/src/main/java/org/apache/accumulo/test/lock/ServiceLockPathsIT.java b/test/src/main/java/org/apache/accumulo/test/lock/ServiceLockPathsIT.java index a39b85466a..dd2e2d079f 100644 --- a/test/src/main/java/org/apache/accumulo/test/lock/ServiceLockPathsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/lock/ServiceLockPathsIT.java @@ -22,8 +22,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; -import java.util.Optional; - import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.lock.ServiceLockPaths; @@ -53,40 +51,40 @@ public class ServiceLockPathsIT extends AccumuloClusterHarness { assertNotNull(paths.getGarbageCollector(true)); assertNotNull(paths.getManager(true)); assertNull(paths.getMonitor(true)); // monitor not started - assertEquals(2, paths.getTabletServer(Optional.empty(), Optional.empty(), true).size()); + assertEquals(2, paths.getTabletServer(rg -> true, addr -> true, true).size()); assertEquals(1, paths - .getTabletServer(Optional.of(Constants.DEFAULT_RESOURCE_GROUP_NAME), Optional.empty(), true) + .getTabletServer(rg -> rg.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME), addr -> true, true) .size()); - assertEquals(1, paths.getTabletServer(Optional.of("TTEST"), Optional.empty(), true).size()); - assertEquals(0, paths.getTabletServer(Optional.of("FAKE"), Optional.empty(), true).size()); - assertEquals(0, paths.getTabletServer(Optional.of("CTEST"), Optional.empty(), true).size()); - assertEquals(0, paths.getTabletServer(Optional.of("STEST"), Optional.empty(), true).size()); + assertEquals(1, paths.getTabletServer(rg -> rg.equals("TTEST"), addr -> true, true).size()); + assertEquals(0, paths.getTabletServer(rg -> rg.equals("FAKE"), addr -> true, true).size()); + assertEquals(0, paths.getTabletServer(rg -> rg.equals("CTEST"), addr -> true, true).size()); + assertEquals(0, paths.getTabletServer(rg -> rg.equals("STEST"), addr -> true, true).size()); - assertEquals(4, paths.getCompactor(Optional.empty(), Optional.empty(), true).size()); + assertEquals(4, paths.getCompactor(rg -> true, addr -> true, true).size()); assertEquals(1, paths - .getCompactor(Optional.of(Constants.DEFAULT_RESOURCE_GROUP_NAME), Optional.empty(), true) + .getCompactor(rg -> rg.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME), addr -> true, true) .size()); - assertEquals(3, paths.getCompactor(Optional.of("CTEST"), Optional.empty(), true).size()); - assertEquals(0, paths.getCompactor(Optional.of("FAKE"), Optional.empty(), true).size()); - assertEquals(0, paths.getCompactor(Optional.of("TTEST"), Optional.empty(), true).size()); - assertEquals(0, paths.getCompactor(Optional.of("STEST"), Optional.empty(), true).size()); + assertEquals(3, paths.getCompactor(rg -> rg.equals("CTEST"), addr -> true, true).size()); + assertEquals(0, paths.getCompactor(rg -> rg.equals("FAKE"), addr -> true, true).size()); + assertEquals(0, paths.getCompactor(rg -> rg.equals("TTEST"), addr -> true, true).size()); + assertEquals(0, paths.getCompactor(rg -> rg.equals("STEST"), addr -> true, true).size()); - assertEquals(3, paths.getScanServer(Optional.empty(), Optional.empty(), true).size()); + assertEquals(3, paths.getScanServer(rg -> true, addr -> true, true).size()); assertEquals(1, paths - .getScanServer(Optional.of(Constants.DEFAULT_RESOURCE_GROUP_NAME), Optional.empty(), true) + .getScanServer(rg -> rg.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME), addr -> true, true) .size()); - assertEquals(2, paths.getScanServer(Optional.of("STEST"), Optional.empty(), true).size()); - assertEquals(0, paths.getScanServer(Optional.of("FAKE"), Optional.empty(), true).size()); - assertEquals(0, paths.getScanServer(Optional.of("CTEST"), Optional.empty(), true).size()); - assertEquals(0, paths.getScanServer(Optional.of("TTEST"), Optional.empty(), true).size()); + assertEquals(2, paths.getScanServer(rg -> rg.equals("STEST"), addr -> true, true).size()); + assertEquals(0, paths.getScanServer(rg -> rg.equals("FAKE"), addr -> true, true).size()); + assertEquals(0, paths.getScanServer(rg -> rg.equals("CTEST"), addr -> true, true).size()); + assertEquals(0, paths.getScanServer(rg -> rg.equals("TTEST"), addr -> true, true).size()); getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); - Wait.waitFor(() -> paths.getCompactor(Optional.empty(), Optional.empty(), true).size() == 0); + Wait.waitFor(() -> paths.getCompactor(rg -> true, addr -> true, true).size() == 0); getCluster().getClusterControl().stopAllServers(ServerType.SCAN_SERVER); - Wait.waitFor(() -> paths.getScanServer(Optional.empty(), Optional.empty(), true).size() == 0); + Wait.waitFor(() -> paths.getScanServer(rg -> true, addr -> true, true).size() == 0); getCluster().getClusterControl().stopAllServers(ServerType.GARBAGE_COLLECTOR); @@ -98,9 +96,8 @@ public class ServiceLockPathsIT extends AccumuloClusterHarness { getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER); - Wait.waitFor(() -> paths.getTabletServer(Optional.empty(), Optional.empty(), true).size() == 0); - Wait.waitFor( - () -> paths.getTabletServer(Optional.empty(), Optional.empty(), false).size() == 2); + Wait.waitFor(() -> paths.getTabletServer(rg -> true, addr -> true, true).size() == 0); + Wait.waitFor(() -> paths.getTabletServer(rg -> true, addr -> true, false).size() == 2); }