This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 8375719384 Moved volume replacement to the Manager (#3893)
8375719384 is described below

commit 837571938436823aeef3954a564d7279807742f6
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Fri Nov 17 16:21:08 2023 -0500

    Moved volume replacement to the Manager (#3893)
    
    
    Fixes #3632
    
    
    Co-authored-by: Keith Turner <ktur...@apache.org>
---
 .../core/manager/state/TabletManagement.java       |  2 +-
 .../miniclusterImpl/MiniAccumuloClusterImpl.java   |  2 +
 .../org/apache/accumulo/server/fs/VolumeUtil.java  | 99 ++++++++++++----------
 .../server/manager/state/TabletGoalState.java      |  6 ++
 .../manager/state/TabletManagementIterator.java    |  5 ++
 .../manager/state/TabletManagementParameters.java  | 15 +++-
 .../server/manager/state/ZooTabletStateStore.java  |  6 ++
 .../accumulo/server/util/MetadataTableUtil.java    | 17 ----
 .../accumulo/manager/TabletGroupWatcher.java       | 81 ++++++++++++++++--
 .../java/org/apache/accumulo/test/VolumeIT.java    |  2 -
 .../functional/TabletManagementIteratorIT.java     | 56 +++++++++++-
 11 files changed, 215 insertions(+), 76 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java
 
b/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java
index 495f3e2ef0..4e4941d212 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java
@@ -53,7 +53,7 @@ public class TabletManagement {
   private static final Text EMPTY = new Text("");
 
   public static enum ManagementAction {
-    BAD_STATE, NEEDS_COMPACTING, NEEDS_LOCATION_UPDATE, NEEDS_SPLITTING;
+    BAD_STATE, NEEDS_COMPACTING, NEEDS_LOCATION_UPDATE, NEEDS_SPLITTING, 
NEEDS_VOLUME_REPLACEMENT;
   }
 
   public static void addActions(final SortedMap<Key,Value> decodedRow,
diff --git 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
index 14588b04a8..091b7e9c6a 100644
--- 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
+++ 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
@@ -934,6 +934,8 @@ public class MiniAccumuloClusterImpl implements 
AccumuloCluster {
     control.stop(ServerType.MANAGER, null);
     control.stop(ServerType.TABLET_SERVER, null);
     control.stop(ServerType.ZOOKEEPER, null);
+    control.stop(ServerType.COMPACTOR, null);
+    control.stop(ServerType.SCAN_SERVER, null);
 
     // ACCUMULO-2985 stop the ExecutorService after we finished using it to 
stop accumulo procs
     if (executor != null) {
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java 
b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
index c6bb965f31..98eb762b8e 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
@@ -19,22 +19,24 @@
 package org.apache.accumulo.server.fs;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 
-import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.lock.ServiceLock;
 import org.apache.accumulo.core.metadata.ReferencedTabletFile;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.util.Pair;
-import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager.FileType;
-import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -128,64 +130,73 @@ public class VolumeUtil {
     }
   }
 
-  // ELASTICITY_TODO this method is no longer called because volume 
replacement needs to move from
-  // the tablet server to the manager. See #3625
-  /**
-   * This method does two things. First, it switches any volumes a tablet is 
using that are
-   * configured in instance.volumes.replacements. Second, if a tablet dir is 
no longer configured
-   * for use it chooses a new tablet directory.
-   */
-  public static TabletFiles updateTabletVolumes(ServerContext context, 
ServiceLock zooLock,
-      KeyExtent extent, TabletFiles tabletFiles) {
-    List<Pair<Path,Path>> replacements = context.getVolumeReplacements();
+  public static boolean needsVolumeReplacement(final List<Pair<Path,Path>> 
replacements,
+      final TabletMetadata tm) {
     if (replacements.isEmpty()) {
-      return tabletFiles;
+      return false;
     }
-    log.trace("Using volume replacements: {}", replacements);
 
-    List<LogEntry> logsToRemove = new ArrayList<>();
-    List<LogEntry> logsToAdd = new ArrayList<>();
+    MutableBoolean needsReplacement = new MutableBoolean(false);
+
+    Consumer<LogEntry> consumer = le -> needsReplacement.setTrue();
+
+    volumeReplacementEvaluation(replacements, tm, consumer, consumer,
+        f -> needsReplacement.setTrue(), (f, dfv) -> 
needsReplacement.setTrue());
+
+    return needsReplacement.booleanValue();
+  }
 
-    List<StoredTabletFile> filesToRemove = new ArrayList<>();
-    SortedMap<ReferencedTabletFile,DataFileValue> filesToAdd = new TreeMap<>();
+  public static class VolumeReplacements {
+    public final TabletMetadata tabletMeta;
+    public final List<LogEntry> logsToRemove = new ArrayList<>();
+    public final List<LogEntry> logsToAdd = new ArrayList<>();
+    public final List<StoredTabletFile> filesToRemove = new ArrayList<>();
+    public final Map<ReferencedTabletFile,DataFileValue> filesToAdd = new 
HashMap<>();
 
-    TabletFiles ret = new TabletFiles();
+    public VolumeReplacements(TabletMetadata tabletMeta) {
+      this.tabletMeta = tabletMeta;
+    }
+  }
 
-    for (LogEntry logEntry : tabletFiles.logEntries) {
+  public static VolumeReplacements
+      computeVolumeReplacements(final List<Pair<Path,Path>> replacements, 
final TabletMetadata tm) {
+    var vr = new VolumeReplacements(tm);
+    volumeReplacementEvaluation(replacements, tm, vr.logsToRemove::add, 
vr.logsToAdd::add,
+        vr.filesToRemove::add, vr.filesToAdd::put);
+    return vr;
+  }
+
+  public static void volumeReplacementEvaluation(final List<Pair<Path,Path>> 
replacements,
+      final TabletMetadata tm, final Consumer<LogEntry> logsToRemove,
+      final Consumer<LogEntry> logsToAdd, final Consumer<StoredTabletFile> 
filesToRemove,
+      final BiConsumer<ReferencedTabletFile,DataFileValue> filesToAdd) {
+    if (replacements.isEmpty() || (tm.getFilesMap().isEmpty() && 
tm.getLogs().isEmpty())) {
+      return;
+    }
+
+    log.trace("Using volume replacements: {}", replacements);
+    for (LogEntry logEntry : tm.getLogs()) {
+      log.trace("Evaluating walog {} for replacement.", logEntry);
       LogEntry switchedLogEntry = switchVolumes(logEntry, replacements);
       if (switchedLogEntry != null) {
-        logsToRemove.add(logEntry);
-        logsToAdd.add(switchedLogEntry);
-        ret.logEntries.add(switchedLogEntry);
-        log.debug("Replacing volume {} : {} -> {}", extent, 
logEntry.getFilePath(),
+        logsToRemove.accept(logEntry);
+        logsToAdd.accept(switchedLogEntry);
+        log.trace("Replacing volume {} : {} -> {}", tm.getExtent(), 
logEntry.getFilePath(),
             switchedLogEntry.getFilePath());
-      } else {
-        ret.logEntries.add(logEntry);
       }
     }
 
-    for (Entry<StoredTabletFile,DataFileValue> entry : 
tabletFiles.datafiles.entrySet()) {
+    for (Entry<StoredTabletFile,DataFileValue> entry : 
tm.getFilesMap().entrySet()) {
+      log.trace("Evaluating file {} for replacement.", 
entry.getKey().getPath());
       String metaPath = entry.getKey().getMetadata();
       Path switchedPath = switchVolume(entry.getKey().getPath(), 
FileType.TABLE, replacements);
       if (switchedPath != null) {
-        filesToRemove.add(entry.getKey());
+        filesToRemove.accept(entry.getKey());
         ReferencedTabletFile switchedFile =
             new ReferencedTabletFile(switchedPath, entry.getKey().getRange());
-        filesToAdd.put(switchedFile, entry.getValue());
-        ret.datafiles.put(switchedFile.insert(), entry.getValue());
-        log.debug("Replacing volume {} : {} -> {}", extent, metaPath, 
switchedPath);
-      } else {
-        ret.datafiles.put(entry.getKey(), entry.getValue());
+        filesToAdd.accept(switchedFile, entry.getValue());
+        log.trace("Replacing volume {} : {} -> {}", tm.getExtent(), metaPath, 
switchedPath);
       }
     }
-
-    if (logsToRemove.size() + filesToRemove.size() > 0) {
-      MetadataTableUtil.updateTabletVolumes(extent, logsToRemove, logsToAdd, 
filesToRemove,
-          filesToAdd, zooLock, context);
-    }
-
-    // method this should return the exact strings that are in the metadata 
table
-    ret.dirName = tabletFiles.dirName;
-    return ret;
   }
 }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletGoalState.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletGoalState.java
index 4dedbe0bf8..49606e3a87 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletGoalState.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletGoalState.java
@@ -30,6 +30,7 @@ import 
org.apache.accumulo.core.metadata.schema.TabletOperationType;
 import org.apache.accumulo.core.spi.balancer.TabletBalancer;
 import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
 import org.apache.accumulo.core.tablet.thrift.TUnloadTabletGoal;
+import org.apache.accumulo.server.fs.VolumeUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -145,6 +146,11 @@ public enum TabletGoalState {
         }
       }
 
+      if (params.getVolumeReplacements().size() > 0
+          && VolumeUtil.needsVolumeReplacement(params.getVolumeReplacements(), 
tm)) {
+        return UNASSIGNED;
+      }
+
       if (tm.hasCurrent()
           && 
params.getServersToShutdown().contains(tm.getLocation().getServerInstance())) {
         if (params.canSuspendTablets()) {
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
index c4e3d70bdb..2dbe7fefd2 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java
@@ -60,6 +60,7 @@ import 
org.apache.accumulo.core.spi.balancer.SimpleLoadBalancer;
 import org.apache.accumulo.core.spi.balancer.TabletBalancer;
 import org.apache.accumulo.core.spi.compaction.CompactionKind;
 import org.apache.accumulo.server.compaction.CompactionJobGenerator;
+import org.apache.accumulo.server.fs.VolumeUtil;
 import org.apache.accumulo.server.iterators.TabletIteratorEnvironment;
 import org.apache.accumulo.server.manager.balancer.BalancerEnvironmentImpl;
 import org.slf4j.Logger;
@@ -246,6 +247,10 @@ public class TabletManagementIterator extends 
SkippingIterator {
       return;
     }
 
+    if 
(VolumeUtil.needsVolumeReplacement(tabletMgmtParams.getVolumeReplacements(), 
tm)) {
+      reasonsToReturnThisTablet.add(ManagementAction.NEEDS_VOLUME_REPLACEMENT);
+    }
+
     if (shouldReturnDueToLocation(tm)) {
       reasonsToReturnThisTablet.add(ManagementAction.NEEDS_LOCATION_UPDATE);
     }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java
index 36f880e5d2..4d183cd6c6 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java
@@ -31,6 +31,7 @@ import java.util.Base64;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Supplier;
@@ -41,7 +42,9 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.manager.thrift.ManagerState;
 import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.server.manager.LiveTServerSet;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 
@@ -68,13 +71,14 @@ public class TabletManagementParameters {
   private final Map<Long,Map<String,String>> compactionHints;
   private final Set<TServerInstance> onlineTservers;
   private final boolean canSuspendTablets;
+  private final List<Pair<Path,Path>> volumeReplacements;
 
   public TabletManagementParameters(ManagerState managerState,
       Map<Ample.DataLevel,Boolean> parentUpgradeMap, Set<TableId> onlineTables,
       LiveTServerSet.LiveTServersSnapshot liveTServersSnapshot,
       Set<TServerInstance> serversToShutdown, Map<KeyExtent,TServerInstance> 
migrations,
       Ample.DataLevel level, Map<Long,Map<String,String>> compactionHints,
-      boolean canSuspendTablets) {
+      boolean canSuspendTablets, List<Pair<Path,Path>> volumeReplacements) {
     this.managerState = managerState;
     this.parentUpgradeMap = Map.copyOf(parentUpgradeMap);
     // TODO could filter by level
@@ -95,6 +99,7 @@ public class TabletManagementParameters {
       return Map.copyOf(resourceGroups);
     });
     this.canSuspendTablets = canSuspendTablets;
+    this.volumeReplacements = volumeReplacements;
   }
 
   private TabletManagementParameters(JsonData jdata) {
@@ -120,7 +125,7 @@ public class TabletManagementParameters {
       return Map.copyOf(resourceGroups);
     });
     this.canSuspendTablets = jdata.canSuspendTablets;
-    ;
+    this.volumeReplacements = jdata.volumeReplacements;
   }
 
   public ManagerState getManagerState() {
@@ -171,6 +176,10 @@ public class TabletManagementParameters {
     return canSuspendTablets;
   }
 
+  public List<Pair<Path,Path>> getVolumeReplacements() {
+    return volumeReplacements;
+  }
+
   private static Map<Long,Map<String,String>>
       makeImmutable(Map<Long,Map<String,String>> compactionHints) {
     var copy = new HashMap<Long,Map<String,String>>();
@@ -193,6 +202,7 @@ public class TabletManagementParameters {
     final Map<Long,Map<String,String>> compactionHints;
 
     final boolean canSuspendTablets;
+    final List<Pair<Path,Path>> volumeReplacements;
 
     private static String toString(KeyExtent extent) {
       DataOutputBuffer buffer = new DataOutputBuffer();
@@ -234,6 +244,7 @@ public class TabletManagementParameters {
               .map(TServerInstance::getHostPortSession).collect(toSet())));
       compactionHints = params.compactionHints;
       canSuspendTablets = params.canSuspendTablets;
+      volumeReplacements = params.volumeReplacements;
     }
 
   }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java
 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java
index 05f5648497..8b7a5abb0c 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java
@@ -36,6 +36,7 @@ import org.apache.accumulo.core.spi.compaction.CompactionKind;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.ServiceEnvironmentImpl;
 import org.apache.accumulo.server.compaction.CompactionJobGenerator;
+import org.apache.accumulo.server.fs.VolumeUtil;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -93,6 +94,11 @@ class ZooTabletStateStore extends AbstractTabletStateStore 
implements TabletStat
           log.error("Error computing tablet management actions for Root 
extent", e);
           error = e.getMessage();
         }
+
+        if 
(VolumeUtil.needsVolumeReplacement(parameters.getVolumeReplacements(), tm)) {
+          actions.add(ManagementAction.NEEDS_VOLUME_REPLACEMENT);
+        }
+
         return new TabletManagement(actions, tm, error);
 
       }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
 
b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index d06bebcc52..4da05f2a5f 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -170,23 +170,6 @@ public class MetadataTableUtil {
 
   }
 
-  public static void updateTabletVolumes(KeyExtent extent, List<LogEntry> 
logsToRemove,
-      List<LogEntry> logsToAdd, List<StoredTabletFile> filesToRemove,
-      SortedMap<ReferencedTabletFile,DataFileValue> filesToAdd, ServiceLock 
zooLock,
-      ServerContext context) {
-
-    TabletMutator tabletMutator = context.getAmple().mutateTablet(extent);
-    logsToRemove.forEach(tabletMutator::deleteWal);
-    logsToAdd.forEach(tabletMutator::putWal);
-
-    filesToRemove.forEach(tabletMutator::deleteFile);
-    filesToAdd.forEach(tabletMutator::putFile);
-
-    tabletMutator.putZooLock(context.getZooKeeperRoot(), zooLock);
-
-    tabletMutator.mutate();
-  }
-
   public static void rollBackSplit(Text metadataEntry, Text oldPrevEndRow, 
ServerContext context,
       ServiceLock zooLock) {
     KeyExtent ke = KeyExtent.fromMetaRow(metadataEntry, oldPrevEndRow);
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index 3031590ec4..fd6b4ddb86 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.manager;
 import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static java.lang.Math.min;
 import static java.util.Objects.requireNonNull;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -40,6 +41,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.Scanner;
@@ -58,6 +60,7 @@ import 
org.apache.accumulo.core.manager.thrift.ManagerGoalState;
 import org.apache.accumulo.core.manager.thrift.ManagerState;
 import org.apache.accumulo.core.manager.thrift.TabletServerStatus;
 import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.ReferencedTabletFile;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.metadata.TabletState;
@@ -78,6 +81,7 @@ import org.apache.accumulo.manager.upgrade.UpgradeCoordinator;
 import org.apache.accumulo.server.ServiceEnvironmentImpl;
 import org.apache.accumulo.server.compaction.CompactionJobGenerator;
 import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.server.fs.VolumeUtil;
 import org.apache.accumulo.server.log.WalStateManager;
 import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
 import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection;
@@ -171,6 +175,7 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
     // read only list of tablet servers that are not shutting down
     private final SortedMap<TServerInstance,TabletServerStatus> destinations;
     private final Map<String,Set<TServerInstance>> currentTServerGrouping;
+    private final List<VolumeUtil.VolumeReplacements> volumeReplacements = new 
ArrayList<>();
 
     public TabletLists(SortedMap<TServerInstance,TabletServerStatus> 
curTServers,
         Map<String,Set<TServerInstance>> grouping, Set<TServerInstance> 
serversToShutdown) {
@@ -205,6 +210,7 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
       assignedToDeadServers.clear();
       suspendedToGoneServers.clear();
       unassigned.clear();
+      volumeReplacements.clear();
     }
   }
 
@@ -238,7 +244,7 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
               continue;
             }
 
-            TabletManagementParameters tabletMgmtParams = 
createTabletManagementParameters();
+            TabletManagementParameters tabletMgmtParams = 
createTabletManagementParameters(false);
 
             var currentTservers = 
getCurrentTservers(tabletMgmtParams.getOnlineTsevers());
             if (currentTservers.isEmpty()) {
@@ -312,7 +318,8 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
     }
   }
 
-  private TabletManagementParameters createTabletManagementParameters() {
+  private TabletManagementParameters
+      createTabletManagementParameters(boolean 
lookForTabletsNeedingVolReplacement) {
 
     HashMap<Ample.DataLevel,Boolean> parentLevelUpgrade = new HashMap<>();
     UpgradeCoordinator.UpgradeStatus upgradeStatus = 
manager.getUpgradeStatus();
@@ -334,7 +341,9 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
 
     return new TabletManagementParameters(manager.getManagerState(), 
parentLevelUpgrade,
         manager.onlineTables(), tServersSnapshot, shutdownServers, 
manager.migrationsSnapshot(),
-        store.getLevel(), manager.getCompactionHints(), canSuspendTablets());
+        store.getLevel(), manager.getCompactionHints(), canSuspendTablets(),
+        lookForTabletsNeedingVolReplacement ? 
manager.getContext().getVolumeReplacements()
+            : List.of());
   }
 
   private Set<TServerInstance> getFilteredServersToShutdown() {
@@ -344,6 +353,7 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
   private static class TableMgmtStats {
     int[] counts = new int[TabletState.values().length];
     private int totalUnloaded;
+    private long totalVolumeReplacements;
   }
 
   private TableMgmtStats manageTablets(Iterator<TabletManagement> iter,
@@ -407,7 +417,8 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
 
       // Don't overwhelm the tablet servers with work
       if (tLists.unassigned.size() + unloaded
-          > Manager.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) {
+          > Manager.MAX_TSERVER_WORK_CHUNK * currentTServers.size()
+          || tLists.volumeReplacements.size() > 1000) {
         flushChanges(tLists);
         tLists.reset();
         unloaded = 0;
@@ -422,6 +433,31 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
       final TabletGoalState goal =
           TabletGoalState.compute(tm, state, manager.tabletBalancer, 
tableMgmtParams);
 
+      if (actions.contains(ManagementAction.NEEDS_VOLUME_REPLACEMENT)
+          && state == TabletState.UNASSIGNED) {
+        tableMgmtStats.totalVolumeReplacements++;
+        var volRep =
+            
VolumeUtil.computeVolumeReplacements(tableMgmtParams.getVolumeReplacements(), 
tm);
+
+        if (volRep.logsToRemove.size() + volRep.filesToRemove.size() > 0) {
+          if (tm.getLocation() != null) {
+            // since the totalVolumeReplacements counter was incremented, 
should try this again
+            // later after its unassigned
+            LOG.debug("Volume replacement needed for {} but it has a location 
{}.", tm.getExtent(),
+                tm.getLocation());
+          } else if (tm.getOperationId() != null) {
+            LOG.debug("Volume replacement needed for {} but it has an active 
operation {}.",
+                tm.getExtent(), tm.getOperationId());
+          } else {
+            LOG.debug("Volume replacement needed for {}.", tm.getExtent());
+            // buffer replacements so that multiple mutations can be done at 
once
+            tLists.volumeReplacements.add(volRep);
+          }
+        } else {
+          LOG.debug("Volume replacement evaluation for {} returned no 
changes.", tm.getExtent());
+        }
+      }
+
       final Location location = tm.getLocation();
       Location current = null;
       Location future = null;
@@ -566,6 +602,7 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
   @Override
   public void run() {
     int[] oldCounts = new int[TabletState.values().length];
+    boolean lookForTabletsNeedingVolReplacement = true;
 
     while (manager.stillManager()) {
       // slow things down a little, otherwise we spam the logs when there are 
many wake-up events
@@ -576,7 +613,8 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
       final long waitTimeBetweenScans = manager.getConfiguration()
           .getTimeInMillis(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL);
 
-      TabletManagementParameters tableMgmtParams = 
createTabletManagementParameters();
+      TabletManagementParameters tableMgmtParams =
+          
createTabletManagementParameters(lookForTabletsNeedingVolReplacement);
       var currentTServers = 
getCurrentTservers(tableMgmtParams.getOnlineTsevers());
 
       ClosableIterator<TabletManagement> iter = null;
@@ -599,6 +637,7 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
 
         iter = store.iterator(tableMgmtParams);
         var tabletMgmtStats = manageTablets(iter, tableMgmtParams, 
currentTServers, true);
+        lookForTabletsNeedingVolReplacement = 
tabletMgmtStats.totalVolumeReplacements != 0;
 
         // provide stats after flushing changes to avoid race conditions w/ 
delete table
         stats.end(managerState);
@@ -889,6 +928,37 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
       }
       manager.assignedTablet(a.tablet);
     }
+
+    replaceVolumes(tLists.volumeReplacements);
+  }
+
+  private void replaceVolumes(List<VolumeUtil.VolumeReplacements> 
volumeReplacementsList) {
+    try (var tabletsMutator = 
manager.getContext().getAmple().conditionallyMutateTablets()) {
+      for (VolumeUtil.VolumeReplacements vr : volumeReplacementsList) {
+        // ELASTICITY_TODO can require same on WALS once that is implemented, 
see #3948
+        var tabletMutator = 
tabletsMutator.mutateTablet(vr.tabletMeta.getExtent())
+            
.requireAbsentOperation().requireAbsentLocation().requireSame(vr.tabletMeta, 
FILES);
+        vr.logsToRemove.forEach(tabletMutator::deleteWal);
+        vr.logsToAdd.forEach(tabletMutator::putWal);
+
+        vr.filesToRemove.forEach(tabletMutator::deleteFile);
+        vr.filesToAdd.forEach(tabletMutator::putFile);
+
+        tabletMutator.putZooLock(manager.getContext().getZooKeeperRoot(), 
manager.getManagerLock());
+
+        tabletMutator.submit(
+            tm -> tm.getLogs().containsAll(vr.logsToAdd) && 
tm.getFiles().containsAll(vr.filesToAdd
+                
.keySet().stream().map(ReferencedTabletFile::insert).collect(Collectors.toSet())));
+      }
+
+      tabletsMutator.process().forEach((extent, result) -> {
+        if (result.getStatus() == Ample.ConditionalResult.Status.REJECTED) {
+          // log that failure happened, should try again later
+          LOG.debug("Failed to update volumes for tablet {}", extent);
+        }
+      });
+    }
+
   }
 
   private static void markDeadServerLogsAsClosed(WalStateManager mgr,
@@ -899,5 +969,4 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
       }
     }
   }
-
 }
diff --git a/test/src/main/java/org/apache/accumulo/test/VolumeIT.java 
b/test/src/main/java/org/apache/accumulo/test/VolumeIT.java
index bc448f2314..2e7dd8a5b0 100644
--- a/test/src/main/java/org/apache/accumulo/test/VolumeIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/VolumeIT.java
@@ -87,12 +87,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.Text;
 import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 
-@Disabled // ELASTICITY_TODO
 public class VolumeIT extends ConfigurableMacBase {
 
   private File volDirBase;
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
index f47d46b45c..5839a95b04 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java
@@ -21,7 +21,6 @@ package org.apache.accumulo.test.functional;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import java.io.IOException;
-import java.nio.file.Path;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -63,11 +62,14 @@ import 
org.apache.accumulo.core.manager.state.TabletManagement;
 import org.apache.accumulo.core.manager.state.tables.TableState;
 import org.apache.accumulo.core.manager.thrift.ManagerState;
 import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.StoredTabletFile;
 import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
+import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
@@ -77,11 +79,13 @@ import 
org.apache.accumulo.core.metadata.schema.TabletOperationType;
 import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
 import org.apache.accumulo.server.manager.LiveTServerSet;
 import org.apache.accumulo.server.manager.state.TabletManagementIterator;
 import org.apache.accumulo.server.manager.state.TabletManagementParameters;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
@@ -215,6 +219,17 @@ public class TabletManagementIteratorIT extends 
AccumuloClusterHarness {
       assertEquals(1, findTabletsNeedingAttention(client, metaCopy3, 
tabletMgmtParams),
           "Should have 1 tablet that needs a metadata repair");
 
+      // test the volume replacements case. Need to insert some files into
+      // the metadata for t4, then run the TabletManagementIterator with
+      // volume replacements
+      addFiles(client, metaCopy4, t4);
+      List<Pair<Path,Path>> replacements = new ArrayList<>();
+      replacements.add(new Pair<Path,Path>(new 
Path("file:/vol1/accumulo/inst_id"),
+          new Path("file:/vol2/accumulo/inst_id")));
+      tabletMgmtParams = createParameters(client, replacements);
+      assertEquals(1, findTabletsNeedingAttention(client, metaCopy4, 
tabletMgmtParams),
+          "Should have one tablet that needs a volume replacement");
+
       // clean up
       dropTables(client, t1, t2, t3, t4, metaCopy1, metaCopy2, metaCopy3, 
metaCopy4);
     }
@@ -250,6 +265,33 @@ public class TabletManagementIteratorIT extends 
AccumuloClusterHarness {
     }
   }
 
+  private void addFiles(AccumuloClient client, String table, String 
tableNameToModify)
+      throws TableNotFoundException, MutationsRejectedException {
+    TableId tableIdToModify =
+        
TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify));
+    Mutation m = new Mutation(new KeyExtent(tableIdToModify, null, 
null).toMetaRow());
+    m.put(DataFileColumnFamily.NAME,
+        new Text(StoredTabletFile
+            
.serialize("file:/vol1/accumulo/inst_id/tables/2a/default_tablet/F0000072.rf")),
+        new Value(new DataFileValue(0, 0, 0).encode()));
+    try (BatchWriter bw = client.createBatchWriter(table)) {
+      bw.addMutation(m);
+    }
+    try {
+      client.createScanner(table).iterator()
+          .forEachRemaining(e -> System.out.println(e.getKey() + "-> " + 
e.getValue()));
+    } catch (TableNotFoundException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    } catch (AccumuloSecurityException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    } catch (AccumuloException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+
   private void reassignLocation(AccumuloClient client, String table, String 
tableNameToModify)
       throws TableNotFoundException, MutationsRejectedException {
     TableId tableIdToModify =
@@ -316,6 +358,7 @@ public class TabletManagementIteratorIT extends 
AccumuloClusterHarness {
           TabletManagement mti = TabletManagementIterator.decode(e);
           results++;
           log.debug("Found tablets that changed state: {}", 
mti.getTabletMetadata().getExtent());
+          log.debug("metadata: {}", mti.getTabletMetadata());
           resultList.add(mti.getTabletMetadata().getExtent());
         }
       }
@@ -404,8 +447,8 @@ public class TabletManagementIteratorIT extends 
AccumuloClusterHarness {
         
TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify));
     KeyExtent extent = new KeyExtent(tableIdToModify, new Text("some split"), 
null);
     Mutation m = new Mutation(extent.toMetaRow());
-    LogEntry logEntry =
-        new LogEntry(Path.of(validHost.toString(), 
UUID.randomUUID().toString()).toString());
+    LogEntry logEntry = new LogEntry(
+        java.nio.file.Path.of(validHost.toString(), 
UUID.randomUUID().toString()).toString());
     
m.at().family(LogColumnFamily.NAME).qualifier(logEntry.getColumnQualifier())
         .put(logEntry.getValue());
     try (BatchWriter bw = client.createBatchWriter(table)) {
@@ -414,6 +457,11 @@ public class TabletManagementIteratorIT extends 
AccumuloClusterHarness {
   }
 
   private static TabletManagementParameters createParameters(AccumuloClient 
client) {
+    return createParameters(client, List.of());
+  }
+
+  private static TabletManagementParameters createParameters(AccumuloClient 
client,
+      List<Pair<Path,Path>> replacements) {
     var context = (ClientContext) client;
     Set<TableId> onlineTables = 
Sets.filter(context.getTableIdToNameMap().keySet(),
         tableId -> context.getTableState(tableId) == TableState.ONLINE);
@@ -436,6 +484,6 @@ public class TabletManagementIteratorIT extends 
AccumuloClusterHarness {
         onlineTables,
         new LiveTServerSet.LiveTServersSnapshot(tservers,
             Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, tservers)),
-        Set.of(), Map.of(), Ample.DataLevel.USER, Map.of(), true);
+        Set.of(), Map.of(), Ample.DataLevel.USER, Map.of(), true, 
replacements);
   }
 }


Reply via email to