This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 128a8d0ed1 Fix inconsistent view of TabletServers in Manager (#3901) 128a8d0ed1 is described below commit 128a8d0ed1ef76d487259e2ac8236383e963b667 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Mon Oct 30 13:55:15 2023 -0400 Fix inconsistent view of TabletServers in Manager (#3901) Modified LiveTserverSet so that the set of tablet servers and the tablet servers resource groups are acquired atomically. The code was acquiring this information at two different times with two different lock acquisitions, which could have led to race condtions resulting differences in set and the map. Fixed some cases in the balancers that were not handling an empty or non-existent group correctly. Co-authored-by: Keith Turner <ktur...@apache.org> --- .../manager/balancer/AssignmentParamsImpl.java | 27 +++++-- .../core/spi/balancer/SimpleLoadBalancer.java | 4 ++ .../core/spi/balancer/TableLoadBalancer.java | 8 ++- .../accumulo/server/manager/LiveTServerSet.java | 84 ++++++++++++++++------ .../server/manager/state/CurrentState.java | 3 +- .../manager/state/TabletManagementIterator.java | 6 +- .../java/org/apache/accumulo/manager/Manager.java | 16 ++--- .../accumulo/manager/TabletGroupWatcher.java | 57 ++++++++++----- .../functional/TabletManagementIteratorIT.java | 11 +-- 9 files changed, 154 insertions(+), 62 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/manager/balancer/AssignmentParamsImpl.java b/core/src/main/java/org/apache/accumulo/core/manager/balancer/AssignmentParamsImpl.java index 7bdbf70fc1..bf5e387fac 100644 --- a/core/src/main/java/org/apache/accumulo/core/manager/balancer/AssignmentParamsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/manager/balancer/AssignmentParamsImpl.java @@ -34,8 +34,13 @@ import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.spi.balancer.TabletBalancer; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AssignmentParamsImpl implements TabletBalancer.AssignmentParameters { + + private static final Logger LOG = LoggerFactory.getLogger(AssignmentParamsImpl.class); + private final SortedMap<TabletServerId,TServerStatus> currentStatus; private final Map<TabletId,TabletServerId> unassigned; private final Map<TabletId,TabletServerId> assignmentsOut; @@ -50,16 +55,26 @@ public class AssignmentParamsImpl implements TabletBalancer.AssignmentParameters Map<KeyExtent,TServerInstance> unassigned, Map<KeyExtent,TServerInstance> assignmentsOut) { SortedMap<TabletServerId,TServerStatus> currentStatusNew = new TreeMap<>(); - currentStatus.forEach((tsi, status) -> currentStatusNew.put(new TabletServerIdImpl(tsi), - TServerStatusImpl.fromThrift(status))); - Map<String,Set<TabletServerId>> tserverGroups = new HashMap<>(); - currentTServerGrouping.forEach((k, v) -> { + currentTServerGrouping.forEach((group, serversInGroup) -> { Set<TabletServerId> servers = new HashSet<>(); - v.forEach(tsi -> servers.add(TabletServerIdImpl.fromThrift(tsi))); - tserverGroups.put(k, servers); + serversInGroup.forEach(tsi -> { + TabletServerIdImpl id = TabletServerIdImpl.fromThrift(tsi); + if (currentStatus.containsKey(tsi)) { + currentStatusNew.put(id, TServerStatusImpl.fromThrift(currentStatus.get(tsi))); + servers.add(id); + } else { + LOG.debug("Dropping tserver {} from group {} as it's not in set of all servers", id, + group); + } + }); + if (!servers.isEmpty()) { + tserverGroups.put(group, servers); + } }); + LOG.debug("TServer groups for balancer assignment: {}", tserverGroups); + Map<TabletId,TabletServerId> unassignedNew = new HashMap<>(); unassigned.forEach( (ke, tsi) -> unassignedNew.put(new TabletIdImpl(ke), TabletServerIdImpl.fromThrift(tsi))); diff --git a/core/src/main/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancer.java b/core/src/main/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancer.java index 50fcd8417f..09189c0192 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancer.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancer.java @@ -356,6 +356,10 @@ public class SimpleLoadBalancer implements TabletBalancer { @Override public void getAssignments(AssignmentParameters params) { + if (params.currentStatus().isEmpty()) { + log.debug("No known TabletServers, skipping tablet assignment for now."); + return; + } params.unassignedTablets().forEach((tabletId, tserverId) -> params.addAssignment(tabletId, getAssignment(params.currentStatus(), tserverId))); } diff --git a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java index 853608018f..44cf29d1e8 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java @@ -149,10 +149,12 @@ public class TableLoadBalancer implements TabletBalancer { tserversInGroup.forEach(tsid -> { TServerStatus tss = allTServers.get(tsid); if (tss == null) { - throw new IllegalStateException("TabletServer " + tsid + " in " + groupNameInUse - + " TabletServer group, but not in set of all TabletServers"); + log.warn( + "Excluding TabletServer {} from group {} because TabletServerStatus is null, likely that Manager.StatusThread.updateStatus has not discovered it yet.", + tsid, groupNameInUse); + } else { + group.put(tsid, tss); } - group.put(tsid, tss); }); return group; } diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java index 018b7eaf89..f5914c4a3f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java @@ -22,6 +22,7 @@ import static java.util.concurrent.TimeUnit.MINUTES; import static org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy.SKIP; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -60,6 +61,7 @@ import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.net.HostAndPort; public class LiveTServerSet implements Watcher { @@ -204,12 +206,11 @@ public class LiveTServerSet implements Watcher { } } - // The set of active tservers with locks, indexed by their name in zookeeper + // The set of active tservers with locks, indexed by their name in zookeeper. When the contents of + // this map are modified, tServersSnapshot should be set to null. private final Map<String,TServerInfo> current = new HashMap<>(); - // as above, indexed by TServerInstance - private final Map<TServerInstance,TServerInfo> currentInstances = new HashMap<>(); - // as above, grouped by resource group name - private final Map<String,Set<TServerInstance>> currentGroups = new HashMap<>(); + + private LiveTServersSnapshot tServersSnapshot = null; // The set of entries in zookeeper without locks, and the first time each was noticed private final Map<String,Long> locklessServers = new HashMap<>(); @@ -270,6 +271,9 @@ public class LiveTServerSet implements Watcher { final Set<TServerInstance> doomed, final String path, final String zPath) throws InterruptedException, KeeperException { + // invalidate the snapshot forcing it to be recomputed the next time its requested + tServersSnapshot = null; + TServerInfo info = current.get(zPath); final var zLockPath = ServiceLock.path(path + "/" + zPath); @@ -280,8 +284,6 @@ public class LiveTServerSet implements Watcher { if (info != null) { doomed.add(info.instance); current.remove(zPath); - currentInstances.remove(info.instance); - currentGroups.get(info.resourceGroup).remove(info.instance); } Long firstSeen = locklessServers.get(zPath); @@ -302,18 +304,12 @@ public class LiveTServerSet implements Watcher { TServerInfo tServerInfo = new TServerInfo(instance, new TServerConnection(address), resourceGroup); current.put(zPath, tServerInfo); - currentInstances.put(instance, tServerInfo); - currentGroups.computeIfAbsent(resourceGroup, rg -> new HashSet<>()).add(instance); } else if (!info.instance.equals(instance)) { doomed.add(info.instance); updates.add(instance); TServerInfo tServerInfo = new TServerInfo(instance, new TServerConnection(address), resourceGroup); current.put(zPath, tServerInfo); - currentInstances.remove(info.instance); - currentGroups.getOrDefault(resourceGroup, new HashSet<>()).remove(instance); - currentInstances.put(instance, tServerInfo); - currentGroups.computeIfAbsent(resourceGroup, rg -> new HashSet<>()).add(instance); } } } @@ -356,21 +352,64 @@ public class LiveTServerSet implements Watcher { if (server == null) { return null; } - TServerInfo tServerInfo = currentInstances.get(server); + TServerInfo tServerInfo = getSnapshot().tserversInfo.get(server); if (tServerInfo == null) { return null; } return tServerInfo.connection; } - public synchronized Set<TServerInstance> getCurrentServers() { - return new HashSet<>(currentInstances.keySet()); + public static class LiveTServersSnapshot { + private final Set<TServerInstance> tservers; + private final Map<String,Set<TServerInstance>> tserverGroups; + + // TServerInfo is only for internal use, so this field is private w/o a getter. + private final Map<TServerInstance,TServerInfo> tserversInfo; + + @VisibleForTesting + public LiveTServersSnapshot(Set<TServerInstance> currentServers, + Map<String,Set<TServerInstance>> serverGroups) { + this.tserversInfo = null; + this.tservers = Set.copyOf(currentServers); + Map<String,Set<TServerInstance>> copy = new HashMap<>(); + serverGroups.forEach((k, v) -> copy.put(k, Set.copyOf(v))); + this.tserverGroups = Collections.unmodifiableMap(copy); + } + + public LiveTServersSnapshot(Map<TServerInstance,TServerInfo> currentServers, + Map<String,Set<TServerInstance>> serverGroups) { + this.tserversInfo = Map.copyOf(currentServers); + this.tservers = this.tserversInfo.keySet(); + Map<String,Set<TServerInstance>> copy = new HashMap<>(); + serverGroups.forEach((k, v) -> copy.put(k, Set.copyOf(v))); + this.tserverGroups = Collections.unmodifiableMap(copy); + } + + public Set<TServerInstance> getTservers() { + return tservers; + } + + public Map<String,Set<TServerInstance>> getTserverGroups() { + return tserverGroups; + } + } + + public synchronized LiveTServersSnapshot getSnapshot() { + if (tServersSnapshot == null) { + HashMap<TServerInstance,TServerInfo> tServerInstances = new HashMap<>(); + Map<String,Set<TServerInstance>> tserversGroups = new HashMap<>(); + current.values().forEach(tServerInfo -> { + tServerInstances.put(tServerInfo.instance, tServerInfo); + tserversGroups.computeIfAbsent(tServerInfo.resourceGroup, rg -> new HashSet<>()) + .add(tServerInfo.instance); + }); + tServersSnapshot = new LiveTServersSnapshot(tServerInstances, tserversGroups); + } + return tServersSnapshot; } - public synchronized Map<String,Set<TServerInstance>> getCurrentServersGroups() { - Map<String,Set<TServerInstance>> copy = new HashMap<>(); - currentGroups.forEach((k, v) -> copy.put(k, new HashSet<>(v))); - return copy; + public synchronized Set<TServerInstance> getCurrentServers() { + return getSnapshot().getTservers(); } public synchronized int size() { @@ -407,6 +446,10 @@ public class LiveTServerSet implements Watcher { } public synchronized void remove(TServerInstance server) { + + // invalidate the snapshot forcing it to be recomputed the next time its requested + tServersSnapshot = null; + String zPath = null; for (Entry<String,TServerInfo> entry : current.entrySet()) { if (entry.getValue().instance.equals(server)) { @@ -418,7 +461,6 @@ public class LiveTServerSet implements Watcher { return; } current.remove(zPath); - currentInstances.remove(server); log.info("Removing zookeeper lock for {}", server); String fullpath = context.getZooKeeperRoot() + Constants.ZTSERVERS + "/" + zPath; diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java index 09caba293e..1b72fa2a55 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java @@ -25,6 +25,7 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.manager.thrift.ManagerState; import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.server.manager.LiveTServerSet.LiveTServersSnapshot; public interface CurrentState { @@ -32,7 +33,7 @@ public interface CurrentState { Set<TServerInstance> onlineTabletServers(); - Map<String,Set<TServerInstance>> tServerResourceGroups(); + LiveTServersSnapshot tserversSnapshot(); Set<TServerInstance> shutdownServers(); diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java index 41646f4145..b44bc5d37e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java @@ -76,6 +76,7 @@ import org.apache.accumulo.core.spi.compaction.CompactionKind; import org.apache.accumulo.core.util.AddressUtil; import org.apache.accumulo.server.compaction.CompactionJobGenerator; import org.apache.accumulo.server.iterators.TabletIteratorEnvironment; +import org.apache.accumulo.server.manager.LiveTServerSet.LiveTServersSnapshot; import org.apache.accumulo.server.manager.balancer.BalancerEnvironmentImpl; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; @@ -304,13 +305,14 @@ public class TabletManagementIterator extends SkippingIterator { IteratorSetting tabletChange = new IteratorSetting(1001, "ManagerTabletInfoIterator", TabletManagementIterator.class); if (state != null) { - TabletManagementIterator.setCurrentServers(tabletChange, state.onlineTabletServers()); + LiveTServersSnapshot tserversSnapshot = state.tserversSnapshot(); + TabletManagementIterator.setCurrentServers(tabletChange, tserversSnapshot.getTservers()); TabletManagementIterator.setOnlineTables(tabletChange, state.onlineTables()); TabletManagementIterator.setMigrations(tabletChange, state.migrationsSnapshot()); TabletManagementIterator.setManagerState(tabletChange, state.getManagerState()); TabletManagementIterator.setShuttingDown(tabletChange, state.shutdownServers()); TabletManagementIterator.setTServerResourceGroups(tabletChange, - state.tServerResourceGroups()); + tserversSnapshot.getTserverGroups()); setCompactionHints(tabletChange, state.getCompactionHints()); } scanner.addScanIterator(tabletChange); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 5257a610a3..b5de7196d0 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -129,6 +129,7 @@ import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.compaction.CompactionConfigStorage; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.manager.LiveTServerSet; +import org.apache.accumulo.server.manager.LiveTServerSet.LiveTServersSnapshot; import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection; import org.apache.accumulo.server.manager.balancer.BalancerEnvironmentImpl; import org.apache.accumulo.server.manager.state.CurrentState; @@ -819,12 +820,11 @@ public class Manager extends AbstractServer } private long updateStatus() { - Set<TServerInstance> currentServers = tserverSet.getCurrentServers(); + var tseversSnapshot = tserverSet.getSnapshot(); TreeMap<TabletServerId,TServerStatus> temp = new TreeMap<>(); - tserverStatus = gatherTableInformation(currentServers, temp); + tserverStatus = gatherTableInformation(tseversSnapshot.getTservers(), temp); tserverStatusForBalancer = Collections.unmodifiableSortedMap(temp); - tServerGroupingForBalancer = - Collections.unmodifiableMap(tserverSet.getCurrentServersGroups()); + tServerGroupingForBalancer = tseversSnapshot.getTserverGroups(); checkForHeldServer(tserverStatus); if (!badServers.isEmpty()) { @@ -838,7 +838,7 @@ public class Manager extends AbstractServer log.debug("not balancing while shutting down servers {}", serversToShutdown); } else { for (TabletGroupWatcher tgw : watchers) { - if (!tgw.isSameTserversAsLastScan(currentServers)) { + if (!tgw.isSameTserversAsLastScan(tseversSnapshot.getTservers())) { log.debug("not balancing just yet, as collection of live tservers is in flux"); return DEFAULT_WAIT_FOR_WATCHER; } @@ -1606,12 +1606,12 @@ public class Manager extends AbstractServer @Override public Set<TServerInstance> onlineTabletServers() { - return tserverSet.getCurrentServers(); + return tserverSet.getSnapshot().getTservers(); } @Override - public Map<String,Set<TServerInstance>> tServerResourceGroups() { - return tserverSet.getCurrentServersGroups(); + public LiveTServersSnapshot tserversSnapshot() { + return tserverSet.getSnapshot(); } // recovers state from the persistent transaction to shutdown a server diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index cdb6953e8a..e0d0e5db7e 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -81,6 +82,7 @@ import org.apache.accumulo.server.compaction.CompactionJobGenerator; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.log.WalStateManager; import org.apache.accumulo.server.log.WalStateManager.WalMarkerException; +import org.apache.accumulo.server.manager.LiveTServerSet; import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection; import org.apache.accumulo.server.manager.state.Assignment; import org.apache.accumulo.server.manager.state.ClosableIterator; @@ -173,10 +175,30 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { public TabletLists(Manager m, SortedMap<TServerInstance,TabletServerStatus> curTServers, Map<String,Set<TServerInstance>> grouping) { - var destinationsMod = new TreeMap<>(curTServers); - destinationsMod.keySet().removeAll(m.serversToShutdown); - this.destinations = Collections.unmodifiableSortedMap(destinationsMod); - this.currentTServerGrouping = grouping; + synchronized (m.serversToShutdown) { + var destinationsMod = new TreeMap<>(curTServers); + if (!m.serversToShutdown.isEmpty()) { + // Remove servers that are in the process of shutting down from the lists of tablet + // servers. + destinationsMod.keySet().removeAll(m.serversToShutdown); + HashMap<String,Set<TServerInstance>> groupingCopy = new HashMap<>(); + grouping.forEach((group, groupsServers) -> { + if (Collections.disjoint(groupsServers, m.serversToShutdown)) { + groupingCopy.put(group, groupsServers); + } else { + var serversCopy = new HashSet<>(groupsServers); + serversCopy.removeAll(m.serversToShutdown); + groupingCopy.put(group, Collections.unmodifiableSet(serversCopy)); + } + }); + + this.currentTServerGrouping = Collections.unmodifiableMap(groupingCopy); + } else { + this.currentTServerGrouping = grouping; + } + + this.destinations = Collections.unmodifiableSortedMap(destinationsMod); + } } public void reset() { @@ -218,7 +240,9 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { continue; } - var currentTservers = getCurrentTservers(); + LiveTServerSet.LiveTServersSnapshot tservers = manager.tserverSet.getSnapshot(); + var currentTservers = getTserversStatus(tservers.getTservers()); + if (currentTservers.isEmpty()) { setNeedsFullScan(); continue; @@ -226,7 +250,7 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { try (var iter = store.iterator(ranges)) { long t1 = System.currentTimeMillis(); - manageTablets(iter, currentTservers, false); + manageTablets(iter, currentTservers, tservers.getTserverGroups(), false); long t2 = System.currentTimeMillis(); Manager.log.debug(String.format("[%s]: partial scan time %.2f seconds for %,d ranges", store.name(), (t2 - t1) / 1000., ranges.size())); @@ -296,7 +320,8 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { } private TableMgmtStats manageTablets(Iterator<TabletManagement> iter, - SortedMap<TServerInstance,TabletServerStatus> currentTServers, boolean isFullScan) + SortedMap<TServerInstance,TabletServerStatus> currentTServers, + Map<String,Set<TServerInstance>> tserverGroups, boolean isFullScan) throws BadLocationStateException, TException, DistributedStoreException, WalMarkerException, IOException { @@ -312,16 +337,13 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { int unloaded = 0; - final Map<String,Set<TServerInstance>> currentTServerGrouping = - manager.tserverSet.getCurrentServersGroups(); - - TabletLists tLists = new TabletLists(manager, currentTServers, currentTServerGrouping); + TabletLists tLists = new TabletLists(manager, currentTServers, tserverGroups); CompactionJobGenerator compactionGenerator = new CompactionJobGenerator( new ServiceEnvironmentImpl(manager.getContext()), manager.getCompactionHints()); final Map<TabletServerId,String> resourceGroups = new HashMap<>(); - manager.tServerResourceGroups().forEach((group, tservers) -> { + tserverGroups.forEach((group, tservers) -> { tservers.stream().map(TabletServerIdImpl::new) .forEach(tabletServerId -> resourceGroups.put(tabletServerId, group)); }); @@ -541,10 +563,11 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { return tableMgmtStats; } - private SortedMap<TServerInstance,TabletServerStatus> getCurrentTservers() { + private SortedMap<TServerInstance,TabletServerStatus> + getTserversStatus(Set<TServerInstance> currentServers) { // Get the current status for the current list of tservers final SortedMap<TServerInstance,TabletServerStatus> currentTServers = new TreeMap<>(); - for (TServerInstance entry : manager.tserverSet.getCurrentServers()) { + for (TServerInstance entry : currentServers) { currentTServers.put(entry, manager.tserverStatus.get(entry)); } return currentTServers; @@ -563,7 +586,8 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { final long waitTimeBetweenScans = manager.getConfiguration() .getTimeInMillis(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL); - var currentTServers = getCurrentTservers(); + LiveTServerSet.LiveTServersSnapshot tservers = manager.tserverSet.getSnapshot(); + var currentTServers = getTserversStatus(tservers.getTservers()); ClosableIterator<TabletManagement> iter = null; try { @@ -584,7 +608,8 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { eventHandler.clearNeedsFullScan(); iter = store.iterator(); - var tabletMgmtStats = manageTablets(iter, currentTServers, true); + var tabletMgmtStats = + manageTablets(iter, currentTServers, tservers.getTserverGroups(), true); // provide stats after flushing changes to avoid race conditions w/ delete table stats.end(managerState); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java index 9c3c2a37f8..3b1a688ca8 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java @@ -71,6 +71,7 @@ import org.apache.accumulo.core.metadata.schema.TabletOperationType; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.server.manager.LiveTServerSet; import org.apache.accumulo.server.manager.state.CurrentState; import org.apache.accumulo.server.manager.state.TabletManagementIterator; import org.apache.hadoop.io.Text; @@ -372,6 +373,11 @@ public class TabletManagementIteratorIT extends AccumuloClusterHarness { return tservers; } + @Override + public LiveTServerSet.LiveTServersSnapshot tserversSnapshot() { + return new LiveTServerSet.LiveTServersSnapshot(onlineTabletServers(), new HashMap<>()); + } + @Override public Set<TableId> onlineTables() { Set<TableId> onlineTables = context.getTableIdToNameMap().keySet(); @@ -380,11 +386,6 @@ public class TabletManagementIteratorIT extends AccumuloClusterHarness { return this.onlineTables; } - @Override - public Map<String,Set<TServerInstance>> tServerResourceGroups() { - return new HashMap<>(); - } - @Override public Set<KeyExtent> migrationsSnapshot() { return Collections.emptySet();