This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new c133b44133 Changes to public API to expose resource groups (#4851) c133b44133 is described below commit c133b44133a59ee2ee62d0040f9a8f9e6a19e14e Author: Dave Marion <dlmar...@apache.org> AuthorDate: Tue Oct 8 06:46:37 2024 -0400 Changes to public API to expose resource groups (#4851) Added methods to InstanceOperations that return server information (host, port, resource group, and type) using a new ServerId type. Deprecated older methods on InstanceOperations that just returned the address of the servers as Strings. Modified existing code to use the new methods and added a new test class, InstanceOperationsIT. This work leverages the new ServiceLockPaths class that was added in #4861 to get information about the servers from their respective paths in ZooKeeper. Closes #4849 --- .../core/client/admin/ActiveCompaction.java | 2 + .../core/client/admin/InstanceOperations.java | 69 ++++++ .../core/client/admin/servers/ServerId.java | 122 +++++++++++ .../core/clientImpl/ActiveCompactionImpl.java | 15 +- .../accumulo/core/clientImpl/ClientContext.java | 61 +----- .../core/clientImpl/InstanceOperationsImpl.java | 242 +++++++++++++++++---- .../accumulo/core/lock/ServiceLockPaths.java | 10 + .../accumulo/core/rpc/clients/ManagerClient.java | 14 +- .../org/apache/accumulo/core/summary/Gatherer.java | 8 +- .../util/compaction/ExternalCompactionUtil.java | 23 +- .../org/apache/accumulo/server/util/Admin.java | 9 +- .../org/apache/accumulo/server/util/ECAdmin.java | 17 +- .../java/org/apache/accumulo/server/util/Info.java | 11 +- .../coordinator/CompactionCoordinator.java | 19 +- .../compaction/CompactionCoordinatorTest.java | 5 +- .../java/org/apache/accumulo/monitor/Monitor.java | 62 +++--- .../rest/compactions/external/Compactors.java | 5 +- .../rest/compactions/external/CoordinatorInfo.java | 10 +- .../external/ExternalCompactionInfo.java | 11 +- .../monitor/rest/manager/ManagerResource.java | 13 +- .../monitor/rest/status/StatusResource.java | 7 +- .../org/apache/accumulo/tserver/TabletServer.java | 7 +- .../shell/commands/ActiveCompactionHelper.java | 43 ++-- .../shell/commands/ActiveScanIterator.java | 26 ++- .../accumulo/shell/commands/ListScansCommand.java | 25 ++- .../accumulo/shell/commands/PingCommand.java | 7 +- .../apache/accumulo/test/ComprehensiveBaseIT.java | 4 +- .../apache/accumulo/test/InstanceOperationsIT.java | 102 +++++++++ .../accumulo/test/InterruptibleScannersIT.java | 4 +- .../java/org/apache/accumulo/test/LocatorIT.java | 5 +- .../java/org/apache/accumulo/test/RecoveryIT.java | 7 +- .../test/ScanServerGroupConfigurationIT.java | 19 +- .../accumulo/test/ScanServerMaxLatencyIT.java | 4 +- .../apache/accumulo/test/ScanServerShutdownIT.java | 5 +- .../accumulo/test/TabletServerGivesUpIT.java | 10 +- .../accumulo/test/TabletServerHdfsRestartIT.java | 3 +- .../org/apache/accumulo/test/TotalQueuedIT.java | 8 +- .../apache/accumulo/test/TransportCachingIT.java | 19 +- .../org/apache/accumulo/test/ZombieScanIT.java | 22 +- .../CompactionPriorityQueueMetricsIT.java | 5 +- .../accumulo/test/fate/FateOpsCommandsIT.java | 4 +- .../BalanceInPresenceOfOfflineTableIT.java | 8 +- .../accumulo/test/functional/CompactionIT.java | 21 +- .../test/functional/DebugClientConnectionIT.java | 13 +- .../test/functional/HalfDeadTServerIT.java | 5 +- .../test/functional/ManagerAssignmentIT.java | 21 +- .../test/functional/MemoryStarvedMajCIT.java | 16 +- .../apache/accumulo/test/functional/ScanIdIT.java | 5 +- .../apache/accumulo/test/functional/ScannerIT.java | 14 +- .../test/functional/SessionBlockVerifyIT.java | 4 +- .../accumulo/test/functional/ShutdownIT.java | 14 +- .../test/functional/SimpleBalancerFairnessIT.java | 4 +- .../accumulo/test/functional/TabletMetadataIT.java | 7 +- .../accumulo/test/manager/SuspendedTabletsIT.java | 4 +- .../apache/accumulo/test/shell/ShellServerIT.java | 12 +- 55 files changed, 871 insertions(+), 341 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/ActiveCompaction.java b/core/src/main/java/org/apache/accumulo/core/client/admin/ActiveCompaction.java index 2ca9f69fb5..8aa35ae498 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/ActiveCompaction.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/ActiveCompaction.java @@ -145,6 +145,8 @@ public abstract class ActiveCompaction { String getAddress(); int getPort(); + + String getResourceGroup(); } /** diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java index 9906dd4d36..935fcb0283 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java @@ -22,9 +22,11 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Consumer; +import java.util.function.Predicate; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.data.InstanceId; public interface InstanceOperations { @@ -181,7 +183,9 @@ public interface InstanceOperations { * * @return a list of locations in <code>hostname:port</code> form. * @since 2.1.0 + * @deprecated see {@link #getServers(ServerId.Type)} */ + @Deprecated(since = "4.0.0") List<String> getManagerLocations(); /** @@ -189,34 +193,83 @@ public interface InstanceOperations { * * @return A set of currently active compactors. * @since 2.1.4 + * @deprecated see {@link #getServers(ServerId.Type)} */ + @Deprecated(since = "4.0.0") Set<String> getCompactors(); /** * Returns the locations of the active scan servers * * @return A set of currently active scan servers. + * @deprecated see {@link #getServers(ServerId.Type)} * @since 2.1.0 */ + @Deprecated(since = "4.0.0") Set<String> getScanServers(); /** * List the currently active tablet servers participating in the accumulo instance * * @return A list of currently active tablet servers. + * @deprecated see {@link #getServers(ServerId.Type)} */ + @Deprecated(since = "4.0.0") List<String> getTabletServers(); + /** + * Resolve the server of the given type and address to a ServerId + * + * @param type type of server + * @param resourceGroup group of server, can be null + * @param host host name, cannot be null + * @param port host port + * @return ServerId if found, else null + * @since 4.0.0 + */ + ServerId getServer(ServerId.Type type, String resourceGroup, String host, int port); + + /** + * Returns all servers of the given types. For the Manager, the result will contain only one + * element for the current active Manager. + * + * @return set of servers of the supplied type + * @since 4.0.0 + */ + Set<ServerId> getServers(ServerId.Type type); + + /** + * Returns the servers of a given type that match the given criteria + * + * @return set of servers of the supplied type matching the supplied test + * @since 4.0.0 + */ + Set<ServerId> getServers(ServerId.Type type, Predicate<ServerId> test); + /** * List the active scans on a tablet server. * * @param tserver The tablet server address. This should be of the form * {@code <ip address>:<port>} * @return A list of active scans on tablet server. + * @deprecated see {@link #getActiveScans(ServerId)} */ + @Deprecated(since = "4.0.0") List<ActiveScan> getActiveScans(String tserver) throws AccumuloException, AccumuloSecurityException; + /** + * List the active scans on a server. + * + * @param server server type and address + * @return A stream of active scans on server. + * @throws IllegalArgumentException when the type of the server is not TABLET_SERVER or + * SCAN_SERVER + * @since 4.0.0 + */ + List<ActiveScan> getActiveScans(ServerId server) + throws AccumuloException, AccumuloSecurityException; + /** * List the active compaction running on a TabletServer or Compactor. The server address can be * retrieved using {@link #getCompactors()} or {@link #getTabletServers()}. Use @@ -226,10 +279,26 @@ public interface InstanceOperations { * @param tserver The server address. This should be of the form {@code <ip address>:<port>} * @return the list of active compactions * @since 1.5.0 + * @deprecated see {@link #getActiveCompactions(ServerId server)} */ + @Deprecated(since = "4.0.0") List<ActiveCompaction> getActiveCompactions(String tserver) throws AccumuloException, AccumuloSecurityException; + /** + * List the active compaction running on a TabletServer or Compactor. The server address can be + * retrieved using {@link #getCompactors()} or {@link #getTabletServers()}. Use + * {@link #getActiveCompactions()} to get a list of all compactions running on tservers and + * compactors. + * + * @param server The ServerId object + * @return the list of active compactions + * @throws IllegalArgumentException when the type of the server is not TABLET_SERVER or COMPACTOR + * @since 4.0.0 + */ + List<ActiveCompaction> getActiveCompactions(ServerId server) + throws AccumuloException, AccumuloSecurityException; + /** * List all internal and external compactions running in Accumulo. * diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/servers/ServerId.java b/core/src/main/java/org/apache/accumulo/core/client/admin/servers/ServerId.java new file mode 100644 index 0000000000..b1c7a44d2b --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/servers/ServerId.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.client.admin.servers; + +import java.util.Objects; + +import org.apache.accumulo.core.conf.PropertyType.PortRange; + +import com.google.common.base.Preconditions; + +/** + * Object representing the type, resource group, and address of a server process. + * + * @since 4.0.0 + */ +public final class ServerId implements Comparable<ServerId> { + + /** + * Server process type names that a client can be expected to interact with. Clients are not + * expected to interact directly with the GarbageCollector or Monitor processes. + * + * @since 4.0.0 + */ + public enum Type { + MANAGER, COMPACTOR, SCAN_SERVER, TABLET_SERVER; + } + + private final Type type; + private final String resourceGroup; + private final String host; + private final int port; + + public ServerId(Type type, String resourceGroup, String host, int port) { + super(); + Preconditions.checkArgument(port == 0 || PortRange.VALID_RANGE.contains(port), + "invalid server port value: " + port); + this.type = Objects.requireNonNull(type); + this.resourceGroup = Objects.requireNonNull(resourceGroup); + this.host = Objects.requireNonNull(host); + this.port = port; + } + + public Type getType() { + return type; + } + + public String getResourceGroup() { + return this.resourceGroup; + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + @Override + public int compareTo(ServerId other) { + if (this == other) { + return 0; + } + int result = this.getType().compareTo(other.getType()); + if (result == 0) { + result = this.getResourceGroup().compareTo(other.getResourceGroup()); + if (result == 0) { + result = this.getHost().compareTo(other.getHost()); + if (result == 0) { + result = Integer.compare(this.getPort(), other.getPort()); + } + } + } + return result; + } + + @Override + public int hashCode() { + return Objects.hash(host, port, type, resourceGroup); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + ServerId other = (ServerId) obj; + return 0 == compareTo(other); + } + + @Override + public String toString() { + return "Server [type= " + type + ", resource group= " + resourceGroup + ", host= " + host + + ", port= " + port + "]"; + } + + public String toHostPortString() { + return host + ":" + port; + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ActiveCompactionImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ActiveCompactionImpl.java index d744789fe8..c6c89e6f47 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ActiveCompactionImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ActiveCompactionImpl.java @@ -26,6 +26,7 @@ import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.ActiveCompaction; import org.apache.accumulo.core.client.admin.ActiveCompaction.CompactionHost.Type; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.TabletIdImpl; @@ -42,14 +43,15 @@ public class ActiveCompactionImpl extends ActiveCompaction { private final ClientContext context; private final HostAndPort hostport; private final Type type; + private final String resourceGroup; ActiveCompactionImpl(ClientContext context, - org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction tac, HostAndPort hostport, - CompactionHost.Type type) { + org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction tac, ServerId server) { this.tac = tac; this.context = context; - this.hostport = hostport; - this.type = type; + this.hostport = HostAndPort.fromParts(server.getHost(), server.getPort()); + this.type = server.getType() == ServerId.Type.COMPACTOR ? Type.COMPACTOR : Type.TSERVER; + this.resourceGroup = server.getResourceGroup(); } @Override @@ -140,6 +142,11 @@ public class ActiveCompactionImpl extends ActiveCompaction { public int getPort() { return hostport.getPort(); } + + @Override + public String getResourceGroup() { + return resourceGroup; + } }; } } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index 1369094328..0017809259 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@ -100,7 +100,6 @@ import org.apache.accumulo.core.spi.common.ServiceEnvironment; import org.apache.accumulo.core.spi.scan.ScanServerInfo; import org.apache.accumulo.core.spi.scan.ScanServerSelector; import org.apache.accumulo.core.util.Pair; -import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.cache.Caches; import org.apache.accumulo.core.util.tables.TableZooHelper; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -197,16 +196,16 @@ public class ClientContext implements AccumuloClient { @Override public Supplier<Collection<ScanServerInfo>> getScanServers() { - return () -> ClientContext.this.getScanServers().entrySet().stream() + return () -> getServerPaths().getScanServer(rg -> true, addr -> true, true).stream() .map(entry -> new ScanServerInfo() { @Override public String getAddress() { - return entry.getKey(); + return entry.getServer(); } @Override public String getGroup() { - return entry.getValue().getSecond(); + return entry.getResourceGroup(); } }).collect(Collectors.toSet()); } @@ -400,6 +399,15 @@ public class ClientContext implements AccumuloClient { return batchWriterConfig; } + /** + * @return the scan server selector implementation used for determining which scan servers will be + * used when performing an eventually consistent scan + */ + public ScanServerSelector getScanServerSelector() { + ensureOpen(); + return scanServerSelectorSupplier.get(); + } + /** * @return map of live scan server addresses to lock uuids. */ @@ -425,15 +433,6 @@ public class ClientContext implements AccumuloClient { return liveScanServers; } - /** - * @return the scan server selector implementation used for determining which scan servers will be - * used when performing an eventually consistent scan - */ - public ScanServerSelector getScanServerSelector() { - ensureOpen(); - return scanServerSelectorSupplier.get(); - } - static ConditionalWriterConfig getConditionalWriterConfig(Properties props) { ConditionalWriterConfig conditionalWriterConfig = new ConditionalWriterConfig(); @@ -476,42 +475,6 @@ public class ClientContext implements AccumuloClient { return rpcCreds; } - /** - * Returns the location(s) of the accumulo manager and any redundant servers. - * - * @return a list of locations in "hostname:port" form - */ - public List<String> getManagerLocations() { - ensureOpen(); - var zLockManagerPath = getServerPaths().getManager(true); - - Timer timer = null; - - if (log.isTraceEnabled()) { - log.trace("tid={} Looking up manager location in zookeeper at {}.", - Thread.currentThread().getId(), zLockManagerPath); - timer = Timer.startNew(); - } - - Optional<ServiceLockData> sld = zooCache.getLockData(zLockManagerPath); - String location = null; - if (sld.isPresent()) { - location = sld.orElseThrow().getAddressString(ThriftService.MANAGER); - } - - if (timer != null) { - log.trace("tid={} Found manager at {} in {}", Thread.currentThread().getId(), - (location == null ? "null" : location), - String.format("%.3f secs", timer.elapsed(MILLISECONDS) / 1000.0)); - } - - if (location == null) { - return Collections.emptyList(); - } - - return Collections.singletonList(location); - } - /** * Returns a unique string that identifies this instance of accumulo. * diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java index 8ea74e90bb..fa9dd6ba44 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java @@ -20,7 +20,6 @@ package org.apache.accumulo.core.clientImpl; import static com.google.common.base.Preconditions.checkArgument; import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.stream.Collectors.toList; import static org.apache.accumulo.core.rpc.ThriftUtil.createClient; import static org.apache.accumulo.core.rpc.ThriftUtil.createTransport; import static org.apache.accumulo.core.rpc.ThriftUtil.getClient; @@ -29,31 +28,38 @@ import static org.apache.accumulo.core.util.threads.ThreadPoolNames.INSTANCE_OPS import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.ConcurrentModificationException; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.stream.Collectors; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.ActiveCompaction; -import org.apache.accumulo.core.client.admin.ActiveCompaction.CompactionHost; import org.apache.accumulo.core.client.admin.ActiveScan; import org.apache.accumulo.core.client.admin.InstanceOperations; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.clientImpl.thrift.ConfigurationType; import org.apache.accumulo.core.clientImpl.thrift.TVersionedProperties; import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.conf.DeprecatedPropertyUtil; import org.apache.accumulo.core.data.InstanceId; import org.apache.accumulo.core.fate.zookeeper.ZooCache; +import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; +import org.apache.accumulo.core.lock.ServiceLockPaths.AddressPredicate; +import org.apache.accumulo.core.lock.ServiceLockPaths.ResourceGroupPredicate; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.tabletscan.thrift.TabletScanClientService; @@ -68,6 +74,7 @@ import org.apache.thrift.TException; import org.apache.thrift.transport.TTransport; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; import com.google.common.net.HostAndPort; /** @@ -214,38 +221,46 @@ public class InstanceOperationsImpl implements InstanceOperations { } @Override + @Deprecated(since = "4.0.0") public List<String> getManagerLocations() { - return context.getManagerLocations(); + + Set<ServerId> managers = getServers(ServerId.Type.MANAGER); + if (managers == null || managers.isEmpty()) { + return List.of(); + } else { + return List.of(managers.iterator().next().toHostPortString()); + } } @Override + @Deprecated(since = "4.0.0") public Set<String> getCompactors() { - Set<String> compactors = new HashSet<>(); - ExternalCompactionUtil.getCompactorAddrs(context).values().forEach(addrs -> { - addrs.forEach(hp -> compactors.add(hp.toString())); - }); - return compactors; + Set<String> results = new HashSet<>(); + context.getServerPaths().getCompactor(rg -> true, addr -> true, true) + .forEach(t -> results.add(t.getServer())); + return results; } @Override + @Deprecated(since = "4.0.0") public Set<String> getScanServers() { - return Set.copyOf(context.getScanServers().keySet()); + Set<String> results = new HashSet<>(); + context.getServerPaths().getScanServer(rg -> true, addr -> true, true) + .forEach(t -> results.add(t.getServer())); + return results; } @Override + @Deprecated(since = "4.0.0") public List<String> getTabletServers() { - Set<ServiceLockPath> paths = - context.getServerPaths().getTabletServer(rg -> true, addr -> true, true); List<String> results = new ArrayList<>(); - paths.forEach(p -> { - if (!p.getServer().equals("manager")) { - results.add(p.getServer()); - } - }); + context.getServerPaths().getTabletServer(rg -> true, addr -> true, true) + .forEach(t -> results.add(t.getServer())); return results; } @Override + @Deprecated(since = "4.0.0") public List<ActiveScan> getActiveScans(String tserver) throws AccumuloException, AccumuloSecurityException { final var parsedTserver = HostAndPort.fromString(tserver); @@ -273,6 +288,42 @@ public class InstanceOperationsImpl implements InstanceOperations { } } + @Override + public List<ActiveScan> getActiveScans(ServerId server) + throws AccumuloException, AccumuloSecurityException { + + Objects.requireNonNull(server); + Preconditions.checkArgument( + server.getType() == ServerId.Type.SCAN_SERVER + || server.getType() == ServerId.Type.TABLET_SERVER, + "Server type %s is not %s or %s.", server.getType(), ServerId.Type.SCAN_SERVER, + ServerId.Type.TABLET_SERVER); + + final var parsedTserver = HostAndPort.fromParts(server.getHost(), server.getPort()); + TabletScanClientService.Client rpcClient = null; + try { + rpcClient = getClient(ThriftClientTypes.TABLET_SCAN, parsedTserver, context); + + List<ActiveScan> as = new ArrayList<>(); + for (var activeScan : rpcClient.getActiveScans(TraceUtil.traceInfo(), context.rpcCreds())) { + try { + as.add(new ActiveScanImpl(context, activeScan)); + } catch (TableNotFoundException e) { + throw new AccumuloException(e); + } + } + return as; + } catch (ThriftSecurityException e) { + throw new AccumuloSecurityException(e.user, e.code, e); + } catch (TException e) { + throw new AccumuloException(e); + } finally { + if (rpcClient != null) { + returnClient(rpcClient, context); + } + } + } + @Override public boolean testClassLoad(final String className, final String asTypeName) throws AccumuloException, AccumuloSecurityException { @@ -281,19 +332,42 @@ public class InstanceOperationsImpl implements InstanceOperations { } @Override + @Deprecated public List<ActiveCompaction> getActiveCompactions(String server) throws AccumuloException, AccumuloSecurityException { - final var serverHostAndPort = HostAndPort.fromString(server); + HostAndPort hp = HostAndPort.fromString(server); + + ServerId si = getServer(ServerId.Type.COMPACTOR, null, hp.getHost(), hp.getPort()); + if (si == null) { + si = getServer(ServerId.Type.TABLET_SERVER, null, hp.getHost(), hp.getPort()); + } + if (si == null) { + return List.of(); + } + return getActiveCompactions(si); + } + + @Override + public List<ActiveCompaction> getActiveCompactions(ServerId server) + throws AccumuloException, AccumuloSecurityException { + + Objects.requireNonNull(server); + Preconditions.checkArgument( + server.getType() == ServerId.Type.COMPACTOR + || server.getType() == ServerId.Type.TABLET_SERVER, + "Server type %s is not %s or %s.", server.getType(), ServerId.Type.COMPACTOR, + ServerId.Type.TABLET_SERVER); + + final HostAndPort serverHostAndPort = HostAndPort.fromParts(server.getHost(), server.getPort()); final List<ActiveCompaction> as = new ArrayList<>(); try { - if (context.getTServerLockChecker().doesTabletServerLockExist(server)) { + if (server.getType() == ServerId.Type.TABLET_SERVER) { Client client = null; try { client = getClient(ThriftClientTypes.TABLET_SERVER, serverHostAndPort, context); for (var tac : client.getActiveCompactions(TraceUtil.traceInfo(), context.rpcCreds())) { - as.add(new ActiveCompactionImpl(context, tac, serverHostAndPort, - CompactionHost.Type.TSERVER)); + as.add(new ActiveCompactionImpl(context, tac, server)); } } finally { if (client != null) { @@ -303,8 +377,7 @@ public class InstanceOperationsImpl implements InstanceOperations { } else { // if not a TabletServer address, maybe it's a Compactor for (var tac : ExternalCompactionUtil.getActiveCompaction(serverHostAndPort, context)) { - as.add(new ActiveCompactionImpl(context, tac, serverHostAndPort, - CompactionHost.Type.COMPACTOR)); + as.add(new ActiveCompactionImpl(context, tac, server)); } } return as; @@ -319,31 +392,20 @@ public class InstanceOperationsImpl implements InstanceOperations { public List<ActiveCompaction> getActiveCompactions() throws AccumuloException, AccumuloSecurityException { - Map<String,Set<HostAndPort>> compactors = ExternalCompactionUtil.getCompactorAddrs(context); - List<String> tservers = getTabletServers(); + Set<ServerId> compactionServers = new HashSet<>(); + compactionServers.addAll(getServers(ServerId.Type.COMPACTOR)); + compactionServers.addAll(getServers(ServerId.Type.TABLET_SERVER)); - int numThreads = Math.max(4, Math.min((tservers.size() + compactors.size()) / 10, 256)); + int numThreads = Math.max(4, Math.min((compactionServers.size()) / 10, 256)); var executorService = context.threadPools().getPoolBuilder(INSTANCE_OPS_COMPACTIONS_FINDER_POOL) .numCoreThreads(numThreads).build(); try { List<Future<List<ActiveCompaction>>> futures = new ArrayList<>(); - for (String tserver : tservers) { - futures.add(executorService.submit(() -> getActiveCompactions(tserver))); + for (ServerId server : compactionServers) { + futures.add(executorService.submit(() -> getActiveCompactions(server))); } - compactors.values().forEach(compactorList -> { - for (HostAndPort compactorAddr : compactorList) { - Callable<List<ActiveCompaction>> task = - () -> ExternalCompactionUtil.getActiveCompaction(compactorAddr, context).stream() - .map(tac -> new ActiveCompactionImpl(context, tac, compactorAddr, - CompactionHost.Type.COMPACTOR)) - .collect(toList()); - - futures.add(executorService.submit(task)); - } - }); - List<ActiveCompaction> ret = new ArrayList<>(); for (Future<List<ActiveCompaction>> future : futures) { try { @@ -407,4 +469,104 @@ public class InstanceOperationsImpl implements InstanceOperations { return context.getInstanceID(); } + @Override + public ServerId getServer(ServerId.Type type, String resourceGroup, String host, int port) { + Objects.requireNonNull(type, "type parameter cannot be null"); + Objects.requireNonNull(host, "host parameter cannot be null"); + + final ResourceGroupPredicate rg = + resourceGroup == null ? rgt -> true : rgt -> rgt.equals(resourceGroup); + final AddressPredicate hp = AddressPredicate.exact(HostAndPort.fromParts(host, port)); + + switch (type) { + case COMPACTOR: + Set<ServiceLockPath> compactors = context.getServerPaths().getCompactor(rg, hp, true); + if (compactors.isEmpty()) { + return null; + } else if (compactors.size() == 1) { + return createServerId(type, compactors.iterator().next()); + } else { + throw new IllegalStateException("Multiple servers matching provided address"); + } + case MANAGER: + Set<ServerId> managers = getServers(type, null); + if (managers.isEmpty()) { + return null; + } else { + return managers.iterator().next(); + } + case SCAN_SERVER: + Set<ServiceLockPath> sservers = context.getServerPaths().getScanServer(rg, hp, true); + if (sservers.isEmpty()) { + return null; + } else if (sservers.size() == 1) { + return createServerId(type, sservers.iterator().next()); + } else { + throw new IllegalStateException("Multiple servers matching provided address"); + } + case TABLET_SERVER: + Set<ServiceLockPath> tservers = context.getServerPaths().getScanServer(rg, hp, true); + if (tservers.isEmpty()) { + return null; + } else if (tservers.size() == 1) { + return createServerId(type, tservers.iterator().next()); + } else { + throw new IllegalStateException("Multiple servers matching provided address"); + } + default: + throw new IllegalArgumentException("Unhandled server type: " + type); + } + } + + @Override + public Set<ServerId> getServers(ServerId.Type type) { + return getServers(type, null); + } + + @Override + public Set<ServerId> getServers(ServerId.Type type, Predicate<ServerId> test) { + final Set<ServerId> results = new HashSet<>(); + switch (type) { + case COMPACTOR: + context.getServerPaths().getCompactor(rg -> true, addr -> true, true) + .forEach(c -> results.add(createServerId(type, c))); + break; + case MANAGER: + ServiceLockPath m = context.getServerPaths().getManager(true); + Optional<ServiceLockData> sld = context.getZooCache().getLockData(m); + String location = null; + if (sld.isPresent()) { + location = sld.orElseThrow().getAddressString(ThriftService.MANAGER); + HostAndPort hp = HostAndPort.fromString(location); + results.add(new ServerId(type, Constants.DEFAULT_RESOURCE_GROUP_NAME, hp.getHost(), + hp.getPort())); + } + break; + case SCAN_SERVER: + context.getServerPaths().getScanServer(rg -> true, addr -> true, true) + .forEach(s -> results.add(createServerId(type, s))); + break; + case TABLET_SERVER: + context.getServerPaths().getTabletServer(rg -> true, addr -> true, true) + .forEach(t -> results.add(createServerId(type, t))); + break; + default: + break; + } + if (test == null) { + return Collections.unmodifiableSet(results); + } + return results.stream().filter(test).collect(Collectors.toUnmodifiableSet()); + } + + private ServerId createServerId(ServerId.Type type, ServiceLockPath slp) { + Objects.requireNonNull(type); + Objects.requireNonNull(slp); + String resourceGroup = Objects.requireNonNull(slp.getResourceGroup()); + HostAndPort hp = HostAndPort.fromString(Objects.requireNonNull(slp.getServer())); + String host = hp.getHost(); + int port = hp.getPort(); + return new ServerId(type, resourceGroup, host, port); + } + } diff --git a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java index b60712e92b..13752a9ae1 100644 --- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java +++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java @@ -288,6 +288,11 @@ public class ServiceLockPaths { return get(Constants.ZCOMPACTORS, resourceGroupPredicate, address, withLock); } + /** + * Note that the ServiceLockPath object returned by this method does not populate the server + * attribute. To get the location of the GarbageCollector you will need to parse the lock data at + * the ZooKeeper path. + */ public ServiceLockPath getGarbageCollector(boolean withLock) { Set<ServiceLockPath> results = get(Constants.ZGC_LOCK, rg -> true, addr -> true, withLock); if (results.isEmpty()) { @@ -297,6 +302,11 @@ public class ServiceLockPaths { } } + /** + * Note that the ServiceLockPath object returned by this method does not populate the server + * attribute. The location of the Manager is not in the ZooKeeper path. Instead, use + * InstanceOperations.getServers(ServerId.Type.MANAGER) to get the location. + */ public ServiceLockPath getManager(boolean withLock) { Set<ServiceLockPath> results = get(Constants.ZMANAGER_LOCK, rg -> true, addr -> true, withLock); if (results.isEmpty()) { diff --git a/core/src/main/java/org/apache/accumulo/core/rpc/clients/ManagerClient.java b/core/src/main/java/org/apache/accumulo/core/rpc/clients/ManagerClient.java index d0076e69f1..29057cf3cb 100644 --- a/core/src/main/java/org/apache/accumulo/core/rpc/clients/ManagerClient.java +++ b/core/src/main/java/org/apache/accumulo/core/rpc/clients/ManagerClient.java @@ -21,8 +21,9 @@ package org.apache.accumulo.core.rpc.clients; import static com.google.common.base.Preconditions.checkArgument; import java.net.UnknownHostException; -import java.util.List; +import java.util.Set; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.thrift.TServiceClient; @@ -36,18 +37,14 @@ public interface ManagerClient<C extends TServiceClient> { default C getManagerConnection(Logger log, ThriftClientTypes<C> type, ClientContext context) { checkArgument(context != null, "context is null"); - List<String> locations = context.getManagerLocations(); + Set<ServerId> managers = context.instanceOperations().getServers(ServerId.Type.MANAGER); - if (locations.isEmpty()) { + if (managers == null || managers.isEmpty()) { log.debug("No managers..."); return null; } - HostAndPort manager = HostAndPort.fromString(locations.get(0)); - if (manager.getPort() == 0) { - return null; - } - + HostAndPort manager = HostAndPort.fromString(managers.iterator().next().toHostPortString()); try { // Manager requests can take a long time: don't ever time out return ThriftUtil.getClientNoTimeout(type, manager, context); @@ -60,7 +57,6 @@ public interface ManagerClient<C extends TServiceClient> { log.debug("Failed to connect to manager=" + manager + ", will retry... ", tte); return null; } - } } diff --git a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java index d775d61a0a..d9258a1130 100644 --- a/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java +++ b/core/src/main/java/org/apache/accumulo/core/summary/Gatherer.java @@ -49,6 +49,7 @@ import java.util.stream.Collectors; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.client.summary.SummarizerConfiguration; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.thrift.TInfo; @@ -185,7 +186,7 @@ public class Gatherer { Map<String,Map<StoredTabletFile,List<TRowRange>>> locations = new HashMap<>(); - List<String> tservers = null; + List<ServerId> tservers = null; for (Entry<StoredTabletFile,List<TabletMetadata>> entry : files.entrySet()) { @@ -203,7 +204,8 @@ public class Gatherer { if (location == null) { if (tservers == null) { - tservers = ctx.instanceOperations().getTabletServers(); + tservers = + new ArrayList<>(ctx.instanceOperations().getServers(ServerId.Type.TABLET_SERVER)); Collections.sort(tservers); } @@ -211,7 +213,7 @@ public class Gatherer { // same file (as long as the set of tservers is stable). int idx = Math.abs(Hashing.murmur3_32_fixed() .hashString(entry.getKey().getNormalizedPathStr(), UTF_8).asInt()) % tservers.size(); - location = tservers.get(idx); + location = tservers.get(idx).toHostPortString(); } // merge contiguous ranges diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java index 7dafb38929..15c3a92019 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java @@ -62,10 +62,9 @@ public class ExternalCompactionUtil { private final HostAndPort compactor; private final Future<TExternalCompactionJob> future; - public RunningCompactionFuture(String group, HostAndPort compactor, - Future<TExternalCompactionJob> future) { - this.group = group; - this.compactor = compactor; + public RunningCompactionFuture(ServiceLockPath slp, Future<TExternalCompactionJob> future) { + this.group = slp.getResourceGroup(); + this.compactor = HostAndPort.fromString(slp.getServer()); this.future = future; } @@ -202,11 +201,10 @@ public class ExternalCompactionUtil { final ExecutorService executor = ThreadPools.getServerThreadPools() .getPoolBuilder(COMPACTOR_RUNNING_COMPACTIONS_POOL).numCoreThreads(16).build(); - getCompactorAddrs(context).forEach((group, hp) -> { - hp.forEach(hostAndPort -> { - rcFutures.add(new RunningCompactionFuture(group, hostAndPort, - executor.submit(() -> getRunningCompaction(hostAndPort, context)))); - }); + context.getServerPaths().getCompactor(rg -> true, addr -> true, true).forEach(slp -> { + final HostAndPort hp = HostAndPort.fromString(slp.getServer()); + rcFutures.add(new RunningCompactionFuture(slp, + executor.submit(() -> getRunningCompaction(hp, context)))); }); executor.shutdown(); @@ -231,10 +229,9 @@ public class ExternalCompactionUtil { .getPoolBuilder(COMPACTOR_RUNNING_COMPACTION_IDS_POOL).numCoreThreads(16).build(); List<Future<ExternalCompactionId>> futures = new ArrayList<>(); - getCompactorAddrs(context).forEach((q, hp) -> { - hp.forEach(hostAndPort -> { - futures.add(executor.submit(() -> getRunningCompactionId(hostAndPort, context))); - }); + context.getServerPaths().getCompactor(rg -> true, addr -> true, true).forEach(slp -> { + final HostAndPort hp = HostAndPort.fromString(slp.getServer()); + futures.add(executor.submit(() -> getRunningCompactionId(hp, context))); }); executor.shutdown(); diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java index 7290a53669..66b2601b13 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java @@ -61,6 +61,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.NamespaceNotFoundException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.InstanceOperations; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.DefaultConfiguration; @@ -550,7 +551,7 @@ public class Admin implements KeywordExecutable { InstanceOperations io = context.instanceOperations(); if (args.isEmpty()) { - args = io.getTabletServers(); + io.getServers(ServerId.Type.TABLET_SERVER).forEach(t -> args.add(t.toHostPortString())); } int unreachable = 0; @@ -631,7 +632,7 @@ public class Admin implements KeywordExecutable { private static void stopTabletServer(final ClientContext context, List<String> servers, final boolean force) throws AccumuloException, AccumuloSecurityException { - if (context.getManagerLocations().isEmpty()) { + if (context.instanceOperations().getServers(ServerId.Type.MANAGER).isEmpty()) { log.info("No managers running. Not attempting safe unload of tserver."); return; } @@ -641,10 +642,10 @@ public class Admin implements KeywordExecutable { } final ZooCache zc = context.getZooCache(); - List<String> runningServers; + Set<ServerId> runningServers; for (String server : servers) { - runningServers = context.instanceOperations().getTabletServers(); + runningServers = context.instanceOperations().getServers(ServerId.Type.TABLET_SERVER); if (runningServers.size() == 1 && !force) { log.info("Only 1 tablet server running. Not attempting shutdown of {}", server); return; diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ECAdmin.java b/server/base/src/main/java/org/apache/accumulo/server/util/ECAdmin.java index 416bbf39a2..b55ad71114 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ECAdmin.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ECAdmin.java @@ -18,6 +18,13 @@ */ package org.apache.accumulo.server.util; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService; import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -145,11 +152,15 @@ public class ECAdmin implements KeywordExecutable { } private void listCompactorsByQueue(ServerContext context) { - var groupToCompactorsMap = ExternalCompactionUtil.getCompactorAddrs(context); - if (groupToCompactorsMap.isEmpty()) { + Set<ServerId> compactors = context.instanceOperations().getServers(ServerId.Type.COMPACTOR); + if (compactors.isEmpty()) { System.out.println("No Compactors found."); } else { - groupToCompactorsMap.forEach((q, compactors) -> System.out.println(q + ": " + compactors)); + Map<String,List<ServerId>> m = new TreeMap<>(); + compactors.forEach(csi -> { + m.putIfAbsent(csi.getResourceGroup(), new ArrayList<>()).add(csi); + }); + m.forEach((q, c) -> System.out.println(q + ": " + c)); } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Info.java b/server/base/src/main/java/org/apache/accumulo/server/util/Info.java index b2645a7688..e1f3129106 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/Info.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/Info.java @@ -18,6 +18,9 @@ */ package org.apache.accumulo.server.util; +import java.util.Set; + +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.util.MonitorUtil; import org.apache.accumulo.server.ServerContext; @@ -47,8 +50,14 @@ public class Info implements KeywordExecutable { @Override public void execute(final String[] args) throws KeeperException, InterruptedException { var context = new ServerContext(SiteConfiguration.auto()); + Set<ServerId> managers = context.instanceOperations().getServers(ServerId.Type.MANAGER); + String manager = null; + if (managers != null && !managers.isEmpty()) { + manager = managers.iterator().next().getHost(); + } + System.out.println("monitor: " + MonitorUtil.getLocation(context)); - System.out.println("managers: " + context.getManagerLocations()); + System.out.println("managers: " + manager); System.out.println("zookeepers: " + context.getZooKeepers()); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 722e467120..3dae538065 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -60,6 +60,7 @@ import org.apache.accumulo.core.client.TableDeletedException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.compaction.CompactableFile; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; import org.apache.accumulo.core.clientImpl.thrift.TInfo; import org.apache.accumulo.core.clientImpl.thrift.TableOperation; @@ -311,12 +312,12 @@ public class CompactionCoordinator LOG.info("Shutting down"); } - private Map<String,Set<HostAndPort>> - getIdleCompactors(Map<String,Set<HostAndPort>> runningCompactors) { + private Map<String,Set<HostAndPort>> getIdleCompactors(Set<ServerId> runningCompactors) { final Map<String,Set<HostAndPort>> allCompactors = new HashMap<>(); - runningCompactors - .forEach((group, compactorList) -> allCompactors.put(group, new HashSet<>(compactorList))); + runningCompactors.forEach( + (csi) -> allCompactors.computeIfAbsent(csi.getResourceGroup(), (k) -> new HashSet<>()) + .add(HostAndPort.fromParts(csi.getHost(), csi.getPort()))); final Set<String> emptyQueues = new HashSet<>(); @@ -1009,8 +1010,8 @@ public class CompactionCoordinator } /* Method exists to be overridden in test to hide static method */ - protected Map<String,Set<HostAndPort>> getRunningCompactors() { - return ExternalCompactionUtil.getCompactorAddrs(this.ctx); + protected Set<ServerId> getRunningCompactors() { + return ctx.instanceOperations().getServers(ServerId.Type.COMPACTOR); } /* Method exists to be overridden in test to hide static method */ @@ -1171,11 +1172,11 @@ public class CompactionCoordinator Sets.difference(trackedGroups, TIME_COMPACTOR_LAST_CHECKED.keySet())); } - final Map<String,Set<HostAndPort>> runningCompactors = getRunningCompactors(); + final Set<ServerId> runningCompactors = getRunningCompactors(); final Set<CompactorGroupId> runningCompactorGroups = new HashSet<>(); - runningCompactors.keySet() - .forEach(group -> runningCompactorGroups.add(CompactorGroupId.of(group))); + runningCompactors + .forEach(c -> runningCompactorGroups.add(CompactorGroupId.of(c.getResourceGroup()))); final Set<CompactorGroupId> groupsWithNoCompactors = Sets.difference(groupsInConfiguration, runningCompactorGroups); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java index ae2cd37223..9a4237e619 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java @@ -49,6 +49,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.clientImpl.thrift.TInfo; import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; @@ -190,8 +191,8 @@ public class CompactionCoordinatorTest { } @Override - protected Map<String,Set<HostAndPort>> getRunningCompactors() { - return Map.of(); + protected Set<ServerId> getRunningCompactors() { + return Set.of(); } @Override diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index 1f1cbbc4ce..70a8ba2229 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@ -47,7 +47,9 @@ import jakarta.inject.Singleton; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.cli.ConfigOpts; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService; +import org.apache.accumulo.core.compaction.thrift.CompactorService; import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList; import org.apache.accumulo.core.conf.Property; @@ -668,8 +670,8 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { } if (System.nanoTime() - ecInfoFetchedNanos > fetchTimeNanos) { log.info("User initiated fetch of External Compaction info"); - Map<String,Set<HostAndPort>> compactors = - ExternalCompactionUtil.getCompactorAddrs(getContext()); + Set<ServerId> compactors = + getContext().instanceOperations().getServers(ServerId.Type.COMPACTOR); log.debug("Found compactors: " + compactors); ecInfo.setFetchedTimeMillis(System.currentTimeMillis()); ecInfo.setCompactors(compactors); @@ -734,11 +736,15 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { } private void fetchScans() { - ServerContext context = getContext(); - for (String server : context.instanceOperations().getTabletServers()) { - final HostAndPort parsedServer = HostAndPort.fromString(server); + final ServerContext context = getContext(); + final Set<ServerId> servers = new HashSet<>(); + servers.addAll(context.instanceOperations().getServers(ServerId.Type.SCAN_SERVER)); + servers.addAll(context.instanceOperations().getServers(ServerId.Type.TABLET_SERVER)); + + for (ServerId server : servers) { TabletScanClientService.Client tserver = null; try { + HostAndPort parsedServer = HostAndPort.fromParts(server.getHost(), server.getPort()); tserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedServer, context); List<ActiveScan> scans = tserver.getActiveScans(null, context.rpcCreds()); tserverScans.put(parsedServer, new ScanStats(scans)); @@ -759,38 +765,13 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { tserverIter.remove(); } } - // Scan Servers - for (String server : context.instanceOperations().getScanServers()) { - final HostAndPort parsedServer = HostAndPort.fromString(server); - TabletScanClientService.Client sserver = null; - try { - sserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedServer, context); - List<ActiveScan> scans = sserver.getActiveScans(null, context.rpcCreds()); - sserverScans.put(parsedServer, new ScanStats(scans)); - scansFetchedNanos = System.nanoTime(); - } catch (Exception ex) { - log.error("Failed to get active scans from {}", server, ex); - } finally { - ThriftUtil.returnClient(sserver, context); - } - } - // Age off old scan information - Iterator<Entry<HostAndPort,ScanStats>> sserverIter = sserverScans.entrySet().iterator(); - // clock time used for fetched for date friendly display - now = System.currentTimeMillis(); - while (sserverIter.hasNext()) { - Entry<HostAndPort,ScanStats> entry = sserverIter.next(); - if (now - entry.getValue().fetched > ageOffEntriesMillis) { - sserverIter.remove(); - } - } } private void fetchCompactions() { - ServerContext context = getContext(); + final ServerContext context = getContext(); - for (String server : context.instanceOperations().getTabletServers()) { - final HostAndPort parsedServer = HostAndPort.fromString(server); + for (ServerId server : context.instanceOperations().getServers(ServerId.Type.TABLET_SERVER)) { + final HostAndPort parsedServer = HostAndPort.fromParts(server.getHost(), server.getPort()); Client tserver = null; try { tserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, parsedServer, context); @@ -803,6 +784,21 @@ public class Monitor extends AbstractServer implements HighlyAvailableService { ThriftUtil.returnClient(tserver, context); } } + for (ServerId server : context.instanceOperations().getServers(ServerId.Type.COMPACTOR)) { + final HostAndPort parsedServer = HostAndPort.fromParts(server.getHost(), server.getPort()); + CompactorService.Client compactor = null; + try { + compactor = ThriftUtil.getClient(ThriftClientTypes.COMPACTOR, parsedServer, context); + var compacts = compactor.getActiveCompactions(null, context.rpcCreds()); + allCompactions.put(parsedServer, new CompactionStats(compacts)); + compactsFetchedNanos = System.nanoTime(); + } catch (Exception ex) { + log.debug("Failed to get active compactions from {}", server, ex); + } finally { + ThriftUtil.returnClient(compactor, context); + } + } + // Age off old compaction information var entryIter = allCompactions.entrySet().iterator(); // clock time used for fetched for date friendly display diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/Compactors.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/Compactors.java index 87a899e6ff..e364d1eb5b 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/Compactors.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/Compactors.java @@ -31,9 +31,10 @@ public class Compactors { public final List<CompactorInfo> compactors = new ArrayList<>(); public Compactors(ExternalCompactionInfo ecInfo) { - ecInfo.getCompactors().forEach((q, c) -> { + ecInfo.getCompactors().forEach(csi -> { var fetchedTime = ecInfo.getFetchedTimeMillis(); - c.forEach(hp -> compactors.add(new CompactorInfo(fetchedTime, q, hp.toString()))); + compactors + .add(new CompactorInfo(fetchedTime, csi.getResourceGroup(), csi.toHostPortString())); }); numCompactors = compactors.size(); } diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CoordinatorInfo.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CoordinatorInfo.java index eccda4569e..059347e3f1 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CoordinatorInfo.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CoordinatorInfo.java @@ -21,6 +21,8 @@ package org.apache.accumulo.monitor.rest.compactions.external; import java.util.Optional; import java.util.Set; +import org.apache.accumulo.core.client.admin.servers.ServerId; + import com.google.common.net.HostAndPort; public class CoordinatorInfo { @@ -28,14 +30,14 @@ public class CoordinatorInfo { // Variable names become JSON keys public long lastContact; public String server; - public int numQueues; + public long numQueues; public int numCompactors; public CoordinatorInfo(Optional<HostAndPort> serverOpt, ExternalCompactionInfo ecInfo) { server = serverOpt.map(HostAndPort::toString).orElse("none"); - var groupToCompactors = ecInfo.getCompactors(); - numQueues = groupToCompactors.size(); - numCompactors = groupToCompactors.values().stream().mapToInt(Set::size).sum(); + Set<ServerId> compactors = ecInfo.getCompactors(); + numQueues = compactors.stream().map(csi -> csi.getResourceGroup()).distinct().count(); + numCompactors = compactors.size(); lastContact = System.currentTimeMillis() - ecInfo.getFetchedTimeMillis(); } } diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ExternalCompactionInfo.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ExternalCompactionInfo.java index 251eb16a32..6d0e9f798a 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ExternalCompactionInfo.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ExternalCompactionInfo.java @@ -18,11 +18,12 @@ */ package org.apache.accumulo.monitor.rest.compactions.external; -import java.util.HashMap; -import java.util.Map; +import java.util.HashSet; import java.util.Optional; import java.util.Set; +import org.apache.accumulo.core.client.admin.servers.ServerId; + import com.google.common.net.HostAndPort; /** @@ -31,7 +32,7 @@ import com.google.common.net.HostAndPort; public class ExternalCompactionInfo { private Optional<HostAndPort> coordinatorHost; - private Map<String,Set<HostAndPort>> compactors = new HashMap<>(); + private Set<ServerId> compactors = new HashSet<>(); private long fetchedTimeMillis; public void setCoordinatorHost(Optional<HostAndPort> coordinatorHost) { @@ -42,11 +43,11 @@ public class ExternalCompactionInfo { return coordinatorHost; } - public Map<String,Set<HostAndPort>> getCompactors() { + public Set<ServerId> getCompactors() { return compactors; } - public void setCompactors(Map<String,Set<HostAndPort>> compactors) { + public void setCompactors(Set<ServerId> compactors) { this.compactors = compactors; } diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/manager/ManagerResource.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/manager/ManagerResource.java index c2c20cc3e3..6abfc6e2fb 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/manager/ManagerResource.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/manager/ManagerResource.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import jakarta.inject.Inject; import jakarta.ws.rs.GET; @@ -30,11 +31,11 @@ import jakarta.ws.rs.Path; import jakarta.ws.rs.Produces; import jakarta.ws.rs.core.MediaType; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.gc.thrift.GCStatus; import org.apache.accumulo.core.manager.thrift.DeadServer; import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo; import org.apache.accumulo.core.manager.thrift.TabletServerStatus; -import org.apache.accumulo.core.util.AddressUtil; import org.apache.accumulo.monitor.Monitor; import org.apache.accumulo.monitor.rest.logs.DeadLoggerInformation; import org.apache.accumulo.monitor.rest.logs.DeadLoggerList; @@ -99,10 +100,14 @@ public class ManagerResource { for (DeadServer down : mmi.deadTabletServers) { tservers.add(down.server); } - List<String> managers = monitor.getContext().getManagerLocations(); - String manager = - managers.isEmpty() ? "Down" : AddressUtil.parseAddress(managers.get(0)).getHost(); + Set<ServerId> managers = + monitor.getContext().instanceOperations().getServers(ServerId.Type.MANAGER); + String manager = "Down"; + if (managers != null && !managers.isEmpty()) { + manager = managers.iterator().next().getHost(); + } + int onlineTabletServers = mmi.tServerInfo.size(); int totalTabletServers = tservers.size(); int tablets = monitor.getTotalTabletCount(); diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/status/StatusResource.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/status/StatusResource.java index 775f612a61..1b0140af47 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/status/StatusResource.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/status/StatusResource.java @@ -18,14 +18,13 @@ */ package org.apache.accumulo.monitor.rest.status; -import java.util.List; - import jakarta.inject.Inject; import jakarta.ws.rs.GET; import jakarta.ws.rs.Path; import jakarta.ws.rs.Produces; import jakarta.ws.rs.core.MediaType; +import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo; import org.apache.accumulo.monitor.Monitor; @@ -67,8 +66,8 @@ public class StatusResource { gcStatus = Status.ERROR; } - List<String> managers = monitor.getContext().getManagerLocations(); - managerStatus = managers.isEmpty() ? Status.ERROR : Status.OK; + ServiceLockPath slp = monitor.getContext().getServerPaths().getManager(true); + managerStatus = slp == null ? Status.ERROR : Status.OK; int tServerUp = mmi.getTServerInfoSize(); int tServerDown = mmi.getDeadTabletServersSize(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index cb12d06359..5cb59315fe 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -65,6 +65,7 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.classloader.ClassLoaderUtil; import org.apache.accumulo.core.cli.ConfigOpts; import org.apache.accumulo.core.client.Durability; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.clientImpl.ClientTabletCache; import org.apache.accumulo.core.clientImpl.DurabilityImpl; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -414,11 +415,11 @@ public class TabletServer extends AbstractServer implements TabletHostingServer private HostAndPort getManagerAddress() { try { - List<String> locations = getContext().getManagerLocations(); - if (locations.isEmpty()) { + Set<ServerId> managers = getContext().instanceOperations().getServers(ServerId.Type.MANAGER); + if (managers == null || managers.isEmpty()) { return null; } - return HostAndPort.fromString(locations.get(0)); + return HostAndPort.fromString(managers.iterator().next().toHostPortString()); } catch (Exception e) { log.warn("Failed to obtain manager host " + e); } diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveCompactionHelper.java b/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveCompactionHelper.java index 62dd93f269..044973dd2d 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveCompactionHelper.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveCompactionHelper.java @@ -31,11 +31,16 @@ import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.ActiveCompaction; import org.apache.accumulo.core.client.admin.InstanceOperations; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.util.DurationFormat; -import org.apache.accumulo.shell.Shell; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.net.HostAndPort; class ActiveCompactionHelper { + private static final Logger LOG = LoggerFactory.getLogger(ActiveCompactionHelper.class); private static final Comparator<ActiveCompaction> COMPACTION_AGE_DESCENDING = Comparator.comparingLong(ActiveCompaction::getAge).reversed(); @@ -94,10 +99,10 @@ class ActiveCompactionHelper { try { var dur = new DurationFormat(ac.getAge(), ""); return String.format( - "%21s | %9s | %5s | %6s | %5s | %5s | %15s | %-40s | %5s | %35s | %9s | %s", host, dur, - ac.getType(), ac.getReason(), shortenCount(ac.getEntriesRead()), - shortenCount(ac.getEntriesWritten()), ac.getTable(), ac.getTablet(), - ac.getInputFiles().size(), output, iterList, iterOpts); + "%21s | %21s | %9s | %5s | %6s | %5s | %5s | %15s | %-40s | %5s | %35s | %9s | %s", + ac.getHost().getResourceGroup(), host, dur, ac.getType(), ac.getReason(), + shortenCount(ac.getEntriesRead()), shortenCount(ac.getEntriesWritten()), ac.getTable(), + ac.getTablet(), ac.getInputFiles().size(), output, iterList, iterOpts); } catch (TableNotFoundException e) { return "ERROR " + e.getMessage(); } @@ -105,20 +110,30 @@ class ActiveCompactionHelper { public static Stream<String> appendHeader(Stream<String> stream) { Stream<String> header = Stream.of(String.format( - " %-21s| %-9s | %-5s | %-6s | %-5s | %-5s | %-15s | %-40s | %-5s | %-35s | %-9s | %s", - "SERVER", "AGE", "TYPE", "REASON", "READ", "WROTE", "TABLE", "TABLET", "INPUT", "OUTPUT", - "ITERATORS", "ITERATOR OPTIONS")); + " %-21s| %-21s| %-9s | %-5s | %-6s | %-5s | %-5s | %-15s | %-40s | %-5s | %-35s | %-9s | %s", + "GROUP", "SERVER", "AGE", "TYPE", "REASON", "READ", "WROTE", "TABLE", "TABLET", "INPUT", + "OUTPUT", "ITERATORS", "ITERATOR OPTIONS")); return Stream.concat(header, stream); } public static Stream<String> activeCompactionsForServer(String tserver, InstanceOperations instanceOps) { - try { - return instanceOps.getActiveCompactions(tserver).stream().sorted(COMPACTION_AGE_DESCENDING) - .map(ActiveCompactionHelper::formatActiveCompactionLine); - } catch (Exception e) { - Shell.log.debug("Failed to list active compactions for server {}", tserver, e); - return Stream.of(tserver + " ERROR " + e.getMessage()); + final HostAndPort hp = HostAndPort.fromString(tserver); + ServerId server = + instanceOps.getServer(ServerId.Type.COMPACTOR, null, hp.getHost(), hp.getPort()); + if (server == null) { + server = instanceOps.getServer(ServerId.Type.TABLET_SERVER, null, hp.getHost(), hp.getPort()); + } + if (server == null) { + return Stream.of(); + } else { + try { + return instanceOps.getActiveCompactions(server).stream().sorted(COMPACTION_AGE_DESCENDING) + .map(ActiveCompactionHelper::formatActiveCompactionLine); + } catch (Exception e) { + LOG.debug("Failed to list active compactions for server {}", tserver, e); + return Stream.of(tserver + " ERROR " + e.getMessage()); + } } } diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveScanIterator.java b/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveScanIterator.java index 77288369ce..6eb6a076d8 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveScanIterator.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveScanIterator.java @@ -22,16 +22,18 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Set; import org.apache.accumulo.core.client.admin.ActiveScan; import org.apache.accumulo.core.client.admin.InstanceOperations; import org.apache.accumulo.core.client.admin.ScanType; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.util.DurationFormat; class ActiveScanIterator implements Iterator<String> { private InstanceOperations instanceOps; - private Iterator<String> tsIter; + private Iterator<ServerId> tsIter; private Iterator<String> scansIter; private void readNext() { @@ -39,22 +41,22 @@ class ActiveScanIterator implements Iterator<String> { while (tsIter.hasNext()) { - final String tserver = tsIter.next(); + final ServerId server = tsIter.next(); try { - final List<ActiveScan> asl = instanceOps.getActiveScans(tserver); + final List<ActiveScan> asl = instanceOps.getActiveScans(server); for (ActiveScan as : asl) { var dur = new DurationFormat(as.getAge(), ""); var dur2 = new DurationFormat(as.getLastContactTime(), ""); scans.add(String.format( - "%21s |%21s |%9s |%9s |%7s |%6s |%8s |%8s |%10s |%20s |%10s |%20s |%10s | %s", - tserver, as.getClient(), dur, dur2, as.getState(), as.getType(), as.getUser(), - as.getTable(), as.getColumns(), as.getAuthorizations(), - (as.getType() == ScanType.SINGLE ? as.getTablet() : "N/A"), as.getScanid(), - as.getSsiList(), as.getSsio())); + "%21s |%21s |%21s |%9s |%9s |%7s |%6s |%8s |%8s |%10s |%20s |%10s |%20s |%10s | %s", + server.getResourceGroup(), server.toHostPortString(), as.getClient(), dur, dur2, + as.getState(), as.getType(), as.getUser(), as.getTable(), as.getColumns(), + as.getAuthorizations(), (as.getType() == ScanType.SINGLE ? as.getTablet() : "N/A"), + as.getScanid(), as.getSsiList(), as.getSsio())); } } catch (Exception e) { - scans.add(tserver + " ERROR " + e.getMessage()); + scans.add(server + " ERROR " + e.getMessage()); } if (!scans.isEmpty()) { @@ -65,14 +67,14 @@ class ActiveScanIterator implements Iterator<String> { scansIter = scans.iterator(); } - ActiveScanIterator(List<String> tservers, InstanceOperations instanceOps) { + ActiveScanIterator(Set<ServerId> tservers, InstanceOperations instanceOps) { this.instanceOps = instanceOps; this.tsIter = tservers.iterator(); final String header = String.format( - " %-21s| %-21s| %-9s| %-9s| %-7s| %-6s|" + " %-21s| %-21s| %-21s| %-9s| %-9s| %-7s| %-6s|" + " %-8s| %-8s| %-10s| %-20s| %-10s| %-10s | %-20s | %s", - "TABLET SERVER", "CLIENT", "AGE", "LAST", "STATE", "TYPE", "USER", "TABLE", "COLUMNS", + "GROUP", "SERVER", "CLIENT", "AGE", "LAST", "STATE", "TYPE", "USER", "TABLE", "COLUMNS", "AUTHORIZATIONS", "TABLET", "SCAN ID", "ITERATORS", "ITERATOR OPTIONS"); scansIter = Collections.singletonList(header).iterator(); diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/ListScansCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/ListScansCommand.java index acca3aa838..54ec60055b 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/ListScansCommand.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/ListScansCommand.java @@ -18,16 +18,19 @@ */ package org.apache.accumulo.shell.commands; -import java.util.ArrayList; -import java.util.List; +import java.util.HashSet; +import java.util.Set; import org.apache.accumulo.core.client.admin.InstanceOperations; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.shell.Shell; import org.apache.accumulo.shell.Shell.Command; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; +import com.google.common.net.HostAndPort; + public class ListScansCommand extends Command { private Option tserverOption, disablePaginationOpt; @@ -42,21 +45,23 @@ public class ListScansCommand extends Command { public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws Exception { - List<String> tservers; - final InstanceOperations instanceOps = shellState.getAccumuloClient().instanceOperations(); - final boolean paginate = !cl.hasOption(disablePaginationOpt.getOpt()); + final Set<ServerId> servers = new HashSet<>(); if (cl.hasOption(tserverOption.getOpt())) { - tservers = new ArrayList<>(); - tservers.add(cl.getOptionValue(tserverOption.getOpt())); + String serverAddress = cl.getOptionValue(tserverOption.getOpt()); + final HostAndPort hp = HostAndPort.fromString(serverAddress); + servers + .add(instanceOps.getServer(ServerId.Type.SCAN_SERVER, null, hp.getHost(), hp.getPort())); + servers.add( + instanceOps.getServer(ServerId.Type.TABLET_SERVER, null, hp.getHost(), hp.getPort())); } else { - tservers = instanceOps.getTabletServers(); - tservers.addAll(instanceOps.getScanServers()); + servers.addAll(instanceOps.getServers(ServerId.Type.SCAN_SERVER)); + servers.addAll(instanceOps.getServers(ServerId.Type.TABLET_SERVER)); } - shellState.printLines(new ActiveScanIterator(tservers, instanceOps), paginate); + shellState.printLines(new ActiveScanIterator(servers, instanceOps), paginate); return 0; } diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/PingCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/PingCommand.java index f9fdaa7b7b..006a7e3b12 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/PingCommand.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/PingCommand.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.accumulo.core.client.admin.InstanceOperations; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.shell.Shell; import org.apache.accumulo.shell.Shell.Command; import org.apache.commons.cli.CommandLine; @@ -41,17 +42,17 @@ public class PingCommand extends Command { public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws Exception { - List<String> tservers; + final List<String> tservers = new ArrayList<>(); final InstanceOperations instanceOps = shellState.getAccumuloClient().instanceOperations(); final boolean paginate = !cl.hasOption(disablePaginationOpt.getOpt()); if (cl.hasOption(tserverOption.getOpt())) { - tservers = new ArrayList<>(); tservers.add(cl.getOptionValue(tserverOption.getOpt())); } else { - tservers = instanceOps.getTabletServers(); + instanceOps.getServers(ServerId.Type.TABLET_SERVER) + .forEach(s -> tservers.add(s.toHostPortString())); } shellState.printLines(new PingIterator(tservers, instanceOps), paginate); diff --git a/test/src/main/java/org/apache/accumulo/test/ComprehensiveBaseIT.java b/test/src/main/java/org/apache/accumulo/test/ComprehensiveBaseIT.java index 1dc1c2c0f9..3513e1e90e 100644 --- a/test/src/main/java/org/apache/accumulo/test/ComprehensiveBaseIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ComprehensiveBaseIT.java @@ -69,6 +69,7 @@ import org.apache.accumulo.core.client.admin.TabletAvailability; import org.apache.accumulo.core.client.admin.TabletInformation; import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.client.admin.compaction.CompactionSelector; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.client.rfile.RFile; import org.apache.accumulo.core.client.sample.Sampler; import org.apache.accumulo.core.client.sample.SamplerConfiguration; @@ -475,7 +476,8 @@ public abstract class ComprehensiveBaseIT extends SharedMiniClusterBase { public void invalidInstanceName() { try (var client = Accumulo.newClient().to("fake_instance_name", getCluster().getZooKeepers()) .as(getAdminPrincipal(), getToken()).build()) { - assertThrows(RuntimeException.class, () -> client.instanceOperations().getTabletServers()); + assertThrows(RuntimeException.class, + () -> client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER)); } } diff --git a/test/src/main/java/org/apache/accumulo/test/InstanceOperationsIT.java b/test/src/main/java/org/apache/accumulo/test/InstanceOperationsIT.java new file mode 100644 index 0000000000..5372bad803 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/InstanceOperationsIT.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.admin.InstanceOperations; +import org.apache.accumulo.core.client.admin.servers.ServerId; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.Test; + +public class InstanceOperationsIT extends AccumuloClusterHarness { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "10s"); + cfg.getClusterServerConfiguration().setNumDefaultCompactors(3); + cfg.getClusterServerConfiguration().setNumDefaultScanServers(2); + cfg.getClusterServerConfiguration().setNumDefaultTabletServers(1); + } + + @SuppressWarnings("deprecation") + @Test + public void testGetServers() throws AccumuloException, AccumuloSecurityException { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + InstanceOperations iops = client.instanceOperations(); + + assertEquals(3, iops.getServers(ServerId.Type.COMPACTOR).size()); + assertEquals(3, iops.getCompactors().size()); + validateAddresses(iops.getCompactors(), iops.getServers(ServerId.Type.COMPACTOR)); + + assertEquals(2, iops.getServers(ServerId.Type.SCAN_SERVER).size()); + assertEquals(2, iops.getScanServers().size()); + validateAddresses(iops.getScanServers(), iops.getServers(ServerId.Type.SCAN_SERVER)); + + assertEquals(1, iops.getServers(ServerId.Type.TABLET_SERVER).size()); + assertEquals(1, iops.getTabletServers().size()); + validateAddresses(iops.getTabletServers(), iops.getServers(ServerId.Type.TABLET_SERVER)); + + assertEquals(1, iops.getServers(ServerId.Type.MANAGER).size()); + assertEquals(1, iops.getManagerLocations().size()); + validateAddresses(iops.getManagerLocations(), iops.getServers(ServerId.Type.MANAGER)); + + for (ServerId compactor : iops.getServers(ServerId.Type.COMPACTOR)) { + assertNotNull(iops.getActiveCompactions(compactor)); + assertThrows(IllegalArgumentException.class, () -> iops.getActiveScans(compactor)); + } + + for (ServerId tserver : iops.getServers(ServerId.Type.TABLET_SERVER)) { + assertNotNull(iops.getActiveCompactions(tserver)); + assertNotNull(iops.getActiveScans(tserver)); + } + + for (ServerId sserver : iops.getServers(ServerId.Type.SCAN_SERVER)) { + assertThrows(IllegalArgumentException.class, () -> iops.getActiveCompactions(sserver)); + assertNotNull(iops.getActiveScans(sserver)); + } + + } + } + + private void validateAddresses(Collection<String> e, Set<ServerId> addresses) { + List<String> actual = new ArrayList<>(addresses.size()); + addresses.forEach(a -> actual.add(a.toHostPortString())); + List<String> expected = new ArrayList<>(e); + Collections.sort(expected); + Collections.sort(actual); + assertEquals(actual, expected); + } + +} diff --git a/test/src/main/java/org/apache/accumulo/test/InterruptibleScannersIT.java b/test/src/main/java/org/apache/accumulo/test/InterruptibleScannersIT.java index f93014febf..eced2eb8b4 100644 --- a/test/src/main/java/org/apache/accumulo/test/InterruptibleScannersIT.java +++ b/test/src/main/java/org/apache/accumulo/test/InterruptibleScannersIT.java @@ -29,6 +29,7 @@ import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.ActiveScan; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; @@ -69,7 +70,8 @@ public class InterruptibleScannersIT extends AccumuloClusterHarness { Thread thread = new Thread(() -> { try { // ensure the scan is running: not perfect, the metadata tables could be scanned, too. - String tserver = client.instanceOperations().getTabletServers().iterator().next(); + ServerId tserver = client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER) + .iterator().next(); do { ArrayList<ActiveScan> scans = new ArrayList<>(client.instanceOperations().getActiveScans(tserver)); diff --git a/test/src/main/java/org/apache/accumulo/test/LocatorIT.java b/test/src/main/java/org/apache/accumulo/test/LocatorIT.java index 175e342f5e..ff89e81b92 100644 --- a/test/src/main/java/org/apache/accumulo/test/LocatorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/LocatorIT.java @@ -42,6 +42,7 @@ import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.client.admin.Locations; import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.admin.TabletAvailability; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.TabletId; @@ -115,7 +116,9 @@ public class LocatorIT extends AccumuloClusterHarness { ArrayList<Range> ranges = new ArrayList<>(); - HashSet<String> tservers = new HashSet<>(client.instanceOperations().getTabletServers()); + HashSet<String> tservers = new HashSet<>(); + client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER) + .forEach((s) -> tservers.add(s.toHostPortString())); // locate won't find any locations, tablets are not hosted ranges.add(r1); diff --git a/test/src/main/java/org/apache/accumulo/test/RecoveryIT.java b/test/src/main/java/org/apache/accumulo/test/RecoveryIT.java index 6da4cd9d77..2b245a6ce7 100644 --- a/test/src/main/java/org/apache/accumulo/test/RecoveryIT.java +++ b/test/src/main/java/org/apache/accumulo/test/RecoveryIT.java @@ -18,7 +18,6 @@ */ package org.apache.accumulo.test; -import static org.apache.accumulo.core.util.compaction.ExternalCompactionUtil.getCompactorAddrs; import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; @@ -143,10 +142,12 @@ public class RecoveryIT extends AccumuloClusterHarness { // Stop any running Compactors and ScanServers control.stopAllServers(ServerType.COMPACTOR); - Wait.waitFor(() -> getCompactorAddrs(getCluster().getServerContext()).size() == 0, 60_000); + Wait.waitFor(() -> getServerContext().getServerPaths() + .getCompactor(rg -> true, addr -> true, true).isEmpty(), 60_000); control.stopAllServers(ServerType.SCAN_SERVER); - Wait.waitFor(() -> ((ClientContext) c).getScanServers().size() == 0, 60_000); + Wait.waitFor(() -> getServerContext().getServerPaths() + .getScanServer(rg -> true, addr -> true, true).size() == 0, 60_000); // Kill the TabletServer in resource group that is hosting the table List<Process> procs = control.getTabletServers(RESOURCE_GROUP); diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java index 47365f22ef..91e6868665 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java @@ -26,7 +26,6 @@ import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; -import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Range; @@ -148,9 +147,10 @@ public class ScanServerGroupConfigurationIT extends SharedMiniClusterBase { getCluster().getClusterControl().start(ServerType.SCAN_SERVER, "localhost"); Wait.waitFor(() -> getCluster().getServerContext().getServerPaths() .getScanServer(rg -> true, addr -> true, true).size() == 1, 30_000); - Wait.waitFor(() -> ((ClientContext) client).getScanServers().values().stream().anyMatch( - (p) -> p.getSecond().equals(ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME)) - == true); + Wait.waitFor(() -> getCluster().getServerContext().getServerPaths() + .getScanServer(rg -> rg.equals(ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME), + addr -> true, true) + .size() > 0); assertEquals(ingestedEntryCount, Iterables.size(scanner), "The scan server scanner should have seen all ingested and flushed entries"); @@ -166,11 +166,12 @@ public class ScanServerGroupConfigurationIT extends SharedMiniClusterBase { getCluster().getClusterControl().start(ServerType.SCAN_SERVER); Wait.waitFor(() -> getCluster().getServerContext().getServerPaths() .getScanServer(rg -> true, addr -> true, true).size() == 2, 30_000); - Wait.waitFor(() -> ((ClientContext) client).getScanServers().values().stream().anyMatch( - (p) -> p.getSecond().equals(ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME)) - == true); - Wait.waitFor(() -> ((ClientContext) client).getScanServers().values().stream() - .anyMatch((p) -> p.getSecond().equals("GROUP1")) == true); + Wait.waitFor(() -> getCluster().getServerContext().getServerPaths() + .getScanServer(rg -> rg.equals(ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME), + addr -> true, true) + .size() == 1); + Wait.waitFor(() -> getCluster().getServerContext().getServerPaths() + .getScanServer(rg -> rg.equals("GROUP1"), addr -> true, true).size() == 1); scanner.setExecutionHints(Map.of("scan_type", "use_group1")); assertEquals(ingestedEntryCount + additionalIngest1, Iterables.size(scanner), diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerMaxLatencyIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerMaxLatencyIT.java index 8e2fb7b796..2feb238fd2 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerMaxLatencyIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerMaxLatencyIT.java @@ -40,6 +40,7 @@ import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.ScannerBase; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.TabletAvailability; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.util.Timer; @@ -67,7 +68,8 @@ public class ScanServerMaxLatencyIT extends ConfigurableMacBase { ExecutorService executor = Executors.newCachedThreadPool(); try (var client = Accumulo.newClient().from(getClientProperties()).build()) { - Wait.waitFor(() -> !client.instanceOperations().getScanServers().isEmpty()); + Wait.waitFor( + () -> !client.instanceOperations().getServers(ServerId.Type.SCAN_SERVER).isEmpty()); var ntc = new NewTableConfiguration(); ntc.setProperties(Map.of(Property.TABLE_MINC_COMPACT_MAXAGE.getKey(), "2s")); diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java index 99694f633a..52a831a502 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java @@ -30,7 +30,7 @@ import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; -import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; @@ -115,7 +115,8 @@ public class ScanServerShutdownIT extends SharedMiniClusterBase { } // ScanServer should stop after the 3rd batch scan closes - Wait.waitFor(() -> ((ClientContext) client).getScanServers().size() == 0); + Wait.waitFor( + () -> client.instanceOperations().getServers(ServerId.Type.SCAN_SERVER).isEmpty()); // The ScanServer should clean up the references on normal shutdown Wait.waitFor(() -> ctx.getAmple().scanServerRefs().list().count() == 0); diff --git a/test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java b/test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java index ae4009ee07..faa1584d06 100644 --- a/test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java +++ b/test/src/main/java/org/apache/accumulo/test/TabletServerGivesUpIT.java @@ -25,10 +25,12 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.conf.Configuration; import org.junit.jupiter.api.Test; @@ -53,10 +55,8 @@ public class TabletServerGivesUpIT extends ConfigurableMacBase { @Test public void test() throws Exception { try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { - while (client.instanceOperations().getTabletServers().isEmpty()) { - // Wait until at least one tablet server is up - Thread.sleep(100); - } + Wait.waitFor( + () -> !client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).isEmpty()); final String tableName = getUniqueNames(1)[0]; client.tableOperations().create(tableName); @@ -90,7 +90,7 @@ public class TabletServerGivesUpIT extends ConfigurableMacBase { }); backgroundWriter.start(); // wait for the tserver to give up on writing to the WAL - while (client.instanceOperations().getTabletServers().size() == 1) { + while (client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size() == 1) { Thread.sleep(SECONDS.toMillis(1)); } } diff --git a/test/src/main/java/org/apache/accumulo/test/TabletServerHdfsRestartIT.java b/test/src/main/java/org/apache/accumulo/test/TabletServerHdfsRestartIT.java index 782a9f8bec..de7fccefc5 100644 --- a/test/src/main/java/org/apache/accumulo/test/TabletServerHdfsRestartIT.java +++ b/test/src/main/java/org/apache/accumulo/test/TabletServerHdfsRestartIT.java @@ -25,6 +25,7 @@ import java.time.Duration; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.security.Authorizations; @@ -55,7 +56,7 @@ public class TabletServerHdfsRestartIT extends ConfigurableMacBase { public void test() throws Exception { try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { // wait until a tablet server is up - while (client.instanceOperations().getTabletServers().isEmpty()) { + while (client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).isEmpty()) { Thread.sleep(50); } final String tableName = getUniqueNames(1)[0]; diff --git a/test/src/main/java/org/apache/accumulo/test/TotalQueuedIT.java b/test/src/main/java/org/apache/accumulo/test/TotalQueuedIT.java index 0dd6f24f82..85afbdf2eb 100644 --- a/test/src/main/java/org/apache/accumulo/test/TotalQueuedIT.java +++ b/test/src/main/java/org/apache/accumulo/test/TotalQueuedIT.java @@ -28,6 +28,7 @@ import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.manager.thrift.TabletServerStatus; @@ -127,9 +128,10 @@ public class TotalQueuedIT extends ConfigurableMacBase { private long getSyncs(AccumuloClient c) throws Exception { ServerContext context = getServerContext(); - for (String address : c.instanceOperations().getTabletServers()) { - TabletServerClientService.Client client = ThriftUtil - .getClient(ThriftClientTypes.TABLET_SERVER, HostAndPort.fromString(address), context); + for (ServerId tserver : c.instanceOperations().getServers(ServerId.Type.TABLET_SERVER)) { + TabletServerClientService.Client client = + ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, + HostAndPort.fromParts(tserver.getHost(), tserver.getPort()), context); TabletServerStatus status = client.getTabletServerStatus(null, context.rpcCreds()); return status.syncs; } diff --git a/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java b/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java index 5396db64da..b5e41e4139 100644 --- a/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java +++ b/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java @@ -27,6 +27,7 @@ import java.util.stream.Collectors; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.ThriftTransportKey; import org.apache.accumulo.core.clientImpl.ThriftTransportPool; @@ -35,6 +36,7 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.test.util.Wait; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.junit.jupiter.api.Test; @@ -53,21 +55,18 @@ public class TransportCachingIT extends AccumuloClusterHarness { public void testCachedTransport() throws InterruptedException { try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { - List<String> tservers; - - while ((tservers = client.instanceOperations().getTabletServers()).isEmpty()) { - // sleep until a tablet server is up - Thread.sleep(50); - } + Wait.waitFor( + () -> !client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).isEmpty()); ClientContext context = (ClientContext) client; long rpcTimeout = ConfigurationTypeHelper.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT.getDefaultValue()); - List<ThriftTransportKey> servers = tservers.stream().map(serverStr -> { - return new ThriftTransportKey(ThriftClientTypes.CLIENT, HostAndPort.fromString(serverStr), - rpcTimeout, context); - }).collect(Collectors.toList()); + List<ThriftTransportKey> servers = + client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).stream().map(tsi -> { + return new ThriftTransportKey(ThriftClientTypes.CLIENT, + HostAndPort.fromParts(tsi.getHost(), tsi.getPort()), rpcTimeout, context); + }).collect(Collectors.toList()); // only want to use one server for all subsequent test ThriftTransportKey ttk = servers.get(0); diff --git a/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java b/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java index 60482e020c..7fc9d5c797 100644 --- a/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java @@ -48,6 +48,7 @@ import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.ScanType; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Mutation; @@ -201,7 +202,7 @@ public class ZombieScanIT extends ConfigurableMacBase { // should eventually see the four zombie scans running against four tablets Wait.waitFor(() -> countDistinctTabletsScans(table, c) == 4); - assertEquals(1, c.instanceOperations().getTabletServers().size()); + assertEquals(1, c.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size()); // Start 3 new tablet servers, this should cause the table to balance and the tablets with // zombie scans to unload. The Zombie scans should not prevent the table from unloading. The @@ -210,7 +211,8 @@ public class ZombieScanIT extends ConfigurableMacBase { getCluster().getClusterControl().start(ServerType.TABLET_SERVER, Map.of(), 4); // Wait for all tablets servers - Wait.waitFor(() -> c.instanceOperations().getTabletServers().size() == 4); + Wait.waitFor( + () -> c.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size() == 4); // The table should eventually balance across the 4 tablet servers Wait.waitFor(() -> countLocations(table, c) == 4); @@ -234,15 +236,15 @@ public class ZombieScanIT extends ConfigurableMacBase { // The zombie scans should migrate with the tablets, taking up more scan threads in the // system. - Set<String> tabletSeversWithZombieScans = new HashSet<>(); - for (String tserver : c.instanceOperations().getTabletServers()) { + Set<ServerId> tabletServersWithZombieScans = new HashSet<>(); + for (ServerId tserver : c.instanceOperations().getServers(ServerId.Type.TABLET_SERVER)) { if (c.instanceOperations().getActiveScans(tserver).stream() .flatMap(activeScan -> activeScan.getSsiList().stream()) .anyMatch(scanIters -> scanIters.contains(ZombieIterator.class.getName()))) { - tabletSeversWithZombieScans.add(tserver); + tabletServersWithZombieScans.add(tserver); } } - assertEquals(4, tabletSeversWithZombieScans.size()); + assertEquals(4, tabletServersWithZombieScans.size()); // This check may be outside the scope of this test but works nicely for this check and is // simple enough to include @@ -274,7 +276,7 @@ public class ZombieScanIT extends ConfigurableMacBase { if (serverType == SCAN_SERVER) { // Scans will fall back to tablet servers when no scan servers are present. So wait for scan // servers to show up in zookeeper. Can remove this in 3.1. - Wait.waitFor(() -> !c.instanceOperations().getScanServers().isEmpty()); + Wait.waitFor(() -> !c.instanceOperations().getServers(ServerId.Type.SCAN_SERVER).isEmpty()); } c.tableOperations().create(table); @@ -352,9 +354,9 @@ public class ZombieScanIT extends ConfigurableMacBase { private static long countDistinctTabletsScans(String table, AccumuloClient client) throws Exception { - var tservers = client.instanceOperations().getTabletServers(); + Set<ServerId> tservers = client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER); long count = 0; - for (String tserver : tservers) { + for (ServerId tserver : tservers) { count += client.instanceOperations().getActiveScans(tserver).stream() .filter(activeScan -> activeScan.getTable().equals(table)) .map(activeScan -> activeScan.getTablet()).distinct().count(); @@ -418,7 +420,7 @@ public class ZombieScanIT extends ConfigurableMacBase { throws AccumuloException, AccumuloSecurityException { Set<Long> scanIds = new HashSet<>(); Set<ScanType> scanTypes = new HashSet<>(); - for (String tserver : c.instanceOperations().getTabletServers()) { + for (ServerId tserver : c.instanceOperations().getServers(ServerId.Type.TABLET_SERVER)) { c.instanceOperations().getActiveScans(tserver).forEach(activeScan -> { scanIds.add(activeScan.getScanid()); scanTypes.add(activeScan.getType()); diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java index b976aab38e..d569c4f511 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java @@ -23,7 +23,6 @@ import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUE import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED; import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED; import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_LENGTH; -import static org.apache.accumulo.core.util.compaction.ExternalCompactionUtil.getCompactorAddrs; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -119,7 +118,9 @@ public class CompactionPriorityQueueMetricsIT extends SharedMiniClusterBase { @BeforeEach public void setupMetricsTest() throws Exception { getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); - Wait.waitFor(() -> getCompactorAddrs(getCluster().getServerContext()).isEmpty()); + Wait.waitFor(() -> getCluster().getServerContext().getServerPaths() + .getCompactor(rg -> true, addr -> true, true).isEmpty(), 60_000); + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { tableName = getUniqueNames(1)[0]; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java index 0db83c044a..7676350790 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java @@ -19,7 +19,6 @@ package org.apache.accumulo.test.fate; import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; -import static org.apache.accumulo.core.util.compaction.ExternalCompactionUtil.getCompactorAddrs; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; @@ -101,7 +100,8 @@ public abstract class FateOpsCommandsIT extends ConfigurableMacBase // initiated on starting the manager, causing the test to fail. Stopping the compactor fixes // this issue. getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR); - Wait.waitFor(() -> getCompactorAddrs(getCluster().getServerContext()).isEmpty(), 60_000); + Wait.waitFor(() -> getServerContext().getServerPaths() + .getCompactor(rg -> true, addr -> true, true).isEmpty(), 60_000); } @Test diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java index f38a6d8bff..84dc9cd773 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BalanceInPresenceOfOfflineTableIT.java @@ -36,6 +36,7 @@ import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.TabletAvailability; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.Credentials; import org.apache.accumulo.core.conf.Property; @@ -93,9 +94,12 @@ public class BalanceInPresenceOfOfflineTableIT extends AccumuloClusterHarness { accumuloClient = Accumulo.newClient().from(getClientProps()).build(); // Need at least two tservers -- wait for them to start before failing - Wait.waitFor(() -> accumuloClient.instanceOperations().getTabletServers().size() >= 2); + Wait.waitFor( + () -> accumuloClient.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size() + >= 2); - assumeTrue(accumuloClient.instanceOperations().getTabletServers().size() >= 2, + assumeTrue( + accumuloClient.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size() >= 2, "Not enough tservers to run test"); // set up splits diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java index 3f9d151a8c..687c233345 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java @@ -71,6 +71,7 @@ import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.PluginConfig; import org.apache.accumulo.core.client.admin.compaction.CompactionConfigurer; import org.apache.accumulo.core.client.admin.compaction.CompactionSelector; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; @@ -1176,16 +1177,16 @@ public class CompactionIT extends CompactionBaseIT { compactions.clear(); do { - HostAndPort hp = HostAndPort.fromParts(host.getAddress(), host.getPort()); - client.instanceOperations().getActiveCompactions(hp.toString()).forEach((ac) -> { - try { - if (ac.getTable().equals(table1)) { - compactions.add(ac); - } - } catch (TableNotFoundException e1) { - fail("Table was deleted during test, should not happen"); - } - }); + client.instanceOperations().getActiveCompactions(new ServerId(ServerId.Type.COMPACTOR, + host.getResourceGroup(), host.getAddress(), host.getPort())).forEach((ac) -> { + try { + if (ac.getTable().equals(table1)) { + compactions.add(ac); + } + } catch (TableNotFoundException e1) { + fail("Table was deleted during test, should not happen"); + } + }); Thread.sleep(1000); } while (compactions.isEmpty()); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/DebugClientConnectionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/DebugClientConnectionIT.java index 5bf6004fe1..84699dde5e 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/DebugClientConnectionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/DebugClientConnectionIT.java @@ -23,10 +23,12 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import java.io.UncheckedIOException; -import java.util.List; +import java.util.Iterator; +import java.util.Set; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.rpc.clients.TServerClient; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; @@ -41,12 +43,12 @@ public class DebugClientConnectionIT extends AccumuloClusterHarness { cfg.getClusterServerConfiguration().setNumDefaultTabletServers(2); } - private List<String> tservers = null; + private Set<ServerId> tservers = null; @BeforeEach public void getTServerAddresses() { try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { - tservers = client.instanceOperations().getTabletServers(); + tservers = client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER); } assertNotNull(tservers); assertEquals(2, tservers.size()); @@ -54,11 +56,12 @@ public class DebugClientConnectionIT extends AccumuloClusterHarness { @Test public void testPreferredConnection() throws Exception { - System.setProperty(TServerClient.DEBUG_HOST, tservers.get(0)); + Iterator<ServerId> tsi = tservers.iterator(); + System.setProperty(TServerClient.DEBUG_HOST, tsi.next().toHostPortString()); try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { assertNotNull(client.instanceOperations().getSiteConfiguration()); } - System.setProperty(TServerClient.DEBUG_HOST, tservers.get(1)); + System.setProperty(TServerClient.DEBUG_HOST, tsi.next().toHostPortString()); try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { assertNotNull(client.instanceOperations().getSiteConfiguration()); } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/HalfDeadTServerIT.java b/test/src/main/java/org/apache/accumulo/test/functional/HalfDeadTServerIT.java index ad06b85d37..64b75d9f2e 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/HalfDeadTServerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/HalfDeadTServerIT.java @@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; @@ -156,7 +157,7 @@ public class HalfDeadTServerIT extends ConfigurableMacBase { public String test(int seconds, boolean expectTserverDied) throws Exception { assumeTrue(sharedLibBuilt.get(), "Shared library did not build"); try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { - while (client.instanceOperations().getTabletServers().isEmpty()) { + while (client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).isEmpty()) { // wait until the tserver that we need to kill is running Thread.sleep(50); } @@ -191,7 +192,7 @@ public class HalfDeadTServerIT extends ConfigurableMacBase { cluster.getProcesses().get(ServerType.TABLET_SERVER).iterator().next()); Thread.sleep(SECONDS.toMillis(1)); client.tableOperations().create("test_ingest"); - assertEquals(1, client.instanceOperations().getTabletServers().size()); + assertEquals(1, client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size()); int rows = 100_000; ingest = cluster.exec(TestIngest.class, "-c", cluster.getClientPropsPath(), "--rows", rows + "") diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java index 75b7f6447d..5ed4913f8b 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java @@ -55,6 +55,7 @@ import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.Locations; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.TabletAvailability; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.ClientTabletCache; import org.apache.accumulo.core.conf.Property; @@ -476,7 +477,8 @@ public class ManagerAssignmentIT extends SharedMiniClusterBase { try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { - Wait.waitFor(() -> client.instanceOperations().getTabletServers().size() == 1, + Wait.waitFor( + () -> client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size() == 1, SECONDS.toMillis(60), SECONDS.toMillis(2)); client.tableOperations().create(tableName); @@ -552,14 +554,17 @@ public class ManagerAssignmentIT extends SharedMiniClusterBase { }); - Wait.waitFor(() -> client.instanceOperations().getTabletServers().size() == 0); + Wait.waitFor( + () -> client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size() == 0); // restart the tablet server for the other tests. Need to call stopAllServers // to clear out the process list because we shutdown the TabletServer outside // of MAC control. getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER); getCluster().getClusterControl().start(ServerType.TABLET_SERVER); - Wait.waitFor(() -> client.instanceOperations().getTabletServers().size() == 1, 60_000); + Wait.waitFor( + () -> client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size() == 1, + 60_000); } } @@ -568,7 +573,8 @@ public class ManagerAssignmentIT extends SharedMiniClusterBase { try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { - Wait.waitFor(() -> client.instanceOperations().getTabletServers().size() == 1, + Wait.waitFor( + () -> client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size() == 1, SECONDS.toMillis(60), SECONDS.toMillis(2)); client.instanceOperations().waitForBalance(); @@ -601,14 +607,17 @@ public class ManagerAssignmentIT extends SharedMiniClusterBase { } }); - Wait.waitFor(() -> client.instanceOperations().getTabletServers().size() == 0); + Wait.waitFor( + () -> client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size() == 0); // restart the tablet server for the other tests. Need to call stopAllServers // to clear out the process list because we shutdown the TabletServer outside // of MAC control. getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER); getCluster().getClusterControl().start(ServerType.TABLET_SERVER); - Wait.waitFor(() -> client.instanceOperations().getTabletServers().size() == 1, 60_000); + Wait.waitFor( + () -> client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size() == 1, + 60_000); } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java index 8f9c38af3c..5464bbcf0e 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java @@ -24,10 +24,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.DoubleAdder; @@ -36,6 +34,7 @@ import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.util.UtilWaitThread; @@ -139,15 +138,12 @@ public class MemoryStarvedMajCIT extends SharedMiniClusterBase { ClientContext ctx = (ClientContext) client; - Wait.waitFor(() -> ExternalCompactionUtil.getCompactorAddrs(ctx).size() == 1, 60_000); - Wait.waitFor(() -> ExternalCompactionUtil.getCompactorAddrs(ctx) - .get(Constants.DEFAULT_RESOURCE_GROUP_NAME).size() == 1, 60_000); + Wait.waitFor(() -> ctx.getServerPaths() + .getCompactor(rg -> rg.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME), addr -> true, true) + .size() == 1, 60_000); - Map<String,Set<HostAndPort>> groupedCompactors = - ExternalCompactionUtil.getCompactorAddrs(ctx); - List<HostAndPort> compactorAddresses = - new ArrayList<>(groupedCompactors.get(Constants.DEFAULT_RESOURCE_GROUP_NAME)); - HostAndPort compactorAddr = compactorAddresses.get(0); + ServerId csi = ctx.instanceOperations().getServers(ServerId.Type.COMPACTOR).iterator().next(); + HostAndPort compactorAddr = HostAndPort.fromParts(csi.getHost(), csi.getPort()); TableOperations to = client.tableOperations(); to.create(table); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java index 8ab1f4da36..02519e10af 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java @@ -52,6 +52,7 @@ import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.ActiveScan; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.clientImpl.Namespace; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; @@ -179,11 +180,11 @@ public class ScanIdIT extends AccumuloClusterHarness { // all scanner have reported at least 1 result, so check for unique scan ids. Set<Long> scanIds = new HashSet<>(); - List<String> tservers = client.instanceOperations().getTabletServers(); + Set<ServerId> tservers = client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER); log.debug("tablet servers {}", tservers); - for (String tserver : tservers) { + for (ServerId tserver : tservers) { List<ActiveScan> activeScans = null; for (int i = 0; i < 10; i++) { diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java index ef5089c961..1a19e0cae6 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java @@ -37,6 +37,7 @@ import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; @@ -139,7 +140,8 @@ public class ScannerIT extends ConfigurableMacBase { if (serverType == SCAN_SERVER) { // Scans will fall back to tablet servers when no scan servers are present. So wait for scan // servers to show up in zookeeper. Can remove this in 3.1. - Wait.waitFor(() -> !accumuloClient.instanceOperations().getScanServers().isEmpty()); + Wait.waitFor(() -> !accumuloClient.instanceOperations() + .getServers(ServerId.Type.SCAN_SERVER).isEmpty()); } accumuloClient.tableOperations().create(tableName); @@ -215,18 +217,18 @@ public class ScannerIT extends ConfigurableMacBase { public static long countActiveScans(AccumuloClient c, ServerType serverType, String tableName) throws Exception { - final Collection<String> servers; + final Collection<ServerId> servers; if (serverType == TABLET_SERVER) { - servers = c.instanceOperations().getTabletServers(); + servers = c.instanceOperations().getServers(ServerId.Type.TABLET_SERVER); } else if (serverType == SCAN_SERVER) { - servers = c.instanceOperations().getScanServers(); + servers = c.instanceOperations().getServers(ServerId.Type.SCAN_SERVER); } else { throw new IllegalArgumentException("Unsupported server type " + serverType); } long count = 0; - for (String server : servers) { - count += c.instanceOperations().getActiveScans(server).stream() + for (ServerId tserver : servers) { + count += c.instanceOperations().getActiveScans(tserver).stream() .filter(activeScan -> activeScan.getTable().equals(tableName)).count(); } return count; diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SessionBlockVerifyIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SessionBlockVerifyIT.java index efd3962b8c..2f3288433b 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SessionBlockVerifyIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SessionBlockVerifyIT.java @@ -38,6 +38,7 @@ import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.ActiveScan; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; @@ -145,7 +146,8 @@ public class SessionBlockVerifyIT extends ScanSessionTimeOutIT { int sessionsFound = 0; // we have configured 1 tserver, so we can grab the one and only - String tserver = getOnlyElement(c.instanceOperations().getTabletServers()); + ServerId tserver = + getOnlyElement(c.instanceOperations().getServers(ServerId.Type.TABLET_SERVER)); final List<ActiveScan> scans = c.instanceOperations().getActiveScans(tserver); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ShutdownIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ShutdownIT.java index 60ab7dd662..966c0a82ca 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ShutdownIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ShutdownIT.java @@ -23,11 +23,12 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import java.io.IOException; import java.time.Duration; -import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl; import org.apache.accumulo.server.util.Admin; import org.apache.accumulo.test.TestIngest; @@ -117,13 +118,14 @@ public class ShutdownIT extends ConfigurableMacBase { int x = cluster.exec(TestIngest.class, "-c", cluster.getClientPropsPath(), "--createTable") .getProcess().waitFor(); assertEquals(0, x); - List<String> tabletServers = c.instanceOperations().getTabletServers(); + Set<ServerId> tabletServers = c.instanceOperations().getServers(ServerId.Type.TABLET_SERVER); assertEquals(2, tabletServers.size()); - String doomed = tabletServers.get(0); + ServerId doomed = tabletServers.iterator().next(); log.info("Stopping " + doomed); - assertEquals(0, cluster.exec(Admin.class, "stop", doomed).getProcess().waitFor()); - tabletServers = c.instanceOperations().getTabletServers(); + assertEquals(0, + cluster.exec(Admin.class, "stop", doomed.toHostPortString()).getProcess().waitFor()); + tabletServers = c.instanceOperations().getServers(ServerId.Type.TABLET_SERVER); assertEquals(1, tabletServers.size()); - assertNotEquals(tabletServers.get(0), doomed); + assertNotEquals(tabletServers.iterator().next(), doomed); } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java index 6d8adf332f..c4bd7f1877 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SimpleBalancerFairnessIT.java @@ -24,10 +24,12 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.TreeSet; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.Credentials; @@ -65,7 +67,7 @@ public class SimpleBalancerFairnessIT extends ConfigurableMacBase { TreeSet<Text> splits = TestIngest.getSplitPoints(0, 10000000, NUM_SPLITS); log.info("Creating {} splits", splits.size()); c.tableOperations().addSplits("unused", splits); - List<String> tservers = c.instanceOperations().getTabletServers(); + Set<ServerId> tservers = c.instanceOperations().getServers(ServerId.Type.TABLET_SERVER); TestIngest.IngestParams params = new TestIngest.IngestParams(getClientProperties()); params.rows = 5000; TestIngest.ingest(c, params); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TabletMetadataIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TabletMetadataIT.java index 7f6d07abbb..4e9f9a3c35 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/TabletMetadataIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletMetadataIT.java @@ -27,6 +27,7 @@ import java.util.Set; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.TabletMetadata; @@ -56,7 +57,8 @@ public class TabletMetadataIT extends ConfigurableMacBase { @Test public void getLiveTServersTest() throws Exception { try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { - while (c.instanceOperations().getTabletServers().size() != NUM_TSERVERS) { + while (c.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size() + != NUM_TSERVERS) { log.info("Waiting for tservers to start up..."); Thread.sleep(SECONDS.toMillis(5)); } @@ -67,7 +69,8 @@ public class TabletMetadataIT extends ConfigurableMacBase { getCluster().killProcess(TABLET_SERVER, getCluster().getProcesses().get(TABLET_SERVER).iterator().next()); - while (c.instanceOperations().getTabletServers().size() == NUM_TSERVERS) { + while (c.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size() + == NUM_TSERVERS) { log.info("Waiting for a tserver to die..."); Thread.sleep(SECONDS.toMillis(5)); } diff --git a/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java b/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java index e43d0e621b..7e44e6d57a 100644 --- a/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java @@ -47,6 +47,7 @@ import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.TabletAvailability; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.conf.Property; @@ -316,7 +317,8 @@ public class SuspendedTabletsIT extends AccumuloClusterHarness { try (AccumuloClient client = Accumulo.newClient().from(getCluster().getClientProperties()).build()) { - Wait.waitFor(() -> client.instanceOperations().getTabletServers().size() == 1); + Wait.waitFor( + () -> client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size() == 1); } } diff --git a/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java b/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java index d6506c320b..8ced6d6128 100644 --- a/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java @@ -60,6 +60,7 @@ import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.IteratorSetting.Column; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.TabletAvailability; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.client.sample.RowColumnSampler; import org.apache.accumulo.core.client.sample.RowSampler; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; @@ -1530,7 +1531,7 @@ public class ShellServerIT extends SharedMiniClusterBase { String[] lines = ts.output.get().split("\n"); String last = lines[lines.length - 1]; String[] parts = last.split("\\|"); - assertEquals(12, parts.length); + assertEquals(13, parts.length); ts.exec("deletetable -f " + table, true); } @@ -1669,18 +1670,19 @@ public class ShellServerIT extends SharedMiniClusterBase { continue; } String[] parts = scan.split("\\|"); - assertEquals(14, parts.length, "Expected 14 colums, but found " + parts.length + assertEquals(15, parts.length, "Expected 15 colums, but found " + parts.length + " instead for '" + Arrays.toString(parts) + "'"); - String tserver = parts[0].trim(); + String tserver = parts[1].trim(); // TODO: any way to tell if the client address is accurate? could be local IP, host, // loopback...? String hostPortPattern = ".+:\\d+"; assertMatches(tserver, hostPortPattern); - assertTrue(accumuloClient.instanceOperations().getTabletServers().contains(tserver)); + assertTrue(accumuloClient.instanceOperations().getServers(ServerId.Type.TABLET_SERVER) + .stream().anyMatch((srv) -> srv.toHostPortString().equals(tserver))); String client = parts[1].trim(); assertMatches(client, hostPortPattern); // Scan ID should be a long (throwing an exception if it fails to parse) - Long r = Long.parseLong(parts[11].trim()); + Long r = Long.parseLong(parts[12].trim()); assertNotNull(r); } }