This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 8a19bc9368 Cleanup InstanceOperations API for active scans/compactions (#5102) 8a19bc9368 is described below commit 8a19bc936892806fc8fb4067349cd9aaca2bf99b Author: Christopher L. Shannon <cshan...@apache.org> AuthorDate: Fri Dec 6 13:04:10 2024 -0500 Cleanup InstanceOperations API for active scans/compactions (#5102) This simplifies the API for getting active scans and active compactions. The new API has been updated to just use a collection of servers instead of trying to also support other arguments (like a single server). Other methods were removed if new or deprecated if old, such as getActiveCompactions(). Getting active scans was updated to be multi-threaded like getting active compactions, and they now share the same code. If there are not multiple servers to query then the code now just uses a direct executor to prevent creating a thread pool. The listscans command was updated to take advantage of the new API and to use multiple threads to collect the scans to speed things up. Lastly, the tests have been updated to account for the API changes. This closes #5096 --- .../core/client/admin/ActiveCompaction.java | 6 +- .../accumulo/core/client/admin/ActiveScan.java | 8 ++ .../core/client/admin/InstanceOperations.java | 28 ++---- .../core/clientImpl/ActiveCompactionImpl.java | 2 +- .../accumulo/core/clientImpl/ActiveScanImpl.java | 27 ++++-- .../core/clientImpl/InstanceOperationsImpl.java | 98 ++++++++++--------- .../core/util/threads/ThreadPoolNames.java | 1 + .../shell/commands/ActiveCompactionHelper.java | 22 +++-- .../shell/commands/ActiveScanIterator.java | 104 --------------------- .../accumulo/shell/commands/ListScansCommand.java | 45 ++++++++- .../apache/accumulo/test/InstanceOperationsIT.java | 13 +-- .../accumulo/test/InterruptibleScannersIT.java | 3 +- .../org/apache/accumulo/test/ZombieScanIT.java | 24 ++--- .../compaction/ExternalCompactionTestUtils.java | 11 +++ .../accumulo/test/functional/CompactionIT.java | 7 +- .../test/functional/MemoryStarvedMajCIT.java | 3 +- .../test/functional/MemoryStarvedMinCIT.java | 3 +- .../apache/accumulo/test/functional/ScanIdIT.java | 2 +- .../apache/accumulo/test/functional/ScannerIT.java | 8 +- .../test/functional/SessionBlockVerifyIT.java | 7 +- 20 files changed, 188 insertions(+), 234 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/ActiveCompaction.java b/core/src/main/java/org/apache/accumulo/core/client/admin/ActiveCompaction.java index c3b7831162..3b461e0758 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/ActiveCompaction.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/ActiveCompaction.java @@ -134,9 +134,9 @@ public abstract class ActiveCompaction { public abstract List<IteratorSetting> getIterators(); /** - * Return the host where the compaction is running. + * Return the server where the compaction is running. * - * @since 2.1.0 + * @since 4.0.0 */ - public abstract ServerId getHost(); + public abstract ServerId getServerId(); } diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/ActiveScan.java b/core/src/main/java/org/apache/accumulo/core/client/admin/ActiveScan.java index d294acf6bf..d5659f80a1 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/ActiveScan.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/ActiveScan.java @@ -21,6 +21,7 @@ package org.apache.accumulo.core.client.admin; import java.util.List; import java.util.Map; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.data.Column; import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.security.Authorizations; @@ -96,4 +97,11 @@ public abstract class ActiveScan { * @since 1.5.0 */ public abstract long getIdleTime(); + + /** + * Return the server where the scan is running. + * + * @since 4.0.0 + */ + public abstract ServerId getServerId(); } diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java index b325eaf52b..8545cd3898 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java @@ -259,22 +259,22 @@ public interface InstanceOperations { * @param tserver The tablet server address. This should be of the form * {@code <ip address>:<port>} * @return A list of active scans on tablet server. - * @deprecated see {@link #getActiveScans(ServerId)} + * @deprecated see {@link #getActiveScans(Collection)} */ @Deprecated(since = "4.0.0") List<ActiveScan> getActiveScans(String tserver) throws AccumuloException, AccumuloSecurityException; /** - * List the active scans on a server. + * List the active scans on a collection of servers. * - * @param server server type and address - * @return A stream of active scans on server. + * @param servers Collection of server types and addresses + * @return A list of active scans on the given servers. * @throws IllegalArgumentException when the type of the server is not TABLET_SERVER or * SCAN_SERVER * @since 4.0.0 */ - List<ActiveScan> getActiveScans(ServerId server) + List<ActiveScan> getActiveScans(Collection<ServerId> servers) throws AccumuloException, AccumuloSecurityException; /** @@ -286,32 +286,20 @@ public interface InstanceOperations { * @param tserver The server address. This should be of the form {@code <ip address>:<port>} * @return the list of active compactions * @since 1.5.0 - * @deprecated see {@link #getActiveCompactions(ServerId server)} + * @deprecated see {@link #getActiveCompactions(Collection)} */ @Deprecated(since = "4.0.0") List<ActiveCompaction> getActiveCompactions(String tserver) throws AccumuloException, AccumuloSecurityException; - /** - * List the active compaction running on a TabletServer or Compactor. The server address can be - * retrieved using {@link #getCompactors()} or {@link #getTabletServers()}. Use - * {@link #getActiveCompactions()} to get a list of all compactions running on tservers and - * compactors. - * - * @param server The ServerId object - * @return the list of active compactions - * @throws IllegalArgumentException when the type of the server is not TABLET_SERVER or COMPACTOR - * @since 4.0.0 - */ - List<ActiveCompaction> getActiveCompactions(ServerId server) - throws AccumuloException, AccumuloSecurityException; - /** * List all internal and external compactions running in Accumulo. * * @return the list of active compactions * @since 2.1.0 + * @deprecated see {@link #getActiveCompactions(Collection)} */ + @Deprecated(since = "4.0.0") List<ActiveCompaction> getActiveCompactions() throws AccumuloException, AccumuloSecurityException; /** diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ActiveCompactionImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ActiveCompactionImpl.java index a041220fd8..f27aba8add 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ActiveCompactionImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ActiveCompactionImpl.java @@ -119,7 +119,7 @@ public class ActiveCompactionImpl extends ActiveCompaction { } @Override - public ServerId getHost() { + public ServerId getServerId() { return server; } } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ActiveScanImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ActiveScanImpl.java index a3b177c4a8..cf8c845f5f 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ActiveScanImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ActiveScanImpl.java @@ -21,11 +21,13 @@ package org.apache.accumulo.core.clientImpl; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.ActiveScan; import org.apache.accumulo.core.client.admin.ScanState; import org.apache.accumulo.core.client.admin.ScanType; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.data.Column; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.TabletId; @@ -44,20 +46,21 @@ public class ActiveScanImpl extends ActiveScan { private final long scanId; private final String client; - private String tableName; + private final String tableName; private final long age; private final long idle; - private ScanType type; - private ScanState state; - private KeyExtent extent; - private List<Column> columns; - private List<String> ssiList; - private Map<String,Map<String,String>> ssio; + private final ScanType type; + private final ScanState state; + private final KeyExtent extent; + private final List<Column> columns; + private final List<String> ssiList; + private final Map<String,Map<String,String>> ssio; private final String user; - private Authorizations authorizations; + private final Authorizations authorizations; + private final ServerId server; ActiveScanImpl(ClientContext context, - org.apache.accumulo.core.tabletscan.thrift.ActiveScan activeScan) + org.apache.accumulo.core.tabletscan.thrift.ActiveScan activeScan, ServerId server) throws TableNotFoundException { this.scanId = activeScan.scanId; this.client = activeScan.client; @@ -81,6 +84,7 @@ public class ActiveScanImpl extends ActiveScan { this.ssiList.add(ii.iterName + "=" + ii.priority + "," + ii.className); } this.ssio = activeScan.ssio; + this.server = Objects.requireNonNull(server); } @Override @@ -152,4 +156,9 @@ public class ActiveScanImpl extends ActiveScan { public long getIdleTime() { return idle; } + + @Override + public ServerId getServerId() { + return server; + } } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java index 7886b9739b..e17ca1f0a4 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/InstanceOperationsImpl.java @@ -25,6 +25,7 @@ import static org.apache.accumulo.core.rpc.ThriftUtil.createTransport; import static org.apache.accumulo.core.rpc.ThriftUtil.getClient; import static org.apache.accumulo.core.rpc.ThriftUtil.returnClient; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.INSTANCE_OPS_COMPACTIONS_FINDER_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.INSTANCE_OPS_SCANS_FINDER_POOL; import java.time.Duration; import java.util.ArrayList; @@ -38,6 +39,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.function.BiPredicate; import java.util.function.Consumer; @@ -51,6 +53,7 @@ import org.apache.accumulo.core.client.admin.ActiveCompaction; import org.apache.accumulo.core.client.admin.ActiveScan; import org.apache.accumulo.core.client.admin.InstanceOperations; import org.apache.accumulo.core.client.admin.servers.ServerId; +import org.apache.accumulo.core.client.admin.servers.ServerId.Type; import org.apache.accumulo.core.clientImpl.thrift.ClientService; import org.apache.accumulo.core.clientImpl.thrift.ConfigurationType; import org.apache.accumulo.core.clientImpl.thrift.TVersionedProperties; @@ -72,12 +75,14 @@ import org.apache.accumulo.core.util.LocalityGroupUtil; import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError; import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; +import org.apache.accumulo.core.util.threads.ThreadPoolNames; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransport; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import com.google.common.net.HostAndPort; +import com.google.common.util.concurrent.MoreExecutors; /** * Provides a class for administering the accumulo instance @@ -265,33 +270,18 @@ public class InstanceOperationsImpl implements InstanceOperations { @Deprecated(since = "4.0.0") public List<ActiveScan> getActiveScans(String tserver) throws AccumuloException, AccumuloSecurityException { - final var parsedTserver = HostAndPort.fromString(tserver); - TabletScanClientService.Client client = null; - try { - client = getClient(ThriftClientTypes.TABLET_SCAN, parsedTserver, context); - - List<ActiveScan> as = new ArrayList<>(); - for (var activeScan : client.getActiveScans(TraceUtil.traceInfo(), context.rpcCreds())) { - try { - as.add(new ActiveScanImpl(context, activeScan)); - } catch (TableNotFoundException e) { - throw new AccumuloException(e); - } - } - return as; - } catch (ThriftSecurityException e) { - throw new AccumuloSecurityException(e.user, e.code, e); - } catch (TException e) { - throw new AccumuloException(e); - } finally { - if (client != null) { - returnClient(client, context); - } - } + var si = getServerId(tserver, List.of(Type.TABLET_SERVER, Type.SCAN_SERVER)); + // getActiveScans throws exceptions so we can't use Optional.map() here + return si.isPresent() ? getActiveScans(si.orElseThrow()) : List.of(); } @Override - public List<ActiveScan> getActiveScans(ServerId server) + public List<ActiveScan> getActiveScans(Collection<ServerId> servers) + throws AccumuloException, AccumuloSecurityException { + return queryServers(servers, this::getActiveScans, INSTANCE_OPS_SCANS_FINDER_POOL); + } + + private List<ActiveScan> getActiveScans(ServerId server) throws AccumuloException, AccumuloSecurityException { Objects.requireNonNull(server); @@ -309,7 +299,7 @@ public class InstanceOperationsImpl implements InstanceOperations { List<ActiveScan> as = new ArrayList<>(); for (var activeScan : rpcClient.getActiveScans(TraceUtil.traceInfo(), context.rpcCreds())) { try { - as.add(new ActiveScanImpl(context, activeScan)); + as.add(new ActiveScanImpl(context, activeScan, server)); } catch (TableNotFoundException e) { throw new AccumuloException(e); } @@ -337,21 +327,12 @@ public class InstanceOperationsImpl implements InstanceOperations { @Deprecated public List<ActiveCompaction> getActiveCompactions(String server) throws AccumuloException, AccumuloSecurityException { - - HostAndPort hp = HostAndPort.fromString(server); - - ServerId si = getServer(ServerId.Type.COMPACTOR, null, hp.getHost(), hp.getPort()); - if (si == null) { - si = getServer(ServerId.Type.TABLET_SERVER, null, hp.getHost(), hp.getPort()); - } - if (si == null) { - return List.of(); - } - return getActiveCompactions(si); + var si = getServerId(server, List.of(Type.COMPACTOR, Type.TABLET_SERVER)); + // getActiveCompactions throws exceptions so we can't use Optional.map() here + return si.isPresent() ? getActiveCompactions(si.orElseThrow()) : List.of(); } - @Override - public List<ActiveCompaction> getActiveCompactions(ServerId server) + private List<ActiveCompaction> getActiveCompactions(ServerId server) throws AccumuloException, AccumuloSecurityException { Objects.requireNonNull(server); @@ -391,6 +372,7 @@ public class InstanceOperationsImpl implements InstanceOperations { } @Override + @Deprecated public List<ActiveCompaction> getActiveCompactions() throws AccumuloException, AccumuloSecurityException { @@ -404,19 +386,34 @@ public class InstanceOperationsImpl implements InstanceOperations { @Override public List<ActiveCompaction> getActiveCompactions(Collection<ServerId> compactionServers) throws AccumuloException, AccumuloSecurityException { + return queryServers(compactionServers, this::getActiveCompactions, + INSTANCE_OPS_COMPACTIONS_FINDER_POOL); + } + + private <T> List<T> queryServers(Collection<ServerId> servers, ServerQuery<List<T>> serverQuery, + ThreadPoolNames pool) throws AccumuloException, AccumuloSecurityException { + + final ExecutorService executorService; + // If size 0 or 1 there's no need to create a thread pool + if (servers.isEmpty()) { + return List.of(); + } else if (servers.size() == 1) { + executorService = MoreExecutors.newDirectExecutorService(); + } else { + int numThreads = Math.max(4, Math.min((servers.size()) / 10, 256)); + executorService = + context.threadPools().getPoolBuilder(pool).numCoreThreads(numThreads).build(); + } - int numThreads = Math.max(4, Math.min((compactionServers.size()) / 10, 256)); - var executorService = context.threadPools().getPoolBuilder(INSTANCE_OPS_COMPACTIONS_FINDER_POOL) - .numCoreThreads(numThreads).build(); try { - List<Future<List<ActiveCompaction>>> futures = new ArrayList<>(); + List<Future<List<T>>> futures = new ArrayList<>(); - for (ServerId server : compactionServers) { - futures.add(executorService.submit(() -> getActiveCompactions(server))); + for (ServerId server : servers) { + futures.add(executorService.submit(() -> serverQuery.execute(server))); } - List<ActiveCompaction> ret = new ArrayList<>(); - for (Future<List<ActiveCompaction>> future : futures) { + List<T> ret = new ArrayList<>(); + for (Future<List<T>> future : futures) { try { ret.addAll(future.get()); } catch (InterruptedException | ExecutionException e) { @@ -635,4 +632,13 @@ public class InstanceOperationsImpl implements InstanceOperations { return new ServerId(type, resourceGroup, host, port); } + private Optional<ServerId> getServerId(String server, List<Type> types) { + HostAndPort hp = HostAndPort.fromString(server); + return types.stream().map(type -> getServer(type, null, hp.getHost(), hp.getPort())) + .findFirst(); + } + + interface ServerQuery<T> { + T execute(ServerId server) throws AccumuloException, AccumuloSecurityException; + } } diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java index 87fab3bc91..b57baf0d5d 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java @@ -39,6 +39,7 @@ public enum ThreadPoolNames { SERVICE_LOCK_POOL("accumulo.pool.service.lock"), IMPORT_TABLE_RENAME_POOL("accumulo.pool.import.table.rename"), INSTANCE_OPS_COMPACTIONS_FINDER_POOL("accumulo.pool.instance.ops.active.compactions.finder"), + INSTANCE_OPS_SCANS_FINDER_POOL("accumulo.pool.instance.ops.active.scans.finder"), MANAGER_FATE_POOL("accumulo.pool.manager.fate"), MANAGER_STATUS_POOL("accumulo.pool.manager.status"), MANAGER_UPGRADE_COORDINATOR_METADATA_POOL("accumulo.pool.manager.upgrade.metadata"), diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveCompactionHelper.java b/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveCompactionHelper.java index 732c68db57..87815512a4 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveCompactionHelper.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveCompactionHelper.java @@ -86,7 +86,7 @@ class ActiveCompactionHelper { } String hostSuffix; - switch (ac.getHost().getType()) { + switch (ac.getServerId().getType()) { case TABLET_SERVER: hostSuffix = ""; break; @@ -94,17 +94,17 @@ class ActiveCompactionHelper { hostSuffix = " (ext)"; break; default: - hostSuffix = ac.getHost().getType().name(); + hostSuffix = ac.getServerId().getType().name(); break; } - String host = ac.getHost().toHostPortString() + hostSuffix; + String host = ac.getServerId().toHostPortString() + hostSuffix; try { var dur = new DurationFormat(ac.getAge(), ""); return String.format( "%21s | %21s | %9s | %5s | %6s | %5s | %5s | %15s | %-40s | %5s | %35s | %9s | %s", - ac.getHost().getResourceGroup(), host, dur, ac.getType(), ac.getReason(), + ac.getServerId().getResourceGroup(), host, dur, ac.getType(), ac.getReason(), shortenCount(ac.getEntriesRead()), shortenCount(ac.getEntriesWritten()), ac.getTable(), ac.getTablet(), ac.getInputFiles().size(), output, iterList, iterOpts); } catch (TableNotFoundException e) { @@ -132,7 +132,8 @@ class ActiveCompactionHelper { return Stream.of(); } else { try { - return instanceOps.getActiveCompactions(server).stream().sorted(COMPACTION_AGE_DESCENDING) + return instanceOps.getActiveCompactions(List.of(server)).stream() + .sorted(COMPACTION_AGE_DESCENDING) .map(ActiveCompactionHelper::formatActiveCompactionLine); } catch (Exception e) { LOG.debug("Failed to list active compactions for server {}", tserver, e); @@ -160,16 +161,19 @@ class ActiveCompactionHelper { public static Stream<String> activeCompactions(InstanceOperations instanceOps) { try { - return sortActiveCompactions(instanceOps.getActiveCompactions()); + Set<ServerId> compactionServers = new HashSet<>(); + compactionServers.addAll(instanceOps.getServers(ServerId.Type.COMPACTOR)); + compactionServers.addAll(instanceOps.getServers(ServerId.Type.TABLET_SERVER)); + return sortActiveCompactions(instanceOps.getActiveCompactions(compactionServers)); } catch (AccumuloException | AccumuloSecurityException e) { return Stream.of("ERROR " + e.getMessage()); } } private static Stream<String> sortActiveCompactions(List<ActiveCompaction> activeCompactions) { - Comparator<ActiveCompaction> comparator = - Comparator.comparing((ActiveCompaction ac) -> ac.getHost().getHost()) - .thenComparing(ac -> ac.getHost().getPort()).thenComparing(COMPACTION_AGE_DESCENDING); + Comparator<ActiveCompaction> comparator = Comparator + .comparing((ActiveCompaction ac) -> ac.getServerId().getHost()) + .thenComparing(ac -> ac.getServerId().getPort()).thenComparing(COMPACTION_AGE_DESCENDING); return activeCompactions.stream().sorted(comparator) .map(ActiveCompactionHelper::formatActiveCompactionLine); } diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveScanIterator.java b/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveScanIterator.java deleted file mode 100644 index 6eb6a076d8..0000000000 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveScanIterator.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.shell.commands; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - -import org.apache.accumulo.core.client.admin.ActiveScan; -import org.apache.accumulo.core.client.admin.InstanceOperations; -import org.apache.accumulo.core.client.admin.ScanType; -import org.apache.accumulo.core.client.admin.servers.ServerId; -import org.apache.accumulo.core.util.DurationFormat; - -class ActiveScanIterator implements Iterator<String> { - - private InstanceOperations instanceOps; - private Iterator<ServerId> tsIter; - private Iterator<String> scansIter; - - private void readNext() { - final List<String> scans = new ArrayList<>(); - - while (tsIter.hasNext()) { - - final ServerId server = tsIter.next(); - try { - final List<ActiveScan> asl = instanceOps.getActiveScans(server); - - for (ActiveScan as : asl) { - var dur = new DurationFormat(as.getAge(), ""); - var dur2 = new DurationFormat(as.getLastContactTime(), ""); - scans.add(String.format( - "%21s |%21s |%21s |%9s |%9s |%7s |%6s |%8s |%8s |%10s |%20s |%10s |%20s |%10s | %s", - server.getResourceGroup(), server.toHostPortString(), as.getClient(), dur, dur2, - as.getState(), as.getType(), as.getUser(), as.getTable(), as.getColumns(), - as.getAuthorizations(), (as.getType() == ScanType.SINGLE ? as.getTablet() : "N/A"), - as.getScanid(), as.getSsiList(), as.getSsio())); - } - } catch (Exception e) { - scans.add(server + " ERROR " + e.getMessage()); - } - - if (!scans.isEmpty()) { - break; - } - } - - scansIter = scans.iterator(); - } - - ActiveScanIterator(Set<ServerId> tservers, InstanceOperations instanceOps) { - this.instanceOps = instanceOps; - this.tsIter = tservers.iterator(); - - final String header = String.format( - " %-21s| %-21s| %-21s| %-9s| %-9s| %-7s| %-6s|" - + " %-8s| %-8s| %-10s| %-20s| %-10s| %-10s | %-20s | %s", - "GROUP", "SERVER", "CLIENT", "AGE", "LAST", "STATE", "TYPE", "USER", "TABLE", "COLUMNS", - "AUTHORIZATIONS", "TABLET", "SCAN ID", "ITERATORS", "ITERATOR OPTIONS"); - - scansIter = Collections.singletonList(header).iterator(); - } - - @Override - public boolean hasNext() { - return scansIter.hasNext(); - } - - @Override - public String next() { - final String next = scansIter.next(); - - if (!scansIter.hasNext()) { - readNext(); - } - - return next; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - -} diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/ListScansCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/ListScansCommand.java index c8283cfb22..dbda36a8c0 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/ListScansCommand.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/ListScansCommand.java @@ -18,15 +18,20 @@ */ package org.apache.accumulo.shell.commands; -import java.util.HashSet; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; -import java.util.Set; import java.util.function.BiPredicate; import java.util.function.Predicate; import java.util.regex.Pattern; +import java.util.stream.Stream; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.admin.InstanceOperations; +import org.apache.accumulo.core.client.admin.ScanType; import org.apache.accumulo.core.client.admin.servers.ServerId; +import org.apache.accumulo.core.util.DurationFormat; import org.apache.accumulo.shell.Shell; import org.apache.accumulo.shell.Shell.Command; import org.apache.commons.cli.CommandLine; @@ -34,6 +39,7 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; public class ListScansCommand extends Command { @@ -51,7 +57,7 @@ public class ListScansCommand extends Command { final InstanceOperations instanceOps = shellState.getAccumuloClient().instanceOperations(); final boolean paginate = !cl.hasOption(disablePaginationOpt.getOpt()); - final Set<ServerId> servers = new HashSet<>(); + final List<ServerId> servers = new ArrayList<>(); String serverValue = getServerOptValue(cl, serverOpt, tserverOption); if (serverValue != null || cl.hasOption(rgOpt)) { @@ -66,11 +72,34 @@ public class ListScansCommand extends Command { servers.addAll(instanceOps.getServers(ServerId.Type.TABLET_SERVER)); } - shellState.printLines(new ActiveScanIterator(servers, instanceOps), paginate); + Stream<String> activeScans = getActiveScans(instanceOps, servers); + activeScans = appendHeader(activeScans); + shellState.printLines(activeScans.iterator(), paginate); return 0; } + private Stream<String> getActiveScans(InstanceOperations instanceOps, List<ServerId> servers) { + List<List<ServerId>> partServerIds = Lists.partition(servers, 100); + return partServerIds.stream().flatMap(ids -> { + try { + return instanceOps.getActiveScans(ids).stream().map(as -> { + var dur = new DurationFormat(as.getAge(), ""); + var dur2 = new DurationFormat(as.getLastContactTime(), ""); + var server = as.getServerId(); + return (String.format( + "%21s |%21s |%21s |%9s |%9s |%7s |%6s |%8s |%8s |%10s |%20s |%10s |%20s |%10s | %s", + server.getResourceGroup(), server.toHostPortString(), as.getClient(), dur, dur2, + as.getState(), as.getType(), as.getUser(), as.getTable(), as.getColumns(), + as.getAuthorizations(), (as.getType() == ScanType.SINGLE ? as.getTablet() : "N/A"), + as.getScanid(), as.getSsiList(), as.getSsio())); + }); + } catch (AccumuloException | AccumuloSecurityException e) { + return Stream.of("ERROR " + e.getMessage()); + } + }); + } + @Override public int numArgs() { return 0; @@ -120,4 +149,12 @@ public class ListScansCommand extends Command { .orElse(rg -> true); } + private static Stream<String> appendHeader(Stream<String> stream) { + Stream<String> header = Stream.of(String.format( + " %-21s| %-21s| %-21s| %-9s| %-9s| %-7s| %-6s|" + + " %-8s| %-8s| %-10s| %-20s| %-10s| %-10s | %-20s | %s", + "GROUP", "SERVER", "CLIENT", "AGE", "LAST", "STATE", "TYPE", "USER", "TABLE", "COLUMNS", + "AUTHORIZATIONS", "TABLET", "SCAN ID", "ITERATORS", "ITERATOR OPTIONS")); + return Stream.concat(header, stream); + } } diff --git a/test/src/main/java/org/apache/accumulo/test/InstanceOperationsIT.java b/test/src/main/java/org/apache/accumulo/test/InstanceOperationsIT.java index 5fd3df9ab4..17ec90fa68 100644 --- a/test/src/main/java/org/apache/accumulo/test/InstanceOperationsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/InstanceOperationsIT.java @@ -154,18 +154,19 @@ public class InstanceOperationsIT extends AccumuloClusterHarness { assertEquals(0, iops.getServers(ServerId.Type.MONITOR).size()); for (ServerId compactor : iops.getServers(ServerId.Type.COMPACTOR)) { - assertNotNull(iops.getActiveCompactions(compactor)); - assertThrows(IllegalArgumentException.class, () -> iops.getActiveScans(compactor)); + assertNotNull(iops.getActiveCompactions(List.of(compactor))); + assertThrows(IllegalArgumentException.class, () -> iops.getActiveScans(List.of(compactor))); } for (ServerId tserver : iops.getServers(ServerId.Type.TABLET_SERVER)) { - assertNotNull(iops.getActiveCompactions(tserver)); - assertNotNull(iops.getActiveScans(tserver)); + assertNotNull(iops.getActiveCompactions(List.of(tserver))); + assertNotNull(iops.getActiveScans(List.of(tserver))); } for (ServerId sserver : iops.getServers(ServerId.Type.SCAN_SERVER)) { - assertThrows(IllegalArgumentException.class, () -> iops.getActiveCompactions(sserver)); - assertNotNull(iops.getActiveScans(sserver)); + assertThrows(IllegalArgumentException.class, + () -> iops.getActiveCompactions(List.of(sserver))); + assertNotNull(iops.getActiveScans(List.of(sserver))); } } diff --git a/test/src/main/java/org/apache/accumulo/test/InterruptibleScannersIT.java b/test/src/main/java/org/apache/accumulo/test/InterruptibleScannersIT.java index eced2eb8b4..9705a69f7d 100644 --- a/test/src/main/java/org/apache/accumulo/test/InterruptibleScannersIT.java +++ b/test/src/main/java/org/apache/accumulo/test/InterruptibleScannersIT.java @@ -23,6 +23,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import java.time.Duration; import java.util.ArrayList; +import java.util.List; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; @@ -74,7 +75,7 @@ public class InterruptibleScannersIT extends AccumuloClusterHarness { .iterator().next(); do { ArrayList<ActiveScan> scans = - new ArrayList<>(client.instanceOperations().getActiveScans(tserver)); + new ArrayList<>(client.instanceOperations().getActiveScans(List.of(tserver))); // Remove scans not against our table and not owned by us scans.removeIf(scan -> !getAdminPrincipal().equals(scan.getUser()) || !tableName.equals(scan.getTable())); diff --git a/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java b/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java index 7fc9d5c797..210cd67373 100644 --- a/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java @@ -46,6 +46,7 @@ import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; +import org.apache.accumulo.core.client.admin.ActiveScan; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.ScanType; import org.apache.accumulo.core.client.admin.servers.ServerId; @@ -238,7 +239,7 @@ public class ZombieScanIT extends ConfigurableMacBase { // system. Set<ServerId> tabletServersWithZombieScans = new HashSet<>(); for (ServerId tserver : c.instanceOperations().getServers(ServerId.Type.TABLET_SERVER)) { - if (c.instanceOperations().getActiveScans(tserver).stream() + if (c.instanceOperations().getActiveScans(List.of(tserver)).stream() .flatMap(activeScan -> activeScan.getSsiList().stream()) .anyMatch(scanIters -> scanIters.contains(ZombieIterator.class.getName()))) { tabletServersWithZombieScans.add(tserver); @@ -355,13 +356,9 @@ public class ZombieScanIT extends ConfigurableMacBase { private static long countDistinctTabletsScans(String table, AccumuloClient client) throws Exception { Set<ServerId> tservers = client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER); - long count = 0; - for (ServerId tserver : tservers) { - count += client.instanceOperations().getActiveScans(tserver).stream() - .filter(activeScan -> activeScan.getTable().equals(table)) - .map(activeScan -> activeScan.getTablet()).distinct().count(); - } - return count; + return client.instanceOperations().getActiveScans(tservers).stream() + .filter(activeScan -> activeScan.getTable().equals(table)).map(ActiveScan::getTablet) + .distinct().count(); } private Future<String> startStuckScan(AccumuloClient c, String table, ExecutorService executor, @@ -420,12 +417,11 @@ public class ZombieScanIT extends ConfigurableMacBase { throws AccumuloException, AccumuloSecurityException { Set<Long> scanIds = new HashSet<>(); Set<ScanType> scanTypes = new HashSet<>(); - for (ServerId tserver : c.instanceOperations().getServers(ServerId.Type.TABLET_SERVER)) { - c.instanceOperations().getActiveScans(tserver).forEach(activeScan -> { - scanIds.add(activeScan.getScanid()); - scanTypes.add(activeScan.getType()); - }); - } + var tservers = c.instanceOperations().getServers(ServerId.Type.TABLET_SERVER); + c.instanceOperations().getActiveScans(tservers).forEach(activeScan -> { + scanIds.add(activeScan.getScanid()); + scanTypes.add(activeScan.getType()); + }); assertNotEquals(0, scanIds.size()); scanIds.forEach(id -> assertTrue(id != 0L && id != -1L)); // ensure coverage of both batch and single scans diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java index c3e24401a2..b3ed3c26fa 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java @@ -43,8 +43,11 @@ import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.ActiveCompaction; import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.client.admin.InstanceOperations; import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService; import org.apache.accumulo.core.compaction.thrift.TCompactionState; @@ -389,4 +392,12 @@ public class ExternalCompactionTestUtils { assertEquals(Set.of(), tabletMetadata.getExternalCompactions().keySet()); assertEquals(Set.of(), tabletMetadata.getUserCompactionsRequested()); } + + public static List<ActiveCompaction> getActiveCompactions(InstanceOperations instanceOps) + throws AccumuloException, AccumuloSecurityException { + Set<ServerId> compactionServers = new HashSet<>(); + compactionServers.addAll(instanceOps.getServers(ServerId.Type.COMPACTOR)); + compactionServers.addAll(instanceOps.getServers(ServerId.Type.TABLET_SERVER)); + return instanceOps.getActiveCompactions(compactionServers); + } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java index 0f7acde3b3..b7cf0f4bc8 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java @@ -21,6 +21,7 @@ package org.apache.accumulo.test.functional; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getActiveCompactions; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -1158,7 +1159,7 @@ public class CompactionIT extends CompactionBaseIT { List<ActiveCompaction> compactions = new ArrayList<>(); do { - client.instanceOperations().getActiveCompactions().forEach((ac) -> { + getActiveCompactions(client.instanceOperations()).forEach((ac) -> { try { if (ac.getTable().equals(table1)) { compactions.add(ac); @@ -1171,12 +1172,12 @@ public class CompactionIT extends CompactionBaseIT { } while (compactions.isEmpty()); ActiveCompaction running1 = compactions.get(0); - ServerId host = running1.getHost(); + ServerId host = running1.getServerId(); assertTrue(host.getType() == ServerId.Type.COMPACTOR); compactions.clear(); do { - client.instanceOperations().getActiveCompactions(host).forEach((ac) -> { + client.instanceOperations().getActiveCompactions(List.of(host)).forEach((ac) -> { try { if (ac.getTable().equals(table1)) { compactions.add(ac); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java index 87cf984f90..48b2bdb632 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java @@ -19,6 +19,7 @@ package org.apache.accumulo.test.functional; import static org.apache.accumulo.core.metrics.Metric.MAJC_PAUSED; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getActiveCompactions; import static org.apache.accumulo.test.util.Wait.waitFor; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -188,7 +189,7 @@ public class MemoryStarvedMajCIT extends SharedMiniClusterBase { compactionThread.interrupt(); compactionThread.join(); assertNull(error.get()); - assertTrue(client.instanceOperations().getActiveCompactions().stream() + assertTrue(getActiveCompactions(client.instanceOperations()).stream() .anyMatch(ac -> ac.getPausedCount() > 0)); } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMinCIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMinCIT.java index f3d957f437..5ceee6c601 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMinCIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMinCIT.java @@ -19,6 +19,7 @@ package org.apache.accumulo.test.functional; import static org.apache.accumulo.core.metrics.Metric.MINC_PAUSED; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.getActiveCompactions; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -155,7 +156,7 @@ public class MemoryStarvedMinCIT extends SharedMiniClusterBase { ingestThread.interrupt(); ingestThread.join(); assertNull(error.get()); - assertTrue(client.instanceOperations().getActiveCompactions().stream() + assertTrue(getActiveCompactions(client.instanceOperations()).stream() .anyMatch(ac -> ac.getPausedCount() > 0)); } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java index 02519e10af..5b974944d8 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java @@ -189,7 +189,7 @@ public class ScanIdIT extends AccumuloClusterHarness { List<ActiveScan> activeScans = null; for (int i = 0; i < 10; i++) { try { - activeScans = client.instanceOperations().getActiveScans(tserver); + activeScans = client.instanceOperations().getActiveScans(List.of(tserver)); break; } catch (AccumuloException e) { if (e.getCause() instanceof TableNotFoundException) { diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java index 1a19e0cae6..3458ceb9b3 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java @@ -226,11 +226,7 @@ public class ScannerIT extends ConfigurableMacBase { throw new IllegalArgumentException("Unsupported server type " + serverType); } - long count = 0; - for (ServerId tserver : servers) { - count += c.instanceOperations().getActiveScans(tserver).stream() - .filter(activeScan -> activeScan.getTable().equals(tableName)).count(); - } - return count; + return c.instanceOperations().getActiveScans(servers).stream() + .filter(activeScan -> activeScan.getTable().equals(tableName)).count(); } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SessionBlockVerifyIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SessionBlockVerifyIT.java index 2f3288433b..4754e41885 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SessionBlockVerifyIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SessionBlockVerifyIT.java @@ -145,11 +145,8 @@ public class SessionBlockVerifyIT extends ScanSessionTimeOutIT { } int sessionsFound = 0; - // we have configured 1 tserver, so we can grab the one and only - ServerId tserver = - getOnlyElement(c.instanceOperations().getServers(ServerId.Type.TABLET_SERVER)); - - final List<ActiveScan> scans = c.instanceOperations().getActiveScans(tserver); + var tservers = c.instanceOperations().getServers(ServerId.Type.TABLET_SERVER); + final List<ActiveScan> scans = c.instanceOperations().getActiveScans(tservers); for (ActiveScan scan : scans) { // only here to minimize chance of seeing meta extent scans