This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new dcbc5d606a Stops waiting on scans during tablet close. (#4819)
dcbc5d606a is described below

commit dcbc5d606ae5be4fa692ee5985640c42ef50cbf8
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Tue Aug 27 14:58:17 2024 -0700

    Stops waiting on scans during tablet close. (#4819)
    
    Before this change when a tablet was closing it would prevent new scans 
from starting
    and wait for running scans to finish.  This could cause the tablet to 
become unavailable
    for long periods of time.  After this change tablets will no longer wait on 
running
    scans to complete.  They will set something to interrupt the thread and 
then disable the
    client scan session which prevents the client form ever seeing any data 
after the tablet
    is closed.  Once all scan sessions are disabled the tablet will proceed to 
close.  Disabling
    the scan session will cause the client side scanner to eventually switch to 
the new tablet
    server where the tablet is loaded.
    
    Its possible that threads may be left running on the tablet server 
therefore it will be
    important to also implement #4756
    
    fixes #4757
---
 .../accumulo/tserver/ThriftScanClientHandler.java  |   4 +
 .../accumulo/tserver/scan/ScanParameters.java      |  10 ++
 .../apache/accumulo/tserver/session/Session.java   |   1 +
 .../accumulo/tserver/session/SessionManager.java   |  36 ++++-
 .../accumulo/tserver/tablet/ScanDataSource.java    |   4 +
 .../org/apache/accumulo/tserver/tablet/Tablet.java |  39 ++++-
 .../tserver/session/SessionManagerTest.java        |  42 ++++++
 .../org/apache/accumulo/test/ZombieScanIT.java     | 166 ++++++++++++++++++---
 8 files changed, 278 insertions(+), 24 deletions(-)

diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
index 6bacdcab0c..b127c8ba31 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
@@ -214,6 +214,8 @@ public class ThriftScanClientHandler implements 
TabletScanClientService.Iface {
 
     long sid = server.getSessionManager().createSession(scanSession, true);
 
+    scanParams.setScanSessionId(sid);
+
     ScanResult scanResult;
     try {
       scanResult = continueScan(tinfo, sid, scanSession, busyTimeout);
@@ -440,6 +442,8 @@ public class ThriftScanClientHandler implements 
TabletScanClientService.Iface {
 
     long sid = server.getSessionManager().createSession(mss, true);
 
+    scanParams.setScanSessionId(sid);
+
     MultiScanResult result;
     try {
       result = continueMultiScan(sid, mss, busyTimeout);
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanParameters.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanParameters.java
index 3ce3b4a24f..ff601fff1f 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanParameters.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanParameters.java
@@ -43,6 +43,7 @@ public final class ScanParameters {
   private final SamplerConfiguration samplerConfig;
   private final long batchTimeOut;
   private final String classLoaderContext;
+  private volatile Long scanSessionId = null;
   private volatile ScanDispatch dispatch;
 
   public ScanParameters(int maxEntries, Authorizations authorizations, 
Set<Column> columnSet,
@@ -106,6 +107,14 @@ public final class ScanParameters {
     return dispatch;
   }
 
+  public void setScanSessionId(long scanSessionId) {
+    this.scanSessionId = scanSessionId;
+  }
+
+  public Long getScanSessionId() {
+    return scanSessionId;
+  }
+
   @Override
   public String toString() {
     StringBuilder buf = new StringBuilder();
@@ -118,6 +127,7 @@ public final class ScanParameters {
     buf.append(", maxEntries=").append(this.maxEntries);
     buf.append(", num=").append(this.maxEntries);
     buf.append(", samplerConfig=").append(this.samplerConfig);
+    buf.append(", scanSessionId=").append(this.scanSessionId);
     buf.append("]");
     return buf.toString();
   }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
index 3f170331c8..29247247c3 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
@@ -34,6 +34,7 @@ public class Session {
   public long lastAccessTime;
   public long startTime;
   private State state = State.NEW;
+  boolean allowReservation = true;
   private final Timer stateChangeTimer = Timer.startNew();
   private final TCredentials credentials;
 
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
index 76ecdb62ba..d32dbcd14c 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
@@ -120,9 +120,9 @@ public class SessionManager {
       synchronized (session) {
         if (session.getState() == State.RESERVED) {
           throw new IllegalStateException(
-              "Attempted to reserved session that is already reserved " + 
sessionId);
+              "Attempted to reserve session that is already reserved " + 
sessionId);
         }
-        if (session.getState() == State.REMOVED) {
+        if (session.getState() == State.REMOVED || !session.allowReservation) {
           return null;
         }
         session.setState(State.RESERVED);
@@ -138,7 +138,7 @@ public class SessionManager {
     if (session != null) {
       synchronized (session) {
 
-        if (session.getState() == State.REMOVED) {
+        if (session.getState() == State.REMOVED || !session.allowReservation) {
           return null;
         }
 
@@ -154,7 +154,7 @@ public class SessionManager {
           throw new IllegalStateException(
               "Attempted to reserved session that is already reserved " + 
sessionId);
         }
-        if (session.getState() == State.REMOVED) {
+        if (session.getState() == State.REMOVED || !session.allowReservation) {
           return null;
         }
         session.setState(State.RESERVED);
@@ -173,6 +173,7 @@ public class SessionManager {
       if (session.getState() != State.RESERVED) {
         throw new IllegalStateException("Cannon unreserve, state: " + 
session.getState());
       }
+
       session.notifyAll();
       session.setState(State.UNRESERVED);
       session.lastAccessTime = System.currentTimeMillis();
@@ -253,6 +254,33 @@ public class SessionManager {
     return removed;
   }
 
+  /**
+   * Prevents a session from ever being reserved in the future. This method 
can be called
+   * concurrently when another thread has the session reserved w/o impacting 
the other thread. When
+   * the session is currently reserved by another thread that thread can 
unreserve as normal and
+   * after that this session can never be reserved again. Since the session 
can never be reserved
+   * after this call it will eventually age off and be cleaned up.
+   *
+   * @return true if the sessions is currently not reserved, false otherwise
+   */
+  public boolean disallowNewReservations(long sessionId) {
+    var session = getSession(sessionId);
+    if (session == null) {
+      return true;
+    }
+    synchronized (session) {
+      if (session.allowReservation) {
+        // Prevent future reservations of this session.
+        session.allowReservation = false;
+        log.debug("disabled session {}", sessionId);
+      }
+
+      // If nothing can reserve the session and it is not currently reserved 
then the session is
+      // disabled and will eventually be cleaned up.
+      return session.getState() != State.RESERVED;
+    }
+  }
+
   static void cleanup(BlockingQueue<Session> deferredCleanupQueue, Session 
session) {
     if (!session.cleanup()) {
       var retry = Retry.builder().infiniteRetries().retryAfter(25, 
MILLISECONDS)
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
index d6af088cbd..8b7aca78f7 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
@@ -321,4 +321,8 @@ class ScanDataSource implements DataSource {
         .append("expectedDeletionCount", 
expectedDeletionCount).append("scanParams", scanParams)
         .toString();
   }
+
+  public ScanParameters getScanParameters() {
+    return scanParams;
+  }
 }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index f74eb0912f..f229799f6d 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -1038,13 +1038,31 @@ public class Tablet extends TabletBase {
       activeScan.interrupt();
     }
 
+    // create a copy so that it can be whittled down as client sessions are 
disabled
+    List<ScanDataSource> runningScans = new ArrayList<>(this.activeScans);
+
+    runningScans.removeIf(scanDataSource -> {
+      boolean currentlyUnreserved = 
disallowNewReservations(scanDataSource.getScanParameters());
+      if (currentlyUnreserved) {
+        log.debug("Disabled scan session in tablet close {} {}", extent, 
scanDataSource);
+      }
+      return currentlyUnreserved;
+    });
+
     long lastLogTime = System.nanoTime();
 
     // wait for reads and writes to complete
-    while (writesInProgress > 0 || !activeScans.isEmpty()) {
+    while (writesInProgress > 0 || !runningScans.isEmpty()) {
+      runningScans.removeIf(scanDataSource -> {
+        boolean currentlyUnreserved = 
disallowNewReservations(scanDataSource.getScanParameters());
+        if (currentlyUnreserved) {
+          log.debug("Disabled scan session in tablet close {} {}", extent, 
scanDataSource);
+        }
+        return currentlyUnreserved;
+      });
 
       if (log.isDebugEnabled() && System.nanoTime() - lastLogTime > 
TimeUnit.SECONDS.toNanos(60)) {
-        for (ScanDataSource activeScan : activeScans) {
+        for (ScanDataSource activeScan : runningScans) {
           log.debug("Waiting on scan in completeClose {} {}", extent, 
activeScan);
         }
 
@@ -1053,13 +1071,19 @@ public class Tablet extends TabletBase {
 
       try {
         log.debug("Waiting to completeClose for {}. {} writes {} scans", 
extent, writesInProgress,
-            activeScans.size());
+            runningScans.size());
         this.wait(50);
       } catch (InterruptedException e) {
         log.error("Interrupted waiting to completeClose for extent {}", 
extent, e);
       }
     }
 
+    // It is assumed that nothing new would have been added to activeScans 
since it was copied, so
+    // check that assumption. At this point activeScans should be empty or 
everything in it should
+    // be disabled.
+    Preconditions.checkState(activeScans.stream()
+        .allMatch(scanDataSource -> 
disallowNewReservations(scanDataSource.getScanParameters())));
+
     getTabletMemory().waitForMinC();
 
     if (saveState && getTabletMemory().getMemTable().getNumEntries() > 0) {
@@ -1107,6 +1131,15 @@ public class Tablet extends TabletBase {
     }
   }
 
+  private boolean disallowNewReservations(ScanParameters scanParameters) {
+    var scanSessId = scanParameters.getScanSessionId();
+    if (scanSessId != null) {
+      return 
getTabletServer().getSessionManager().disallowNewReservations(scanSessId);
+    } else {
+      return true;
+    }
+  }
+
   private void closeConsistencyCheck() {
 
     long num = tabletMemory.getMemTable().getNumEntries();
diff --git 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionManagerTest.java
 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionManagerTest.java
index be3626ea71..38565f0a04 100644
--- 
a/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionManagerTest.java
+++ 
b/server/tserver/src/test/java/org/apache/accumulo/tserver/session/SessionManagerTest.java
@@ -18,12 +18,20 @@
  */
 package org.apache.accumulo.tserver.session;
 
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.server.ServerContext;
 import org.junit.jupiter.api.Test;
 
 public class SessionManagerTest {
@@ -106,4 +114,38 @@ public class SessionManagerTest {
     assertEquals(2,
         deferredCleanupQeue.stream().filter(s -> ((TestSession) 
s).cleanupCount == 2).count());
   }
+
+  @Test
+  public void testDisallowNewReservation() {
+    var sessionManager = createSessionManager();
+
+    var sid = sessionManager.createSession(new TestSession(2), true);
+
+    // this should prevent future reservation and return false because its 
currently reserved
+    assertFalse(sessionManager.disallowNewReservations(sid));
+
+    // should not have a problem un-reserving
+    sessionManager.unreserveSession(sid);
+
+    // should not be able to reserve the session because reservations were 
disabled
+    assertNull(sessionManager.reserveSession(sid));
+    assertNull(sessionManager.reserveSession(sid, false));
+
+    // should return true now that its not reserved
+    assertTrue(sessionManager.disallowNewReservations(sid));
+
+    sessionManager.removeSession(sid);
+
+    // should return true for nonexistent session
+    assertTrue(sessionManager.disallowNewReservations(sid));
+  }
+
+  private SessionManager createSessionManager() {
+    ServerContext ctx = createMock(ServerContext.class);
+    
expect(ctx.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
+    var executor = (ScheduledThreadPoolExecutor) 
Executors.newScheduledThreadPool(1);
+    expect(ctx.getScheduledExecutor()).andReturn(executor).anyTimes();
+    replay(ctx);
+    return new SessionManager(ctx);
+  }
 }
diff --git a/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java 
b/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java
index 25f5ab3195..e8c3a47a3e 100644
--- a/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ZombieScanIT.java
@@ -18,30 +18,43 @@
  */
 package org.apache.accumulo.test;
 
+import static org.apache.accumulo.test.functional.ScannerIT.countActiveScans;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
+import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metrics.MetricsProducer;
 import org.apache.accumulo.core.spi.metrics.LoggingMeterRegistryFactory;
+import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.test.functional.ConfigurableMacBase;
 import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory;
 import org.apache.accumulo.test.metrics.TestStatsDSink;
 import org.apache.accumulo.test.util.Wait;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -115,6 +128,117 @@ public class ZombieScanIT extends ConfigurableMacBase {
     }
   }
 
+  /**
+   * This test ensure that scans threads that run forever do not prevent 
tablets from unloading.
+   */
+  @Test
+  public void testZombieScan() throws Exception {
+
+    String table = getUniqueNames(1)[0];
+
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProperties()).build()) {
+
+      var splits = new TreeSet<Text>();
+      splits.add(new Text("3"));
+      splits.add(new Text("5"));
+      splits.add(new Text("7"));
+      var ntc = new NewTableConfiguration().withSplits(splits);
+      c.tableOperations().create(table, ntc);
+
+      try (var writer = c.createBatchWriter(table)) {
+        for (var row : List.of("2", "4", "6", "8")) {
+          Mutation m = new Mutation(row);
+          m.put("f", "q", "v");
+          writer.addMutation(m);
+        }
+      }
+
+      // Flush the data otherwise when the tablet attempts to close with an 
active scan reading from
+      // the in memory map it will wait for 15 seconds for the scan
+      c.tableOperations().flush(table, null, null, true);
+
+      var executor = Executors.newCachedThreadPool();
+
+      // start two zombie scans that should never return using a normal scanner
+      List<Future<String>> futures = new ArrayList<>();
+      for (var row : List.of("2", "4")) {
+        var future = executor.submit(() -> {
+          try (var scanner = c.createScanner(table)) {
+            IteratorSetting iter = new IteratorSetting(100, "Z", 
ZombieIterator.class);
+            scanner.addScanIterator(iter);
+            scanner.setRange(new Range(row));
+            return scanner.stream().findFirst().map(e -> 
e.getKey().getRowData().toString())
+                .orElse("none");
+          }
+        });
+        futures.add(future);
+      }
+
+      // start two zombie scans that should never return using a batch scanner
+      for (var row : List.of("6", "8")) {
+        var future = executor.submit(() -> {
+          try (var scanner = c.createBatchScanner(table)) {
+            IteratorSetting iter = new IteratorSetting(100, "Z", 
ZombieIterator.class);
+            scanner.addScanIterator(iter);
+            scanner.setRanges(List.of(new Range(row)));
+            return scanner.stream().findFirst().map(e -> 
e.getKey().getRowData().toString())
+                .orElse("none");
+          }
+        });
+        futures.add(future);
+      }
+
+      // should eventually see the four zombie scans running against four 
tablets
+      Wait.waitFor(() -> countDistinctTabletsScans(table, c) == 4);
+
+      assertEquals(1, c.instanceOperations().getTabletServers().size());
+
+      // Start 3 new tablet servers, this should cause the table to balance 
and the tablets with
+      // zombie scans to unload. The Zombie scans should not prevent the table 
from unloading. The
+      // scan threads will still be running on the old tablet servers.
+      getCluster().getConfig().setNumTservers(4);
+      
getCluster().getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+
+      // Wait for all tablets servers
+      Wait.waitFor(() -> c.instanceOperations().getTabletServers().size() == 
4);
+
+      // The table should eventually balance across the 4 tablet servers
+      Wait.waitFor(() -> countLocations(table, c) == 4);
+
+      // The zombie scans should still be running
+      assertTrue(futures.stream().noneMatch(Future::isDone));
+
+      // Should be able to scan all the tablets at the new locations.
+      try (var scanner = c.createScanner(table)) {
+        var rows = scanner.stream().map(e -> 
e.getKey().getRowData().toString())
+            .collect(Collectors.toSet());
+        assertEquals(Set.of("2", "4", "6", "8"), rows);
+      }
+
+      try (var scanner = c.createBatchScanner(table)) {
+        scanner.setRanges(List.of(new Range()));
+        var rows = scanner.stream().map(e -> 
e.getKey().getRowData().toString())
+            .collect(Collectors.toSet());
+        assertEquals(Set.of("2", "4", "6", "8"), rows);
+      }
+
+      // The zombie scans should migrate with the tablets, taking up more scan 
threads in the
+      // system.
+      Set<String> tabletSeversWithZombieScans = new HashSet<>();
+      for (String tserver : c.instanceOperations().getTabletServers()) {
+        if (c.instanceOperations().getActiveScans(tserver).stream()
+            .flatMap(activeScan -> activeScan.getSsiList().stream())
+            .anyMatch(scanIters -> 
scanIters.contains(ZombieIterator.class.getName()))) {
+          tabletSeversWithZombieScans.add(tserver);
+        }
+      }
+      assertEquals(4, tabletSeversWithZombieScans.size());
+
+      executor.shutdownNow();
+    }
+
+  }
+
   /**
    * Create some zombie scans and ensure metrics for them show up.
    */
@@ -152,7 +276,7 @@ public class ZombieScanIT extends ConfigurableMacBase {
       }
 
       // should eventually see the eight stuck scans running
-      Wait.waitFor(() -> countScansForTable(table, c) == 8);
+      Wait.waitFor(() -> countActiveScans(c, table) == 8);
 
       // Cancel the scan threads. This will cause the sessions on the server 
side to timeout and
       // become inactive. The stuck threads on the server side related to the 
timed out sessions
@@ -163,11 +287,11 @@ public class ZombieScanIT extends ConfigurableMacBase {
       });
 
       // Four of the eight running scans should respond to thread interrupts 
and exit
-      Wait.waitFor(() -> countScansForTable(table, c) == 4);
+      Wait.waitFor(() -> countActiveScans(c, table) == 4);
 
       Wait.waitFor(() -> getZombieScansMetric() == 4);
 
-      assertEquals(4, countScansForTable(table, c));
+      assertEquals(4, countActiveScans(c, table));
 
       // start four more stuck scans with two that will ignore interrupts
       futures.clear();
@@ -176,7 +300,7 @@ public class ZombieScanIT extends ConfigurableMacBase {
       futures.add(startStuckBatchScan(c, table, executor, "99", false));
       futures.add(startStuckBatchScan(c, table, executor, "0", true));
 
-      Wait.waitFor(() -> countScansForTable(table, c) == 8);
+      Wait.waitFor(() -> countActiveScans(c, table) == 8);
 
       // Cancel the client side scan threads. Should cause the server side 
threads to be
       // interrupted.
@@ -186,17 +310,36 @@ public class ZombieScanIT extends ConfigurableMacBase {
       });
 
       // Two of the stuck threads should respond to interrupts on the server 
side and exit.
-      Wait.waitFor(() -> countScansForTable(table, c) == 6);
+      Wait.waitFor(() -> countActiveScans(c, table) == 6);
 
       Wait.waitFor(() -> getZombieScansMetric() == 6);
 
-      assertEquals(6, countScansForTable(table, c));
+      assertEquals(6, countActiveScans(c, table));
 
       executor.shutdownNow();
     }
 
   }
 
+  private static long countLocations(String table, AccumuloClient client) 
throws Exception {
+    var ctx = (ClientContext) client;
+    var tableId = ctx.getTableId(table);
+    return ctx.getAmple().readTablets().forTable(tableId).build().stream()
+        
.map(TabletMetadata::getLocation).filter(Objects::nonNull).distinct().count();
+  }
+
+  private static long countDistinctTabletsScans(String table, AccumuloClient 
client)
+      throws Exception {
+    var tservers = client.instanceOperations().getTabletServers();
+    long count = 0;
+    for (String tserver : tservers) {
+      count += client.instanceOperations().getActiveScans(tserver).stream()
+          .filter(activeScan -> activeScan.getTable().equals(table))
+          .map(activeScan -> activeScan.getTablet()).distinct().count();
+    }
+    return count;
+  }
+
   private Future<String> startStuckScan(AccumuloClient c, String table, 
ExecutorService executor,
       String row, boolean canInterrupt) {
     return executor.submit(() -> {
@@ -241,15 +384,4 @@ public class ZombieScanIT extends ConfigurableMacBase {
         .filter(metric -> 
metric.getName().equals(MetricsProducer.METRICS_SCAN_ZOMBIE_THREADS))
         .mapToInt(metric -> 
Integer.parseInt(metric.getValue())).max().orElse(-1);
   }
-
-  private static long countScansForTable(String table, AccumuloClient client) 
throws Exception {
-    var tservers = client.instanceOperations().getTabletServers();
-    long count = 0;
-    for (String tserver : tservers) {
-      count += client.instanceOperations().getActiveScans(tserver).stream()
-          .filter(activeScan -> activeScan.getTable().equals(table)).count();
-    }
-    return count;
-  }
-
 }

Reply via email to