This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 48d39719148454f002ab346610ad39b911662acc Merge: 374f2c1a6c 2890cc2c99 Author: Christopher Tubbs <ctubb...@apache.org> AuthorDate: Thu Dec 21 16:27:48 2023 -0500 Merge branch 'main' into elasticity pom.xml | 3 +- .../org/apache/accumulo/server/ServerContext.java | 4 +- .../org/apache/accumulo/server/ServerDirs.java | 33 +++++----- .../org/apache/accumulo/server/fs/VolumeUtil.java | 61 +++++++----------- .../apache/accumulo/server/init/Initialize.java | 5 +- .../manager/state/TabletManagementParameters.java | 12 ++-- .../apache/accumulo/server/fs/VolumeUtilTest.java | 75 ++++++++++------------ .../main/java/org/apache/accumulo/gc/GCRun.java | 3 +- .../accumulo/manager/TabletGroupWatcher.java | 2 +- .../accumulo/manager/recovery/RecoveryManager.java | 2 +- .../org/apache/accumulo/tserver/log/DfsLogger.java | 28 ++++---- .../apache/accumulo/tserver/logger/LogReader.java | 5 -- .../apache/accumulo/tserver/log/DfsLoggerTest.java | 9 +++ .../functional/TabletManagementIteratorIT.java | 10 ++- 14 files changed, 111 insertions(+), 141 deletions(-) diff --cc server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java index 5eda27a18d,a994d16edb..66c527b6d4 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java @@@ -18,25 -18,24 +18,25 @@@ */ package org.apache.accumulo.server.fs; + import static java.util.Objects.requireNonNull; + import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; - import java.util.Objects; import java.util.SortedMap; import java.util.TreeMap; +import java.util.function.BiConsumer; +import java.util.function.Consumer; -import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.tabletserver.log.LogEntry; - import org.apache.accumulo.core.util.Pair; -import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.fs.VolumeManager.FileType; -import org.apache.accumulo.server.util.MetadataTableUtil; +import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -119,59 -103,37 +104,59 @@@ public class VolumeUtil } } - public static boolean needsVolumeReplacement(final List<Pair<Path,Path>> replacements, - /** - * This method does two things. First, it switches any volumes a tablet is using that are - * configured in instance.volumes.replacements. Second, if a tablet dir is no longer configured - * for use it chooses a new tablet directory. - */ - public static TabletFiles updateTabletVolumes(ServerContext context, ServiceLock zooLock, - KeyExtent extent, TabletFiles tabletFiles) { - Map<Path,Path> replacements = context.getVolumeReplacements(); ++ public static boolean needsVolumeReplacement(final Map<Path,Path> replacements, + final TabletMetadata tm) { if (replacements.isEmpty()) { - return tabletFiles; + return false; } - log.trace("Using volume replacements: {}", replacements); - List<LogEntry> logsToRemove = new ArrayList<>(); - List<LogEntry> logsToAdd = new ArrayList<>(); + MutableBoolean needsReplacement = new MutableBoolean(false); + + Consumer<LogEntry> consumer = le -> needsReplacement.setTrue(); + + volumeReplacementEvaluation(replacements, tm, consumer, consumer, + f -> needsReplacement.setTrue(), (f, dfv) -> needsReplacement.setTrue()); + + return needsReplacement.booleanValue(); + } - List<StoredTabletFile> filesToRemove = new ArrayList<>(); - SortedMap<ReferencedTabletFile,DataFileValue> filesToAdd = new TreeMap<>(); + public static class VolumeReplacements { + public final TabletMetadata tabletMeta; + public final List<LogEntry> logsToRemove = new ArrayList<>(); + public final List<LogEntry> logsToAdd = new ArrayList<>(); + public final List<StoredTabletFile> filesToRemove = new ArrayList<>(); + public final Map<ReferencedTabletFile,DataFileValue> filesToAdd = new HashMap<>(); - TabletFiles ret = new TabletFiles(); + public VolumeReplacements(TabletMetadata tabletMeta) { + this.tabletMeta = tabletMeta; + } + } - public static VolumeReplacements - computeVolumeReplacements(final List<Pair<Path,Path>> replacements, final TabletMetadata tm) { - for (LogEntry logEntry : tabletFiles.logEntries) { ++ public static VolumeReplacements computeVolumeReplacements(final Map<Path,Path> replacements, ++ final TabletMetadata tm) { + var vr = new VolumeReplacements(tm); + volumeReplacementEvaluation(replacements, tm, vr.logsToRemove::add, vr.logsToAdd::add, + vr.filesToRemove::add, vr.filesToAdd::put); + return vr; + } + - public static void volumeReplacementEvaluation(final List<Pair<Path,Path>> replacements, ++ public static void volumeReplacementEvaluation(final Map<Path,Path> replacements, + final TabletMetadata tm, final Consumer<LogEntry> logsToRemove, + final Consumer<LogEntry> logsToAdd, final Consumer<StoredTabletFile> filesToRemove, + final BiConsumer<ReferencedTabletFile,DataFileValue> filesToAdd) { + if (replacements.isEmpty() || (tm.getFilesMap().isEmpty() && tm.getLogs().isEmpty())) { + return; + } + + log.trace("Using volume replacements: {}", replacements); + for (LogEntry logEntry : tm.getLogs()) { + log.trace("Evaluating walog {} for replacement.", logEntry); - LogEntry switchedLogEntry = switchVolumes(logEntry, replacements); + LogEntry switchedLogEntry = switchVolume(logEntry, replacements); if (switchedLogEntry != null) { - logsToRemove.add(logEntry); - logsToAdd.add(switchedLogEntry); - ret.logEntries.add(switchedLogEntry); - log.debug("Replacing volume {} : {} -> {}", extent, logEntry.getPath(), + logsToRemove.accept(logEntry); + logsToAdd.accept(switchedLogEntry); + log.trace("Replacing volume {} : {} -> {}", tm.getExtent(), logEntry.getPath(), switchedLogEntry.getPath()); - } else { - ret.logEntries.add(logEntry); } } diff --cc server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java index 4d183cd6c6,0000000000..c6bfbe3fc3 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java @@@ -1,263 -1,0 +1,265 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.manager.state; + +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; +import static java.util.stream.Collectors.toSet; +import static java.util.stream.Collectors.toUnmodifiableMap; +import static java.util.stream.Collectors.toUnmodifiableSet; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Base64; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; + +import org.apache.accumulo.core.data.AbstractId; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.manager.thrift.ManagerState; +import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.server.manager.LiveTServerSet; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; + +import com.google.common.base.Suppliers; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +/** + * An immutable snapshot of the information needed by the TabletGroupWatcher and the + * {@link TabletManagementIterator} to make decisions about tablets. + */ +public class TabletManagementParameters { + // ELASTICITY_TODO need to unit test serialization and deserialization of this class. + private final ManagerState managerState; + private final Map<Ample.DataLevel,Boolean> parentUpgradeMap; + private final Set<TableId> onlineTables; + private final Set<TServerInstance> serversToShutdown; + private final Map<KeyExtent,TServerInstance> migrations; + + private final Ample.DataLevel level; + + private final Supplier<Map<TServerInstance,String>> resourceGroups; + private final Map<String,Set<TServerInstance>> tserverGroups; + private final Map<Long,Map<String,String>> compactionHints; + private final Set<TServerInstance> onlineTservers; + private final boolean canSuspendTablets; - private final List<Pair<Path,Path>> volumeReplacements; ++ private final Map<Path,Path> volumeReplacements; + + public TabletManagementParameters(ManagerState managerState, + Map<Ample.DataLevel,Boolean> parentUpgradeMap, Set<TableId> onlineTables, + LiveTServerSet.LiveTServersSnapshot liveTServersSnapshot, + Set<TServerInstance> serversToShutdown, Map<KeyExtent,TServerInstance> migrations, + Ample.DataLevel level, Map<Long,Map<String,String>> compactionHints, - boolean canSuspendTablets, List<Pair<Path,Path>> volumeReplacements) { ++ boolean canSuspendTablets, Map<Path,Path> volumeReplacements) { + this.managerState = managerState; + this.parentUpgradeMap = Map.copyOf(parentUpgradeMap); + // TODO could filter by level + this.onlineTables = Set.copyOf(onlineTables); + // This is already immutable, so no need to copy + this.onlineTservers = liveTServersSnapshot.getTservers(); + this.serversToShutdown = Set.copyOf(serversToShutdown); + // TODO could filter by level + this.migrations = Map.copyOf(migrations); + this.level = level; + // This is already immutable, so no need to copy + this.tserverGroups = liveTServersSnapshot.getTserverGroups(); + this.compactionHints = makeImmutable(compactionHints); + this.resourceGroups = Suppliers.memoize(() -> { + Map<TServerInstance,String> resourceGroups = new HashMap<>(); + TabletManagementParameters.this.tserverGroups.forEach((resourceGroup, tservers) -> tservers + .forEach(tserver -> resourceGroups.put(tserver, resourceGroup))); + return Map.copyOf(resourceGroups); + }); + this.canSuspendTablets = canSuspendTablets; + this.volumeReplacements = volumeReplacements; + } + + private TabletManagementParameters(JsonData jdata) { + this.managerState = jdata.managerState; + this.parentUpgradeMap = Map.copyOf(jdata.parentUpgradeMap); + this.onlineTables = jdata.onlineTables.stream().map(TableId::of).collect(toUnmodifiableSet()); + this.onlineTservers = + jdata.onlineTservers.stream().map(TServerInstance::new).collect(toUnmodifiableSet()); + this.serversToShutdown = + jdata.serversToShutdown.stream().map(TServerInstance::new).collect(toUnmodifiableSet()); + this.migrations = jdata.migrations.entrySet().stream() + .collect(toUnmodifiableMap(entry -> JsonData.strToExtent(entry.getKey()), + entry -> new TServerInstance(entry.getValue()))); + this.level = jdata.level; + this.compactionHints = makeImmutable(jdata.compactionHints); + this.tserverGroups = jdata.tserverGroups.entrySet().stream().collect(toUnmodifiableMap( + Map.Entry::getKey, + entry -> entry.getValue().stream().map(TServerInstance::new).collect(toUnmodifiableSet()))); + this.resourceGroups = Suppliers.memoize(() -> { + Map<TServerInstance,String> resourceGroups = new HashMap<>(); + TabletManagementParameters.this.tserverGroups.forEach((resourceGroup, tservers) -> tservers + .forEach(tserver -> resourceGroups.put(tserver, resourceGroup))); + return Map.copyOf(resourceGroups); + }); + this.canSuspendTablets = jdata.canSuspendTablets; - this.volumeReplacements = jdata.volumeReplacements; ++ this.volumeReplacements = jdata.volumeReplacements.stream() ++ .collect(toUnmodifiableMap(Pair::getFirst, Pair::getSecond)); + } + + public ManagerState getManagerState() { + return managerState; + } + + public boolean isParentLevelUpgraded() { + return parentUpgradeMap.get(level); + } + + public Set<TServerInstance> getOnlineTsevers() { + return onlineTservers; + } + + public Set<TServerInstance> getServersToShutdown() { + return serversToShutdown; + } + + public boolean isTableOnline(TableId tableId) { + return onlineTables.contains(tableId); + } + + public Map<KeyExtent,TServerInstance> getMigrations() { + return migrations; + } + + public Ample.DataLevel getLevel() { + return level; + } + + public String getResourceGroup(TServerInstance tserver) { + return resourceGroups.get().get(tserver); + } + + public Map<String,Set<TServerInstance>> getGroupedTServers() { + return tserverGroups; + } + + public Set<TableId> getOnlineTables() { + return onlineTables; + } + + public Map<Long,Map<String,String>> getCompactionHints() { + return compactionHints; + } + + public boolean canSuspendTablets() { + return canSuspendTablets; + } + - public List<Pair<Path,Path>> getVolumeReplacements() { ++ public Map<Path,Path> getVolumeReplacements() { + return volumeReplacements; + } + + private static Map<Long,Map<String,String>> + makeImmutable(Map<Long,Map<String,String>> compactionHints) { + var copy = new HashMap<Long,Map<String,String>>(); + compactionHints.forEach((ftxid, hints) -> copy.put(ftxid, Map.copyOf(hints))); + return Collections.unmodifiableMap(copy); + } + + private static class JsonData { + final ManagerState managerState; + final Map<Ample.DataLevel,Boolean> parentUpgradeMap; + final Collection<String> onlineTables; + final Collection<String> onlineTservers; + final Collection<String> serversToShutdown; + final Map<String,String> migrations; + + final Ample.DataLevel level; + + final Map<String,Set<String>> tserverGroups; + + final Map<Long,Map<String,String>> compactionHints; + + final boolean canSuspendTablets; + final List<Pair<Path,Path>> volumeReplacements; + + private static String toString(KeyExtent extent) { + DataOutputBuffer buffer = new DataOutputBuffer(); + try { + extent.writeTo(buffer); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + return Base64.getEncoder() + .encodeToString(Arrays.copyOf(buffer.getData(), buffer.getLength())); + + } + + private static KeyExtent strToExtent(String kes) { + byte[] data = Base64.getDecoder().decode(kes); + DataInputBuffer buffer = new DataInputBuffer(); + buffer.reset(data, data.length); + try { + return KeyExtent.readFrom(buffer); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + JsonData(TabletManagementParameters params) { + managerState = params.managerState; + parentUpgradeMap = params.parentUpgradeMap; + onlineTables = params.onlineTables.stream().map(AbstractId::canonical).collect(toList()); + onlineTservers = params.getOnlineTsevers().stream().map(TServerInstance::getHostPortSession) + .collect(toList()); + serversToShutdown = params.serversToShutdown.stream().map(TServerInstance::getHostPortSession) + .collect(toList()); + migrations = params.migrations.entrySet().stream().collect( + toMap(entry -> toString(entry.getKey()), entry -> entry.getValue().getHostPortSession())); + level = params.level; + tserverGroups = params.getGroupedTServers().entrySet().stream() + .collect(toMap(Map.Entry::getKey, entry -> entry.getValue().stream() + .map(TServerInstance::getHostPortSession).collect(toSet()))); + compactionHints = params.compactionHints; + canSuspendTablets = params.canSuspendTablets; - volumeReplacements = params.volumeReplacements; ++ volumeReplacements = ++ params.volumeReplacements.entrySet().stream().map(Pair::fromEntry).collect(toList()); + } + + } + + public String serialize() { + Gson gson = new GsonBuilder().disableHtmlEscaping().create(); + return gson.toJson(new JsonData(this)); + } + + public static TabletManagementParameters deserialize(String json) { + Gson gson = new GsonBuilder().disableHtmlEscaping().create(); + JsonData jdata = gson.fromJson(json, JsonData.class); + return new TabletManagementParameters(jdata); + } + +} diff --cc server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index 3d9e840af2,068bb4930b..6d90fa5423 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@@ -214,465 -178,198 +214,465 @@@ abstract class TabletGroupWatcher exten } } - @Override - public void run() { - int[] oldCounts = new int[TabletState.values().length]; - EventCoordinator.Listener eventListener = this.manager.nextEvent.getListener(); + class EventHandler implements EventCoordinator.Listener { - WalStateManager wals = new WalStateManager(manager.getContext()); + // Setting this to true to start with because its not know what happended before this object was + // created, so just start off with full scan. + private boolean needsFullScan = true; - while (manager.stillManager()) { - // slow things down a little, otherwise we spam the logs when there are many wake-up events - sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + private final BlockingQueue<Range> rangesToProcess; - final long waitTimeBetweenScans = manager.getConfiguration() - .getTimeInMillis(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL); + class RangeProccessor implements Runnable { + @Override + public void run() { + try { + while (manager.stillManager()) { + var range = rangesToProcess.poll(100, TimeUnit.MILLISECONDS); + if (range == null) { + // check to see if still the manager + continue; + } - int totalUnloaded = 0; - int unloaded = 0; - ClosableIterator<TabletLocationState> iter = null; - try { - Map<TableId,MergeStats> mergeStatsCache = new HashMap<>(); - Map<TableId,MergeStats> currentMerges = new HashMap<>(); - for (MergeInfo merge : manager.merges()) { - if (merge.getExtent() != null) { - currentMerges.put(merge.getExtent().tableId(), new MergeStats(merge)); + ArrayList<Range> ranges = new ArrayList<>(); + ranges.add(range); + + rangesToProcess.drainTo(ranges); + + if (manager.getManagerGoalState() == ManagerGoalState.CLEAN_STOP) { + // only do full scans when trying to shutdown + setNeedsFullScan(); + continue; + } + + TabletManagementParameters tabletMgmtParams = createTabletManagementParameters(false); + + var currentTservers = getCurrentTservers(tabletMgmtParams.getOnlineTsevers()); + if (currentTservers.isEmpty()) { + setNeedsFullScan(); + continue; + } + + try (var iter = store.iterator(ranges, tabletMgmtParams)) { + long t1 = System.currentTimeMillis(); + manageTablets(iter, tabletMgmtParams, currentTservers, false); + long t2 = System.currentTimeMillis(); + Manager.log.debug(String.format("[%s]: partial scan time %.2f seconds for %,d ranges", + store.name(), (t2 - t1) / 1000., ranges.size())); + } catch (Exception e) { + Manager.log.error("Error processing {} ranges for store {} ", ranges.size(), + store.name(), e); + } } + } catch (InterruptedException e) { + throw new RuntimeException(e); } + } + } - // Get the current status for the current list of tservers - SortedMap<TServerInstance,TabletServerStatus> currentTServers = new TreeMap<>(); - for (TServerInstance entry : manager.tserverSet.getCurrentServers()) { - currentTServers.put(entry, manager.tserverStatus.get(entry)); - } + EventHandler() { + rangesToProcess = new ArrayBlockingQueue<>(3000); - if (currentTServers.isEmpty()) { - eventListener.waitForEvents(waitTimeBetweenScans); - synchronized (this) { - lastScanServers = Collections.emptySortedSet(); + Threads + .createThread("TGW [" + store.name() + "] event range processor", new RangeProccessor()) + .start(); + } + + private synchronized void setNeedsFullScan() { + needsFullScan = true; + notifyAll(); + } + + public synchronized void clearNeedsFullScan() { + needsFullScan = false; + } + + @Override + public void process(EventCoordinator.Event event) { + + switch (event.getScope()) { + case ALL: + case DATA_LEVEL: + setNeedsFullScan(); + break; + case TABLE: + case TABLE_RANGE: + if (!rangesToProcess.offer(event.getExtent().toMetaRange())) { + Manager.log.debug("[{}] unable to process event range {} because queue is full", + store.name(), event.getExtent()); + setNeedsFullScan(); } - continue; + break; + default: + throw new IllegalArgumentException("Unhandled scope " + event.getScope()); + } + } + + synchronized void waitForFullScan(long millis) { + if (!needsFullScan) { + try { + wait(millis); + } catch (InterruptedException e) { + throw new RuntimeException(e); } + } + } + } - TabletLists tLists = new TabletLists(manager, currentTServers); + private TabletManagementParameters + createTabletManagementParameters(boolean lookForTabletsNeedingVolReplacement) { - ManagerState managerState = manager.getManagerState(); - int[] counts = new int[TabletState.values().length]; - stats.begin(); - // Walk through the tablets in our store, and work tablets - // towards their goal - iter = store.iterator(); - while (iter.hasNext()) { - TabletLocationState tls = iter.next(); - if (tls == null) { - continue; - } + HashMap<Ample.DataLevel,Boolean> parentLevelUpgrade = new HashMap<>(); + UpgradeCoordinator.UpgradeStatus upgradeStatus = manager.getUpgradeStatus(); + for (var level : Ample.DataLevel.values()) { + parentLevelUpgrade.put(level, upgradeStatus.isParentLevelUpgraded(level)); + } - // ignore entries for tables that do not exist in zookeeper - if (manager.getTableManager().getTableState(tls.extent.tableId()) == null) { - continue; - } + Set<TServerInstance> shutdownServers; + if (store.getLevel() == Ample.DataLevel.USER) { + shutdownServers = manager.shutdownServers(); + } else { + // Use the servers to shutdown filtered by the dependent watcher. These are servers to + // shutdown that the dependent watcher has determined it has no tablets hosted on or assigned + // to. + shutdownServers = dependentWatcher.getFilteredServersToShutdown(); + } - // Don't overwhelm the tablet servers with work - if (tLists.unassigned.size() + unloaded - > Manager.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) { - flushChanges(tLists, wals); - tLists.reset(); - unloaded = 0; - eventListener.waitForEvents(waitTimeBetweenScans); - } - TableId tableId = tls.extent.tableId(); - TableConfiguration tableConf = manager.getContext().getTableConfiguration(tableId); - - MergeStats mergeStats = mergeStatsCache.computeIfAbsent(tableId, k -> { - var mStats = currentMerges.get(k); - return mStats != null ? mStats : new MergeStats(new MergeInfo()); - }); - TabletGoalState goal = manager.getGoalState(tls, mergeStats.getMergeInfo()); - Location location = tls.getLocation(); - TabletState state = tls.getState(currentTServers.keySet()); - - TabletLogger.missassigned(tls.extent, goal.toString(), state.toString(), - tls.getFutureServer(), tls.getCurrentServer(), tls.walogs.size()); - - stats.update(tableId, state); - mergeStats.update(tls.extent, state); - - // Always follow through with assignments - if (state == TabletState.ASSIGNED) { - goal = TabletGoalState.HOSTED; + var tServersSnapshot = manager.tserversSnapshot(); + + return new TabletManagementParameters(manager.getManagerState(), parentLevelUpgrade, + manager.onlineTables(), tServersSnapshot, shutdownServers, manager.migrationsSnapshot(), + store.getLevel(), manager.getCompactionHints(), canSuspendTablets(), + lookForTabletsNeedingVolReplacement ? manager.getContext().getVolumeReplacements() - : List.of()); ++ : Map.of()); + } + + private Set<TServerInstance> getFilteredServersToShutdown() { + return filteredServersToShutdown; + } + + private static class TableMgmtStats { + int[] counts = new int[TabletState.values().length]; + private int totalUnloaded; + private long totalVolumeReplacements; + } + + private TableMgmtStats manageTablets(Iterator<TabletManagement> iter, + TabletManagementParameters tableMgmtParams, + SortedMap<TServerInstance,TabletServerStatus> currentTServers, boolean isFullScan) + throws BadLocationStateException, TException, DistributedStoreException, WalMarkerException, + IOException { + + TableMgmtStats tableMgmtStats = new TableMgmtStats(); + final boolean shuttingDownAllTabletServers = + tableMgmtParams.getServersToShutdown().equals(currentTServers.keySet()); + if (shuttingDownAllTabletServers && !isFullScan) { + // If we are shutting down all of the TabletServers, then don't process any events + // from the EventCoordinator. + LOG.debug("Partial scan requested, but aborted due to shutdown of all TabletServers"); + return tableMgmtStats; + } + + int unloaded = 0; + + TabletLists tLists = new TabletLists(currentTServers, tableMgmtParams.getGroupedTServers(), + tableMgmtParams.getServersToShutdown()); + + CompactionJobGenerator compactionGenerator = new CompactionJobGenerator( + new ServiceEnvironmentImpl(manager.getContext()), tableMgmtParams.getCompactionHints()); + + Set<TServerInstance> filteredServersToShutdown = + new HashSet<>(tableMgmtParams.getServersToShutdown()); + + while (iter.hasNext()) { + final TabletManagement mti = iter.next(); + if (mti == null) { + throw new IllegalStateException("State store returned a null ManagerTabletInfo object"); + } + + final TabletMetadata tm = mti.getTabletMetadata(); + + final String mtiError = mti.getErrorMessage(); + if (mtiError != null) { + // An error happened on the TabletServer in the TabletManagementIterator + // when trying to process this extent. + LOG.warn( + "Error on TabletServer trying to get Tablet management information for extent: {}. Error message: {}", + tm.getExtent(), mtiError); + this.metrics.incrementTabletGroupWatcherError(this.store.getLevel()); + continue; + } + + final Set<ManagementAction> actions = mti.getActions(); + if (actions.contains(ManagementAction.BAD_STATE) && tm.isFutureAndCurrentLocationSet()) { + throw new BadLocationStateException( + tm.getExtent() + " is both assigned and hosted, which should never happen: " + this, + tm.getExtent().toMetaRow()); + } + + final TableId tableId = tm.getTableId(); + // ignore entries for tables that do not exist in zookeeper + if (manager.getTableManager().getTableState(tableId) == null) { + continue; + } + + // Don't overwhelm the tablet servers with work + if (tLists.unassigned.size() + unloaded + > Manager.MAX_TSERVER_WORK_CHUNK * currentTServers.size() + || tLists.volumeReplacements.size() > 1000) { + flushChanges(tLists); + tLists.reset(); + unloaded = 0; + } + + final TableConfiguration tableConf = manager.getContext().getTableConfiguration(tableId); + + TabletState state = TabletState.compute(tm, currentTServers.keySet()); + if (state == TabletState.ASSIGNED_TO_DEAD_SERVER) { + /* + * This code exists to deal with a race condition caused by two threads running in this + * class that compute tablets actions. One thread does full scans and the other reacts to + * events and does partial scans. Below is an example of the race condition this is + * handling. + * + * - TGW Thread 1 : reads the set of tablets servers and its empty + * + * - TGW Thread 2 : reads the set of tablet servers and its [TS1] + * + * - TGW Thread 2 : Sees tabletX without a location and assigns it to TS1 + * + * - TGW Thread 1 : Sees tabletX assigned to TS1 and assumes it's assigned to a dead tablet + * server because its set of live servers is the empty set. + * + * To deal with this race condition, this code recomputes the tablet state using the latest + * tservers when a tablet is seen assigned to a dead tserver. + */ + + TabletState newState = TabletState.compute(tm, manager.tserversSnapshot().getTservers()); + if (newState != state) { + LOG.debug("Tablet state changed when using latest set of tservers {} {} {}", + tm.getExtent(), state, newState); + state = newState; + } + } + + // This is final because nothing in this method should change the goal. All computation of the + // goal should be done in TabletGoalState.compute() so that all parts of the Accumulo code + // will compute a consistent goal. + final TabletGoalState goal = + TabletGoalState.compute(tm, state, manager.tabletBalancer, tableMgmtParams); + + if (actions.contains(ManagementAction.NEEDS_VOLUME_REPLACEMENT)) { + tableMgmtStats.totalVolumeReplacements++; + if (state == TabletState.UNASSIGNED || state == TabletState.SUSPENDED) { + var volRep = + VolumeUtil.computeVolumeReplacements(tableMgmtParams.getVolumeReplacements(), tm); + if (volRep.logsToRemove.size() + volRep.filesToRemove.size() > 0) { + if (tm.getLocation() != null) { + // since the totalVolumeReplacements counter was incremented, should try this again + // later after its unassigned + LOG.debug("Volume replacement needed for {} but it has a location {}.", + tm.getExtent(), tm.getLocation()); + } else if (tm.getOperationId() != null) { + LOG.debug("Volume replacement needed for {} but it has an active operation {}.", + tm.getExtent(), tm.getOperationId()); + } else { + LOG.debug("Volume replacement needed for {}.", tm.getExtent()); + // buffer replacements so that multiple mutations can be done at once + tLists.volumeReplacements.add(volRep); + } + } else { + LOG.debug("Volume replacement evaluation for {} returned no changes.", tm.getExtent()); } - if (Manager.log.isTraceEnabled()) { - Manager.log.trace( - "[{}] Shutting down all Tservers: {}, dependentCount: {} Extent: {}, state: {}, goal: {}", - store.name(), manager.serversToShutdown.equals(currentTServers.keySet()), - dependentWatcher == null ? "null" : dependentWatcher.assignedOrHosted(), tls.extent, - state, goal); + } else { + LOG.debug("Volume replacement needed for {} but its tablet state is {}.", tm.getExtent(), + state); + } + } + + final Location location = tm.getLocation(); + Location current = null; + Location future = null; + if (tm.hasCurrent()) { + current = tm.getLocation(); + } else { + future = tm.getLocation(); + } + TabletLogger.missassigned(tm.getExtent(), goal.toString(), state.toString(), + future != null ? future.getServerInstance() : null, + current != null ? current.getServerInstance() : null, tm.getLogs().size()); + + if (isFullScan) { + stats.update(tableId, state); + } + + if (Manager.log.isTraceEnabled()) { + Manager.log.trace( + "[{}] Shutting down all Tservers: {}, dependentCount: {} Extent: {}, state: {}, goal: {} actions:{}", + store.name(), tableMgmtParams.getServersToShutdown().equals(currentTServers.keySet()), + dependentWatcher == null ? "null" : dependentWatcher.assignedOrHosted(), tm.getExtent(), + state, goal, actions); + } + + if (actions.contains(ManagementAction.NEEDS_SPLITTING) + && !actions.contains(ManagementAction.NEEDS_VOLUME_REPLACEMENT)) { + LOG.debug("{} may need splitting.", tm.getExtent()); + if (manager.getSplitter().isSplittable(tm)) { + if (manager.getSplitter().addSplitStarting(tm.getExtent())) { + LOG.debug("submitting tablet {} for split", tm.getExtent()); + manager.getSplitter().executeSplit(new SplitTask(manager.getContext(), tm, manager)); } + } else { + LOG.debug("{} is not splittable.", tm.getExtent()); + } + // ELASITICITY_TODO: See #3605. Merge is non-functional. Left this commented out code to + // show where merge used to make a call to split a tablet. + // sendSplitRequest(mergeStats.getMergeInfo(), state, tm); + } + + if (actions.contains(ManagementAction.NEEDS_COMPACTING) + && !actions.contains(ManagementAction.NEEDS_VOLUME_REPLACEMENT)) { + var jobs = compactionGenerator.generateJobs(tm, + TabletManagementIterator.determineCompactionKinds(actions)); + LOG.debug("{} may need compacting adding {} jobs", tm.getExtent(), jobs.size()); + manager.getCompactionCoordinator().addJobs(tm, jobs); + } + + // ELASITICITY_TODO the case where a planner generates compactions at time T1 for tablet + // and later at time T2 generates nothing for the same tablet is not being handled. At + // time T1 something could have been queued. However at time T2 we will not clear those + // entries from the queue because we see nothing here for that case. After a full + // metadata scan could remove any tablets that were not updated during the scan. + + if (actions.contains(ManagementAction.NEEDS_LOCATION_UPDATE)) { - // if we are shutting down all the tabletservers, we have to do it in order - if ((goal == TabletGoalState.SUSPENDED && state == TabletState.HOSTED) - && manager.serversToShutdown.equals(currentTServers.keySet())) { - if (dependentWatcher != null) { - // If the dependentWatcher is for the user tables, check to see - // that user tables exist. - DataLevel dependentLevel = dependentWatcher.store.getLevel(); - boolean userTablesExist = true; - switch (dependentLevel) { - case USER: - Set<TableId> onlineTables = manager.onlineTables(); - onlineTables.remove(RootTable.ID); - onlineTables.remove(MetadataTable.ID); - userTablesExist = !onlineTables.isEmpty(); - break; - case METADATA: - case ROOT: - default: - break; + if (tm.getLocation() != null) { + filteredServersToShutdown.remove(tm.getLocation().getServerInstance()); + } + + if (goal == TabletGoalState.HOSTED) { + if ((state != TabletState.HOSTED && !tm.getLogs().isEmpty()) + && manager.recoveryManager.recoverLogs(tm.getExtent(), tm.getLogs())) { + LOG.debug("Not hosting {} as it needs recovery, logs: {}", tm.getExtent(), + tm.getLogs().size()); + continue; + } + switch (state) { + case HOSTED: + if (location.getServerInstance().equals(manager.migrations.get(tm.getExtent()))) { + manager.migrations.remove(tm.getExtent()); } - // If the stats object in the dependentWatcher is empty, then it - // currently does not have data about what is hosted or not. In - // that case host these tablets until the dependent watcher can - // gather some data. - final Map<TableId,TableCounts> stats = dependentWatcher.getStats(); - if (dependentLevel == DataLevel.USER) { - if (userTablesExist - && (stats == null || stats.isEmpty() || assignedOrHosted(stats) > 0)) { - goal = TabletGoalState.HOSTED; - } - } else if (stats == null || stats.isEmpty() || assignedOrHosted(stats) > 0) { - goal = TabletGoalState.HOSTED; + break; + case ASSIGNED_TO_DEAD_SERVER: + hostDeadTablet(tLists, tm, location); + break; + case SUSPENDED: + hostSuspendedTablet(tLists, tm, location, tableConf); + break; + case UNASSIGNED: + hostUnassignedTablet(tLists, tm.getExtent(), + new UnassignedTablet(location, tm.getLast())); + break; + case ASSIGNED: + // Send another reminder + tLists.assigned.add(new Assignment(tm.getExtent(), + future != null ? future.getServerInstance() : null, tm.getLast())); + break; + default: + break; + } + } else { + switch (state) { + case SUSPENDED: + // Request a move to UNASSIGNED, so as to allow balancing to continue. + tLists.suspendedToGoneServers.add(tm); + cancelOfflineTableMigrations(tm.getExtent()); + break; + case UNASSIGNED: + cancelOfflineTableMigrations(tm.getExtent()); + break; + case ASSIGNED_TO_DEAD_SERVER: + unassignDeadTablet(tLists, tm); + break; + case HOSTED: + TServerConnection client = + manager.tserverSet.getConnection(location.getServerInstance()); + if (client != null) { + LOG.debug("Requesting tserver {} unload tablet {}", location.getServerInstance(), + tm.getExtent()); + client.unloadTablet(manager.managerLock, tm.getExtent(), goal.howUnload(), + manager.getSteadyTime()); + tableMgmtStats.totalUnloaded++; + unloaded++; + } else { + Manager.log.warn("Could not connect to server {}", location); } - } + break; + case ASSIGNED: + break; } + } + tableMgmtStats.counts[state.ordinal()]++; + } + } - if (goal == TabletGoalState.HOSTED) { - if ((state != TabletState.HOSTED && !tls.walogs.isEmpty()) - && manager.recoveryManager.recoverLogs(tls.extent, tls.walogs)) { - continue; - } - switch (state) { - case HOSTED: - if (location.getServerInstance().equals(manager.migrations.get(tls.extent))) { - manager.migrations.remove(tls.extent); - } - break; - case ASSIGNED_TO_DEAD_SERVER: - hostDeadTablet(tLists, tls, location, wals); - break; - case SUSPENDED: - hostSuspendedTablet(tLists, tls, location, tableConf); - break; - case UNASSIGNED: - hostUnassignedTablet(tLists, tls.extent, new UnassignedTablet(location, tls.last)); - break; - case ASSIGNED: - // Send another reminder - tLists.assigned.add(new Assignment(tls.extent, tls.getFutureServer(), tls.last)); - break; - } - } else { - switch (state) { - case SUSPENDED: - // Request a move to UNASSIGNED, so as to allow balancing to continue. - tLists.suspendedToGoneServers.add(tls); - cancelOfflineTableMigrations(tls.extent); - break; - case UNASSIGNED: - cancelOfflineTableMigrations(tls.extent); - break; - case ASSIGNED_TO_DEAD_SERVER: - unassignDeadTablet(tLists, tls, wals); - break; - case HOSTED: - TServerConnection client = - manager.tserverSet.getConnection(location.getServerInstance()); - if (client != null) { - Manager.log.trace("[{}] Requesting TabletServer {} unload {} {}", store.name(), - location.getServerInstance(), tls.extent, goal.howUnload()); - client.unloadTablet(manager.managerLock, tls.extent, goal.howUnload(), - manager.getSteadyTime()); - unloaded++; - totalUnloaded++; - } else { - Manager.log.warn("Could not connect to server {}", location); - } - break; - case ASSIGNED: - break; - } + flushChanges(tLists); + + if (isFullScan) { + this.filteredServersToShutdown = Set.copyOf(filteredServersToShutdown); + } + + return tableMgmtStats; + } + + private SortedMap<TServerInstance,TabletServerStatus> + getCurrentTservers(Set<TServerInstance> onlineTservers) { + // Get the current status for the current list of tservers + final SortedMap<TServerInstance,TabletServerStatus> currentTServers = new TreeMap<>(); + for (TServerInstance entry : onlineTservers) { + currentTServers.put(entry, manager.tserverStatus.get(entry)); + } + return currentTServers; + } + + @Override + public void run() { + int[] oldCounts = new int[TabletState.values().length]; + boolean lookForTabletsNeedingVolReplacement = true; + + while (manager.stillManager()) { + // slow things down a little, otherwise we spam the logs when there are many wake-up events + sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + // ELASTICITY_TODO above sleep in the case when not doing a full scan to make manager more + // responsive + + final long waitTimeBetweenScans = manager.getConfiguration() + .getTimeInMillis(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL); + + TabletManagementParameters tableMgmtParams = + createTabletManagementParameters(lookForTabletsNeedingVolReplacement); + var currentTServers = getCurrentTservers(tableMgmtParams.getOnlineTsevers()); + + ClosableIterator<TabletManagement> iter = null; + try { + if (currentTServers.isEmpty()) { + eventHandler.waitForFullScan(waitTimeBetweenScans); + synchronized (this) { + lastScanServers = Collections.emptySortedSet(); } - counts[state.ordinal()]++; + continue; } - flushChanges(tLists, wals); + stats.begin(); + + ManagerState managerState = tableMgmtParams.getManagerState(); + + // Clear the need for a full scan before starting a full scan inorder to detect events that + // happen during the full scan. + eventHandler.clearNeedsFullScan(); + + iter = store.iterator(tableMgmtParams); + var tabletMgmtStats = manageTablets(iter, tableMgmtParams, currentTServers, true); + lookForTabletsNeedingVolReplacement = tabletMgmtStats.totalVolumeReplacements != 0; // provide stats after flushing changes to avoid race conditions w/ delete table stats.end(managerState); diff --cc test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java index 5ca097da33,0000000000..06c3bec142 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java @@@ -1,484 -1,0 +1,482 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchDeleter; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.IsolatedScanner; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.RowIterator; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.TabletHostingGoal; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.manager.state.TabletManagement; +import org.apache.accumulo.core.manager.state.tables.TableState; +import org.apache.accumulo.core.manager.thrift.ManagerState; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; +import org.apache.accumulo.core.metadata.schema.TabletOperationId; +import org.apache.accumulo.core.metadata.schema.TabletOperationType; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.tabletserver.log.LogEntry; - import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.server.manager.LiveTServerSet; +import org.apache.accumulo.server.manager.state.TabletManagementIterator; +import org.apache.accumulo.server.manager.state.TabletManagementParameters; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; + +/** + * Test to ensure that the {@link TabletManagementIterator} properly skips over tablet information + * in the metadata table when there is no work to be done on the tablet (see ACCUMULO-3580) + */ +public class TabletManagementIteratorIT extends AccumuloClusterHarness { + private final static Logger log = LoggerFactory.getLogger(TabletManagementIteratorIT.class); + + @Override + protected Duration defaultTimeout() { + return Duration.ofMinutes(3); + } + + @Test + public void test() throws AccumuloException, AccumuloSecurityException, TableExistsException, + TableNotFoundException, IOException { + + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + + String[] tables = getUniqueNames(8); + final String t1 = tables[0]; + final String t2 = tables[1]; + final String t3 = tables[2]; + final String t4 = tables[3]; + final String metaCopy1 = tables[4]; + final String metaCopy2 = tables[5]; + final String metaCopy3 = tables[6]; + final String metaCopy4 = tables[7]; + + // create some metadata + createTable(client, t1, true); + createTable(client, t2, false); + createTable(client, t3, true); + createTable(client, t4, true); + + // Scan table t3 which will cause it's tablets + // to be hosted. Then, remove the location. + Scanner s = client.createScanner(t3); + s.setRange(new Range()); + @SuppressWarnings("unused") + var unused = Iterables.size(s); // consume all the data + + // examine a clone of the metadata table, so we can manipulate it + copyTable(client, MetadataTable.NAME, metaCopy1); + + TabletManagementParameters tabletMgmtParams = createParameters(client); + int tabletsInFlux = findTabletsNeedingAttention(client, metaCopy1, tabletMgmtParams); + while (tabletsInFlux > 0) { + log.debug("Waiting for {} tablets for {}", tabletsInFlux, metaCopy1); + UtilWaitThread.sleep(500); + copyTable(client, MetadataTable.NAME, metaCopy1); + tabletsInFlux = findTabletsNeedingAttention(client, metaCopy1, tabletMgmtParams); + } + assertEquals(0, findTabletsNeedingAttention(client, metaCopy1, tabletMgmtParams), + "No tables should need attention"); + + // The metadata table stabilized and metaCopy1 contains a copy suitable for testing. Before + // metaCopy1 is modified, copy it for subsequent test. + copyTable(client, metaCopy1, metaCopy2); + copyTable(client, metaCopy1, metaCopy3); + copyTable(client, metaCopy1, metaCopy4); + + // t1 is unassigned, setting to always will generate a change to host tablets + setTabletHostingGoal(client, metaCopy1, t1, TabletHostingGoal.ALWAYS.name()); + // t3 is hosted, setting to never will generate a change to unhost tablets + setTabletHostingGoal(client, metaCopy1, t3, TabletHostingGoal.NEVER.name()); + tabletMgmtParams = createParameters(client); + assertEquals(4, findTabletsNeedingAttention(client, metaCopy1, tabletMgmtParams), + "Should have four tablets with hosting goal changes"); + + // test the assigned case (no location) + removeLocation(client, metaCopy1, t3); + assertEquals(2, findTabletsNeedingAttention(client, metaCopy1, tabletMgmtParams), + "Should have two tablets without a loc"); + + // Test setting the operation id on one of the tablets in table t1. Table t1 has two tablets + // w/o a location. Only one should need attention because of the operation id. + setOperationId(client, metaCopy1, t1, new Text("some split"), TabletOperationType.SPLITTING); + assertEquals(1, findTabletsNeedingAttention(client, metaCopy1, tabletMgmtParams), + "Should have tablets needing attention because of operation id"); + + // test the cases where the assignment is to a dead tserver + reassignLocation(client, metaCopy2, t3); + assertEquals(1, findTabletsNeedingAttention(client, metaCopy2, tabletMgmtParams), + "Only 1 of 2 tablets in table t1 should be returned"); + + // Remove location and set merge operation id on both tablets + // These tablets should not need attention as they have no WALs + setTabletHostingGoal(client, metaCopy4, t4, TabletHostingGoal.ALWAYS.name()); + removeLocation(client, metaCopy4, t4); + assertEquals(2, findTabletsNeedingAttention(client, metaCopy4, tabletMgmtParams), + "Tablets have no location and a hosting goal of always, so they should need attention"); + + // Test MERGING and SPLITTING do not need attention with no location or wals + setOperationId(client, metaCopy4, t4, null, TabletOperationType.MERGING); + assertEquals(0, findTabletsNeedingAttention(client, metaCopy4, tabletMgmtParams), + "Should have no tablets needing attention for merge as they have no location"); + setOperationId(client, metaCopy4, t4, null, TabletOperationType.SPLITTING); + assertEquals(0, findTabletsNeedingAttention(client, metaCopy4, tabletMgmtParams), + "Should have no tablets needing attention for merge as they have no location"); + + // Create a log entry for one of the tablets, this tablet will now need attention + // for both MERGING and SPLITTING + setOperationId(client, metaCopy4, t4, null, TabletOperationType.MERGING); + createLogEntry(client, metaCopy4, t4); + assertEquals(1, findTabletsNeedingAttention(client, metaCopy4, tabletMgmtParams), + "Should have a tablet needing attention because of wals"); + // Switch op to SPLITTING which should also need attention like MERGING + setOperationId(client, metaCopy4, t4, null, TabletOperationType.SPLITTING); + assertEquals(1, findTabletsNeedingAttention(client, metaCopy4, tabletMgmtParams), + "Should have a tablet needing attention because of wals"); + + // Switch op to delete, no tablets should need attention even with WALs + setOperationId(client, metaCopy4, t4, null, TabletOperationType.DELETING); + assertEquals(0, findTabletsNeedingAttention(client, metaCopy4, tabletMgmtParams), + "Should have no tablets needing attention for delete"); + + // test the bad tablet location state case (inconsistent metadata) + tabletMgmtParams = createParameters(client); + addDuplicateLocation(client, metaCopy3, t3); + assertEquals(1, findTabletsNeedingAttention(client, metaCopy3, tabletMgmtParams), + "Should have 1 tablet that needs a metadata repair"); + + // test the volume replacements case. Need to insert some files into + // the metadata for t4, then run the TabletManagementIterator with + // volume replacements + addFiles(client, metaCopy4, t4); - List<Pair<Path,Path>> replacements = new ArrayList<>(); - replacements.add(new Pair<Path,Path>(new Path("file:/vol1/accumulo/inst_id"), - new Path("file:/vol2/accumulo/inst_id"))); ++ Map<Path,Path> replacements = ++ Map.of(new Path("file:/vol1/accumulo/inst_id"), new Path("file:/vol2/accumulo/inst_id")); + tabletMgmtParams = createParameters(client, replacements); + assertEquals(1, findTabletsNeedingAttention(client, metaCopy4, tabletMgmtParams), + "Should have one tablet that needs a volume replacement"); + + // clean up + dropTables(client, t1, t2, t3, t4, metaCopy1, metaCopy2, metaCopy3, metaCopy4); + } + } + + private void setTabletHostingGoal(AccumuloClient client, String table, String tableNameToModify, + String state) throws TableNotFoundException, MutationsRejectedException { + TableId tableIdToModify = + TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify)); + + try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) { + scanner.setRange(new KeyExtent(tableIdToModify, null, null).toMetaRange()); + for (Entry<Key,Value> entry : scanner) { + Mutation m = new Mutation(entry.getKey().getRow()); + m.put(HostingColumnFamily.GOAL_COLUMN.getColumnFamily(), + HostingColumnFamily.GOAL_COLUMN.getColumnQualifier(), entry.getKey().getTimestamp() + 1, + new Value(state)); + try (BatchWriter bw = client.createBatchWriter(table)) { + bw.addMutation(m); + } + } + } + } + + private void addDuplicateLocation(AccumuloClient client, String table, String tableNameToModify) + throws TableNotFoundException, MutationsRejectedException { + TableId tableIdToModify = + TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify)); + Mutation m = new Mutation(new KeyExtent(tableIdToModify, null, null).toMetaRow()); + m.put(CurrentLocationColumnFamily.NAME, new Text("1234567"), new Value("fake:9005")); + try (BatchWriter bw = client.createBatchWriter(table)) { + bw.addMutation(m); + } + } + + private void addFiles(AccumuloClient client, String table, String tableNameToModify) + throws TableNotFoundException, MutationsRejectedException { + TableId tableIdToModify = + TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify)); + Mutation m = new Mutation(new KeyExtent(tableIdToModify, null, null).toMetaRow()); + m.put(DataFileColumnFamily.NAME, + new Text(StoredTabletFile + .serialize("file:/vol1/accumulo/inst_id/tables/2a/default_tablet/F0000072.rf")), + new Value(new DataFileValue(0, 0, 0).encode())); + try (BatchWriter bw = client.createBatchWriter(table)) { + bw.addMutation(m); + } + try { + client.createScanner(table).iterator() + .forEachRemaining(e -> System.out.println(e.getKey() + "-> " + e.getValue())); + } catch (TableNotFoundException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (AccumuloSecurityException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (AccumuloException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + private void reassignLocation(AccumuloClient client, String table, String tableNameToModify) + throws TableNotFoundException, MutationsRejectedException { + TableId tableIdToModify = + TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify)); + try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) { + scanner.setRange(new KeyExtent(tableIdToModify, null, null).toMetaRange()); + scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME); + Entry<Key,Value> entry = scanner.iterator().next(); + Mutation m = new Mutation(entry.getKey().getRow()); + m.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier(), + entry.getKey().getTimestamp()); + m.put(entry.getKey().getColumnFamily(), new Text("1234567"), + entry.getKey().getTimestamp() + 1, new Value("fake:9005")); + try (BatchWriter bw = client.createBatchWriter(table)) { + bw.addMutation(m); + } + } + } + + // Sets an operation type on all tablets up to the end row + private void setOperationId(AccumuloClient client, String table, String tableNameToModify, + Text end, TabletOperationType opType) throws TableNotFoundException { + var opid = TabletOperationId.from(opType, 42L); + TableId tableIdToModify = + TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify)); + try (TabletsMetadata tabletsMetadata = + getServerContext().getAmple().readTablets().forTable(tableIdToModify) + .overlapping(null, end != null ? TabletsSection.encodeRow(tableIdToModify, end) : null) + .fetch(ColumnType.PREV_ROW).build()) { + for (TabletMetadata tabletMetadata : tabletsMetadata) { + Mutation m = new Mutation(tabletMetadata.getExtent().toMetaRow()); + MetadataSchema.TabletsSection.ServerColumnFamily.OPID_COLUMN.put(m, + new Value(opid.canonical())); + try (BatchWriter bw = client.createBatchWriter(table)) { + bw.addMutation(m); + } catch (MutationsRejectedException e) { + throw new RuntimeException(e); + } + } + } + } + + private void removeLocation(AccumuloClient client, String table, String tableNameToModify) + throws TableNotFoundException, MutationsRejectedException { + TableId tableIdToModify = + TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify)); + BatchDeleter deleter = client.createBatchDeleter(table, Authorizations.EMPTY, 1); + deleter + .setRanges(Collections.singleton(new KeyExtent(tableIdToModify, null, null).toMetaRange())); + deleter.fetchColumnFamily(CurrentLocationColumnFamily.NAME); + deleter.delete(); + deleter.close(); + } + + private int findTabletsNeedingAttention(AccumuloClient client, String table, + TabletManagementParameters tabletMgmtParams) throws TableNotFoundException, IOException { + int results = 0; + List<KeyExtent> resultList = new ArrayList<>(); + try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) { + TabletManagementIterator.configureScanner(scanner, tabletMgmtParams); + scanner.updateScanIteratorOption("tabletChange", "debug", "1"); + for (Entry<Key,Value> e : scanner) { + if (e != null) { + TabletManagement mti = TabletManagementIterator.decode(e); + results++; + log.debug("Found tablets that changed state: {}", mti.getTabletMetadata().getExtent()); + log.debug("metadata: {}", mti.getTabletMetadata()); + resultList.add(mti.getTabletMetadata().getExtent()); + } + } + } + log.debug("Tablets in flux: {}", resultList); + return results; + } + + private void createTable(AccumuloClient client, String t, boolean online) + throws AccumuloSecurityException, AccumuloException, TableNotFoundException, + TableExistsException { + SortedSet<Text> partitionKeys = new TreeSet<>(); + partitionKeys.add(new Text("some split")); + NewTableConfiguration ntc = new NewTableConfiguration().withSplits(partitionKeys); + client.tableOperations().create(t, ntc); + client.tableOperations().online(t); + if (!online) { + client.tableOperations().offline(t, true); + } + } + + /** + * Create a copy of the source table by first gathering all the rows of the source in a list of + * mutations. Then create the copy of the table and apply the mutations to the copy. + */ + private void copyTable(AccumuloClient client, String source, String copy) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException, + TableExistsException { + try { + dropTables(client, copy); + } catch (TableNotFoundException ex) { + // ignored + } + + log.info("Gathering rows to copy {} ", source); + List<Mutation> mutations = new ArrayList<>(); + + try (Scanner scanner = client.createScanner(source, Authorizations.EMPTY)) { + RowIterator rows = new RowIterator(new IsolatedScanner(scanner)); + + while (rows.hasNext()) { + Iterator<Entry<Key,Value>> row = rows.next(); + Mutation m = null; + + while (row.hasNext()) { + Entry<Key,Value> entry = row.next(); + Key k = entry.getKey(); + if (m == null) { + m = new Mutation(k.getRow()); + } + + m.put(k.getColumnFamily(), k.getColumnQualifier(), k.getColumnVisibilityParsed(), + k.getTimestamp(), entry.getValue()); + } + + mutations.add(m); + } + } + + // metadata should be stable with only 6 rows (2 for each table) + log.debug("Gathered {} rows to create copy {}", mutations.size(), copy); + assertEquals(8, mutations.size(), "Metadata should have 8 rows (2 for each table)"); + client.tableOperations().create(copy); + + try (BatchWriter writer = client.createBatchWriter(copy)) { + for (Mutation m : mutations) { + writer.addMutation(m); + } + } + + log.info("Finished creating copy " + copy); + } + + private void dropTables(AccumuloClient client, String... tables) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + for (String t : tables) { + client.tableOperations().delete(t); + } + } + + // Creates a log entry on the "some split" extent, this could be modified easily to support + // other extents + private void createLogEntry(AccumuloClient client, String table, String tableNameToModify) + throws MutationsRejectedException, TableNotFoundException { + TableId tableIdToModify = + TableId.of(client.tableOperations().tableIdMap().get(tableNameToModify)); + KeyExtent extent = new KeyExtent(tableIdToModify, new Text("some split"), null); + Mutation m = new Mutation(extent.toMetaRow()); + String fileName = "file:/accumulo/wal/localhost+9997/" + UUID.randomUUID().toString(); + LogEntry logEntry = LogEntry.fromPath(fileName); + logEntry.addToMutation(m); + try (BatchWriter bw = client.createBatchWriter(table)) { + bw.addMutation(m); + } + } + + private static TabletManagementParameters createParameters(AccumuloClient client) { - return createParameters(client, List.of()); ++ return createParameters(client, Map.of()); + } + + private static TabletManagementParameters createParameters(AccumuloClient client, - List<Pair<Path,Path>> replacements) { ++ Map<Path,Path> replacements) { + var context = (ClientContext) client; + Set<TableId> onlineTables = Sets.filter(context.getTableIdToNameMap().keySet(), + tableId -> context.getTableState(tableId) == TableState.ONLINE); + + HashSet<TServerInstance> tservers = new HashSet<>(); + for (String tserver : context.instanceOperations().getTabletServers()) { + try { + var zPath = ServiceLock.path(ZooUtil.getRoot(context.instanceOperations().getInstanceId()) + + Constants.ZTSERVERS + "/" + tserver); + long sessionId = ServiceLock.getSessionId(context.getZooCache(), zPath); + tservers.add(new TServerInstance(tserver, sessionId)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + return new TabletManagementParameters(ManagerState.NORMAL, + Map.of( + Ample.DataLevel.ROOT, true, Ample.DataLevel.USER, true, Ample.DataLevel.METADATA, true), + onlineTables, + new LiveTServerSet.LiveTServersSnapshot(tservers, + Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, tservers)), + Set.of(), Map.of(), Ample.DataLevel.USER, Map.of(), true, replacements); + } +}