This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new f3b7a300a4 adapts new scan session tests to also run on scan servers (#4844) f3b7a300a4 is described below commit f3b7a300a41b23d45991edecbc19695c1c790dbb Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri Sep 13 12:31:17 2024 -0400 adapts new scan session tests to also run on scan servers (#4844) New ITs were added in #4840 and #4841 for zombie scan metrics and ensuring that scanners close scan sessions. Ths commit adapts these new test to also run against scan servers. For the zombie scan case found a bug in the scan sever where the session manager cleanup thread would get stuck forever in SnapshotTablet.close() preventing any further session cleanup. Fixed this bug by changing SnapshotTablet.close() to no longer wait forever for scans to finish. --- .../core/client/admin/InstanceOperations.java | 1 + .../accumulo/tserver/tablet/SnapshotTablet.java | 15 ++--- .../org/apache/accumulo/tserver/tablet/Tablet.java | 9 --- .../apache/accumulo/tserver/tablet/TabletBase.java | 9 +++ .../org/apache/accumulo/test/CloseScannerIT.java | 9 ++- .../org/apache/accumulo/test/ZombieScanIT.java | 61 +++++++++++++------ .../test/functional/ScanSessionTimeOutIT.java | 11 ++-- .../apache/accumulo/test/functional/ScannerIT.java | 71 +++++++++++++++++----- 8 files changed, 125 insertions(+), 61 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java index 033dacbe94..cce51b7981 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/client/admin/InstanceOperations.java @@ -176,6 +176,7 @@ public interface InstanceOperations { * Returns the locations of the active scan servers * * @return A set of currently active scan servers. + * @since 2.1.0 */ Set<String> getScanServers(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SnapshotTablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SnapshotTablet.java index 1f0f754082..985f7339dd 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SnapshotTablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/SnapshotTablet.java @@ -127,16 +127,11 @@ public class SnapshotTablet extends TabletBase { for (ScanDataSource activeScan : activeScans) { activeScan.interrupt(); - } - - // wait for reads and writes to complete - while (!activeScans.isEmpty()) { - try { - log.debug("Closing tablet {} waiting for {} scans", extent, activeScans.size()); - this.wait(50); - } catch (InterruptedException e) { - log.error(e.toString()); - } + // The tablet server will wait for this to return true because it wants to be sure no scans + // will run after the tablet is unloaded to ensure immediate consistency. Scans running in the + // scan server are eventually consistent, so there is no need to wait here. Disallow future + // activity on the scan session so it can be cleaned up eventually, but do not need to wait. + disallowNewReservations(activeScan.getScanParameters()); } } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index f229799f6d..70c200bb7f 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -1131,15 +1131,6 @@ public class Tablet extends TabletBase { } } - private boolean disallowNewReservations(ScanParameters scanParameters) { - var scanSessId = scanParameters.getScanSessionId(); - if (scanSessId != null) { - return getTabletServer().getSessionManager().disallowNewReservations(scanSessId); - } else { - return true; - } - } - private void closeConsistencyCheck() { long num = tabletMemory.getMemTable().getNumEntries(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java index 12890106d4..dcc54e9da7 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletBase.java @@ -110,6 +110,15 @@ public abstract class TabletBase { } } + protected boolean disallowNewReservations(ScanParameters scanParameters) { + var scanSessId = scanParameters.getScanSessionId(); + if (scanSessId != null) { + return server.getSessionManager().disallowNewReservations(scanSessId); + } else { + return true; + } + } + public abstract boolean isClosed(); public abstract SortedMap<StoredTabletFile,DataFileValue> getDatafiles(); diff --git a/test/src/main/java/org/apache/accumulo/test/CloseScannerIT.java b/test/src/main/java/org/apache/accumulo/test/CloseScannerIT.java index f6c157fd2b..6a4c4f6812 100644 --- a/test/src/main/java/org/apache/accumulo/test/CloseScannerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/CloseScannerIT.java @@ -27,11 +27,13 @@ import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.IsolatedScanner; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ScannerBase; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.functional.ReadWriteIT; import org.apache.accumulo.test.util.Wait; @@ -55,7 +57,8 @@ public class CloseScannerIT extends AccumuloClusterHarness { } /** - * {@link org.apache.accumulo.test.functional.ScannerIT#testSessionCleanup()} is a similar test. + * {@link org.apache.accumulo.test.functional.ScannerIT#testSessionCleanup(ScannerBase.ConsistencyLevel)} + * is a similar test. */ @Test public void testManyScans() throws Exception { @@ -104,8 +107,8 @@ public class CloseScannerIT extends AccumuloClusterHarness { // time the test will allow to 3s+13s=16s which is less than the 20s when idle session clean // starts. - Wait.waitFor(() -> countActiveScans(client, tableName) < 1, 13000, 250, - "Found active scans after closing all scanners. Expected to find no scans"); + Wait.waitFor(() -> countActiveScans(client, ServerType.TABLET_SERVER, tableName) < 1, 13000, + 250, "Found active scans after closing all scanners. Expected to find no scans"); var elasped = timer.elapsed(TimeUnit.MILLISECONDS); if (elasped > 20000) { diff --git a/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java b/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java index e8c3a47a3e..bd0be182c0 100644 --- a/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java @@ -18,6 +18,9 @@ */ package org.apache.accumulo.test; +import static org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel.IMMEDIATE; +import static org.apache.accumulo.minicluster.ServerType.SCAN_SERVER; +import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER; import static org.apache.accumulo.test.functional.ScannerIT.countActiveScans; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -38,6 +41,7 @@ import java.util.stream.Collectors; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.conf.Property; @@ -58,6 +62,8 @@ import org.apache.hadoop.io.Text; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; public class ZombieScanIT extends ConfigurableMacBase { @@ -242,8 +248,9 @@ public class ZombieScanIT extends ConfigurableMacBase { /** * Create some zombie scans and ensure metrics for them show up. */ - @Test - public void testMetrics() throws Exception { + @ParameterizedTest + @EnumSource + public void testMetrics(ConsistencyLevel consistency) throws Exception { Wait.waitFor(() -> { var zsmc = getZombieScansMetric(); @@ -252,8 +259,18 @@ public class ZombieScanIT extends ConfigurableMacBase { String table = getUniqueNames(1)[0]; + final ServerType serverType = consistency == IMMEDIATE ? TABLET_SERVER : SCAN_SERVER; + try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { + if (serverType == SCAN_SERVER) { + getCluster().getConfig().setNumScanServers(1); + getCluster().getClusterControl().startAllServers(SCAN_SERVER); + // Scans will fall back to tablet servers when no scan servers are present. So wait for scan + // servers to show up in zookeeper. Can remove this in 3.1. + Wait.waitFor(() -> !c.instanceOperations().getScanServers().isEmpty()); + } + c.tableOperations().create(table); var executor = Executors.newCachedThreadPool(); @@ -262,21 +279,21 @@ public class ZombieScanIT extends ConfigurableMacBase { List<Future<String>> futures = new ArrayList<>(); for (var row : List.of("2", "4")) { // start a scan with an iterator that gets stuck and can not be interrupted - futures.add(startStuckScan(c, table, executor, row, false)); + futures.add(startStuckScan(c, table, executor, row, false, consistency)); // start a scan with an iterator that gets stuck and can be interrupted - futures.add(startStuckScan(c, table, executor, row, true)); + futures.add(startStuckScan(c, table, executor, row, true, consistency)); } // start four stuck scans, using a batch scanner, that should never return data for (var row : List.of("6", "8")) { // start a scan with an iterator that gets stuck and can not be interrupted - futures.add(startStuckBatchScan(c, table, executor, row, false)); + futures.add(startStuckBatchScan(c, table, executor, row, false, consistency)); // start a scan with an iterator that gets stuck and can be interrupted - futures.add(startStuckBatchScan(c, table, executor, row, true)); + futures.add(startStuckBatchScan(c, table, executor, row, true, consistency)); } // should eventually see the eight stuck scans running - Wait.waitFor(() -> countActiveScans(c, table) == 8); + Wait.waitFor(() -> countActiveScans(c, serverType, table) == 8); // Cancel the scan threads. This will cause the sessions on the server side to timeout and // become inactive. The stuck threads on the server side related to the timed out sessions @@ -287,20 +304,20 @@ public class ZombieScanIT extends ConfigurableMacBase { }); // Four of the eight running scans should respond to thread interrupts and exit - Wait.waitFor(() -> countActiveScans(c, table) == 4); + Wait.waitFor(() -> countActiveScans(c, serverType, table) == 4); Wait.waitFor(() -> getZombieScansMetric() == 4); - assertEquals(4, countActiveScans(c, table)); + assertEquals(4, countActiveScans(c, serverType, table)); // start four more stuck scans with two that will ignore interrupts futures.clear(); - futures.add(startStuckScan(c, table, executor, "0", false)); - futures.add(startStuckScan(c, table, executor, "0", true)); - futures.add(startStuckBatchScan(c, table, executor, "99", false)); - futures.add(startStuckBatchScan(c, table, executor, "0", true)); + futures.add(startStuckScan(c, table, executor, "0", false, consistency)); + futures.add(startStuckScan(c, table, executor, "0", true, consistency)); + futures.add(startStuckBatchScan(c, table, executor, "99", false, consistency)); + futures.add(startStuckBatchScan(c, table, executor, "0", true, consistency)); - Wait.waitFor(() -> countActiveScans(c, table) == 8); + Wait.waitFor(() -> countActiveScans(c, serverType, table) == 8); // Cancel the client side scan threads. Should cause the server side threads to be // interrupted. @@ -310,15 +327,19 @@ public class ZombieScanIT extends ConfigurableMacBase { }); // Two of the stuck threads should respond to interrupts on the server side and exit. - Wait.waitFor(() -> countActiveScans(c, table) == 6); + Wait.waitFor(() -> countActiveScans(c, serverType, table) == 6); Wait.waitFor(() -> getZombieScansMetric() == 6); - assertEquals(6, countActiveScans(c, table)); + assertEquals(6, countActiveScans(c, serverType, table)); executor.shutdownNow(); + } finally { + if (serverType == SCAN_SERVER) { + getCluster().getConfig().setNumScanServers(0); + getCluster().getClusterControl().stopAllServers(SCAN_SERVER); + } } - } private static long countLocations(String table, AccumuloClient client) throws Exception { @@ -341,7 +362,7 @@ public class ZombieScanIT extends ConfigurableMacBase { } private Future<String> startStuckScan(AccumuloClient c, String table, ExecutorService executor, - String row, boolean canInterrupt) { + String row, boolean canInterrupt, ConsistencyLevel consistency) { return executor.submit(() -> { try (var scanner = c.createScanner(table)) { String className; @@ -351,6 +372,7 @@ public class ZombieScanIT extends ConfigurableMacBase { className = ZombieIterator.class.getName(); } IteratorSetting iter = new IteratorSetting(100, "Z", className); + scanner.setConsistencyLevel(consistency); scanner.addScanIterator(iter); scanner.setRange(new Range(row)); return scanner.stream().findFirst().map(e -> e.getKey().getRowData().toString()) @@ -360,7 +382,7 @@ public class ZombieScanIT extends ConfigurableMacBase { } private Future<String> startStuckBatchScan(AccumuloClient c, String table, - ExecutorService executor, String row, boolean canInterrupt) { + ExecutorService executor, String row, boolean canInterrupt, ConsistencyLevel consistency) { return executor.submit(() -> { try (var scanner = c.createBatchScanner(table)) { String className; @@ -373,6 +395,7 @@ public class ZombieScanIT extends ConfigurableMacBase { IteratorSetting iter = new IteratorSetting(100, "Z", className); scanner.addScanIterator(iter); scanner.setRanges(List.of(new Range(row))); + scanner.setConsistencyLevel(consistency); return scanner.stream().findFirst().map(e -> e.getKey().getRowData().toString()) .orElse("none"); } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java index 741a02a18a..8b7bdaa7d4 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java @@ -19,6 +19,7 @@ package org.apache.accumulo.test.functional; import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; +import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER; import static org.apache.accumulo.test.functional.ScannerIT.countActiveScans; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -121,24 +122,24 @@ public class ScanSessionTimeOutIT extends AccumuloClusterHarness { verify(iter, 0, 200); // There should be a scan session open since not all data was read from the iterator - assertEquals(1L, countActiveScans(c, tableName)); + assertEquals(1L, countActiveScans(c, TABLET_SERVER, tableName)); // sleep three times the session timeout sleepUninterruptibly(9, TimeUnit.SECONDS); // The scan session should have timed out and the next read should create a new one - assertEquals(0L, countActiveScans(c, tableName)); + assertEquals(0L, countActiveScans(c, TABLET_SERVER, tableName)); verify(iter, 200, 50000); // Reading part of the data in the range should cause a new scan session to be created - assertEquals(1L, countActiveScans(c, tableName)); + assertEquals(1L, countActiveScans(c, TABLET_SERVER, tableName)); verify(iter, 50000, 100000); // Once all of the data in the range was read the scanner should automatically close the // scan session - assertEquals(0L, countActiveScans(c, tableName)); + assertEquals(0L, countActiveScans(c, TABLET_SERVER, tableName)); } // Nothing should have created any ew scan sessions for the table - assertEquals(0L, countActiveScans(c, tableName)); + assertEquals(0L, countActiveScans(c, TABLET_SERVER, tableName)); } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java index f767df4fdf..48db8bbe29 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java @@ -18,10 +18,15 @@ */ package org.apache.accumulo.test.functional; +import static org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel.EVENTUAL; +import static org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel.IMMEDIATE; +import static org.apache.accumulo.minicluster.ServerType.SCAN_SERVER; +import static org.apache.accumulo.minicluster.ServerType.TABLET_SERVER; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import java.time.Duration; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map.Entry; @@ -31,6 +36,7 @@ import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; @@ -38,12 +44,14 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.UtilWaitThread; -import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.test.CloseScannerIT; import org.apache.accumulo.test.util.Wait; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; -public class ScannerIT extends AccumuloClusterHarness { +public class ScannerIT extends ConfigurableMacBase { @Override protected Duration defaultTimeout() { @@ -53,7 +61,7 @@ public class ScannerIT extends AccumuloClusterHarness { @Test public void testScannerReadaheadConfiguration() throws Exception { final String table = getUniqueNames(1)[0]; - try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { c.tableOperations().create(table); try (BatchWriter bw = c.createBatchWriter(table)) { @@ -122,10 +130,20 @@ public class ScannerIT extends AccumuloClusterHarness { /** * {@link CloseScannerIT#testManyScans()} is a similar test. */ - @Test - public void testSessionCleanup() throws Exception { + @ParameterizedTest + @EnumSource + public void testSessionCleanup(ConsistencyLevel consistency) throws Exception { final String tableName = getUniqueNames(1)[0]; - try (AccumuloClient accumuloClient = Accumulo.newClient().from(getClientProps()).build()) { + final ServerType serverType = consistency == IMMEDIATE ? TABLET_SERVER : SCAN_SERVER; + try (AccumuloClient accumuloClient = Accumulo.newClient().from(getClientProperties()).build()) { + + if (serverType == SCAN_SERVER) { + getCluster().getConfig().setNumScanServers(1); + getCluster().getClusterControl().startAllServers(SCAN_SERVER); + // Scans will fall back to tablet servers when no scan servers are present. So wait for scan + // servers to show up in zookeeper. Can remove this in 3.1. + Wait.waitFor(() -> !accumuloClient.instanceOperations().getScanServers().isEmpty()); + } accumuloClient.tableOperations().create(tableName); @@ -137,6 +155,10 @@ public class ScannerIT extends AccumuloClusterHarness { } } + if (consistency == EVENTUAL) { + accumuloClient.tableOperations().flush(tableName, null, null, true); + } + // The test assumes the session timeout is configured to 1 minute, validate this. Later in the // test 10s is given for session to disappear and we want this 10s to be much smaller than the // configured session timeout. @@ -147,53 +169,72 @@ public class ScannerIT extends AccumuloClusterHarness { // closed that any open sessions will be closed. for (int i = 0; i < 3; i++) { try (var scanner = accumuloClient.createScanner(tableName)) { + scanner.setConsistencyLevel(consistency); assertEquals(10, scanner.stream().limit(10).count()); assertEquals(10000, scanner.stream().limit(10000).count()); // since not all data in the range was read from the scanner it should leave an active // scan session per scanner iterator created - assertEquals(2, countActiveScans(accumuloClient, tableName)); + assertEquals(2, countActiveScans(accumuloClient, serverType, tableName)); } // When close is called on on the scanner it should close the scan session. The session // cleanup is async on the server because task may still be running server side, but it // should happen in less than the session timeout. Also the server should start working on // it immediately. - Wait.waitFor(() -> countActiveScans(accumuloClient, tableName) == 0, 10000); + Wait.waitFor(() -> countActiveScans(accumuloClient, serverType, tableName) == 0, 10000); try (var scanner = accumuloClient.createBatchScanner(tableName)) { + scanner.setConsistencyLevel(consistency); scanner.setRanges(List.of(new Range())); assertEquals(10, scanner.stream().limit(10).count()); assertEquals(10000, scanner.stream().limit(10000).count()); - assertEquals(2, countActiveScans(accumuloClient, tableName)); + assertEquals(2, countActiveScans(accumuloClient, serverType, tableName)); } - Wait.waitFor(() -> countActiveScans(accumuloClient, tableName) == 0, 10000); + Wait.waitFor(() -> countActiveScans(accumuloClient, serverType, tableName) == 0, 10000); } // Test the case where all data is read from a scanner. In this case the scanner should close // the scan session at the end of the range even before the scanner itself is closed. for (int i = 0; i < 3; i++) { try (var scanner = accumuloClient.createScanner(tableName)) { + scanner.setConsistencyLevel(consistency); assertEquals(100000, scanner.stream().count()); assertEquals(100000, scanner.stream().count()); // The server side cleanup of the session should be able to happen immediately in this // case because nothing should be running on the server side to fetch data because all // data in the range was fetched. - assertEquals(0, countActiveScans(accumuloClient, tableName)); + assertEquals(0, countActiveScans(accumuloClient, serverType, tableName)); } try (var scanner = accumuloClient.createBatchScanner(tableName)) { + scanner.setConsistencyLevel(consistency); scanner.setRanges(List.of(new Range())); assertEquals(100000, scanner.stream().count()); assertEquals(100000, scanner.stream().count()); - assertEquals(0, countActiveScans(accumuloClient, tableName)); + assertEquals(0, countActiveScans(accumuloClient, serverType, tableName)); } } + } finally { + if (serverType == SCAN_SERVER) { + getCluster().getConfig().setNumScanServers(0); + getCluster().getClusterControl().stopAllServers(SCAN_SERVER); + } } } - public static long countActiveScans(AccumuloClient c, String tableName) throws Exception { + public static long countActiveScans(AccumuloClient c, ServerType serverType, String tableName) + throws Exception { + final Collection<String> servers; + if (serverType == TABLET_SERVER) { + servers = c.instanceOperations().getTabletServers(); + } else if (serverType == SCAN_SERVER) { + servers = c.instanceOperations().getScanServers(); + } else { + throw new IllegalArgumentException("Unsupported server type " + serverType); + } + long count = 0; - for (String tserver : c.instanceOperations().getTabletServers()) { - count += c.instanceOperations().getActiveScans(tserver).stream() + for (String server : servers) { + count += c.instanceOperations().getActiveScans(server).stream() .filter(activeScan -> activeScan.getTable().equals(tableName)).count(); } return count;