This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 48d39719148454f002ab346610ad39b911662acc
Merge: 374f2c1a6c 2890cc2c99
Author: Christopher Tubbs <ctubb...@apache.org>
AuthorDate: Thu Dec 21 16:27:48 2023 -0500

    Merge branch 'main' into elasticity

 pom.xml                                            |  3 +-
 .../org/apache/accumulo/server/ServerContext.java  |  4 +-
 .../org/apache/accumulo/server/ServerDirs.java     | 33 +++++-----
 .../org/apache/accumulo/server/fs/VolumeUtil.java  | 61 +++++++-----------
 .../apache/accumulo/server/init/Initialize.java    |  5 +-
 .../manager/state/TabletManagementParameters.java  | 12 ++--
 .../apache/accumulo/server/fs/VolumeUtilTest.java  | 75 ++++++++++------------
 .../main/java/org/apache/accumulo/gc/GCRun.java    |  3 +-
 .../accumulo/manager/TabletGroupWatcher.java       |  2 +-
 .../accumulo/manager/recovery/RecoveryManager.java |  2 +-
 .../org/apache/accumulo/tserver/log/DfsLogger.java | 28 ++++----
 .../apache/accumulo/tserver/logger/LogReader.java  |  5 --
 .../apache/accumulo/tserver/log/DfsLoggerTest.java |  9 +++
 .../functional/TabletManagementIteratorIT.java     | 10 ++-
 14 files changed, 111 insertions(+), 141 deletions(-)

diff --cc 
server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
index 5eda27a18d,a994d16edb..66c527b6d4
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
@@@ -18,25 -18,24 +18,25 @@@
   */
  package org.apache.accumulo.server.fs;
  
+ import static java.util.Objects.requireNonNull;
+ 
  import java.util.ArrayList;
 +import java.util.HashMap;
  import java.util.List;
  import java.util.Map;
  import java.util.Map.Entry;
- import java.util.Objects;
  import java.util.SortedMap;
  import java.util.TreeMap;
 +import java.util.function.BiConsumer;
 +import java.util.function.Consumer;
  
 -import org.apache.accumulo.core.dataImpl.KeyExtent;
 -import org.apache.accumulo.core.lock.ServiceLock;
  import org.apache.accumulo.core.metadata.ReferencedTabletFile;
  import org.apache.accumulo.core.metadata.StoredTabletFile;
  import org.apache.accumulo.core.metadata.schema.DataFileValue;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
  import org.apache.accumulo.core.tabletserver.log.LogEntry;
- import org.apache.accumulo.core.util.Pair;
 -import org.apache.accumulo.server.ServerContext;
  import org.apache.accumulo.server.fs.VolumeManager.FileType;
 -import org.apache.accumulo.server.util.MetadataTableUtil;
 +import org.apache.commons.lang3.mutable.MutableBoolean;
  import org.apache.hadoop.fs.Path;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -119,59 -103,37 +104,59 @@@ public class VolumeUtil 
      }
    }
  
-   public static boolean needsVolumeReplacement(final List<Pair<Path,Path>> 
replacements,
 -  /**
 -   * This method does two things. First, it switches any volumes a tablet is 
using that are
 -   * configured in instance.volumes.replacements. Second, if a tablet dir is 
no longer configured
 -   * for use it chooses a new tablet directory.
 -   */
 -  public static TabletFiles updateTabletVolumes(ServerContext context, 
ServiceLock zooLock,
 -      KeyExtent extent, TabletFiles tabletFiles) {
 -    Map<Path,Path> replacements = context.getVolumeReplacements();
++  public static boolean needsVolumeReplacement(final Map<Path,Path> 
replacements,
 +      final TabletMetadata tm) {
      if (replacements.isEmpty()) {
 -      return tabletFiles;
 +      return false;
      }
 -    log.trace("Using volume replacements: {}", replacements);
  
 -    List<LogEntry> logsToRemove = new ArrayList<>();
 -    List<LogEntry> logsToAdd = new ArrayList<>();
 +    MutableBoolean needsReplacement = new MutableBoolean(false);
 +
 +    Consumer<LogEntry> consumer = le -> needsReplacement.setTrue();
 +
 +    volumeReplacementEvaluation(replacements, tm, consumer, consumer,
 +        f -> needsReplacement.setTrue(), (f, dfv) -> 
needsReplacement.setTrue());
 +
 +    return needsReplacement.booleanValue();
 +  }
  
 -    List<StoredTabletFile> filesToRemove = new ArrayList<>();
 -    SortedMap<ReferencedTabletFile,DataFileValue> filesToAdd = new 
TreeMap<>();
 +  public static class VolumeReplacements {
 +    public final TabletMetadata tabletMeta;
 +    public final List<LogEntry> logsToRemove = new ArrayList<>();
 +    public final List<LogEntry> logsToAdd = new ArrayList<>();
 +    public final List<StoredTabletFile> filesToRemove = new ArrayList<>();
 +    public final Map<ReferencedTabletFile,DataFileValue> filesToAdd = new 
HashMap<>();
  
 -    TabletFiles ret = new TabletFiles();
 +    public VolumeReplacements(TabletMetadata tabletMeta) {
 +      this.tabletMeta = tabletMeta;
 +    }
 +  }
  
-   public static VolumeReplacements
-       computeVolumeReplacements(final List<Pair<Path,Path>> replacements, 
final TabletMetadata tm) {
 -    for (LogEntry logEntry : tabletFiles.logEntries) {
++  public static VolumeReplacements computeVolumeReplacements(final 
Map<Path,Path> replacements,
++      final TabletMetadata tm) {
 +    var vr = new VolumeReplacements(tm);
 +    volumeReplacementEvaluation(replacements, tm, vr.logsToRemove::add, 
vr.logsToAdd::add,
 +        vr.filesToRemove::add, vr.filesToAdd::put);
 +    return vr;
 +  }
 +
-   public static void volumeReplacementEvaluation(final List<Pair<Path,Path>> 
replacements,
++  public static void volumeReplacementEvaluation(final Map<Path,Path> 
replacements,
 +      final TabletMetadata tm, final Consumer<LogEntry> logsToRemove,
 +      final Consumer<LogEntry> logsToAdd, final Consumer<StoredTabletFile> 
filesToRemove,
 +      final BiConsumer<ReferencedTabletFile,DataFileValue> filesToAdd) {
 +    if (replacements.isEmpty() || (tm.getFilesMap().isEmpty() && 
tm.getLogs().isEmpty())) {
 +      return;
 +    }
 +
 +    log.trace("Using volume replacements: {}", replacements);
 +    for (LogEntry logEntry : tm.getLogs()) {
 +      log.trace("Evaluating walog {} for replacement.", logEntry);
-       LogEntry switchedLogEntry = switchVolumes(logEntry, replacements);
+       LogEntry switchedLogEntry = switchVolume(logEntry, replacements);
        if (switchedLogEntry != null) {
 -        logsToRemove.add(logEntry);
 -        logsToAdd.add(switchedLogEntry);
 -        ret.logEntries.add(switchedLogEntry);
 -        log.debug("Replacing volume {} : {} -> {}", extent, 
logEntry.getPath(),
 +        logsToRemove.accept(logEntry);
 +        logsToAdd.accept(switchedLogEntry);
 +        log.trace("Replacing volume {} : {} -> {}", tm.getExtent(), 
logEntry.getPath(),
              switchedLogEntry.getPath());
 -      } else {
 -        ret.logEntries.add(logEntry);
        }
      }
  
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java
index 4d183cd6c6,0000000000..c6bfbe3fc3
mode 100644,000000..100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java
@@@ -1,263 -1,0 +1,265 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.server.manager.state;
 +
 +import static java.util.stream.Collectors.toList;
 +import static java.util.stream.Collectors.toMap;
 +import static java.util.stream.Collectors.toSet;
 +import static java.util.stream.Collectors.toUnmodifiableMap;
 +import static java.util.stream.Collectors.toUnmodifiableSet;
 +
 +import java.io.IOException;
 +import java.io.UncheckedIOException;
 +import java.util.Arrays;
 +import java.util.Base64;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.function.Supplier;
 +
 +import org.apache.accumulo.core.data.AbstractId;
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.manager.thrift.ManagerState;
 +import org.apache.accumulo.core.metadata.TServerInstance;
 +import org.apache.accumulo.core.metadata.schema.Ample;
 +import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.server.manager.LiveTServerSet;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.DataInputBuffer;
 +import org.apache.hadoop.io.DataOutputBuffer;
 +
 +import com.google.common.base.Suppliers;
 +import com.google.gson.Gson;
 +import com.google.gson.GsonBuilder;
 +
 +/**
 + * An immutable snapshot of the information needed by the TabletGroupWatcher 
and the
 + * {@link TabletManagementIterator} to make decisions about tablets.
 + */
 +public class TabletManagementParameters {
 +  // ELASTICITY_TODO need to unit test serialization and deserialization of 
this class.
 +  private final ManagerState managerState;
 +  private final Map<Ample.DataLevel,Boolean> parentUpgradeMap;
 +  private final Set<TableId> onlineTables;
 +  private final Set<TServerInstance> serversToShutdown;
 +  private final Map<KeyExtent,TServerInstance> migrations;
 +
 +  private final Ample.DataLevel level;
 +
 +  private final Supplier<Map<TServerInstance,String>> resourceGroups;
 +  private final Map<String,Set<TServerInstance>> tserverGroups;
 +  private final Map<Long,Map<String,String>> compactionHints;
 +  private final Set<TServerInstance> onlineTservers;
 +  private final boolean canSuspendTablets;
-   private final List<Pair<Path,Path>> volumeReplacements;
++  private final Map<Path,Path> volumeReplacements;
 +
 +  public TabletManagementParameters(ManagerState managerState,
 +      Map<Ample.DataLevel,Boolean> parentUpgradeMap, Set<TableId> 
onlineTables,
 +      LiveTServerSet.LiveTServersSnapshot liveTServersSnapshot,
 +      Set<TServerInstance> serversToShutdown, Map<KeyExtent,TServerInstance> 
migrations,
 +      Ample.DataLevel level, Map<Long,Map<String,String>> compactionHints,
-       boolean canSuspendTablets, List<Pair<Path,Path>> volumeReplacements) {
++      boolean canSuspendTablets, Map<Path,Path> volumeReplacements) {
 +    this.managerState = managerState;
 +    this.parentUpgradeMap = Map.copyOf(parentUpgradeMap);
 +    // TODO could filter by level
 +    this.onlineTables = Set.copyOf(onlineTables);
 +    // This is already immutable, so no need to copy
 +    this.onlineTservers = liveTServersSnapshot.getTservers();
 +    this.serversToShutdown = Set.copyOf(serversToShutdown);
 +    // TODO could filter by level
 +    this.migrations = Map.copyOf(migrations);
 +    this.level = level;
 +    // This is already immutable, so no need to copy
 +    this.tserverGroups = liveTServersSnapshot.getTserverGroups();
 +    this.compactionHints = makeImmutable(compactionHints);
 +    this.resourceGroups = Suppliers.memoize(() -> {
 +      Map<TServerInstance,String> resourceGroups = new HashMap<>();
 +      TabletManagementParameters.this.tserverGroups.forEach((resourceGroup, 
tservers) -> tservers
 +          .forEach(tserver -> resourceGroups.put(tserver, resourceGroup)));
 +      return Map.copyOf(resourceGroups);
 +    });
 +    this.canSuspendTablets = canSuspendTablets;
 +    this.volumeReplacements = volumeReplacements;
 +  }
 +
 +  private TabletManagementParameters(JsonData jdata) {
 +    this.managerState = jdata.managerState;
 +    this.parentUpgradeMap = Map.copyOf(jdata.parentUpgradeMap);
 +    this.onlineTables = 
jdata.onlineTables.stream().map(TableId::of).collect(toUnmodifiableSet());
 +    this.onlineTservers =
 +        
jdata.onlineTservers.stream().map(TServerInstance::new).collect(toUnmodifiableSet());
 +    this.serversToShutdown =
 +        
jdata.serversToShutdown.stream().map(TServerInstance::new).collect(toUnmodifiableSet());
 +    this.migrations = jdata.migrations.entrySet().stream()
 +        .collect(toUnmodifiableMap(entry -> 
JsonData.strToExtent(entry.getKey()),
 +            entry -> new TServerInstance(entry.getValue())));
 +    this.level = jdata.level;
 +    this.compactionHints = makeImmutable(jdata.compactionHints);
 +    this.tserverGroups = 
jdata.tserverGroups.entrySet().stream().collect(toUnmodifiableMap(
 +        Map.Entry::getKey,
 +        entry -> 
entry.getValue().stream().map(TServerInstance::new).collect(toUnmodifiableSet())));
 +    this.resourceGroups = Suppliers.memoize(() -> {
 +      Map<TServerInstance,String> resourceGroups = new HashMap<>();
 +      TabletManagementParameters.this.tserverGroups.forEach((resourceGroup, 
tservers) -> tservers
 +          .forEach(tserver -> resourceGroups.put(tserver, resourceGroup)));
 +      return Map.copyOf(resourceGroups);
 +    });
 +    this.canSuspendTablets = jdata.canSuspendTablets;
-     this.volumeReplacements = jdata.volumeReplacements;
++    this.volumeReplacements = jdata.volumeReplacements.stream()
++        .collect(toUnmodifiableMap(Pair::getFirst, Pair::getSecond));
 +  }
 +
 +  public ManagerState getManagerState() {
 +    return managerState;
 +  }
 +
 +  public boolean isParentLevelUpgraded() {
 +    return parentUpgradeMap.get(level);
 +  }
 +
 +  public Set<TServerInstance> getOnlineTsevers() {
 +    return onlineTservers;
 +  }
 +
 +  public Set<TServerInstance> getServersToShutdown() {
 +    return serversToShutdown;
 +  }
 +
 +  public boolean isTableOnline(TableId tableId) {
 +    return onlineTables.contains(tableId);
 +  }
 +
 +  public Map<KeyExtent,TServerInstance> getMigrations() {
 +    return migrations;
 +  }
 +
 +  public Ample.DataLevel getLevel() {
 +    return level;
 +  }
 +
 +  public String getResourceGroup(TServerInstance tserver) {
 +    return resourceGroups.get().get(tserver);
 +  }
 +
 +  public Map<String,Set<TServerInstance>> getGroupedTServers() {
 +    return tserverGroups;
 +  }
 +
 +  public Set<TableId> getOnlineTables() {
 +    return onlineTables;
 +  }
 +
 +  public Map<Long,Map<String,String>> getCompactionHints() {
 +    return compactionHints;
 +  }
 +
 +  public boolean canSuspendTablets() {
 +    return canSuspendTablets;
 +  }
 +
-   public List<Pair<Path,Path>> getVolumeReplacements() {
++  public Map<Path,Path> getVolumeReplacements() {
 +    return volumeReplacements;
 +  }
 +
 +  private static Map<Long,Map<String,String>>
 +      makeImmutable(Map<Long,Map<String,String>> compactionHints) {
 +    var copy = new HashMap<Long,Map<String,String>>();
 +    compactionHints.forEach((ftxid, hints) -> copy.put(ftxid, 
Map.copyOf(hints)));
 +    return Collections.unmodifiableMap(copy);
 +  }
 +
 +  private static class JsonData {
 +    final ManagerState managerState;
 +    final Map<Ample.DataLevel,Boolean> parentUpgradeMap;
 +    final Collection<String> onlineTables;
 +    final Collection<String> onlineTservers;
 +    final Collection<String> serversToShutdown;
 +    final Map<String,String> migrations;
 +
 +    final Ample.DataLevel level;
 +
 +    final Map<String,Set<String>> tserverGroups;
 +
 +    final Map<Long,Map<String,String>> compactionHints;
 +
 +    final boolean canSuspendTablets;
 +    final List<Pair<Path,Path>> volumeReplacements;
 +
 +    private static String toString(KeyExtent extent) {
 +      DataOutputBuffer buffer = new DataOutputBuffer();
 +      try {
 +        extent.writeTo(buffer);
 +      } catch (IOException e) {
 +        throw new UncheckedIOException(e);
 +      }
 +
 +      return Base64.getEncoder()
 +          .encodeToString(Arrays.copyOf(buffer.getData(), 
buffer.getLength()));
 +
 +    }
 +
 +    private static KeyExtent strToExtent(String kes) {
 +      byte[] data = Base64.getDecoder().decode(kes);
 +      DataInputBuffer buffer = new DataInputBuffer();
 +      buffer.reset(data, data.length);
 +      try {
 +        return KeyExtent.readFrom(buffer);
 +      } catch (IOException e) {
 +        throw new UncheckedIOException(e);
 +      }
 +    }
 +
 +    JsonData(TabletManagementParameters params) {
 +      managerState = params.managerState;
 +      parentUpgradeMap = params.parentUpgradeMap;
 +      onlineTables = 
params.onlineTables.stream().map(AbstractId::canonical).collect(toList());
 +      onlineTservers = 
params.getOnlineTsevers().stream().map(TServerInstance::getHostPortSession)
 +          .collect(toList());
 +      serversToShutdown = 
params.serversToShutdown.stream().map(TServerInstance::getHostPortSession)
 +          .collect(toList());
 +      migrations = params.migrations.entrySet().stream().collect(
 +          toMap(entry -> toString(entry.getKey()), entry -> 
entry.getValue().getHostPortSession()));
 +      level = params.level;
 +      tserverGroups = params.getGroupedTServers().entrySet().stream()
 +          .collect(toMap(Map.Entry::getKey, entry -> entry.getValue().stream()
 +              .map(TServerInstance::getHostPortSession).collect(toSet())));
 +      compactionHints = params.compactionHints;
 +      canSuspendTablets = params.canSuspendTablets;
-       volumeReplacements = params.volumeReplacements;
++      volumeReplacements =
++          
params.volumeReplacements.entrySet().stream().map(Pair::fromEntry).collect(toList());
 +    }
 +
 +  }
 +
 +  public String serialize() {
 +    Gson gson = new GsonBuilder().disableHtmlEscaping().create();
 +    return gson.toJson(new JsonData(this));
 +  }
 +
 +  public static TabletManagementParameters deserialize(String json) {
 +    Gson gson = new GsonBuilder().disableHtmlEscaping().create();
 +    JsonData jdata = gson.fromJson(json, JsonData.class);
 +    return new TabletManagementParameters(jdata);
 +  }
 +
 +}
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index 3d9e840af2,068bb4930b..6d90fa5423
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@@ -214,465 -178,198 +214,465 @@@ abstract class TabletGroupWatcher exten
      }
    }
  
 -  @Override
 -  public void run() {
 -    int[] oldCounts = new int[TabletState.values().length];
 -    EventCoordinator.Listener eventListener = 
this.manager.nextEvent.getListener();
 +  class EventHandler implements EventCoordinator.Listener {
  
 -    WalStateManager wals = new WalStateManager(manager.getContext());
 +    // Setting this to true to start with because its not know what happended 
before this object was
 +    // created, so just start off with full scan.
 +    private boolean needsFullScan = true;
  
 -    while (manager.stillManager()) {
 -      // slow things down a little, otherwise we spam the logs when there are 
many wake-up events
 -      sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 +    private final BlockingQueue<Range> rangesToProcess;
  
 -      final long waitTimeBetweenScans = manager.getConfiguration()
 -          .getTimeInMillis(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL);
 +    class RangeProccessor implements Runnable {
 +      @Override
 +      public void run() {
 +        try {
 +          while (manager.stillManager()) {
 +            var range = rangesToProcess.poll(100, TimeUnit.MILLISECONDS);
 +            if (range == null) {
 +              // check to see if still the manager
 +              continue;
 +            }
  
 -      int totalUnloaded = 0;
 -      int unloaded = 0;
 -      ClosableIterator<TabletLocationState> iter = null;
 -      try {
 -        Map<TableId,MergeStats> mergeStatsCache = new HashMap<>();
 -        Map<TableId,MergeStats> currentMerges = new HashMap<>();
 -        for (MergeInfo merge : manager.merges()) {
 -          if (merge.getExtent() != null) {
 -            currentMerges.put(merge.getExtent().tableId(), new 
MergeStats(merge));
 +            ArrayList<Range> ranges = new ArrayList<>();
 +            ranges.add(range);
 +
 +            rangesToProcess.drainTo(ranges);
 +
 +            if (manager.getManagerGoalState() == ManagerGoalState.CLEAN_STOP) 
{
 +              // only do full scans when trying to shutdown
 +              setNeedsFullScan();
 +              continue;
 +            }
 +
 +            TabletManagementParameters tabletMgmtParams = 
createTabletManagementParameters(false);
 +
 +            var currentTservers = 
getCurrentTservers(tabletMgmtParams.getOnlineTsevers());
 +            if (currentTservers.isEmpty()) {
 +              setNeedsFullScan();
 +              continue;
 +            }
 +
 +            try (var iter = store.iterator(ranges, tabletMgmtParams)) {
 +              long t1 = System.currentTimeMillis();
 +              manageTablets(iter, tabletMgmtParams, currentTservers, false);
 +              long t2 = System.currentTimeMillis();
 +              Manager.log.debug(String.format("[%s]: partial scan time %.2f 
seconds for %,d ranges",
 +                  store.name(), (t2 - t1) / 1000., ranges.size()));
 +            } catch (Exception e) {
 +              Manager.log.error("Error processing {} ranges for store {} ", 
ranges.size(),
 +                  store.name(), e);
 +            }
            }
 +        } catch (InterruptedException e) {
 +          throw new RuntimeException(e);
          }
 +      }
 +    }
  
 -        // Get the current status for the current list of tservers
 -        SortedMap<TServerInstance,TabletServerStatus> currentTServers = new 
TreeMap<>();
 -        for (TServerInstance entry : manager.tserverSet.getCurrentServers()) {
 -          currentTServers.put(entry, manager.tserverStatus.get(entry));
 -        }
 +    EventHandler() {
 +      rangesToProcess = new ArrayBlockingQueue<>(3000);
  
 -        if (currentTServers.isEmpty()) {
 -          eventListener.waitForEvents(waitTimeBetweenScans);
 -          synchronized (this) {
 -            lastScanServers = Collections.emptySortedSet();
 +      Threads
 +          .createThread("TGW [" + store.name() + "] event range processor", 
new RangeProccessor())
 +          .start();
 +    }
 +
 +    private synchronized void setNeedsFullScan() {
 +      needsFullScan = true;
 +      notifyAll();
 +    }
 +
 +    public synchronized void clearNeedsFullScan() {
 +      needsFullScan = false;
 +    }
 +
 +    @Override
 +    public void process(EventCoordinator.Event event) {
 +
 +      switch (event.getScope()) {
 +        case ALL:
 +        case DATA_LEVEL:
 +          setNeedsFullScan();
 +          break;
 +        case TABLE:
 +        case TABLE_RANGE:
 +          if (!rangesToProcess.offer(event.getExtent().toMetaRange())) {
 +            Manager.log.debug("[{}] unable to process event range {} because 
queue is full",
 +                store.name(), event.getExtent());
 +            setNeedsFullScan();
            }
 -          continue;
 +          break;
 +        default:
 +          throw new IllegalArgumentException("Unhandled scope " + 
event.getScope());
 +      }
 +    }
 +
 +    synchronized void waitForFullScan(long millis) {
 +      if (!needsFullScan) {
 +        try {
 +          wait(millis);
 +        } catch (InterruptedException e) {
 +          throw new RuntimeException(e);
          }
 +      }
 +    }
 +  }
  
 -        TabletLists tLists = new TabletLists(manager, currentTServers);
 +  private TabletManagementParameters
 +      createTabletManagementParameters(boolean 
lookForTabletsNeedingVolReplacement) {
  
 -        ManagerState managerState = manager.getManagerState();
 -        int[] counts = new int[TabletState.values().length];
 -        stats.begin();
 -        // Walk through the tablets in our store, and work tablets
 -        // towards their goal
 -        iter = store.iterator();
 -        while (iter.hasNext()) {
 -          TabletLocationState tls = iter.next();
 -          if (tls == null) {
 -            continue;
 -          }
 +    HashMap<Ample.DataLevel,Boolean> parentLevelUpgrade = new HashMap<>();
 +    UpgradeCoordinator.UpgradeStatus upgradeStatus = 
manager.getUpgradeStatus();
 +    for (var level : Ample.DataLevel.values()) {
 +      parentLevelUpgrade.put(level, 
upgradeStatus.isParentLevelUpgraded(level));
 +    }
  
 -          // ignore entries for tables that do not exist in zookeeper
 -          if (manager.getTableManager().getTableState(tls.extent.tableId()) 
== null) {
 -            continue;
 -          }
 +    Set<TServerInstance> shutdownServers;
 +    if (store.getLevel() == Ample.DataLevel.USER) {
 +      shutdownServers = manager.shutdownServers();
 +    } else {
 +      // Use the servers to shutdown filtered by the dependent watcher. These 
are servers to
 +      // shutdown that the dependent watcher has determined it has no tablets 
hosted on or assigned
 +      // to.
 +      shutdownServers = dependentWatcher.getFilteredServersToShutdown();
 +    }
  
 -          // Don't overwhelm the tablet servers with work
 -          if (tLists.unassigned.size() + unloaded
 -              > Manager.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) {
 -            flushChanges(tLists, wals);
 -            tLists.reset();
 -            unloaded = 0;
 -            eventListener.waitForEvents(waitTimeBetweenScans);
 -          }
 -          TableId tableId = tls.extent.tableId();
 -          TableConfiguration tableConf = 
manager.getContext().getTableConfiguration(tableId);
 -
 -          MergeStats mergeStats = mergeStatsCache.computeIfAbsent(tableId, k 
-> {
 -            var mStats = currentMerges.get(k);
 -            return mStats != null ? mStats : new MergeStats(new MergeInfo());
 -          });
 -          TabletGoalState goal = manager.getGoalState(tls, 
mergeStats.getMergeInfo());
 -          Location location = tls.getLocation();
 -          TabletState state = tls.getState(currentTServers.keySet());
 -
 -          TabletLogger.missassigned(tls.extent, goal.toString(), 
state.toString(),
 -              tls.getFutureServer(), tls.getCurrentServer(), 
tls.walogs.size());
 -
 -          stats.update(tableId, state);
 -          mergeStats.update(tls.extent, state);
 -
 -          // Always follow through with assignments
 -          if (state == TabletState.ASSIGNED) {
 -            goal = TabletGoalState.HOSTED;
 +    var tServersSnapshot = manager.tserversSnapshot();
 +
 +    return new TabletManagementParameters(manager.getManagerState(), 
parentLevelUpgrade,
 +        manager.onlineTables(), tServersSnapshot, shutdownServers, 
manager.migrationsSnapshot(),
 +        store.getLevel(), manager.getCompactionHints(), canSuspendTablets(),
 +        lookForTabletsNeedingVolReplacement ? 
manager.getContext().getVolumeReplacements()
-             : List.of());
++            : Map.of());
 +  }
 +
 +  private Set<TServerInstance> getFilteredServersToShutdown() {
 +    return filteredServersToShutdown;
 +  }
 +
 +  private static class TableMgmtStats {
 +    int[] counts = new int[TabletState.values().length];
 +    private int totalUnloaded;
 +    private long totalVolumeReplacements;
 +  }
 +
 +  private TableMgmtStats manageTablets(Iterator<TabletManagement> iter,
 +      TabletManagementParameters tableMgmtParams,
 +      SortedMap<TServerInstance,TabletServerStatus> currentTServers, boolean 
isFullScan)
 +      throws BadLocationStateException, TException, 
DistributedStoreException, WalMarkerException,
 +      IOException {
 +
 +    TableMgmtStats tableMgmtStats = new TableMgmtStats();
 +    final boolean shuttingDownAllTabletServers =
 +        
tableMgmtParams.getServersToShutdown().equals(currentTServers.keySet());
 +    if (shuttingDownAllTabletServers && !isFullScan) {
 +      // If we are shutting down all of the TabletServers, then don't process 
any events
 +      // from the EventCoordinator.
 +      LOG.debug("Partial scan requested, but aborted due to shutdown of all 
TabletServers");
 +      return tableMgmtStats;
 +    }
 +
 +    int unloaded = 0;
 +
 +    TabletLists tLists = new TabletLists(currentTServers, 
tableMgmtParams.getGroupedTServers(),
 +        tableMgmtParams.getServersToShutdown());
 +
 +    CompactionJobGenerator compactionGenerator = new CompactionJobGenerator(
 +        new ServiceEnvironmentImpl(manager.getContext()), 
tableMgmtParams.getCompactionHints());
 +
 +    Set<TServerInstance> filteredServersToShutdown =
 +        new HashSet<>(tableMgmtParams.getServersToShutdown());
 +
 +    while (iter.hasNext()) {
 +      final TabletManagement mti = iter.next();
 +      if (mti == null) {
 +        throw new IllegalStateException("State store returned a null 
ManagerTabletInfo object");
 +      }
 +
 +      final TabletMetadata tm = mti.getTabletMetadata();
 +
 +      final String mtiError = mti.getErrorMessage();
 +      if (mtiError != null) {
 +        // An error happened on the TabletServer in the 
TabletManagementIterator
 +        // when trying to process this extent.
 +        LOG.warn(
 +            "Error on TabletServer trying to get Tablet management 
information for extent: {}. Error message: {}",
 +            tm.getExtent(), mtiError);
 +        this.metrics.incrementTabletGroupWatcherError(this.store.getLevel());
 +        continue;
 +      }
 +
 +      final Set<ManagementAction> actions = mti.getActions();
 +      if (actions.contains(ManagementAction.BAD_STATE) && 
tm.isFutureAndCurrentLocationSet()) {
 +        throw new BadLocationStateException(
 +            tm.getExtent() + " is both assigned and hosted, which should 
never happen: " + this,
 +            tm.getExtent().toMetaRow());
 +      }
 +
 +      final TableId tableId = tm.getTableId();
 +      // ignore entries for tables that do not exist in zookeeper
 +      if (manager.getTableManager().getTableState(tableId) == null) {
 +        continue;
 +      }
 +
 +      // Don't overwhelm the tablet servers with work
 +      if (tLists.unassigned.size() + unloaded
 +          > Manager.MAX_TSERVER_WORK_CHUNK * currentTServers.size()
 +          || tLists.volumeReplacements.size() > 1000) {
 +        flushChanges(tLists);
 +        tLists.reset();
 +        unloaded = 0;
 +      }
 +
 +      final TableConfiguration tableConf = 
manager.getContext().getTableConfiguration(tableId);
 +
 +      TabletState state = TabletState.compute(tm, currentTServers.keySet());
 +      if (state == TabletState.ASSIGNED_TO_DEAD_SERVER) {
 +        /*
 +         * This code exists to deal with a race condition caused by two 
threads running in this
 +         * class that compute tablets actions. One thread does full scans and 
the other reacts to
 +         * events and does partial scans. Below is an example of the race 
condition this is
 +         * handling.
 +         *
 +         * - TGW Thread 1 : reads the set of tablets servers and its empty
 +         *
 +         * - TGW Thread 2 : reads the set of tablet servers and its [TS1]
 +         *
 +         * - TGW Thread 2 : Sees tabletX without a location and assigns it to 
TS1
 +         *
 +         * - TGW Thread 1 : Sees tabletX assigned to TS1 and assumes it's 
assigned to a dead tablet
 +         * server because its set of live servers is the empty set.
 +         *
 +         * To deal with this race condition, this code recomputes the tablet 
state using the latest
 +         * tservers when a tablet is seen assigned to a dead tserver.
 +         */
 +
 +        TabletState newState = TabletState.compute(tm, 
manager.tserversSnapshot().getTservers());
 +        if (newState != state) {
 +          LOG.debug("Tablet state changed when using latest set of tservers 
{} {} {}",
 +              tm.getExtent(), state, newState);
 +          state = newState;
 +        }
 +      }
 +
 +      // This is final because nothing in this method should change the goal. 
All computation of the
 +      // goal should be done in TabletGoalState.compute() so that all parts 
of the Accumulo code
 +      // will compute a consistent goal.
 +      final TabletGoalState goal =
 +          TabletGoalState.compute(tm, state, manager.tabletBalancer, 
tableMgmtParams);
 +
 +      if (actions.contains(ManagementAction.NEEDS_VOLUME_REPLACEMENT)) {
 +        tableMgmtStats.totalVolumeReplacements++;
 +        if (state == TabletState.UNASSIGNED || state == 
TabletState.SUSPENDED) {
 +          var volRep =
 +              
VolumeUtil.computeVolumeReplacements(tableMgmtParams.getVolumeReplacements(), 
tm);
 +          if (volRep.logsToRemove.size() + volRep.filesToRemove.size() > 0) {
 +            if (tm.getLocation() != null) {
 +              // since the totalVolumeReplacements counter was incremented, 
should try this again
 +              // later after its unassigned
 +              LOG.debug("Volume replacement needed for {} but it has a 
location {}.",
 +                  tm.getExtent(), tm.getLocation());
 +            } else if (tm.getOperationId() != null) {
 +              LOG.debug("Volume replacement needed for {} but it has an 
active operation {}.",
 +                  tm.getExtent(), tm.getOperationId());
 +            } else {
 +              LOG.debug("Volume replacement needed for {}.", tm.getExtent());
 +              // buffer replacements so that multiple mutations can be done 
at once
 +              tLists.volumeReplacements.add(volRep);
 +            }
 +          } else {
 +            LOG.debug("Volume replacement evaluation for {} returned no 
changes.", tm.getExtent());
            }
 -          if (Manager.log.isTraceEnabled()) {
 -            Manager.log.trace(
 -                "[{}] Shutting down all Tservers: {}, dependentCount: {} 
Extent: {}, state: {}, goal: {}",
 -                store.name(), 
manager.serversToShutdown.equals(currentTServers.keySet()),
 -                dependentWatcher == null ? "null" : 
dependentWatcher.assignedOrHosted(), tls.extent,
 -                state, goal);
 +        } else {
 +          LOG.debug("Volume replacement needed for {} but its tablet state is 
{}.", tm.getExtent(),
 +              state);
 +        }
 +      }
 +
 +      final Location location = tm.getLocation();
 +      Location current = null;
 +      Location future = null;
 +      if (tm.hasCurrent()) {
 +        current = tm.getLocation();
 +      } else {
 +        future = tm.getLocation();
 +      }
 +      TabletLogger.missassigned(tm.getExtent(), goal.toString(), 
state.toString(),
 +          future != null ? future.getServerInstance() : null,
 +          current != null ? current.getServerInstance() : null, 
tm.getLogs().size());
 +
 +      if (isFullScan) {
 +        stats.update(tableId, state);
 +      }
 +
 +      if (Manager.log.isTraceEnabled()) {
 +        Manager.log.trace(
 +            "[{}] Shutting down all Tservers: {}, dependentCount: {} Extent: 
{}, state: {}, goal: {} actions:{}",
 +            store.name(), 
tableMgmtParams.getServersToShutdown().equals(currentTServers.keySet()),
 +            dependentWatcher == null ? "null" : 
dependentWatcher.assignedOrHosted(), tm.getExtent(),
 +            state, goal, actions);
 +      }
 +
 +      if (actions.contains(ManagementAction.NEEDS_SPLITTING)
 +          && !actions.contains(ManagementAction.NEEDS_VOLUME_REPLACEMENT)) {
 +        LOG.debug("{} may need splitting.", tm.getExtent());
 +        if (manager.getSplitter().isSplittable(tm)) {
 +          if (manager.getSplitter().addSplitStarting(tm.getExtent())) {
 +            LOG.debug("submitting tablet {} for split", tm.getExtent());
 +            manager.getSplitter().executeSplit(new 
SplitTask(manager.getContext(), tm, manager));
            }
 +        } else {
 +          LOG.debug("{} is not splittable.", tm.getExtent());
 +        }
 +        // ELASITICITY_TODO: See #3605. Merge is non-functional. Left this 
commented out code to
 +        // show where merge used to make a call to split a tablet.
 +        // sendSplitRequest(mergeStats.getMergeInfo(), state, tm);
 +      }
 +
 +      if (actions.contains(ManagementAction.NEEDS_COMPACTING)
 +          && !actions.contains(ManagementAction.NEEDS_VOLUME_REPLACEMENT)) {
 +        var jobs = compactionGenerator.generateJobs(tm,
 +            TabletManagementIterator.determineCompactionKinds(actions));
 +        LOG.debug("{} may need compacting adding {} jobs", tm.getExtent(), 
jobs.size());
 +        manager.getCompactionCoordinator().addJobs(tm, jobs);
 +      }
 +
 +      // ELASITICITY_TODO the case where a planner generates compactions at 
time T1 for tablet
 +      // and later at time T2 generates nothing for the same tablet is not 
being handled. At
 +      // time T1 something could have been queued. However at time T2 we will 
not clear those
 +      // entries from the queue because we see nothing here for that case. 
After a full
 +      // metadata scan could remove any tablets that were not updated during 
the scan.
 +
 +      if (actions.contains(ManagementAction.NEEDS_LOCATION_UPDATE)) {
  
 -          // if we are shutting down all the tabletservers, we have to do it 
in order
 -          if ((goal == TabletGoalState.SUSPENDED && state == 
TabletState.HOSTED)
 -              && manager.serversToShutdown.equals(currentTServers.keySet())) {
 -            if (dependentWatcher != null) {
 -              // If the dependentWatcher is for the user tables, check to see
 -              // that user tables exist.
 -              DataLevel dependentLevel = dependentWatcher.store.getLevel();
 -              boolean userTablesExist = true;
 -              switch (dependentLevel) {
 -                case USER:
 -                  Set<TableId> onlineTables = manager.onlineTables();
 -                  onlineTables.remove(RootTable.ID);
 -                  onlineTables.remove(MetadataTable.ID);
 -                  userTablesExist = !onlineTables.isEmpty();
 -                  break;
 -                case METADATA:
 -                case ROOT:
 -                default:
 -                  break;
 +        if (tm.getLocation() != null) {
 +          
filteredServersToShutdown.remove(tm.getLocation().getServerInstance());
 +        }
 +
 +        if (goal == TabletGoalState.HOSTED) {
 +          if ((state != TabletState.HOSTED && !tm.getLogs().isEmpty())
 +              && manager.recoveryManager.recoverLogs(tm.getExtent(), 
tm.getLogs())) {
 +            LOG.debug("Not hosting {} as it needs recovery, logs: {}", 
tm.getExtent(),
 +                tm.getLogs().size());
 +            continue;
 +          }
 +          switch (state) {
 +            case HOSTED:
 +              if 
(location.getServerInstance().equals(manager.migrations.get(tm.getExtent()))) {
 +                manager.migrations.remove(tm.getExtent());
                }
 -              // If the stats object in the dependentWatcher is empty, then it
 -              // currently does not have data about what is hosted or not. In
 -              // that case host these tablets until the dependent watcher can
 -              // gather some data.
 -              final Map<TableId,TableCounts> stats = 
dependentWatcher.getStats();
 -              if (dependentLevel == DataLevel.USER) {
 -                if (userTablesExist
 -                    && (stats == null || stats.isEmpty() || 
assignedOrHosted(stats) > 0)) {
 -                  goal = TabletGoalState.HOSTED;
 -                }
 -              } else if (stats == null || stats.isEmpty() || 
assignedOrHosted(stats) > 0) {
 -                goal = TabletGoalState.HOSTED;
 +              break;
 +            case ASSIGNED_TO_DEAD_SERVER:
 +              hostDeadTablet(tLists, tm, location);
 +              break;
 +            case SUSPENDED:
 +              hostSuspendedTablet(tLists, tm, location, tableConf);
 +              break;
 +            case UNASSIGNED:
 +              hostUnassignedTablet(tLists, tm.getExtent(),
 +                  new UnassignedTablet(location, tm.getLast()));
 +              break;
 +            case ASSIGNED:
 +              // Send another reminder
 +              tLists.assigned.add(new Assignment(tm.getExtent(),
 +                  future != null ? future.getServerInstance() : null, 
tm.getLast()));
 +              break;
 +            default:
 +              break;
 +          }
 +        } else {
 +          switch (state) {
 +            case SUSPENDED:
 +              // Request a move to UNASSIGNED, so as to allow balancing to 
continue.
 +              tLists.suspendedToGoneServers.add(tm);
 +              cancelOfflineTableMigrations(tm.getExtent());
 +              break;
 +            case UNASSIGNED:
 +              cancelOfflineTableMigrations(tm.getExtent());
 +              break;
 +            case ASSIGNED_TO_DEAD_SERVER:
 +              unassignDeadTablet(tLists, tm);
 +              break;
 +            case HOSTED:
 +              TServerConnection client =
 +                  
manager.tserverSet.getConnection(location.getServerInstance());
 +              if (client != null) {
 +                LOG.debug("Requesting tserver {} unload tablet {}", 
location.getServerInstance(),
 +                    tm.getExtent());
 +                client.unloadTablet(manager.managerLock, tm.getExtent(), 
goal.howUnload(),
 +                    manager.getSteadyTime());
 +                tableMgmtStats.totalUnloaded++;
 +                unloaded++;
 +              } else {
 +                Manager.log.warn("Could not connect to server {}", location);
                }
 -            }
 +              break;
 +            case ASSIGNED:
 +              break;
            }
 +        }
 +        tableMgmtStats.counts[state.ordinal()]++;
 +      }
 +    }
  
 -          if (goal == TabletGoalState.HOSTED) {
 -            if ((state != TabletState.HOSTED && !tls.walogs.isEmpty())
 -                && manager.recoveryManager.recoverLogs(tls.extent, 
tls.walogs)) {
 -              continue;
 -            }
 -            switch (state) {
 -              case HOSTED:
 -                if 
(location.getServerInstance().equals(manager.migrations.get(tls.extent))) {
 -                  manager.migrations.remove(tls.extent);
 -                }
 -                break;
 -              case ASSIGNED_TO_DEAD_SERVER:
 -                hostDeadTablet(tLists, tls, location, wals);
 -                break;
 -              case SUSPENDED:
 -                hostSuspendedTablet(tLists, tls, location, tableConf);
 -                break;
 -              case UNASSIGNED:
 -                hostUnassignedTablet(tLists, tls.extent, new 
UnassignedTablet(location, tls.last));
 -                break;
 -              case ASSIGNED:
 -                // Send another reminder
 -                tLists.assigned.add(new Assignment(tls.extent, 
tls.getFutureServer(), tls.last));
 -                break;
 -            }
 -          } else {
 -            switch (state) {
 -              case SUSPENDED:
 -                // Request a move to UNASSIGNED, so as to allow balancing to 
continue.
 -                tLists.suspendedToGoneServers.add(tls);
 -                cancelOfflineTableMigrations(tls.extent);
 -                break;
 -              case UNASSIGNED:
 -                cancelOfflineTableMigrations(tls.extent);
 -                break;
 -              case ASSIGNED_TO_DEAD_SERVER:
 -                unassignDeadTablet(tLists, tls, wals);
 -                break;
 -              case HOSTED:
 -                TServerConnection client =
 -                    
manager.tserverSet.getConnection(location.getServerInstance());
 -                if (client != null) {
 -                  Manager.log.trace("[{}] Requesting TabletServer {} unload 
{} {}", store.name(),
 -                      location.getServerInstance(), tls.extent, 
goal.howUnload());
 -                  client.unloadTablet(manager.managerLock, tls.extent, 
goal.howUnload(),
 -                      manager.getSteadyTime());
 -                  unloaded++;
 -                  totalUnloaded++;
 -                } else {
 -                  Manager.log.warn("Could not connect to server {}", 
location);
 -                }
 -                break;
 -              case ASSIGNED:
 -                break;
 -            }
 +    flushChanges(tLists);
 +
 +    if (isFullScan) {
 +      this.filteredServersToShutdown = Set.copyOf(filteredServersToShutdown);
 +    }
 +
 +    return tableMgmtStats;
 +  }
 +
 +  private SortedMap<TServerInstance,TabletServerStatus>
 +      getCurrentTservers(Set<TServerInstance> onlineTservers) {
 +    // Get the current status for the current list of tservers
 +    final SortedMap<TServerInstance,TabletServerStatus> currentTServers = new 
TreeMap<>();
 +    for (TServerInstance entry : onlineTservers) {
 +      currentTServers.put(entry, manager.tserverStatus.get(entry));
 +    }
 +    return currentTServers;
 +  }
 +
 +  @Override
 +  public void run() {
 +    int[] oldCounts = new int[TabletState.values().length];
 +    boolean lookForTabletsNeedingVolReplacement = true;
 +
 +    while (manager.stillManager()) {
 +      // slow things down a little, otherwise we spam the logs when there are 
many wake-up events
 +      sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
 +      // ELASTICITY_TODO above sleep in the case when not doing a full scan 
to make manager more
 +      // responsive
 +
 +      final long waitTimeBetweenScans = manager.getConfiguration()
 +          .getTimeInMillis(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL);
 +
 +      TabletManagementParameters tableMgmtParams =
 +          
createTabletManagementParameters(lookForTabletsNeedingVolReplacement);
 +      var currentTServers = 
getCurrentTservers(tableMgmtParams.getOnlineTsevers());
 +
 +      ClosableIterator<TabletManagement> iter = null;
 +      try {
 +        if (currentTServers.isEmpty()) {
 +          eventHandler.waitForFullScan(waitTimeBetweenScans);
 +          synchronized (this) {
 +            lastScanServers = Collections.emptySortedSet();
            }
 -          counts[state.ordinal()]++;
 +          continue;
          }
  
 -        flushChanges(tLists, wals);
 +        stats.begin();
 +
 +        ManagerState managerState = tableMgmtParams.getManagerState();
 +
 +        // Clear the need for a full scan before starting a full scan inorder 
to detect events that
 +        // happen during the full scan.
 +        eventHandler.clearNeedsFullScan();
 +
 +        iter = store.iterator(tableMgmtParams);
 +        var tabletMgmtStats = manageTablets(iter, tableMgmtParams, 
currentTServers, true);
 +        lookForTabletsNeedingVolReplacement = 
tabletMgmtStats.totalVolumeReplacements != 0;
  
          // provide stats after flushing changes to avoid race conditions w/ 
delete table
          stats.end(managerState);
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
index 5ca097da33,0000000000..06c3bec142
mode 100644,000000..100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
@@@ -1,484 -1,0 +1,482 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.test.functional;
 +
 +import static org.junit.jupiter.api.Assertions.assertEquals;
 +
 +import java.io.IOException;
 +import java.time.Duration;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Set;
 +import java.util.SortedSet;
 +import java.util.TreeSet;
 +import java.util.UUID;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.Accumulo;
 +import org.apache.accumulo.core.client.AccumuloClient;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.BatchDeleter;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.IsolatedScanner;
 +import org.apache.accumulo.core.client.MutationsRejectedException;
 +import org.apache.accumulo.core.client.RowIterator;
 +import org.apache.accumulo.core.client.Scanner;
 +import org.apache.accumulo.core.client.TableExistsException;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 +import org.apache.accumulo.core.client.admin.TabletHostingGoal;
 +import org.apache.accumulo.core.clientImpl.ClientContext;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
 +import org.apache.accumulo.core.lock.ServiceLock;
 +import org.apache.accumulo.core.manager.state.TabletManagement;
 +import org.apache.accumulo.core.manager.state.tables.TableState;
 +import org.apache.accumulo.core.manager.thrift.ManagerState;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.StoredTabletFile;
 +import org.apache.accumulo.core.metadata.TServerInstance;
 +import org.apache.accumulo.core.metadata.schema.Ample;
 +import org.apache.accumulo.core.metadata.schema.DataFileValue;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 +import org.apache.accumulo.core.metadata.schema.TabletOperationId;
 +import org.apache.accumulo.core.metadata.schema.TabletOperationType;
 +import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.tabletserver.log.LogEntry;
- import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.accumulo.server.manager.LiveTServerSet;
 +import org.apache.accumulo.server.manager.state.TabletManagementIterator;
 +import org.apache.accumulo.server.manager.state.TabletManagementParameters;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +import org.junit.jupiter.api.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Sets;
 +
 +/**
 + * Test to ensure that the {@link TabletManagementIterator} properly skips 
over tablet information
 + * in the metadata table when there is no work to be done on the tablet (see 
ACCUMULO-3580)
 + */
 +public class TabletManagementIteratorIT extends AccumuloClusterHarness {
 +  private final static Logger log = 
LoggerFactory.getLogger(TabletManagementIteratorIT.class);
 +
 +  @Override
 +  protected Duration defaultTimeout() {
 +    return Duration.ofMinutes(3);
 +  }
 +
 +  @Test
 +  public void test() throws AccumuloException, AccumuloSecurityException, 
TableExistsException,
 +      TableNotFoundException, IOException {
 +
 +    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
 +
 +      String[] tables = getUniqueNames(8);
 +      final String t1 = tables[0];
 +      final String t2 = tables[1];
 +      final String t3 = tables[2];
 +      final String t4 = tables[3];
 +      final String metaCopy1 = tables[4];
 +      final String metaCopy2 = tables[5];
 +      final String metaCopy3 = tables[6];
 +      final String metaCopy4 = tables[7];
 +
 +      // create some metadata
 +      createTable(client, t1, true);
 +      createTable(client, t2, false);
 +      createTable(client, t3, true);
 +      createTable(client, t4, true);
 +
 +      // Scan table t3 which will cause it's tablets
 +      // to be hosted. Then, remove the location.
 +      Scanner s = client.createScanner(t3);
 +      s.setRange(new Range());
 +      @SuppressWarnings("unused")
 +      var unused = Iterables.size(s); // consume all the data
 +
 +      // examine a clone of the metadata table, so we can manipulate it
 +      copyTable(client, MetadataTable.NAME, metaCopy1);
 +
 +      TabletManagementParameters tabletMgmtParams = createParameters(client);
 +      int tabletsInFlux = findTabletsNeedingAttention(client, metaCopy1, 
tabletMgmtParams);
 +      while (tabletsInFlux > 0) {
 +        log.debug("Waiting for {} tablets for {}", tabletsInFlux, metaCopy1);
 +        UtilWaitThread.sleep(500);
 +        copyTable(client, MetadataTable.NAME, metaCopy1);
 +        tabletsInFlux = findTabletsNeedingAttention(client, metaCopy1, 
tabletMgmtParams);
 +      }
 +      assertEquals(0, findTabletsNeedingAttention(client, metaCopy1, 
tabletMgmtParams),
 +          "No tables should need attention");
 +
 +      // The metadata table stabilized and metaCopy1 contains a copy suitable 
for testing. Before
 +      // metaCopy1 is modified, copy it for subsequent test.
 +      copyTable(client, metaCopy1, metaCopy2);
 +      copyTable(client, metaCopy1, metaCopy3);
 +      copyTable(client, metaCopy1, metaCopy4);
 +
 +      // t1 is unassigned, setting to always will generate a change to host 
tablets
 +      setTabletHostingGoal(client, metaCopy1, t1, 
TabletHostingGoal.ALWAYS.name());
 +      // t3 is hosted, setting to never will generate a change to unhost 
tablets
 +      setTabletHostingGoal(client, metaCopy1, t3, 
TabletHostingGoal.NEVER.name());
 +      tabletMgmtParams = createParameters(client);
 +      assertEquals(4, findTabletsNeedingAttention(client, metaCopy1, 
tabletMgmtParams),
 +          "Should have four tablets with hosting goal changes");
 +
 +      // test the assigned case (no location)
 +      removeLocation(client, metaCopy1, t3);
 +      assertEquals(2, findTabletsNeedingAttention(client, metaCopy1, 
tabletMgmtParams),
 +          "Should have two tablets without a loc");
 +
 +      // Test setting the operation id on one of the tablets in table t1. 
Table t1 has two tablets
 +      // w/o a location. Only one should need attention because of the 
operation id.
 +      setOperationId(client, metaCopy1, t1, new Text("some split"), 
TabletOperationType.SPLITTING);
 +      assertEquals(1, findTabletsNeedingAttention(client, metaCopy1, 
tabletMgmtParams),
 +          "Should have tablets needing attention because of operation id");
 +
 +      // test the cases where the assignment is to a dead tserver
 +      reassignLocation(client, metaCopy2, t3);
 +      assertEquals(1, findTabletsNeedingAttention(client, metaCopy2, 
tabletMgmtParams),
 +          "Only 1 of 2 tablets in table t1 should be returned");
 +
 +      // Remove location and set merge operation id on both tablets
 +      // These tablets should not need attention as they have no WALs
 +      setTabletHostingGoal(client, metaCopy4, t4, 
TabletHostingGoal.ALWAYS.name());
 +      removeLocation(client, metaCopy4, t4);
 +      assertEquals(2, findTabletsNeedingAttention(client, metaCopy4, 
tabletMgmtParams),
 +          "Tablets have no location and a hosting goal of always, so they 
should need attention");
 +
 +      // Test MERGING and SPLITTING do not need attention with no location or 
wals
 +      setOperationId(client, metaCopy4, t4, null, 
TabletOperationType.MERGING);
 +      assertEquals(0, findTabletsNeedingAttention(client, metaCopy4, 
tabletMgmtParams),
 +          "Should have no tablets needing attention for merge as they have no 
location");
 +      setOperationId(client, metaCopy4, t4, null, 
TabletOperationType.SPLITTING);
 +      assertEquals(0, findTabletsNeedingAttention(client, metaCopy4, 
tabletMgmtParams),
 +          "Should have no tablets needing attention for merge as they have no 
location");
 +
 +      // Create a log entry for one of the tablets, this tablet will now need 
attention
 +      // for both MERGING and SPLITTING
 +      setOperationId(client, metaCopy4, t4, null, 
TabletOperationType.MERGING);
 +      createLogEntry(client, metaCopy4, t4);
 +      assertEquals(1, findTabletsNeedingAttention(client, metaCopy4, 
tabletMgmtParams),
 +          "Should have a tablet needing attention because of wals");
 +      // Switch op to SPLITTING which should also need attention like MERGING
 +      setOperationId(client, metaCopy4, t4, null, 
TabletOperationType.SPLITTING);
 +      assertEquals(1, findTabletsNeedingAttention(client, metaCopy4, 
tabletMgmtParams),
 +          "Should have a tablet needing attention because of wals");
 +
 +      // Switch op to delete, no tablets should need attention even with WALs
 +      setOperationId(client, metaCopy4, t4, null, 
TabletOperationType.DELETING);
 +      assertEquals(0, findTabletsNeedingAttention(client, metaCopy4, 
tabletMgmtParams),
 +          "Should have no tablets needing attention for delete");
 +
 +      // test the bad tablet location state case (inconsistent metadata)
 +      tabletMgmtParams = createParameters(client);
 +      addDuplicateLocation(client, metaCopy3, t3);
 +      assertEquals(1, findTabletsNeedingAttention(client, metaCopy3, 
tabletMgmtParams),
 +          "Should have 1 tablet that needs a metadata repair");
 +
 +      // test the volume replacements case. Need to insert some files into
 +      // the metadata for t4, then run the TabletManagementIterator with
 +      // volume replacements
 +      addFiles(client, metaCopy4, t4);
-       List<Pair<Path,Path>> replacements = new ArrayList<>();
-       replacements.add(new Pair<Path,Path>(new 
Path("file:/vol1/accumulo/inst_id"),
-           new Path("file:/vol2/accumulo/inst_id")));
++      Map<Path,Path> replacements =
++          Map.of(new Path("file:/vol1/accumulo/inst_id"), new 
Path("file:/vol2/accumulo/inst_id"));
 +      tabletMgmtParams = createParameters(client, replacements);
 +      assertEquals(1, findTabletsNeedingAttention(client, metaCopy4, 
tabletMgmtParams),
 +          "Should have one tablet that needs a volume replacement");
 +
 +      // clean up
 +      dropTables(client, t1, t2, t3, t4, metaCopy1, metaCopy2, metaCopy3, 
metaCopy4);
 +    }
 +  }
 +
 +  private void setTabletHostingGoal(AccumuloClient client, String table, 
String tableNameToModify,
 +      String state) throws TableNotFoundException, MutationsRejectedException 
{
 +    TableId tableIdToModify =
 +        
TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify));
 +
 +    try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) 
{
 +      scanner.setRange(new KeyExtent(tableIdToModify, null, 
null).toMetaRange());
 +      for (Entry<Key,Value> entry : scanner) {
 +        Mutation m = new Mutation(entry.getKey().getRow());
 +        m.put(HostingColumnFamily.GOAL_COLUMN.getColumnFamily(),
 +            HostingColumnFamily.GOAL_COLUMN.getColumnQualifier(), 
entry.getKey().getTimestamp() + 1,
 +            new Value(state));
 +        try (BatchWriter bw = client.createBatchWriter(table)) {
 +          bw.addMutation(m);
 +        }
 +      }
 +    }
 +  }
 +
 +  private void addDuplicateLocation(AccumuloClient client, String table, 
String tableNameToModify)
 +      throws TableNotFoundException, MutationsRejectedException {
 +    TableId tableIdToModify =
 +        
TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify));
 +    Mutation m = new Mutation(new KeyExtent(tableIdToModify, null, 
null).toMetaRow());
 +    m.put(CurrentLocationColumnFamily.NAME, new Text("1234567"), new 
Value("fake:9005"));
 +    try (BatchWriter bw = client.createBatchWriter(table)) {
 +      bw.addMutation(m);
 +    }
 +  }
 +
 +  private void addFiles(AccumuloClient client, String table, String 
tableNameToModify)
 +      throws TableNotFoundException, MutationsRejectedException {
 +    TableId tableIdToModify =
 +        
TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify));
 +    Mutation m = new Mutation(new KeyExtent(tableIdToModify, null, 
null).toMetaRow());
 +    m.put(DataFileColumnFamily.NAME,
 +        new Text(StoredTabletFile
 +            
.serialize("file:/vol1/accumulo/inst_id/tables/2a/default_tablet/F0000072.rf")),
 +        new Value(new DataFileValue(0, 0, 0).encode()));
 +    try (BatchWriter bw = client.createBatchWriter(table)) {
 +      bw.addMutation(m);
 +    }
 +    try {
 +      client.createScanner(table).iterator()
 +          .forEachRemaining(e -> System.out.println(e.getKey() + "-> " + 
e.getValue()));
 +    } catch (TableNotFoundException e) {
 +      // TODO Auto-generated catch block
 +      e.printStackTrace();
 +    } catch (AccumuloSecurityException e) {
 +      // TODO Auto-generated catch block
 +      e.printStackTrace();
 +    } catch (AccumuloException e) {
 +      // TODO Auto-generated catch block
 +      e.printStackTrace();
 +    }
 +  }
 +
 +  private void reassignLocation(AccumuloClient client, String table, String 
tableNameToModify)
 +      throws TableNotFoundException, MutationsRejectedException {
 +    TableId tableIdToModify =
 +        
TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify));
 +    try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) 
{
 +      scanner.setRange(new KeyExtent(tableIdToModify, null, 
null).toMetaRange());
 +      scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
 +      Entry<Key,Value> entry = scanner.iterator().next();
 +      Mutation m = new Mutation(entry.getKey().getRow());
 +      m.putDelete(entry.getKey().getColumnFamily(), 
entry.getKey().getColumnQualifier(),
 +          entry.getKey().getTimestamp());
 +      m.put(entry.getKey().getColumnFamily(), new Text("1234567"),
 +          entry.getKey().getTimestamp() + 1, new Value("fake:9005"));
 +      try (BatchWriter bw = client.createBatchWriter(table)) {
 +        bw.addMutation(m);
 +      }
 +    }
 +  }
 +
 +  // Sets an operation type on all tablets up to the end row
 +  private void setOperationId(AccumuloClient client, String table, String 
tableNameToModify,
 +      Text end, TabletOperationType opType) throws TableNotFoundException {
 +    var opid = TabletOperationId.from(opType, 42L);
 +    TableId tableIdToModify =
 +        
TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify));
 +    try (TabletsMetadata tabletsMetadata =
 +        getServerContext().getAmple().readTablets().forTable(tableIdToModify)
 +            .overlapping(null, end != null ? 
TabletsSection.encodeRow(tableIdToModify, end) : null)
 +            .fetch(ColumnType.PREV_ROW).build()) {
 +      for (TabletMetadata tabletMetadata : tabletsMetadata) {
 +        Mutation m = new Mutation(tabletMetadata.getExtent().toMetaRow());
 +        MetadataSchema.TabletsSection.ServerColumnFamily.OPID_COLUMN.put(m,
 +            new Value(opid.canonical()));
 +        try (BatchWriter bw = client.createBatchWriter(table)) {
 +          bw.addMutation(m);
 +        } catch (MutationsRejectedException e) {
 +          throw new RuntimeException(e);
 +        }
 +      }
 +    }
 +  }
 +
 +  private void removeLocation(AccumuloClient client, String table, String 
tableNameToModify)
 +      throws TableNotFoundException, MutationsRejectedException {
 +    TableId tableIdToModify =
 +        
TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify));
 +    BatchDeleter deleter = client.createBatchDeleter(table, 
Authorizations.EMPTY, 1);
 +    deleter
 +        .setRanges(Collections.singleton(new KeyExtent(tableIdToModify, null, 
null).toMetaRange()));
 +    deleter.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
 +    deleter.delete();
 +    deleter.close();
 +  }
 +
 +  private int findTabletsNeedingAttention(AccumuloClient client, String table,
 +      TabletManagementParameters tabletMgmtParams) throws 
TableNotFoundException, IOException {
 +    int results = 0;
 +    List<KeyExtent> resultList = new ArrayList<>();
 +    try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) 
{
 +      TabletManagementIterator.configureScanner(scanner, tabletMgmtParams);
 +      scanner.updateScanIteratorOption("tabletChange", "debug", "1");
 +      for (Entry<Key,Value> e : scanner) {
 +        if (e != null) {
 +          TabletManagement mti = TabletManagementIterator.decode(e);
 +          results++;
 +          log.debug("Found tablets that changed state: {}", 
mti.getTabletMetadata().getExtent());
 +          log.debug("metadata: {}", mti.getTabletMetadata());
 +          resultList.add(mti.getTabletMetadata().getExtent());
 +        }
 +      }
 +    }
 +    log.debug("Tablets in flux: {}", resultList);
 +    return results;
 +  }
 +
 +  private void createTable(AccumuloClient client, String t, boolean online)
 +      throws AccumuloSecurityException, AccumuloException, 
TableNotFoundException,
 +      TableExistsException {
 +    SortedSet<Text> partitionKeys = new TreeSet<>();
 +    partitionKeys.add(new Text("some split"));
 +    NewTableConfiguration ntc = new 
NewTableConfiguration().withSplits(partitionKeys);
 +    client.tableOperations().create(t, ntc);
 +    client.tableOperations().online(t);
 +    if (!online) {
 +      client.tableOperations().offline(t, true);
 +    }
 +  }
 +
 +  /**
 +   * Create a copy of the source table by first gathering all the rows of the 
source in a list of
 +   * mutations. Then create the copy of the table and apply the mutations to 
the copy.
 +   */
 +  private void copyTable(AccumuloClient client, String source, String copy)
 +      throws AccumuloException, AccumuloSecurityException, 
TableNotFoundException,
 +      TableExistsException {
 +    try {
 +      dropTables(client, copy);
 +    } catch (TableNotFoundException ex) {
 +      // ignored
 +    }
 +
 +    log.info("Gathering rows to copy {} ", source);
 +    List<Mutation> mutations = new ArrayList<>();
 +
 +    try (Scanner scanner = client.createScanner(source, 
Authorizations.EMPTY)) {
 +      RowIterator rows = new RowIterator(new IsolatedScanner(scanner));
 +
 +      while (rows.hasNext()) {
 +        Iterator<Entry<Key,Value>> row = rows.next();
 +        Mutation m = null;
 +
 +        while (row.hasNext()) {
 +          Entry<Key,Value> entry = row.next();
 +          Key k = entry.getKey();
 +          if (m == null) {
 +            m = new Mutation(k.getRow());
 +          }
 +
 +          m.put(k.getColumnFamily(), k.getColumnQualifier(), 
k.getColumnVisibilityParsed(),
 +              k.getTimestamp(), entry.getValue());
 +        }
 +
 +        mutations.add(m);
 +      }
 +    }
 +
 +    // metadata should be stable with only 6 rows (2 for each table)
 +    log.debug("Gathered {} rows to create copy {}", mutations.size(), copy);
 +    assertEquals(8, mutations.size(), "Metadata should have 8 rows (2 for 
each table)");
 +    client.tableOperations().create(copy);
 +
 +    try (BatchWriter writer = client.createBatchWriter(copy)) {
 +      for (Mutation m : mutations) {
 +        writer.addMutation(m);
 +      }
 +    }
 +
 +    log.info("Finished creating copy " + copy);
 +  }
 +
 +  private void dropTables(AccumuloClient client, String... tables)
 +      throws AccumuloException, AccumuloSecurityException, 
TableNotFoundException {
 +    for (String t : tables) {
 +      client.tableOperations().delete(t);
 +    }
 +  }
 +
 +  // Creates a log entry on the "some split" extent, this could be modified 
easily to support
 +  // other extents
 +  private void createLogEntry(AccumuloClient client, String table, String 
tableNameToModify)
 +      throws MutationsRejectedException, TableNotFoundException {
 +    TableId tableIdToModify =
 +        
TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify));
 +    KeyExtent extent = new KeyExtent(tableIdToModify, new Text("some split"), 
null);
 +    Mutation m = new Mutation(extent.toMetaRow());
 +    String fileName = "file:/accumulo/wal/localhost+9997/" + 
UUID.randomUUID().toString();
 +    LogEntry logEntry = LogEntry.fromPath(fileName);
 +    logEntry.addToMutation(m);
 +    try (BatchWriter bw = client.createBatchWriter(table)) {
 +      bw.addMutation(m);
 +    }
 +  }
 +
 +  private static TabletManagementParameters createParameters(AccumuloClient 
client) {
-     return createParameters(client, List.of());
++    return createParameters(client, Map.of());
 +  }
 +
 +  private static TabletManagementParameters createParameters(AccumuloClient 
client,
-       List<Pair<Path,Path>> replacements) {
++      Map<Path,Path> replacements) {
 +    var context = (ClientContext) client;
 +    Set<TableId> onlineTables = 
Sets.filter(context.getTableIdToNameMap().keySet(),
 +        tableId -> context.getTableState(tableId) == TableState.ONLINE);
 +
 +    HashSet<TServerInstance> tservers = new HashSet<>();
 +    for (String tserver : context.instanceOperations().getTabletServers()) {
 +      try {
 +        var zPath = 
ServiceLock.path(ZooUtil.getRoot(context.instanceOperations().getInstanceId())
 +            + Constants.ZTSERVERS + "/" + tserver);
 +        long sessionId = ServiceLock.getSessionId(context.getZooCache(), 
zPath);
 +        tservers.add(new TServerInstance(tserver, sessionId));
 +      } catch (Exception e) {
 +        throw new RuntimeException(e);
 +      }
 +    }
 +
 +    return new TabletManagementParameters(ManagerState.NORMAL,
 +        Map.of(
 +            Ample.DataLevel.ROOT, true, Ample.DataLevel.USER, true, 
Ample.DataLevel.METADATA, true),
 +        onlineTables,
 +        new LiveTServerSet.LiveTServersSnapshot(tservers,
 +            Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, tservers)),
 +        Set.of(), Map.of(), Ample.DataLevel.USER, Map.of(), true, 
replacements);
 +  }
 +}

Reply via email to