This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 8375719384 Moved volume replacement to the Manager (#3893) 8375719384 is described below commit 837571938436823aeef3954a564d7279807742f6 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Fri Nov 17 16:21:08 2023 -0500 Moved volume replacement to the Manager (#3893) Fixes #3632 Co-authored-by: Keith Turner <ktur...@apache.org> --- .../core/manager/state/TabletManagement.java | 2 +- .../miniclusterImpl/MiniAccumuloClusterImpl.java | 2 + .../org/apache/accumulo/server/fs/VolumeUtil.java | 99 ++++++++++++---------- .../server/manager/state/TabletGoalState.java | 6 ++ .../manager/state/TabletManagementIterator.java | 5 ++ .../manager/state/TabletManagementParameters.java | 15 +++- .../server/manager/state/ZooTabletStateStore.java | 6 ++ .../accumulo/server/util/MetadataTableUtil.java | 17 ---- .../accumulo/manager/TabletGroupWatcher.java | 81 ++++++++++++++++-- .../java/org/apache/accumulo/test/VolumeIT.java | 2 - .../functional/TabletManagementIteratorIT.java | 56 +++++++++++- 11 files changed, 215 insertions(+), 76 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java b/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java index 495f3e2ef0..4e4941d212 100644 --- a/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java +++ b/core/src/main/java/org/apache/accumulo/core/manager/state/TabletManagement.java @@ -53,7 +53,7 @@ public class TabletManagement { private static final Text EMPTY = new Text(""); public static enum ManagementAction { - BAD_STATE, NEEDS_COMPACTING, NEEDS_LOCATION_UPDATE, NEEDS_SPLITTING; + BAD_STATE, NEEDS_COMPACTING, NEEDS_LOCATION_UPDATE, NEEDS_SPLITTING, NEEDS_VOLUME_REPLACEMENT; } public static void addActions(final SortedMap<Key,Value> decodedRow, diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java index 14588b04a8..091b7e9c6a 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java @@ -934,6 +934,8 @@ public class MiniAccumuloClusterImpl implements AccumuloCluster { control.stop(ServerType.MANAGER, null); control.stop(ServerType.TABLET_SERVER, null); control.stop(ServerType.ZOOKEEPER, null); + control.stop(ServerType.COMPACTOR, null); + control.stop(ServerType.SCAN_SERVER, null); // ACCUMULO-2985 stop the ExecutorService after we finished using it to stop accumulo procs if (executor != null) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java index c6bb965f31..98eb762b8e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java @@ -19,22 +19,24 @@ package org.apache.accumulo.server.fs; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.SortedMap; import java.util.TreeMap; +import java.util.function.BiConsumer; +import java.util.function.Consumer; -import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.util.Pair; -import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.fs.VolumeManager.FileType; -import org.apache.accumulo.server.util.MetadataTableUtil; +import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -128,64 +130,73 @@ public class VolumeUtil { } } - // ELASTICITY_TODO this method is no longer called because volume replacement needs to move from - // the tablet server to the manager. See #3625 - /** - * This method does two things. First, it switches any volumes a tablet is using that are - * configured in instance.volumes.replacements. Second, if a tablet dir is no longer configured - * for use it chooses a new tablet directory. - */ - public static TabletFiles updateTabletVolumes(ServerContext context, ServiceLock zooLock, - KeyExtent extent, TabletFiles tabletFiles) { - List<Pair<Path,Path>> replacements = context.getVolumeReplacements(); + public static boolean needsVolumeReplacement(final List<Pair<Path,Path>> replacements, + final TabletMetadata tm) { if (replacements.isEmpty()) { - return tabletFiles; + return false; } - log.trace("Using volume replacements: {}", replacements); - List<LogEntry> logsToRemove = new ArrayList<>(); - List<LogEntry> logsToAdd = new ArrayList<>(); + MutableBoolean needsReplacement = new MutableBoolean(false); + + Consumer<LogEntry> consumer = le -> needsReplacement.setTrue(); + + volumeReplacementEvaluation(replacements, tm, consumer, consumer, + f -> needsReplacement.setTrue(), (f, dfv) -> needsReplacement.setTrue()); + + return needsReplacement.booleanValue(); + } - List<StoredTabletFile> filesToRemove = new ArrayList<>(); - SortedMap<ReferencedTabletFile,DataFileValue> filesToAdd = new TreeMap<>(); + public static class VolumeReplacements { + public final TabletMetadata tabletMeta; + public final List<LogEntry> logsToRemove = new ArrayList<>(); + public final List<LogEntry> logsToAdd = new ArrayList<>(); + public final List<StoredTabletFile> filesToRemove = new ArrayList<>(); + public final Map<ReferencedTabletFile,DataFileValue> filesToAdd = new HashMap<>(); - TabletFiles ret = new TabletFiles(); + public VolumeReplacements(TabletMetadata tabletMeta) { + this.tabletMeta = tabletMeta; + } + } - for (LogEntry logEntry : tabletFiles.logEntries) { + public static VolumeReplacements + computeVolumeReplacements(final List<Pair<Path,Path>> replacements, final TabletMetadata tm) { + var vr = new VolumeReplacements(tm); + volumeReplacementEvaluation(replacements, tm, vr.logsToRemove::add, vr.logsToAdd::add, + vr.filesToRemove::add, vr.filesToAdd::put); + return vr; + } + + public static void volumeReplacementEvaluation(final List<Pair<Path,Path>> replacements, + final TabletMetadata tm, final Consumer<LogEntry> logsToRemove, + final Consumer<LogEntry> logsToAdd, final Consumer<StoredTabletFile> filesToRemove, + final BiConsumer<ReferencedTabletFile,DataFileValue> filesToAdd) { + if (replacements.isEmpty() || (tm.getFilesMap().isEmpty() && tm.getLogs().isEmpty())) { + return; + } + + log.trace("Using volume replacements: {}", replacements); + for (LogEntry logEntry : tm.getLogs()) { + log.trace("Evaluating walog {} for replacement.", logEntry); LogEntry switchedLogEntry = switchVolumes(logEntry, replacements); if (switchedLogEntry != null) { - logsToRemove.add(logEntry); - logsToAdd.add(switchedLogEntry); - ret.logEntries.add(switchedLogEntry); - log.debug("Replacing volume {} : {} -> {}", extent, logEntry.getFilePath(), + logsToRemove.accept(logEntry); + logsToAdd.accept(switchedLogEntry); + log.trace("Replacing volume {} : {} -> {}", tm.getExtent(), logEntry.getFilePath(), switchedLogEntry.getFilePath()); - } else { - ret.logEntries.add(logEntry); } } - for (Entry<StoredTabletFile,DataFileValue> entry : tabletFiles.datafiles.entrySet()) { + for (Entry<StoredTabletFile,DataFileValue> entry : tm.getFilesMap().entrySet()) { + log.trace("Evaluating file {} for replacement.", entry.getKey().getPath()); String metaPath = entry.getKey().getMetadata(); Path switchedPath = switchVolume(entry.getKey().getPath(), FileType.TABLE, replacements); if (switchedPath != null) { - filesToRemove.add(entry.getKey()); + filesToRemove.accept(entry.getKey()); ReferencedTabletFile switchedFile = new ReferencedTabletFile(switchedPath, entry.getKey().getRange()); - filesToAdd.put(switchedFile, entry.getValue()); - ret.datafiles.put(switchedFile.insert(), entry.getValue()); - log.debug("Replacing volume {} : {} -> {}", extent, metaPath, switchedPath); - } else { - ret.datafiles.put(entry.getKey(), entry.getValue()); + filesToAdd.accept(switchedFile, entry.getValue()); + log.trace("Replacing volume {} : {} -> {}", tm.getExtent(), metaPath, switchedPath); } } - - if (logsToRemove.size() + filesToRemove.size() > 0) { - MetadataTableUtil.updateTabletVolumes(extent, logsToRemove, logsToAdd, filesToRemove, - filesToAdd, zooLock, context); - } - - // method this should return the exact strings that are in the metadata table - ret.dirName = tabletFiles.dirName; - return ret; } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletGoalState.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletGoalState.java index 4dedbe0bf8..49606e3a87 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletGoalState.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletGoalState.java @@ -30,6 +30,7 @@ import org.apache.accumulo.core.metadata.schema.TabletOperationType; import org.apache.accumulo.core.spi.balancer.TabletBalancer; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; import org.apache.accumulo.core.tablet.thrift.TUnloadTabletGoal; +import org.apache.accumulo.server.fs.VolumeUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -145,6 +146,11 @@ public enum TabletGoalState { } } + if (params.getVolumeReplacements().size() > 0 + && VolumeUtil.needsVolumeReplacement(params.getVolumeReplacements(), tm)) { + return UNASSIGNED; + } + if (tm.hasCurrent() && params.getServersToShutdown().contains(tm.getLocation().getServerInstance())) { if (params.canSuspendTablets()) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java index c4e3d70bdb..2dbe7fefd2 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java @@ -60,6 +60,7 @@ import org.apache.accumulo.core.spi.balancer.SimpleLoadBalancer; import org.apache.accumulo.core.spi.balancer.TabletBalancer; import org.apache.accumulo.core.spi.compaction.CompactionKind; import org.apache.accumulo.server.compaction.CompactionJobGenerator; +import org.apache.accumulo.server.fs.VolumeUtil; import org.apache.accumulo.server.iterators.TabletIteratorEnvironment; import org.apache.accumulo.server.manager.balancer.BalancerEnvironmentImpl; import org.slf4j.Logger; @@ -246,6 +247,10 @@ public class TabletManagementIterator extends SkippingIterator { return; } + if (VolumeUtil.needsVolumeReplacement(tabletMgmtParams.getVolumeReplacements(), tm)) { + reasonsToReturnThisTablet.add(ManagementAction.NEEDS_VOLUME_REPLACEMENT); + } + if (shouldReturnDueToLocation(tm)) { reasonsToReturnThisTablet.add(ManagementAction.NEEDS_LOCATION_UPDATE); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java index 36f880e5d2..4d183cd6c6 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java @@ -31,6 +31,7 @@ import java.util.Base64; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Supplier; @@ -41,7 +42,9 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.manager.thrift.ManagerState; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.server.manager.LiveTServerSet; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; @@ -68,13 +71,14 @@ public class TabletManagementParameters { private final Map<Long,Map<String,String>> compactionHints; private final Set<TServerInstance> onlineTservers; private final boolean canSuspendTablets; + private final List<Pair<Path,Path>> volumeReplacements; public TabletManagementParameters(ManagerState managerState, Map<Ample.DataLevel,Boolean> parentUpgradeMap, Set<TableId> onlineTables, LiveTServerSet.LiveTServersSnapshot liveTServersSnapshot, Set<TServerInstance> serversToShutdown, Map<KeyExtent,TServerInstance> migrations, Ample.DataLevel level, Map<Long,Map<String,String>> compactionHints, - boolean canSuspendTablets) { + boolean canSuspendTablets, List<Pair<Path,Path>> volumeReplacements) { this.managerState = managerState; this.parentUpgradeMap = Map.copyOf(parentUpgradeMap); // TODO could filter by level @@ -95,6 +99,7 @@ public class TabletManagementParameters { return Map.copyOf(resourceGroups); }); this.canSuspendTablets = canSuspendTablets; + this.volumeReplacements = volumeReplacements; } private TabletManagementParameters(JsonData jdata) { @@ -120,7 +125,7 @@ public class TabletManagementParameters { return Map.copyOf(resourceGroups); }); this.canSuspendTablets = jdata.canSuspendTablets; - ; + this.volumeReplacements = jdata.volumeReplacements; } public ManagerState getManagerState() { @@ -171,6 +176,10 @@ public class TabletManagementParameters { return canSuspendTablets; } + public List<Pair<Path,Path>> getVolumeReplacements() { + return volumeReplacements; + } + private static Map<Long,Map<String,String>> makeImmutable(Map<Long,Map<String,String>> compactionHints) { var copy = new HashMap<Long,Map<String,String>>(); @@ -193,6 +202,7 @@ public class TabletManagementParameters { final Map<Long,Map<String,String>> compactionHints; final boolean canSuspendTablets; + final List<Pair<Path,Path>> volumeReplacements; private static String toString(KeyExtent extent) { DataOutputBuffer buffer = new DataOutputBuffer(); @@ -234,6 +244,7 @@ public class TabletManagementParameters { .map(TServerInstance::getHostPortSession).collect(toSet()))); compactionHints = params.compactionHints; canSuspendTablets = params.canSuspendTablets; + volumeReplacements = params.volumeReplacements; } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java index 05f5648497..8b7a5abb0c 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java @@ -36,6 +36,7 @@ import org.apache.accumulo.core.spi.compaction.CompactionKind; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.ServiceEnvironmentImpl; import org.apache.accumulo.server.compaction.CompactionJobGenerator; +import org.apache.accumulo.server.fs.VolumeUtil; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,6 +94,11 @@ class ZooTabletStateStore extends AbstractTabletStateStore implements TabletStat log.error("Error computing tablet management actions for Root extent", e); error = e.getMessage(); } + + if (VolumeUtil.needsVolumeReplacement(parameters.getVolumeReplacements(), tm)) { + actions.add(ManagementAction.NEEDS_VOLUME_REPLACEMENT); + } + return new TabletManagement(actions, tm, error); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java index d06bebcc52..4da05f2a5f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java @@ -170,23 +170,6 @@ public class MetadataTableUtil { } - public static void updateTabletVolumes(KeyExtent extent, List<LogEntry> logsToRemove, - List<LogEntry> logsToAdd, List<StoredTabletFile> filesToRemove, - SortedMap<ReferencedTabletFile,DataFileValue> filesToAdd, ServiceLock zooLock, - ServerContext context) { - - TabletMutator tabletMutator = context.getAmple().mutateTablet(extent); - logsToRemove.forEach(tabletMutator::deleteWal); - logsToAdd.forEach(tabletMutator::putWal); - - filesToRemove.forEach(tabletMutator::deleteFile); - filesToAdd.forEach(tabletMutator::putFile); - - tabletMutator.putZooLock(context.getZooKeeperRoot(), zooLock); - - tabletMutator.mutate(); - } - public static void rollBackSplit(Text metadataEntry, Text oldPrevEndRow, ServerContext context, ServiceLock zooLock) { KeyExtent ke = KeyExtent.fromMetaRow(metadataEntry, oldPrevEndRow); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index 3031590ec4..fd6b4ddb86 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -21,6 +21,7 @@ package org.apache.accumulo.manager; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static java.lang.Math.min; import static java.util.Objects.requireNonNull; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; import java.io.IOException; import java.util.ArrayList; @@ -40,6 +41,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.Scanner; @@ -58,6 +60,7 @@ import org.apache.accumulo.core.manager.thrift.ManagerGoalState; import org.apache.accumulo.core.manager.thrift.ManagerState; import org.apache.accumulo.core.manager.thrift.TabletServerStatus; import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletState; @@ -78,6 +81,7 @@ import org.apache.accumulo.manager.upgrade.UpgradeCoordinator; import org.apache.accumulo.server.ServiceEnvironmentImpl; import org.apache.accumulo.server.compaction.CompactionJobGenerator; import org.apache.accumulo.server.conf.TableConfiguration; +import org.apache.accumulo.server.fs.VolumeUtil; import org.apache.accumulo.server.log.WalStateManager; import org.apache.accumulo.server.log.WalStateManager.WalMarkerException; import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection; @@ -171,6 +175,7 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { // read only list of tablet servers that are not shutting down private final SortedMap<TServerInstance,TabletServerStatus> destinations; private final Map<String,Set<TServerInstance>> currentTServerGrouping; + private final List<VolumeUtil.VolumeReplacements> volumeReplacements = new ArrayList<>(); public TabletLists(SortedMap<TServerInstance,TabletServerStatus> curTServers, Map<String,Set<TServerInstance>> grouping, Set<TServerInstance> serversToShutdown) { @@ -205,6 +210,7 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { assignedToDeadServers.clear(); suspendedToGoneServers.clear(); unassigned.clear(); + volumeReplacements.clear(); } } @@ -238,7 +244,7 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { continue; } - TabletManagementParameters tabletMgmtParams = createTabletManagementParameters(); + TabletManagementParameters tabletMgmtParams = createTabletManagementParameters(false); var currentTservers = getCurrentTservers(tabletMgmtParams.getOnlineTsevers()); if (currentTservers.isEmpty()) { @@ -312,7 +318,8 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { } } - private TabletManagementParameters createTabletManagementParameters() { + private TabletManagementParameters + createTabletManagementParameters(boolean lookForTabletsNeedingVolReplacement) { HashMap<Ample.DataLevel,Boolean> parentLevelUpgrade = new HashMap<>(); UpgradeCoordinator.UpgradeStatus upgradeStatus = manager.getUpgradeStatus(); @@ -334,7 +341,9 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { return new TabletManagementParameters(manager.getManagerState(), parentLevelUpgrade, manager.onlineTables(), tServersSnapshot, shutdownServers, manager.migrationsSnapshot(), - store.getLevel(), manager.getCompactionHints(), canSuspendTablets()); + store.getLevel(), manager.getCompactionHints(), canSuspendTablets(), + lookForTabletsNeedingVolReplacement ? manager.getContext().getVolumeReplacements() + : List.of()); } private Set<TServerInstance> getFilteredServersToShutdown() { @@ -344,6 +353,7 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { private static class TableMgmtStats { int[] counts = new int[TabletState.values().length]; private int totalUnloaded; + private long totalVolumeReplacements; } private TableMgmtStats manageTablets(Iterator<TabletManagement> iter, @@ -407,7 +417,8 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { // Don't overwhelm the tablet servers with work if (tLists.unassigned.size() + unloaded - > Manager.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) { + > Manager.MAX_TSERVER_WORK_CHUNK * currentTServers.size() + || tLists.volumeReplacements.size() > 1000) { flushChanges(tLists); tLists.reset(); unloaded = 0; @@ -422,6 +433,31 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { final TabletGoalState goal = TabletGoalState.compute(tm, state, manager.tabletBalancer, tableMgmtParams); + if (actions.contains(ManagementAction.NEEDS_VOLUME_REPLACEMENT) + && state == TabletState.UNASSIGNED) { + tableMgmtStats.totalVolumeReplacements++; + var volRep = + VolumeUtil.computeVolumeReplacements(tableMgmtParams.getVolumeReplacements(), tm); + + if (volRep.logsToRemove.size() + volRep.filesToRemove.size() > 0) { + if (tm.getLocation() != null) { + // since the totalVolumeReplacements counter was incremented, should try this again + // later after its unassigned + LOG.debug("Volume replacement needed for {} but it has a location {}.", tm.getExtent(), + tm.getLocation()); + } else if (tm.getOperationId() != null) { + LOG.debug("Volume replacement needed for {} but it has an active operation {}.", + tm.getExtent(), tm.getOperationId()); + } else { + LOG.debug("Volume replacement needed for {}.", tm.getExtent()); + // buffer replacements so that multiple mutations can be done at once + tLists.volumeReplacements.add(volRep); + } + } else { + LOG.debug("Volume replacement evaluation for {} returned no changes.", tm.getExtent()); + } + } + final Location location = tm.getLocation(); Location current = null; Location future = null; @@ -566,6 +602,7 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { @Override public void run() { int[] oldCounts = new int[TabletState.values().length]; + boolean lookForTabletsNeedingVolReplacement = true; while (manager.stillManager()) { // slow things down a little, otherwise we spam the logs when there are many wake-up events @@ -576,7 +613,8 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { final long waitTimeBetweenScans = manager.getConfiguration() .getTimeInMillis(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL); - TabletManagementParameters tableMgmtParams = createTabletManagementParameters(); + TabletManagementParameters tableMgmtParams = + createTabletManagementParameters(lookForTabletsNeedingVolReplacement); var currentTServers = getCurrentTservers(tableMgmtParams.getOnlineTsevers()); ClosableIterator<TabletManagement> iter = null; @@ -599,6 +637,7 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { iter = store.iterator(tableMgmtParams); var tabletMgmtStats = manageTablets(iter, tableMgmtParams, currentTServers, true); + lookForTabletsNeedingVolReplacement = tabletMgmtStats.totalVolumeReplacements != 0; // provide stats after flushing changes to avoid race conditions w/ delete table stats.end(managerState); @@ -889,6 +928,37 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { } manager.assignedTablet(a.tablet); } + + replaceVolumes(tLists.volumeReplacements); + } + + private void replaceVolumes(List<VolumeUtil.VolumeReplacements> volumeReplacementsList) { + try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) { + for (VolumeUtil.VolumeReplacements vr : volumeReplacementsList) { + // ELASTICITY_TODO can require same on WALS once that is implemented, see #3948 + var tabletMutator = tabletsMutator.mutateTablet(vr.tabletMeta.getExtent()) + .requireAbsentOperation().requireAbsentLocation().requireSame(vr.tabletMeta, FILES); + vr.logsToRemove.forEach(tabletMutator::deleteWal); + vr.logsToAdd.forEach(tabletMutator::putWal); + + vr.filesToRemove.forEach(tabletMutator::deleteFile); + vr.filesToAdd.forEach(tabletMutator::putFile); + + tabletMutator.putZooLock(manager.getContext().getZooKeeperRoot(), manager.getManagerLock()); + + tabletMutator.submit( + tm -> tm.getLogs().containsAll(vr.logsToAdd) && tm.getFiles().containsAll(vr.filesToAdd + .keySet().stream().map(ReferencedTabletFile::insert).collect(Collectors.toSet()))); + } + + tabletsMutator.process().forEach((extent, result) -> { + if (result.getStatus() == Ample.ConditionalResult.Status.REJECTED) { + // log that failure happened, should try again later + LOG.debug("Failed to update volumes for tablet {}", extent); + } + }); + } + } private static void markDeadServerLogsAsClosed(WalStateManager mgr, @@ -899,5 +969,4 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { } } } - } diff --git a/test/src/main/java/org/apache/accumulo/test/VolumeIT.java b/test/src/main/java/org/apache/accumulo/test/VolumeIT.java index bc448f2314..2e7dd8a5b0 100644 --- a/test/src/main/java/org/apache/accumulo/test/VolumeIT.java +++ b/test/src/main/java/org/apache/accumulo/test/VolumeIT.java @@ -87,12 +87,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.io.Text; import org.apache.zookeeper.KeeperException.NoNodeException; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -@Disabled // ELASTICITY_TODO public class VolumeIT extends ConfigurableMacBase { private File volDirBase; diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java index f47d46b45c..5839a95b04 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java @@ -21,7 +21,6 @@ package org.apache.accumulo.test.functional; import static org.junit.jupiter.api.Assertions.assertEquals; import java.io.IOException; -import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; @@ -63,11 +62,14 @@ import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.manager.thrift.ManagerState; import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; import org.apache.accumulo.core.metadata.schema.TabletMetadata; @@ -77,11 +79,13 @@ import org.apache.accumulo.core.metadata.schema.TabletOperationType; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.server.manager.LiveTServerSet; import org.apache.accumulo.server.manager.state.TabletManagementIterator; import org.apache.accumulo.server.manager.state.TabletManagementParameters; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -215,6 +219,17 @@ public class TabletManagementIteratorIT extends AccumuloClusterHarness { assertEquals(1, findTabletsNeedingAttention(client, metaCopy3, tabletMgmtParams), "Should have 1 tablet that needs a metadata repair"); + // test the volume replacements case. Need to insert some files into + // the metadata for t4, then run the TabletManagementIterator with + // volume replacements + addFiles(client, metaCopy4, t4); + List<Pair<Path,Path>> replacements = new ArrayList<>(); + replacements.add(new Pair<Path,Path>(new Path("file:/vol1/accumulo/inst_id"), + new Path("file:/vol2/accumulo/inst_id"))); + tabletMgmtParams = createParameters(client, replacements); + assertEquals(1, findTabletsNeedingAttention(client, metaCopy4, tabletMgmtParams), + "Should have one tablet that needs a volume replacement"); + // clean up dropTables(client, t1, t2, t3, t4, metaCopy1, metaCopy2, metaCopy3, metaCopy4); } @@ -250,6 +265,33 @@ public class TabletManagementIteratorIT extends AccumuloClusterHarness { } } + private void addFiles(AccumuloClient client, String table, String tableNameToModify) + throws TableNotFoundException, MutationsRejectedException { + TableId tableIdToModify = + TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify)); + Mutation m = new Mutation(new KeyExtent(tableIdToModify, null, null).toMetaRow()); + m.put(DataFileColumnFamily.NAME, + new Text(StoredTabletFile + .serialize("file:/vol1/accumulo/inst_id/tables/2a/default_tablet/F0000072.rf")), + new Value(new DataFileValue(0, 0, 0).encode())); + try (BatchWriter bw = client.createBatchWriter(table)) { + bw.addMutation(m); + } + try { + client.createScanner(table).iterator() + .forEachRemaining(e -> System.out.println(e.getKey() + "-> " + e.getValue())); + } catch (TableNotFoundException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (AccumuloSecurityException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (AccumuloException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + private void reassignLocation(AccumuloClient client, String table, String tableNameToModify) throws TableNotFoundException, MutationsRejectedException { TableId tableIdToModify = @@ -316,6 +358,7 @@ public class TabletManagementIteratorIT extends AccumuloClusterHarness { TabletManagement mti = TabletManagementIterator.decode(e); results++; log.debug("Found tablets that changed state: {}", mti.getTabletMetadata().getExtent()); + log.debug("metadata: {}", mti.getTabletMetadata()); resultList.add(mti.getTabletMetadata().getExtent()); } } @@ -404,8 +447,8 @@ public class TabletManagementIteratorIT extends AccumuloClusterHarness { TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify)); KeyExtent extent = new KeyExtent(tableIdToModify, new Text("some split"), null); Mutation m = new Mutation(extent.toMetaRow()); - LogEntry logEntry = - new LogEntry(Path.of(validHost.toString(), UUID.randomUUID().toString()).toString()); + LogEntry logEntry = new LogEntry( + java.nio.file.Path.of(validHost.toString(), UUID.randomUUID().toString()).toString()); m.at().family(LogColumnFamily.NAME).qualifier(logEntry.getColumnQualifier()) .put(logEntry.getValue()); try (BatchWriter bw = client.createBatchWriter(table)) { @@ -414,6 +457,11 @@ public class TabletManagementIteratorIT extends AccumuloClusterHarness { } private static TabletManagementParameters createParameters(AccumuloClient client) { + return createParameters(client, List.of()); + } + + private static TabletManagementParameters createParameters(AccumuloClient client, + List<Pair<Path,Path>> replacements) { var context = (ClientContext) client; Set<TableId> onlineTables = Sets.filter(context.getTableIdToNameMap().keySet(), tableId -> context.getTableState(tableId) == TableState.ONLINE); @@ -436,6 +484,6 @@ public class TabletManagementIteratorIT extends AccumuloClusterHarness { onlineTables, new LiveTServerSet.LiveTServersSnapshot(tservers, Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, tservers)), - Set.of(), Map.of(), Ample.DataLevel.USER, Map.of(), true); + Set.of(), Map.of(), Ample.DataLevel.USER, Map.of(), true, replacements); } }