This is an automated email from the ASF dual-hosted git repository. ddanielr pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit de82d54847bb426da887d759ef1e7520c74be91d Merge: 682e1c8ed8 950fce0d36 Author: Daniel Roberts ddanielr <[email protected]> AuthorDate: Mon Nov 10 18:26:01 2025 +0000 Merge branch '2.1' .../accumulo/manager/TabletGroupWatcher.java | 6 +- .../accumulo/manager/recovery/RecoveryManager.java | 124 ++++++++++++--------- .../shell/commands/ActiveCompactionHelper.java | 55 ++++++++- .../apache/accumulo/test/shell/ShellServerIT.java | 8 +- 4 files changed, 138 insertions(+), 55 deletions(-) diff --cc server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index 29eb475c8e,215e0b8f74..ba278e0901 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@@ -57,35 -53,44 +57,36 @@@ import org.apache.accumulo.core.data.Re import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.gc.ReferenceFile; import org.apache.accumulo.core.logging.ConditionalLogger.EscalatingLogger; import org.apache.accumulo.core.logging.TabletLogger; +import org.apache.accumulo.core.manager.state.TabletManagement; +import org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction; import org.apache.accumulo.core.manager.state.tables.TableState; +import org.apache.accumulo.core.manager.thrift.ManagerGoalState; import org.apache.accumulo.core.manager.thrift.ManagerState; -import org.apache.accumulo.core.master.thrift.TabletServerStatus; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.RootTable; -import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.manager.thrift.TabletServerStatus; import org.apache.accumulo.core.metadata.TServerInstance; -import org.apache.accumulo.core.metadata.TabletLocationState; -import org.apache.accumulo.core.metadata.TabletLocationState.BadLocationStateException; import org.apache.accumulo.core.metadata.TabletState; import org.apache.accumulo.core.metadata.schema.Ample; -import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataTime; +import org.apache.accumulo.core.metadata.schema.RootTabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; import org.apache.accumulo.core.util.Timer; +import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.core.util.threads.Threads.AccumuloDaemonThread; -import org.apache.accumulo.manager.Manager.TabletGoalState; +import org.apache.accumulo.manager.metrics.ManagerMetrics; + import org.apache.accumulo.manager.recovery.RecoveryManager; -import org.apache.accumulo.manager.state.MergeStats; import org.apache.accumulo.manager.state.TableCounts; import org.apache.accumulo.manager.state.TableStats; -import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.manager.upgrade.UpgradeCoordinator; +import org.apache.accumulo.server.ServiceEnvironmentImpl; +import org.apache.accumulo.server.compaction.CompactionJobGenerator; +import org.apache.accumulo.server.conf.CheckCompactionConfig; import org.apache.accumulo.server.conf.TableConfiguration; -import org.apache.accumulo.server.gc.AllVolumesDirectory; +import org.apache.accumulo.server.fs.VolumeUtil; import org.apache.accumulo.server.log.WalStateManager; import org.apache.accumulo.server.log.WalStateManager.WalMarkerException; import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection; @@@ -223,565 -186,208 +224,568 @@@ abstract class TabletGroupWatcher exten } } - @Override - public void run() { - int[] oldCounts = new int[TabletState.values().length]; - EventCoordinator.Listener eventListener = this.manager.nextEvent.getListener(); + class EventHandler implements EventCoordinator.Listener { - WalStateManager wals = new WalStateManager(manager.getContext()); + // Setting this to true to start with because its not know what happended before this object was + // created, so just start off with full scan. + private boolean needsFullScan = true; - while (manager.stillManager()) { - // slow things down a little, otherwise we spam the logs when there are many wake-up events - sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + private final BlockingQueue<Range> rangesToProcess; - final long waitTimeBetweenScans = manager.getConfiguration() - .getTimeInMillis(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL); + class RangeProccessor implements Runnable { + @Override + public void run() { + try { + while (manager.stillManager()) { + var range = rangesToProcess.poll(100, TimeUnit.MILLISECONDS); + if (range == null) { + // check to see if still the manager + continue; + } - int totalUnloaded = 0; - int unloaded = 0; - ClosableIterator<TabletLocationState> iter = null; - try { - Map<TableId,MergeStats> mergeStatsCache = new HashMap<>(); - Map<TableId,MergeStats> currentMerges = new HashMap<>(); - for (MergeInfo merge : manager.merges()) { - if (merge.getExtent() != null) { - currentMerges.put(merge.getExtent().tableId(), new MergeStats(merge)); + ArrayList<Range> ranges = new ArrayList<>(); + ranges.add(range); + + rangesToProcess.drainTo(ranges); + + if (!processRanges(ranges)) { + setNeedsFullScan(); + } } + } catch (InterruptedException e) { + throw new RuntimeException(e); } + } + } - // Get the current status for the current list of tservers - SortedMap<TServerInstance,TabletServerStatus> currentTServers = new TreeMap<>(); - for (TServerInstance entry : manager.tserverSet.getCurrentServers()) { - currentTServers.put(entry, manager.tserverStatus.get(entry)); - } + EventHandler() { + rangesToProcess = new ArrayBlockingQueue<>(10000); - if (currentTServers.isEmpty()) { - eventListener.waitForEvents(waitTimeBetweenScans); - synchronized (this) { - lastScanServers = Collections.emptySortedSet(); + Threads.createCriticalThread("TGW [" + store.name() + "] event range processor", + new RangeProccessor()).start(); + } + + private synchronized void setNeedsFullScan() { + needsFullScan = true; + notifyAll(); + } + + public synchronized void clearNeedsFullScan() { + needsFullScan = false; + } + + public synchronized boolean isNeedsFullScan() { + return needsFullScan; + } + + @Override + public void process(EventCoordinator.Event event) { + + switch (event.getScope()) { + case ALL: + case DATA_LEVEL: + setNeedsFullScan(); + break; + case TABLE: + case TABLE_RANGE: + if (!rangesToProcess.offer(event.getExtent().toMetaRange())) { + Manager.log.debug("[{}] unable to process event range {} because queue is full", + store.name(), event.getExtent()); + setNeedsFullScan(); } - continue; + break; + default: + throw new IllegalArgumentException("Unhandled scope " + event.getScope()); + } + } + + synchronized void waitForFullScan(long millis) { + if (!needsFullScan) { + try { + wait(millis); + } catch (InterruptedException e) { + throw new RuntimeException(e); } + } + } + } - TabletLists tLists = new TabletLists(manager, currentTServers); + private boolean processRanges(List<Range> ranges) { + if (manager.getManagerGoalState() == ManagerGoalState.CLEAN_STOP) { + return false; + } - RecoveryManager.RecoverySession recoverySession = - manager.recoveryManager.newRecoverySession(); + TabletManagementParameters tabletMgmtParams = createTabletManagementParameters(false); - ManagerState managerState = manager.getManagerState(); - int[] counts = new int[TabletState.values().length]; - stats.begin(); - // Walk through the tablets in our store, and work tablets - // towards their goal - iter = store.iterator(); - while (iter.hasNext()) { - TabletLocationState tls = iter.next(); - if (tls == null) { - continue; - } + var currentTservers = getCurrentTservers(tabletMgmtParams.getOnlineTsevers()); + if (currentTservers.isEmpty()) { + return false; + } - // ignore entries for tables that do not exist in zookeeper - if (manager.getTableManager().getTableState(tls.extent.tableId()) == null) { - continue; + try (var iter = store.iterator(ranges, tabletMgmtParams)) { + long t1 = System.currentTimeMillis(); + manageTablets(iter, tabletMgmtParams, currentTservers, false); + long t2 = System.currentTimeMillis(); + Manager.log.debug(String.format("[%s]: partial scan time %.2f seconds for %,d ranges", + store.name(), (t2 - t1) / 1000., ranges.size())); + } catch (Exception e) { + Manager.log.error("Error processing {} ranges for store {} ", ranges.size(), store.name(), e); + } + + return true; + } + + private final Set<KeyExtent> hostingRequestInProgress = new ConcurrentSkipListSet<>(); + + public void hostOndemand(Collection<KeyExtent> extents) { + // This is only expected to be called for the user level + Preconditions.checkState(getLevel() == Ample.DataLevel.USER); + + final List<KeyExtent> inProgress = new ArrayList<>(); + extents.forEach(ke -> { + if (hostingRequestInProgress.add(ke)) { + LOG.info("Tablet hosting requested for: {} ", ke); + inProgress.add(ke); + } else { + LOG.trace("Ignoring hosting request because another thread is currently processing it {}", + ke); + } + }); + // Do not add any code here, it may interfere with the finally block removing extents from + // hostingRequestInProgress + try (var mutator = manager.getContext().getAmple().conditionallyMutateTablets()) { + inProgress.forEach(ke -> mutator.mutateTablet(ke).requireAbsentOperation() + .requireTabletAvailability(TabletAvailability.ONDEMAND).requireAbsentLocation() + .setHostingRequested() + .submit(TabletMetadata::getHostingRequested, () -> "host ondemand")); + + List<Range> ranges = new ArrayList<>(); + + mutator.process().forEach((extent, result) -> { + if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { + // cache this success for a bit + ranges.add(extent.toMetaRange()); + } else { + if (LOG.isTraceEnabled()) { + // only read the metadata if the logging is enabled + LOG.trace("Failed to set hosting request {}", result.readMetadata()); } + } + }); + + if (!ranges.isEmpty()) { + processRanges(ranges); + } + } finally { + inProgress.forEach(hostingRequestInProgress::remove); + } + } + + private TabletManagementParameters + createTabletManagementParameters(boolean lookForTabletsNeedingVolReplacement) { + + HashMap<Ample.DataLevel,Boolean> parentLevelUpgrade = new HashMap<>(); + UpgradeCoordinator.UpgradeStatus upgradeStatus = manager.getUpgradeStatus(); + for (var level : Ample.DataLevel.values()) { + parentLevelUpgrade.put(level, upgradeStatus.isParentLevelUpgraded(level)); + } + + Set<TServerInstance> shutdownServers; + if (store.getLevel() == Ample.DataLevel.USER) { + shutdownServers = manager.shutdownServers(); + } else { + // Use the servers to shutdown filtered by the dependent watcher. These are servers to + // shutdown that the dependent watcher has determined it has no tablets hosted on or assigned + // to. + shutdownServers = dependentWatcher.getFilteredServersToShutdown(); + } + + var tServersSnapshot = manager.tserversSnapshot(); + + var tabletMgmtParams = new TabletManagementParameters(manager.getManagerState(), + parentLevelUpgrade, manager.onlineTables(), tServersSnapshot, shutdownServers, + store.getLevel(), manager.getCompactionHints(store.getLevel()), canSuspendTablets(), + lookForTabletsNeedingVolReplacement ? manager.getContext().getVolumeReplacements() + : Map.of(), + manager.getSteadyTime()); + + if (LOG.isTraceEnabled()) { + // Log the json that will be passed to iterators to make tablet filtering decisions. + LOG.trace("{}:{}", TabletManagementParameters.class.getSimpleName(), + tabletMgmtParams.serialize()); + } + + return tabletMgmtParams; + } + + private Set<TServerInstance> getFilteredServersToShutdown() { + return filteredServersToShutdown; + } + + private static class TableMgmtStats { + final int[] counts = new int[TabletState.values().length]; + private int totalUnloaded; + private long totalVolumeReplacements; + private int tabletsWithErrors; + } + + private TableMgmtStats manageTablets(Iterator<TabletManagement> iter, + TabletManagementParameters tableMgmtParams, + SortedMap<TServerInstance,TabletServerStatus> currentTServers, boolean isFullScan) + throws TException, DistributedStoreException, WalMarkerException, IOException { + + // When upgrading the Manager needs the TabletGroupWatcher + // to assign and balance the root and metadata tables, but + // the Manager does not fully start up until the upgrade + // is complete. This means that objects like the Splitter + // are not going to be initialized and the Coordinator + // is not going to be started. + final boolean currentlyUpgrading = manager.isUpgrading(); + if (currentlyUpgrading) { + LOG.debug( + "Currently upgrading, splits and compactions for tables in level {} will occur once upgrade is completed.", + store.getLevel()); + } - // Don't overwhelm the tablet servers with work - if (tLists.unassigned.size() + unloaded - > Manager.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) { - flushChanges(tLists, wals); - tLists.reset(); - unloaded = 0; - eventListener.waitForEvents(waitTimeBetweenScans); + final TableMgmtStats tableMgmtStats = new TableMgmtStats(); + final boolean shuttingDownAllTabletServers = + tableMgmtParams.getServersToShutdown().equals(currentTServers.keySet()); + if (shuttingDownAllTabletServers && !isFullScan) { + // If we are shutting down all of the TabletServers, then don't process any events + // from the EventCoordinator. + LOG.debug("Partial scan requested, but aborted due to shutdown of all TabletServers"); + return tableMgmtStats; + } + + int unloaded = 0; + + TabletLists tLists = new TabletLists(currentTServers, tableMgmtParams.getGroupedTServers(), + tableMgmtParams.getServersToShutdown()); + + CompactionJobGenerator compactionGenerator = + new CompactionJobGenerator(new ServiceEnvironmentImpl(manager.getContext()), + tableMgmtParams.getCompactionHints(), tableMgmtParams.getSteadyTime()); + + try { + CheckCompactionConfig.validate(manager.getConfiguration(), Level.TRACE); + this.metrics.clearCompactionServiceConfigurationError(); + } catch (RuntimeException | ReflectiveOperationException e) { + this.metrics.setCompactionServiceConfigurationError(); + LOG.error( + "Error validating compaction configuration, all {} compactions are paused until the configuration is fixed.", + store.getLevel(), e); + compactionGenerator = null; + } + + Set<TServerInstance> filteredServersToShutdown = + new HashSet<>(tableMgmtParams.getServersToShutdown()); + + while (iter.hasNext() && !manager.isShutdownRequested()) { + final TabletManagement mti = iter.next(); + if (mti == null) { + throw new IllegalStateException("State store returned a null ManagerTabletInfo object"); + } + + final String mtiError = mti.getErrorMessage(); + if (mtiError != null) { + LOG.warn( + "Error on TabletServer trying to get Tablet management information for metadata tablet. Error message: {}", + mtiError); + this.metrics.incrementTabletGroupWatcherError(this.store.getLevel()); + tableMgmtStats.tabletsWithErrors++; + continue; + } + ++ RecoveryManager.RecoverySession recoverySession = ++ manager.recoveryManager.newRecoverySession(); ++ + final TabletMetadata tm = mti.getTabletMetadata(); + final TableId tableId = tm.getTableId(); + // ignore entries for tables that do not exist in zookeeper + if (manager.getTableManager().getTableState(tableId) == TableState.UNKNOWN) { + continue; + } + + // Don't overwhelm the tablet servers with work + if (tLists.unassigned.size() + unloaded + > Manager.MAX_TSERVER_WORK_CHUNK * currentTServers.size() + || tLists.volumeReplacements.size() > 1000) { + flushChanges(tLists); + tLists.reset(); + unloaded = 0; + } + + final TableConfiguration tableConf = manager.getContext().getTableConfiguration(tableId); + + TabletState state = TabletState.compute(tm, currentTServers.keySet()); + if (state == TabletState.ASSIGNED_TO_DEAD_SERVER) { + /* + * This code exists to deal with a race condition caused by two threads running in this + * class that compute tablets actions. One thread does full scans and the other reacts to + * events and does partial scans. Below is an example of the race condition this is + * handling. + * + * - TGW Thread 1 : reads the set of tablets servers and its empty + * + * - TGW Thread 2 : reads the set of tablet servers and its [TS1] + * + * - TGW Thread 2 : Sees tabletX without a location and assigns it to TS1 + * + * - TGW Thread 1 : Sees tabletX assigned to TS1 and assumes it's assigned to a dead tablet + * server because its set of live servers is the empty set. + * + * To deal with this race condition, this code recomputes the tablet state using the latest + * tservers when a tablet is seen assigned to a dead tserver. + */ + + TabletState newState = TabletState.compute(tm, manager.tserversSnapshot().getTservers()); + if (newState != state) { + LOG.debug("Tablet state changed when using latest set of tservers {} {} {}", + tm.getExtent(), state, newState); + state = newState; + } + } + tableMgmtStats.counts[state.ordinal()]++; + + // This is final because nothing in this method should change the goal. All computation of the + // goal should be done in TabletGoalState.compute() so that all parts of the Accumulo code + // will compute a consistent goal. + final TabletGoalState goal = TabletGoalState.compute(tm, state, + manager.getBalanceManager().getBalancer(), tableMgmtParams); + + final Set<ManagementAction> actions = mti.getActions(); + + if (actions.contains(ManagementAction.NEEDS_RECOVERY) && goal != TabletGoalState.HOSTED) { + LOG.warn("Tablet has wals, but goal is not hosted. Tablet: {}, goal:{}", tm.getExtent(), + goal); + } + + if (actions.contains(ManagementAction.NEEDS_VOLUME_REPLACEMENT)) { + tableMgmtStats.totalVolumeReplacements++; + if (state == TabletState.UNASSIGNED || state == TabletState.SUSPENDED) { + var volRep = + VolumeUtil.computeVolumeReplacements(tableMgmtParams.getVolumeReplacements(), tm); + if (volRep.logsToRemove.size() + volRep.filesToRemove.size() > 0) { + if (tm.getLocation() != null) { + // since the totalVolumeReplacements counter was incremented, should try this again + // later after its unassigned + LOG.debug("Volume replacement needed for {} but it has a location {}.", + tm.getExtent(), tm.getLocation()); + } else if (tm.getOperationId() != null) { + LOG.debug("Volume replacement needed for {} but it has an active operation {}.", + tm.getExtent(), tm.getOperationId()); + } else { + LOG.debug("Volume replacement needed for {}.", tm.getExtent()); + // buffer replacements so that multiple mutations can be done at once + tLists.volumeReplacements.add(volRep); + } + } else { + LOG.debug("Volume replacement evaluation for {} returned no changes.", tm.getExtent()); } - TableId tableId = tls.extent.tableId(); - TableConfiguration tableConf = manager.getContext().getTableConfiguration(tableId); - - MergeStats mergeStats = mergeStatsCache.computeIfAbsent(tableId, k -> { - var mStats = currentMerges.get(k); - return mStats != null ? mStats : new MergeStats(new MergeInfo()); - }); - TabletGoalState goal = manager.getGoalState(tls, mergeStats.getMergeInfo()); - Location location = tls.getLocation(); - TabletState state = tls.getState(currentTServers.keySet()); - - TabletLogger.missassigned(tls.extent, goal.toString(), state.toString(), - tls.getFutureServer(), tls.getCurrentServer(), tls.walogs.size()); - - stats.update(tableId, state); - mergeStats.update(tls.extent, state, tls.chopped, !tls.walogs.isEmpty()); - sendChopRequest(mergeStats.getMergeInfo(), state, tls); - sendSplitRequest(mergeStats.getMergeInfo(), state, tls); - - // Always follow through with assignments - if (state == TabletState.ASSIGNED) { - goal = TabletGoalState.HOSTED; + } else { + LOG.debug("Volume replacement needed for {} but its tablet state is {}.", tm.getExtent(), + state); + } + } + + if (actions.contains(ManagementAction.BAD_STATE) && tm.isFutureAndCurrentLocationSet()) { + Manager.log.error("{}, saw tablet with multiple locations, which should not happen", + tm.getExtent()); + logIncorrectTabletLocations(tm); + // take no further action for this tablet + continue; + } + + final Location location = tm.getLocation(); + Location current = null; + Location future = null; + if (tm.hasCurrent()) { + current = tm.getLocation(); + } else { + future = tm.getLocation(); + } + TabletLogger.missassigned(tm.getExtent(), goal.toString(), state.toString(), + future != null ? future.getServerInstance() : null, + current != null ? current.getServerInstance() : null, tm.getLogs().size()); + + if (isFullScan) { + stats.update(tableId, state); + } + + if (Manager.log.isTraceEnabled()) { + Manager.log.trace( + "[{}] Shutting down all Tservers: {}, dependentCount: {} Extent: {}, state: {}, goal: {} actions:{} #wals:{}", + store.name(), tableMgmtParams.getServersToShutdown().equals(currentTServers.keySet()), + dependentWatcher == null ? "null" : dependentWatcher.assignedOrHosted(), tm.getExtent(), + state, goal, actions, tm.getLogs().size()); + } + + final boolean needsSplit = actions.contains(ManagementAction.NEEDS_SPLITTING); + if (!currentlyUpgrading && needsSplit) { + LOG.debug("{} may need splitting.", tm.getExtent()); + manager.getSplitter().initiateSplit(tm.getExtent()); + } + + if (!currentlyUpgrading && actions.contains(ManagementAction.NEEDS_COMPACTING) + && compactionGenerator != null) { + // Check if tablet needs splitting, priority should be giving to splits over + // compactions because it's best to compact after a split + if (!needsSplit) { + var jobs = compactionGenerator.generateJobs(tm, + TabletManagementIterator.determineCompactionKinds(actions)); + LOG.debug("{} may need compacting adding {} jobs", tm.getExtent(), jobs.size()); + manager.getCompactionCoordinator().addJobs(tm, jobs); + } else { + LOG.trace("skipping compaction job generation because {} may need splitting.", + tm.getExtent()); + } + } + + if (actions.contains(ManagementAction.NEEDS_LOCATION_UPDATE) + || actions.contains(ManagementAction.NEEDS_RECOVERY)) { + + if (tm.getLocation() != null) { + filteredServersToShutdown.remove(tm.getLocation().getServerInstance()); + } + + if (goal == TabletGoalState.HOSTED) { + + // RecoveryManager.recoverLogs will return false when all of the logs + // have been sorted so that recovery can occur. Delay the hosting of + // the Tablet until the sorting is finished. + if ((state != TabletState.HOSTED && actions.contains(ManagementAction.NEEDS_RECOVERY)) - && manager.recoveryManager.recoverLogs(tm.getExtent(), tm.getLogs())) { ++ && recoverySession.recoverLogs(tm.getLogs())) { + LOG.debug("Not hosting {} as it needs recovery, logs: {}", tm.getExtent(), + tm.getLogs().size()); + continue; } - if (Manager.log.isTraceEnabled()) { - Manager.log.trace( - "[{}] Shutting down all Tservers: {}, dependentCount: {} Extent: {}, state: {}, goal: {}", - store.name(), manager.serversToShutdown.equals(currentTServers.keySet()), - dependentWatcher == null ? "null" : dependentWatcher.assignedOrHosted(), tls.extent, - state, goal); + switch (state) { + case ASSIGNED_TO_DEAD_SERVER: + hostDeadTablet(tLists, tm, location); + break; + case SUSPENDED: + hostSuspendedTablet(tLists, tm, location, tableConf); + break; + case UNASSIGNED: + hostUnassignedTablet(tLists, tm.getExtent(), + new UnassignedTablet(location, tm.getLast())); + break; + case ASSIGNED: + // Send another reminder + tLists.assigned.add(new Assignment(tm.getExtent(), + future != null ? future.getServerInstance() : null, tm.getLast())); + break; + case HOSTED: + break; } - - // if we are shutting down all the tabletservers, we have to do it in order - if ((goal == TabletGoalState.SUSPENDED && state == TabletState.HOSTED) - && manager.serversToShutdown.equals(currentTServers.keySet())) { - if (dependentWatcher != null) { - // If the dependentWatcher is for the user tables, check to see - // that user tables exist. - DataLevel dependentLevel = dependentWatcher.store.getLevel(); - boolean userTablesExist = true; - switch (dependentLevel) { - case USER: - Set<TableId> onlineTables = manager.onlineTables(); - onlineTables.remove(RootTable.ID); - onlineTables.remove(MetadataTable.ID); - userTablesExist = !onlineTables.isEmpty(); - break; - case METADATA: - case ROOT: - default: - break; - } - // If the stats object in the dependentWatcher is empty, then it - // currently does not have data about what is hosted or not. In - // that case host these tablets until the dependent watcher can - // gather some data. - final Map<TableId,TableCounts> stats = dependentWatcher.getStats(); - if (dependentLevel == DataLevel.USER) { - if (userTablesExist - && (stats == null || stats.isEmpty() || assignedOrHosted(stats) > 0)) { - goal = TabletGoalState.HOSTED; - } - } else if (stats == null || stats.isEmpty() || assignedOrHosted(stats) > 0) { - goal = TabletGoalState.HOSTED; + } else { + switch (state) { + case SUSPENDED: + // Request a move to UNASSIGNED, so as to allow balancing to continue. + tLists.suspendedToGoneServers.add(tm); + break; + case ASSIGNED_TO_DEAD_SERVER: + unassignDeadTablet(tLists, tm); + break; + case HOSTED: + TServerConnection client = + manager.tserverSet.getConnection(location.getServerInstance()); + if (client != null) { + TABLET_UNLOAD_LOGGER.trace("[{}] Requesting TabletServer {} unload {} {}", + store.name(), location.getServerInstance(), tm.getExtent(), goal.howUnload()); + client.unloadTablet(manager.managerLock, tm.getExtent(), goal.howUnload(), + manager.getSteadyTime().getMillis()); + tableMgmtStats.totalUnloaded++; + unloaded++; + } else { + Manager.log.warn("Could not connect to server {}", location); } - } + break; + case ASSIGNED: + case UNASSIGNED: + break; } + } + } + } - if (goal == TabletGoalState.HOSTED) { - if ((state != TabletState.HOSTED && !tls.walogs.isEmpty()) - && recoverySession.recoverLogs(tls.walogs)) { - continue; - } - switch (state) { - case HOSTED: - if (location.getServerInstance().equals(manager.migrations.get(tls.extent))) { - manager.migrations.removeExtent(tls.extent); - } - break; - case ASSIGNED_TO_DEAD_SERVER: - hostDeadTablet(tLists, tls, location, wals); - break; - case SUSPENDED: - hostSuspendedTablet(tLists, tls, location, tableConf); - break; - case UNASSIGNED: - hostUnassignedTablet(tLists, tls.extent, new UnassignedTablet(location, tls.last)); - break; - case ASSIGNED: - // Send another reminder - tLists.assigned.add(new Assignment(tls.extent, tls.getFutureServer(), tls.last)); - break; - } - } else { - switch (state) { - case SUSPENDED: - // Request a move to UNASSIGNED, so as to allow balancing to continue. - tLists.suspendedToGoneServers.add(tls); - cancelOfflineTableMigrations(tls.extent); - break; - case UNASSIGNED: - cancelOfflineTableMigrations(tls.extent); - break; - case ASSIGNED_TO_DEAD_SERVER: - unassignDeadTablet(tLists, tls, wals); - break; - case HOSTED: - TServerConnection client = - manager.tserverSet.getConnection(location.getServerInstance()); - if (client != null) { - try { - TABLET_UNLOAD_LOGGER.trace("[{}] Requesting TabletServer {} unload {} {}", - store.name(), location.getServerInstance(), tls.extent, goal.howUnload()); - client.unloadTablet(manager.managerLock, tls.extent, goal.howUnload(), - manager.getSteadyTime()); - unloaded++; - totalUnloaded++; - } catch (TException tException) { - Manager.log.warn("[{}] Failed to request tablet unload {} {} {}", store.name(), - location.getServerInstance(), tls.extent, goal.howUnload(), tException); - } - } else { - Manager.log.warn("Could not connect to server {}", location); - } - break; - case ASSIGNED: - break; - } + flushChanges(tLists); + + if (isFullScan) { + this.filteredServersToShutdown = Set.copyOf(filteredServersToShutdown); + } + + return tableMgmtStats; + } + + private SortedMap<TServerInstance,TabletServerStatus> + getCurrentTservers(Set<TServerInstance> onlineTservers) { + // Get the current status for the current list of tservers + final SortedMap<TServerInstance,TabletServerStatus> currentTServers = new TreeMap<>(); + for (TServerInstance entry : onlineTservers) { + currentTServers.put(entry, manager.getTserverStatus().status.get(entry)); + } + return currentTServers; + } + + @Override + public void run() { + int[] oldCounts = new int[TabletState.values().length]; + boolean lookForTabletsNeedingVolReplacement = true; + + while (manager.stillManager() && !manager.isShutdownRequested()) { + if (!eventHandler.isNeedsFullScan()) { + // If an event handled by the EventHandler.RangeProcessor indicated + // that we need to do a full scan, then do it. Otherwise wait a bit + // before re-checking the tablets. + sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } + + final long waitTimeBetweenScans = manager.getConfiguration() + .getTimeInMillis(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL); + + TabletManagementParameters tableMgmtParams = + createTabletManagementParameters(lookForTabletsNeedingVolReplacement); + var currentTServers = getCurrentTservers(tableMgmtParams.getOnlineTsevers()); + + ClosableIterator<TabletManagement> iter = null; + try { + if (currentTServers.isEmpty()) { + eventHandler.waitForFullScan(waitTimeBetweenScans); + synchronized (this) { + lastScanServers = Collections.emptySortedSet(); } - counts[state.ordinal()]++; + continue; } - flushChanges(tLists, wals); + stats.begin(); + + ManagerState managerState = tableMgmtParams.getManagerState(); + + // Clear the need for a full scan before starting a full scan inorder to detect events that + // happen during the full scan. + eventHandler.clearNeedsFullScan(); + + iter = store.iterator(tableMgmtParams); + manager.getCompactionCoordinator().getJobQueues().beginFullScan(store.getLevel()); + var tabletMgmtStats = manageTablets(iter, tableMgmtParams, currentTServers, true); + manager.getCompactionCoordinator().getJobQueues().endFullScan(store.getLevel()); + + // If currently looking for volume replacements, determine if the next round needs to look. + if (lookForTabletsNeedingVolReplacement) { + // Continue to look for tablets needing volume replacement if there was an error + // processing tablets in the call to manageTablets() or if we are still performing volume + // replacement. We only want to stop looking for tablets that need volume replacement when + // we have successfully processed all tablet metadata and no more volume replacements are + // being performed. + Manager.log.debug("[{}] saw {} tablets needing volume replacement", store.name(), + tabletMgmtStats.totalVolumeReplacements); + lookForTabletsNeedingVolReplacement = tabletMgmtStats.totalVolumeReplacements != 0 + || tabletMgmtStats.tabletsWithErrors != 0; + if (!lookForTabletsNeedingVolReplacement) { + Manager.log.debug("[{}] no longer looking for volume replacements", store.name()); + } + } // provide stats after flushing changes to avoid race conditions w/ delete table stats.end(managerState); diff --cc server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java index ecba0b20fe,666970cc09..8e1e00988f --- a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java @@@ -36,11 -36,10 +36,10 @@@ import java.util.concurrent.TimeUnit import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; - import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; +import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.accumulo.core.util.cache.Caches.CacheName; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.manager.Manager; -import org.apache.accumulo.server.fs.VolumeManager.FileType; import org.apache.accumulo.server.fs.VolumeUtil; import org.apache.accumulo.server.log.SortedLogState; import org.apache.accumulo.server.manager.recovery.HadoopLogCloser; @@@ -153,71 -149,98 +152,94 @@@ public class RecoveryManager } } - public boolean recoverLogs(KeyExtent extent, Collection<LogEntry> walogs) throws IOException { - boolean recoveryNeeded = false; + // caches per log recovery decisions for its lifetime + public class RecoverySession { + - private HashMap<String,Boolean> needsRecovery = new HashMap<>(); ++ private HashMap<LogEntry,Boolean> needsRecovery = new HashMap<>(); - for (LogEntry walog : walogs) { - public boolean recoverLogs(Collection<Collection<String>> walogs) throws IOException { ++ public boolean recoverLogs(Collection<LogEntry> walogs) throws IOException { + boolean recoveryNeeded = false; - LogEntry switchedWalog = - VolumeUtil.switchVolume(walog, manager.getContext().getVolumeReplacements()); - if (switchedWalog != null) { - // replaces the volume used for sorting, but do not change entry in metadata table. When - // the tablet loads it will change the metadata table entry. If - // the tablet has the same replacement config, then it will find the sorted log. - log.info("Volume replaced {} -> {}", walog, switchedWalog); - walog = switchedWalog; - for (Collection<String> logs : walogs) { - for (String walog : logs) { - var logNeedsRecovery = needsRecovery.get(walog); - if (logNeedsRecovery == null) { - logNeedsRecovery = recoverLog(walog); - needsRecovery.put(walog, logNeedsRecovery); - } - recoveryNeeded |= logNeedsRecovery; ++ for (LogEntry walog : walogs) { ++ var logNeedsRecovery = needsRecovery.get(walog); ++ if (logNeedsRecovery == null) { ++ logNeedsRecovery = recoverLog(walog); ++ needsRecovery.put(walog, logNeedsRecovery); + } ++ recoveryNeeded |= logNeedsRecovery; } - + return recoveryNeeded; + } ++ + } + + public RecoverySession newRecoverySession() { + return new RecoverySession(); + } + - private boolean recoverLog(String walog) throws IOException { ++ private boolean recoverLog(LogEntry walog) throws IOException { + boolean recoveryNeeded = false; - String sortId = walog.getUniqueID().toString(); - String filename = walog.getPath(); - String dest = RecoveryPath.getRecoveryPath(new Path(filename)).toString(); - Path switchedWalog = - VolumeUtil.switchVolume(walog, FileType.WAL, manager.getContext().getVolumeReplacements()); ++ LogEntry switchedWalog = ++ VolumeUtil.switchVolume(walog, manager.getContext().getVolumeReplacements()); + if (switchedWalog != null) { + // replaces the volume used for sorting, but do not change entry in metadata table. When + // the tablet loads it will change the metadata table entry. If + // the tablet has the same replacement config, then it will find the sorted log. + log.info("Volume replaced {} -> {}", walog, switchedWalog); - walog = switchedWalog.toString(); ++ walog = switchedWalog; + } + - String[] parts = walog.split("/"); - String sortId = parts[parts.length - 1]; - String filename = new Path(walog).toString(); ++ String sortId = walog.getUniqueID().toString(); ++ String filename = walog.getPath(); + String dest = RecoveryPath.getRecoveryPath(new Path(filename)).toString(); + + boolean sortQueued; + synchronized (this) { + sortQueued = sortsQueued.contains(sortId); + } - boolean sortQueued; + if (sortQueued - && zooCache.get(manager.getZooKeeperRoot() + Constants.ZRECOVERY + "/" + sortId) == null) { ++ && this.manager.getContext().getZooCache().get(Constants.ZRECOVERY + "/" + sortId) ++ == null) { synchronized (this) { - sortQueued = sortsQueued.contains(sortId); + sortsQueued.remove(sortId); } + } - if (sortQueued - && this.manager.getContext().getZooCache().get(Constants.ZRECOVERY + "/" + sortId) - == null) { - synchronized (this) { - sortsQueued.remove(sortId); - } + if (exists(SortedLogState.getFinishedMarkerPath(dest))) { + synchronized (this) { + closeTasksQueued.remove(sortId); + recoveryDelay.remove(sortId); + sortsQueued.remove(sortId); } + return false; ++ // was continue; + } - if (exists(SortedLogState.getFinishedMarkerPath(dest))) { - synchronized (this) { - closeTasksQueued.remove(sortId); - recoveryDelay.remove(sortId); - sortsQueued.remove(sortId); + recoveryNeeded = true; + synchronized (this) { + if (!closeTasksQueued.contains(sortId) && !sortsQueued.contains(sortId)) { + AccumuloConfiguration aconf = manager.getConfiguration(); - @SuppressWarnings("deprecation") + LogCloser closer = Property.createInstanceFromPropertyName(aconf, - aconf.resolve(Property.MANAGER_WAL_CLOSER_IMPLEMENTATION, - Property.MANAGER_WALOG_CLOSER_IMPLEMETATION), - LogCloser.class, new HadoopLogCloser()); ++ Property.MANAGER_WAL_CLOSER_IMPLEMENTATION, LogCloser.class, new HadoopLogCloser()); + Long delay = recoveryDelay.get(sortId); + if (delay == null) { + delay = aconf.getTimeInMillis(Property.MANAGER_RECOVERY_DELAY); + } else { + delay = Math.min(2 * delay, 1000 * 60 * 5L); } - continue; - } - - recoveryNeeded = true; - synchronized (this) { - if (!closeTasksQueued.contains(sortId) && !sortsQueued.contains(sortId)) { - AccumuloConfiguration aconf = manager.getConfiguration(); - LogCloser closer = Property.createInstanceFromPropertyName(aconf, - Property.MANAGER_WAL_CLOSER_IMPLEMENTATION, LogCloser.class, new HadoopLogCloser()); - Long delay = recoveryDelay.get(sortId); - if (delay == null) { - delay = aconf.getTimeInMillis(Property.MANAGER_RECOVERY_DELAY); - } else { - delay = Math.min(2 * delay, 1000 * 60 * 5L); - } - log.info("Starting recovery of {} (in : {}s), tablet {} holds a reference", filename, - (delay / 1000), extent); + log.info("Starting recovery of {} (in : {}s)", filename, (delay / 1000)); - ScheduledFuture<?> future = executor.schedule( - new LogSortTask(closer, filename, dest, sortId), delay, TimeUnit.MILLISECONDS); - ThreadPools.watchNonCriticalScheduledTask(future); - closeTasksQueued.add(sortId); - recoveryDelay.put(sortId, delay); - } + ScheduledFuture<?> future = executor.schedule( + new LogSortTask(closer, filename, dest, sortId), delay, TimeUnit.MILLISECONDS); + ThreadPools.watchNonCriticalScheduledTask(future); + closeTasksQueued.add(sortId); + recoveryDelay.put(sortId, delay); } } + return recoveryNeeded; } } diff --cc shell/src/main/java/org/apache/accumulo/shell/commands/ActiveCompactionHelper.java index a83d617baa,fa93483d24..76c1da76aa --- a/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveCompactionHelper.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/ActiveCompactionHelper.java @@@ -35,13 -31,11 +35,16 @@@ import org.apache.accumulo.core.client. import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.ActiveCompaction; import org.apache.accumulo.core.client.admin.InstanceOperations; +import org.apache.accumulo.core.client.admin.servers.ServerId; +import org.apache.accumulo.core.data.ResourceGroupId; + import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.util.DurationFormat; + import org.apache.accumulo.core.util.TextUtil; -import org.apache.accumulo.shell.Shell; + import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.net.HostAndPort; class ActiveCompactionHelper { @@@ -97,20 -97,70 +100,70 @@@ try { var dur = new DurationFormat(ac.getAge(), ""); return String.format( - "%21s | %9s | %5s | %6s | %5s | %5s | %15s | %-40s | %5s | %35s | %9s | %s", host, dur, - ac.getType(), ac.getReason(), shortenCount(ac.getEntriesRead()), - shortenCount(ac.getEntriesWritten()), ac.getTable(), formatTablet(ac.getTablet()), - ac.getInputFiles().size(), output, iterList, iterOpts); + "%21s | %21s | %9s | %5s | %6s | %5s | %5s | %15s | %-40s | %5s | %35s | %9s | %s", + ac.getServerId().getResourceGroup(), host, dur, ac.getType(), ac.getReason(), + shortenCount(ac.getEntriesRead()), shortenCount(ac.getEntriesWritten()), ac.getTable(), - ac.getTablet(), ac.getInputFiles().size(), output, iterList, iterOpts); ++ formatTablet(ac.getTablet()), ac.getInputFiles().size(), output, iterList, iterOpts); } catch (TableNotFoundException e) { return "ERROR " + e.getMessage(); } } + private static String formatTablet(TabletId tabletId) { + if (tabletId == null) { + return ""; + } + StringBuilder sb = new StringBuilder(); + appendEscapedTableId(sb, tabletId.getTable().canonical()); + appendTabletRow(sb, tabletId.getEndRow()); + appendTabletRow(sb, tabletId.getPrevEndRow()); + return sb.toString(); + } + + private static void appendEscapedTableId(StringBuilder sb, String tableId) { + for (int i = 0; i < tableId.length(); i++) { + char c = tableId.charAt(i); + if (c == '\\') { + sb.append("\\\\"); + } else if (c == ';') { + sb.append("\\;"); + } else { + sb.append(c); + } + } + } + + private static void appendTabletRow(StringBuilder sb, Text row) { + if (row == null) { + sb.append("<"); + return; + } + sb.append(';'); + Text truncated = TextUtil.truncate(row); + byte[] bytes = TextUtil.getBytes(truncated); + appendEscapedBytes(sb, bytes); + } + + private static void appendEscapedBytes(StringBuilder sb, byte[] bytes) { + for (byte b : bytes) { + int c = b & 0xFF; + if (c == '\\') { + sb.append("\\\\"); + } else if (c == ';') { + sb.append("\\;"); + } else if (c >= 32 && c <= 126) { + sb.append((char) c); + } else { + sb.append("\\x").append(String.format("%02X", c)); + } + } + } + public static Stream<String> appendHeader(Stream<String> stream) { Stream<String> header = Stream.of(String.format( - " %-21s| %-9s | %-5s | %-6s | %-5s | %-5s | %-15s | %-40s | %-5s | %-35s | %-9s | %s", - "SERVER", "AGE", "TYPE", "REASON", "READ", "WROTE", "TABLE", "TABLET", "INPUT", "OUTPUT", - "ITERATORS", "ITERATOR OPTIONS")); + " %-21s| %-21s| %-9s | %-5s | %-6s | %-5s | %-5s | %-15s | %-40s | %-5s | %-35s | %-9s | %s", + "GROUP", "SERVER", "AGE", "TYPE", "REASON", "READ", "WROTE", "TABLE", "TABLET", "INPUT", + "OUTPUT", "ITERATORS", "ITERATOR OPTIONS")); return Stream.concat(header, stream); } diff --cc test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java index 7f1b7c1b51,00e7cfdc03..262c7856f6 --- a/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java @@@ -1514,23 -1459,15 +1515,26 @@@ public class ShellServerIT extends Shar ts.exec("insert d cf cq value", true); ts.exec("flush -t " + table, true); ts.exec("sleep 0.2", true); - ts.exec("listcompactions", true, "default_tablet"); + verifyListCompactions("listcompactions", "default_tablet"); + // basic regex filtering test, more tests are in ListCompactionsCommandTest + verifyListCompactions("listcompactions -s .*:[0-9]*", "default_tablet"); + verifyListCompactions("listcompactions -rg def.*", "default_tablet"); + verifyListCompactions("listcompactions -s .*:[0-9]* -rg def.*", "default_tablet"); + // non matching + assertFalse(ts.exec("listcompactions -s bad.*", true).contains("default_tablet")); + assertFalse(ts.exec("listcompactions -rg bad.*", true).contains("default_tablet")); + ts.exec("deletetable -f " + table, true); + } + + private void verifyListCompactions(String cmd, String expected) throws IOException { + ts.exec(cmd, true, expected); String[] lines = ts.output.get().split("\n"); - String last = lines[lines.length - 1]; - String[] parts = last.split("\\|"); + String compaction = Arrays.stream(lines).filter(line -> line.contains("default_tablet")) + .findFirst().orElseThrow(); + assertTrue(compaction.contains("\\x00"), + "Expected tablet to display \\x00 for null byte: " + compaction); + String[] parts = compaction.split("\\|"); - assertEquals(12, parts.length); - ts.exec("deletetable -f " + table, true); + assertEquals(13, parts.length); } @Test
