This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 128a8d0ed1 Fix inconsistent view of TabletServers in Manager (#3901)
128a8d0ed1 is described below

commit 128a8d0ed1ef76d487259e2ac8236383e963b667
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Mon Oct 30 13:55:15 2023 -0400

    Fix inconsistent view of TabletServers in Manager (#3901)
    
    Modified LiveTserverSet so that the set of tablet servers and the tablet
    servers resource groups are acquired atomically.  The code was
    acquiring this information at two different times with two different lock
    acquisitions, which could have led to race condtions resulting
    differences in set and the map. Fixed some cases in the balancers
    that were not handling an empty or non-existent group correctly.
    
    
    Co-authored-by: Keith Turner <ktur...@apache.org>
---
 .../manager/balancer/AssignmentParamsImpl.java     | 27 +++++--
 .../core/spi/balancer/SimpleLoadBalancer.java      |  4 ++
 .../core/spi/balancer/TableLoadBalancer.java       |  8 ++-
 .../accumulo/server/manager/LiveTServerSet.java    | 84 ++++++++++++++++------
 .../server/manager/state/CurrentState.java         |  3 +-
 .../manager/state/TabletManagementIterator.java    |  6 +-
 .../java/org/apache/accumulo/manager/Manager.java  | 16 ++---
 .../accumulo/manager/TabletGroupWatcher.java       | 57 ++++++++++-----
 .../functional/TabletManagementIteratorIT.java     | 11 +--
 9 files changed, 154 insertions(+), 62 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/manager/balancer/AssignmentParamsImpl.java
 
b/core/src/main/java/org/apache/accumulo/core/manager/balancer/AssignmentParamsImpl.java
index 7bdbf70fc1..bf5e387fac 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/manager/balancer/AssignmentParamsImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/manager/balancer/AssignmentParamsImpl.java
@@ -34,8 +34,13 @@ import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.spi.balancer.TabletBalancer;
 import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
 import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class AssignmentParamsImpl implements 
TabletBalancer.AssignmentParameters {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AssignmentParamsImpl.class);
+
   private final SortedMap<TabletServerId,TServerStatus> currentStatus;
   private final Map<TabletId,TabletServerId> unassigned;
   private final Map<TabletId,TabletServerId> assignmentsOut;
@@ -50,16 +55,26 @@ public class AssignmentParamsImpl implements 
TabletBalancer.AssignmentParameters
       Map<KeyExtent,TServerInstance> unassigned, 
Map<KeyExtent,TServerInstance> assignmentsOut) {
 
     SortedMap<TabletServerId,TServerStatus> currentStatusNew = new TreeMap<>();
-    currentStatus.forEach((tsi, status) -> currentStatusNew.put(new 
TabletServerIdImpl(tsi),
-        TServerStatusImpl.fromThrift(status)));
-
     Map<String,Set<TabletServerId>> tserverGroups = new HashMap<>();
-    currentTServerGrouping.forEach((k, v) -> {
+    currentTServerGrouping.forEach((group, serversInGroup) -> {
       Set<TabletServerId> servers = new HashSet<>();
-      v.forEach(tsi -> servers.add(TabletServerIdImpl.fromThrift(tsi)));
-      tserverGroups.put(k, servers);
+      serversInGroup.forEach(tsi -> {
+        TabletServerIdImpl id = TabletServerIdImpl.fromThrift(tsi);
+        if (currentStatus.containsKey(tsi)) {
+          currentStatusNew.put(id, 
TServerStatusImpl.fromThrift(currentStatus.get(tsi)));
+          servers.add(id);
+        } else {
+          LOG.debug("Dropping tserver {} from group {} as it's not in set of 
all servers", id,
+              group);
+        }
+      });
+      if (!servers.isEmpty()) {
+        tserverGroups.put(group, servers);
+      }
     });
 
+    LOG.debug("TServer groups for balancer assignment: {}", tserverGroups);
+
     Map<TabletId,TabletServerId> unassignedNew = new HashMap<>();
     unassigned.forEach(
         (ke, tsi) -> unassignedNew.put(new TabletIdImpl(ke), 
TabletServerIdImpl.fromThrift(tsi)));
diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancer.java
 
b/core/src/main/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancer.java
index 50fcd8417f..09189c0192 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancer.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancer.java
@@ -356,6 +356,10 @@ public class SimpleLoadBalancer implements TabletBalancer {
 
   @Override
   public void getAssignments(AssignmentParameters params) {
+    if (params.currentStatus().isEmpty()) {
+      log.debug("No known TabletServers, skipping tablet assignment for now.");
+      return;
+    }
     params.unassignedTablets().forEach((tabletId, tserverId) -> 
params.addAssignment(tabletId,
         getAssignment(params.currentStatus(), tserverId)));
   }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java
 
b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java
index 853608018f..44cf29d1e8 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java
@@ -149,10 +149,12 @@ public class TableLoadBalancer implements TabletBalancer {
     tserversInGroup.forEach(tsid -> {
       TServerStatus tss = allTServers.get(tsid);
       if (tss == null) {
-        throw new IllegalStateException("TabletServer " + tsid + " in " + 
groupNameInUse
-            + " TabletServer group, but not in set of all TabletServers");
+        log.warn(
+            "Excluding TabletServer {}  from group {} because 
TabletServerStatus is null, likely that Manager.StatusThread.updateStatus has 
not discovered it yet.",
+            tsid, groupNameInUse);
+      } else {
+        group.put(tsid, tss);
       }
-      group.put(tsid, tss);
     });
     return group;
   }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
index 018b7eaf89..f5914c4a3f 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
@@ -22,6 +22,7 @@ import static java.util.concurrent.TimeUnit.MINUTES;
 import static 
org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy.SKIP;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -60,6 +61,7 @@ import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.net.HostAndPort;
 
 public class LiveTServerSet implements Watcher {
@@ -204,12 +206,11 @@ public class LiveTServerSet implements Watcher {
     }
   }
 
-  // The set of active tservers with locks, indexed by their name in zookeeper
+  // The set of active tservers with locks, indexed by their name in 
zookeeper. When the contents of
+  // this map are modified, tServersSnapshot should be set to null.
   private final Map<String,TServerInfo> current = new HashMap<>();
-  // as above, indexed by TServerInstance
-  private final Map<TServerInstance,TServerInfo> currentInstances = new 
HashMap<>();
-  // as above, grouped by resource group name
-  private final Map<String,Set<TServerInstance>> currentGroups = new 
HashMap<>();
+
+  private LiveTServersSnapshot tServersSnapshot = null;
 
   // The set of entries in zookeeper without locks, and the first time each 
was noticed
   private final Map<String,Long> locklessServers = new HashMap<>();
@@ -270,6 +271,9 @@ public class LiveTServerSet implements Watcher {
       final Set<TServerInstance> doomed, final String path, final String zPath)
       throws InterruptedException, KeeperException {
 
+    // invalidate the snapshot forcing it to be recomputed the next time its 
requested
+    tServersSnapshot = null;
+
     TServerInfo info = current.get(zPath);
 
     final var zLockPath = ServiceLock.path(path + "/" + zPath);
@@ -280,8 +284,6 @@ public class LiveTServerSet implements Watcher {
       if (info != null) {
         doomed.add(info.instance);
         current.remove(zPath);
-        currentInstances.remove(info.instance);
-        currentGroups.get(info.resourceGroup).remove(info.instance);
       }
 
       Long firstSeen = locklessServers.get(zPath);
@@ -302,18 +304,12 @@ public class LiveTServerSet implements Watcher {
         TServerInfo tServerInfo =
             new TServerInfo(instance, new TServerConnection(address), 
resourceGroup);
         current.put(zPath, tServerInfo);
-        currentInstances.put(instance, tServerInfo);
-        currentGroups.computeIfAbsent(resourceGroup, rg -> new 
HashSet<>()).add(instance);
       } else if (!info.instance.equals(instance)) {
         doomed.add(info.instance);
         updates.add(instance);
         TServerInfo tServerInfo =
             new TServerInfo(instance, new TServerConnection(address), 
resourceGroup);
         current.put(zPath, tServerInfo);
-        currentInstances.remove(info.instance);
-        currentGroups.getOrDefault(resourceGroup, new 
HashSet<>()).remove(instance);
-        currentInstances.put(instance, tServerInfo);
-        currentGroups.computeIfAbsent(resourceGroup, rg -> new 
HashSet<>()).add(instance);
       }
     }
   }
@@ -356,21 +352,64 @@ public class LiveTServerSet implements Watcher {
     if (server == null) {
       return null;
     }
-    TServerInfo tServerInfo = currentInstances.get(server);
+    TServerInfo tServerInfo = getSnapshot().tserversInfo.get(server);
     if (tServerInfo == null) {
       return null;
     }
     return tServerInfo.connection;
   }
 
-  public synchronized Set<TServerInstance> getCurrentServers() {
-    return new HashSet<>(currentInstances.keySet());
+  public static class LiveTServersSnapshot {
+    private final Set<TServerInstance> tservers;
+    private final Map<String,Set<TServerInstance>> tserverGroups;
+
+    // TServerInfo is only for internal use, so this field is private w/o a 
getter.
+    private final Map<TServerInstance,TServerInfo> tserversInfo;
+
+    @VisibleForTesting
+    public LiveTServersSnapshot(Set<TServerInstance> currentServers,
+        Map<String,Set<TServerInstance>> serverGroups) {
+      this.tserversInfo = null;
+      this.tservers = Set.copyOf(currentServers);
+      Map<String,Set<TServerInstance>> copy = new HashMap<>();
+      serverGroups.forEach((k, v) -> copy.put(k, Set.copyOf(v)));
+      this.tserverGroups = Collections.unmodifiableMap(copy);
+    }
+
+    public LiveTServersSnapshot(Map<TServerInstance,TServerInfo> 
currentServers,
+        Map<String,Set<TServerInstance>> serverGroups) {
+      this.tserversInfo = Map.copyOf(currentServers);
+      this.tservers = this.tserversInfo.keySet();
+      Map<String,Set<TServerInstance>> copy = new HashMap<>();
+      serverGroups.forEach((k, v) -> copy.put(k, Set.copyOf(v)));
+      this.tserverGroups = Collections.unmodifiableMap(copy);
+    }
+
+    public Set<TServerInstance> getTservers() {
+      return tservers;
+    }
+
+    public Map<String,Set<TServerInstance>> getTserverGroups() {
+      return tserverGroups;
+    }
+  }
+
+  public synchronized LiveTServersSnapshot getSnapshot() {
+    if (tServersSnapshot == null) {
+      HashMap<TServerInstance,TServerInfo> tServerInstances = new HashMap<>();
+      Map<String,Set<TServerInstance>> tserversGroups = new HashMap<>();
+      current.values().forEach(tServerInfo -> {
+        tServerInstances.put(tServerInfo.instance, tServerInfo);
+        tserversGroups.computeIfAbsent(tServerInfo.resourceGroup, rg -> new 
HashSet<>())
+            .add(tServerInfo.instance);
+      });
+      tServersSnapshot = new LiveTServersSnapshot(tServerInstances, 
tserversGroups);
+    }
+    return tServersSnapshot;
   }
 
-  public synchronized Map<String,Set<TServerInstance>> 
getCurrentServersGroups() {
-    Map<String,Set<TServerInstance>> copy = new HashMap<>();
-    currentGroups.forEach((k, v) -> copy.put(k, new HashSet<>(v)));
-    return copy;
+  public synchronized Set<TServerInstance> getCurrentServers() {
+    return getSnapshot().getTservers();
   }
 
   public synchronized int size() {
@@ -407,6 +446,10 @@ public class LiveTServerSet implements Watcher {
   }
 
   public synchronized void remove(TServerInstance server) {
+
+    // invalidate the snapshot forcing it to be recomputed the next time its 
requested
+    tServersSnapshot = null;
+
     String zPath = null;
     for (Entry<String,TServerInfo> entry : current.entrySet()) {
       if (entry.getValue().instance.equals(server)) {
@@ -418,7 +461,6 @@ public class LiveTServerSet implements Watcher {
       return;
     }
     current.remove(zPath);
-    currentInstances.remove(server);
 
     log.info("Removing zookeeper lock for {}", server);
     String fullpath = context.getZooKeeperRoot() + Constants.ZTSERVERS + "/" + 
zPath;
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java
index 09caba293e..1b72fa2a55 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java
@@ -25,6 +25,7 @@ import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.manager.thrift.ManagerState;
 import org.apache.accumulo.core.metadata.TServerInstance;
+import org.apache.accumulo.server.manager.LiveTServerSet.LiveTServersSnapshot;
 
 public interface CurrentState {
 
@@ -32,7 +33,7 @@ public interface CurrentState {
 
   Set<TServerInstance> onlineTabletServers();
 
-  Map<String,Set<TServerInstance>> tServerResourceGroups();
+  LiveTServersSnapshot tserversSnapshot();
 
   Set<TServerInstance> shutdownServers();
 
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
index 41646f4145..b44bc5d37e 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
@@ -76,6 +76,7 @@ import org.apache.accumulo.core.spi.compaction.CompactionKind;
 import org.apache.accumulo.core.util.AddressUtil;
 import org.apache.accumulo.server.compaction.CompactionJobGenerator;
 import org.apache.accumulo.server.iterators.TabletIteratorEnvironment;
+import org.apache.accumulo.server.manager.LiveTServerSet.LiveTServersSnapshot;
 import org.apache.accumulo.server.manager.balancer.BalancerEnvironmentImpl;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -304,13 +305,14 @@ public class TabletManagementIterator extends 
SkippingIterator {
     IteratorSetting tabletChange =
         new IteratorSetting(1001, "ManagerTabletInfoIterator", 
TabletManagementIterator.class);
     if (state != null) {
-      TabletManagementIterator.setCurrentServers(tabletChange, 
state.onlineTabletServers());
+      LiveTServersSnapshot tserversSnapshot = state.tserversSnapshot();
+      TabletManagementIterator.setCurrentServers(tabletChange, 
tserversSnapshot.getTservers());
       TabletManagementIterator.setOnlineTables(tabletChange, 
state.onlineTables());
       TabletManagementIterator.setMigrations(tabletChange, 
state.migrationsSnapshot());
       TabletManagementIterator.setManagerState(tabletChange, 
state.getManagerState());
       TabletManagementIterator.setShuttingDown(tabletChange, 
state.shutdownServers());
       TabletManagementIterator.setTServerResourceGroups(tabletChange,
-          state.tServerResourceGroups());
+          tserversSnapshot.getTserverGroups());
       setCompactionHints(tabletChange, state.getCompactionHints());
     }
     scanner.addScanIterator(tabletChange);
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 5257a610a3..b5de7196d0 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -129,6 +129,7 @@ import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.compaction.CompactionConfigStorage;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.manager.LiveTServerSet;
+import org.apache.accumulo.server.manager.LiveTServerSet.LiveTServersSnapshot;
 import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection;
 import org.apache.accumulo.server.manager.balancer.BalancerEnvironmentImpl;
 import org.apache.accumulo.server.manager.state.CurrentState;
@@ -819,12 +820,11 @@ public class Manager extends AbstractServer
     }
 
     private long updateStatus() {
-      Set<TServerInstance> currentServers = tserverSet.getCurrentServers();
+      var tseversSnapshot = tserverSet.getSnapshot();
       TreeMap<TabletServerId,TServerStatus> temp = new TreeMap<>();
-      tserverStatus = gatherTableInformation(currentServers, temp);
+      tserverStatus = gatherTableInformation(tseversSnapshot.getTservers(), 
temp);
       tserverStatusForBalancer = Collections.unmodifiableSortedMap(temp);
-      tServerGroupingForBalancer =
-          Collections.unmodifiableMap(tserverSet.getCurrentServersGroups());
+      tServerGroupingForBalancer = tseversSnapshot.getTserverGroups();
       checkForHeldServer(tserverStatus);
 
       if (!badServers.isEmpty()) {
@@ -838,7 +838,7 @@ public class Manager extends AbstractServer
         log.debug("not balancing while shutting down servers {}", 
serversToShutdown);
       } else {
         for (TabletGroupWatcher tgw : watchers) {
-          if (!tgw.isSameTserversAsLastScan(currentServers)) {
+          if (!tgw.isSameTserversAsLastScan(tseversSnapshot.getTservers())) {
             log.debug("not balancing just yet, as collection of live tservers 
is in flux");
             return DEFAULT_WAIT_FOR_WATCHER;
           }
@@ -1606,12 +1606,12 @@ public class Manager extends AbstractServer
 
   @Override
   public Set<TServerInstance> onlineTabletServers() {
-    return tserverSet.getCurrentServers();
+    return tserverSet.getSnapshot().getTservers();
   }
 
   @Override
-  public Map<String,Set<TServerInstance>> tServerResourceGroups() {
-    return tserverSet.getCurrentServersGroups();
+  public LiveTServersSnapshot tserversSnapshot() {
+    return tserverSet.getSnapshot();
   }
 
   // recovers state from the persistent transaction to shutdown a server
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index cdb6953e8a..e0d0e5db7e 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -81,6 +82,7 @@ import 
org.apache.accumulo.server.compaction.CompactionJobGenerator;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.log.WalStateManager;
 import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
+import org.apache.accumulo.server.manager.LiveTServerSet;
 import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection;
 import org.apache.accumulo.server.manager.state.Assignment;
 import org.apache.accumulo.server.manager.state.ClosableIterator;
@@ -173,10 +175,30 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
 
     public TabletLists(Manager m, 
SortedMap<TServerInstance,TabletServerStatus> curTServers,
         Map<String,Set<TServerInstance>> grouping) {
-      var destinationsMod = new TreeMap<>(curTServers);
-      destinationsMod.keySet().removeAll(m.serversToShutdown);
-      this.destinations = Collections.unmodifiableSortedMap(destinationsMod);
-      this.currentTServerGrouping = grouping;
+      synchronized (m.serversToShutdown) {
+        var destinationsMod = new TreeMap<>(curTServers);
+        if (!m.serversToShutdown.isEmpty()) {
+          // Remove servers that are in the process of shutting down from the 
lists of tablet
+          // servers.
+          destinationsMod.keySet().removeAll(m.serversToShutdown);
+          HashMap<String,Set<TServerInstance>> groupingCopy = new HashMap<>();
+          grouping.forEach((group, groupsServers) -> {
+            if (Collections.disjoint(groupsServers, m.serversToShutdown)) {
+              groupingCopy.put(group, groupsServers);
+            } else {
+              var serversCopy = new HashSet<>(groupsServers);
+              serversCopy.removeAll(m.serversToShutdown);
+              groupingCopy.put(group, 
Collections.unmodifiableSet(serversCopy));
+            }
+          });
+
+          this.currentTServerGrouping = 
Collections.unmodifiableMap(groupingCopy);
+        } else {
+          this.currentTServerGrouping = grouping;
+        }
+
+        this.destinations = Collections.unmodifiableSortedMap(destinationsMod);
+      }
     }
 
     public void reset() {
@@ -218,7 +240,9 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
               continue;
             }
 
-            var currentTservers = getCurrentTservers();
+            LiveTServerSet.LiveTServersSnapshot tservers = 
manager.tserverSet.getSnapshot();
+            var currentTservers = getTserversStatus(tservers.getTservers());
+
             if (currentTservers.isEmpty()) {
               setNeedsFullScan();
               continue;
@@ -226,7 +250,7 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
 
             try (var iter = store.iterator(ranges)) {
               long t1 = System.currentTimeMillis();
-              manageTablets(iter, currentTservers, false);
+              manageTablets(iter, currentTservers, 
tservers.getTserverGroups(), false);
               long t2 = System.currentTimeMillis();
               Manager.log.debug(String.format("[%s]: partial scan time %.2f 
seconds for %,d ranges",
                   store.name(), (t2 - t1) / 1000., ranges.size()));
@@ -296,7 +320,8 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
   }
 
   private TableMgmtStats manageTablets(Iterator<TabletManagement> iter,
-      SortedMap<TServerInstance,TabletServerStatus> currentTServers, boolean 
isFullScan)
+      SortedMap<TServerInstance,TabletServerStatus> currentTServers,
+      Map<String,Set<TServerInstance>> tserverGroups, boolean isFullScan)
       throws BadLocationStateException, TException, DistributedStoreException, 
WalMarkerException,
       IOException {
 
@@ -312,16 +337,13 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
 
     int unloaded = 0;
 
-    final Map<String,Set<TServerInstance>> currentTServerGrouping =
-        manager.tserverSet.getCurrentServersGroups();
-
-    TabletLists tLists = new TabletLists(manager, currentTServers, 
currentTServerGrouping);
+    TabletLists tLists = new TabletLists(manager, currentTServers, 
tserverGroups);
 
     CompactionJobGenerator compactionGenerator = new CompactionJobGenerator(
         new ServiceEnvironmentImpl(manager.getContext()), 
manager.getCompactionHints());
 
     final Map<TabletServerId,String> resourceGroups = new HashMap<>();
-    manager.tServerResourceGroups().forEach((group, tservers) -> {
+    tserverGroups.forEach((group, tservers) -> {
       tservers.stream().map(TabletServerIdImpl::new)
           .forEach(tabletServerId -> resourceGroups.put(tabletServerId, 
group));
     });
@@ -541,10 +563,11 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
     return tableMgmtStats;
   }
 
-  private SortedMap<TServerInstance,TabletServerStatus> getCurrentTservers() {
+  private SortedMap<TServerInstance,TabletServerStatus>
+      getTserversStatus(Set<TServerInstance> currentServers) {
     // Get the current status for the current list of tservers
     final SortedMap<TServerInstance,TabletServerStatus> currentTServers = new 
TreeMap<>();
-    for (TServerInstance entry : manager.tserverSet.getCurrentServers()) {
+    for (TServerInstance entry : currentServers) {
       currentTServers.put(entry, manager.tserverStatus.get(entry));
     }
     return currentTServers;
@@ -563,7 +586,8 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
       final long waitTimeBetweenScans = manager.getConfiguration()
           .getTimeInMillis(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL);
 
-      var currentTServers = getCurrentTservers();
+      LiveTServerSet.LiveTServersSnapshot tservers = 
manager.tserverSet.getSnapshot();
+      var currentTServers = getTserversStatus(tservers.getTservers());
 
       ClosableIterator<TabletManagement> iter = null;
       try {
@@ -584,7 +608,8 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
         eventHandler.clearNeedsFullScan();
 
         iter = store.iterator();
-        var tabletMgmtStats = manageTablets(iter, currentTServers, true);
+        var tabletMgmtStats =
+            manageTablets(iter, currentTServers, tservers.getTserverGroups(), 
true);
 
         // provide stats after flushing changes to avoid race conditions w/ 
delete table
         stats.end(managerState);
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
index 9c3c2a37f8..3b1a688ca8 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
@@ -71,6 +71,7 @@ import 
org.apache.accumulo.core.metadata.schema.TabletOperationType;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.server.manager.LiveTServerSet;
 import org.apache.accumulo.server.manager.state.CurrentState;
 import org.apache.accumulo.server.manager.state.TabletManagementIterator;
 import org.apache.hadoop.io.Text;
@@ -372,6 +373,11 @@ public class TabletManagementIteratorIT extends 
AccumuloClusterHarness {
       return tservers;
     }
 
+    @Override
+    public LiveTServerSet.LiveTServersSnapshot tserversSnapshot() {
+      return new LiveTServerSet.LiveTServersSnapshot(onlineTabletServers(), 
new HashMap<>());
+    }
+
     @Override
     public Set<TableId> onlineTables() {
       Set<TableId> onlineTables = context.getTableIdToNameMap().keySet();
@@ -380,11 +386,6 @@ public class TabletManagementIteratorIT extends 
AccumuloClusterHarness {
       return this.onlineTables;
     }
 
-    @Override
-    public Map<String,Set<TServerInstance>> tServerResourceGroups() {
-      return new HashMap<>();
-    }
-
     @Override
     public Set<KeyExtent> migrationsSnapshot() {
       return Collections.emptySet();

Reply via email to