This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 5c454e12a1 Refactors tablet mgmt iterator to share decision code w/ TGW (#3904) 5c454e12a1 is described below commit 5c454e12a144e3543175d2cbc0b54bc0900749b1 Author: Keith Turner <ktur...@apache.org> AuthorDate: Tue Oct 31 11:00:52 2023 -0400 Refactors tablet mgmt iterator to share decision code w/ TGW (#3904) The TabletGroupWatcher uses the TabletManagementIterator to filter tablets that need attention. The TabletMgmtIter had its own custom code to make decision about tablets. This commit modifies the TabletMgmtIterator to use the same code as the TGW when making decisions about tablets. This makes it easier to reason about and change the behavior of the TGW. In making the TGW and TabletMgmtIter share code a change was also made to how they get information to make decisions. A new class called TabletManagementParameters was introduced that contains an immutable snapshot of the information that both classes need to make decisions. With this change the iterator and TGW are using the same information for a pass over the metadata table, in the past the information was obtained at different times and could have been different due to race conditions. These race conditions were probably not harmful, but removing them makes the code easier to reason about. Also move some code outside of TGW away from using the TableMgmtIterator. Need to move all code outside of TGW away from using this iterator in order to make the code easier to maintain. Left TODOs in the code about this and will open follow on issues. This commit also changes how shutting down servers are handled. Each TGW gets its servers to shutdown from its dependent TGW. If the TGW has no dependent it gets it from the manager. This allows the set of servers to shutdown to percolate through the TGWs, each filtering out any active servers it sees at its level. So for example the USER TGW will only make a server to shutdown available to the METADATA TGW when it has not seen it in a full scan. --- .../core/manager/balancer/TabletServerIdImpl.java | 4 + .../apache/accumulo/core/metadata/TabletState.java | 49 +--- .../server/manager/state/CurrentState.java | 53 ----- .../manager/state/LoggingTabletStateStore.java | 5 +- .../server/manager/state/MetaDataStateStore.java | 17 +- .../server/manager/state/RootTabletStateStore.java | 12 +- .../server/manager/state/TabletGoalState.java | 184 ++++++++++++++ .../manager/state/TabletManagementIterator.java | 263 +++------------------ .../manager/state/TabletManagementParameters.java | 252 ++++++++++++++++++++ .../manager/state/TabletManagementScanner.java | 11 +- .../server/manager/state/TabletStateStore.java | 19 +- .../server/manager/state/ZooTabletStateStore.java | 6 +- .../accumulo/server/util/FindOfflineTablets.java | 37 +-- .../server/util/ListOnlineOnDemandTablets.java | 25 +- .../accumulo/gc/GarbageCollectWriteAheadLogs.java | 29 ++- .../gc/GarbageCollectWriteAheadLogsTest.java | 20 +- .../java/org/apache/accumulo/manager/Manager.java | 138 ++--------- .../accumulo/manager/TabletGroupWatcher.java | 205 ++++++++-------- .../accumulo/manager/tableOps/delete/CleanUp.java | 37 ++- .../manager/tableOps/merge/DeleteRows.java | 1 - .../manager/upgrade/UpgradeCoordinator.java | 18 +- server/monitor/pom.xml | 4 - .../monitor/rest/tables/TablesResource.java | 34 +-- .../java/org/apache/accumulo/test/LocatorIT.java | 27 +-- .../test/ManagerRepairsDualAssignmentIT.java | 50 ++-- .../test/functional/AssignLocationModeIT.java | 11 +- .../test/functional/CompactLocationModeIT.java | 14 +- .../test/functional/ManagerAssignmentIT.java | 47 ++-- .../functional/TabletManagementIteratorIT.java | 117 +++------ .../accumulo/test/manager/SuspendedTabletsIT.java | 24 +- .../accumulo/test/performance/NullTserver.java | 7 +- 31 files changed, 812 insertions(+), 908 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/manager/balancer/TabletServerIdImpl.java b/core/src/main/java/org/apache/accumulo/core/manager/balancer/TabletServerIdImpl.java index 2fefb466d3..013515a253 100644 --- a/core/src/main/java/org/apache/accumulo/core/manager/balancer/TabletServerIdImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/manager/balancer/TabletServerIdImpl.java @@ -44,6 +44,10 @@ public class TabletServerIdImpl implements TabletServerId { this.tServerInstance = requireNonNull(tServerInstance); } + public TServerInstance getTServerInstance() { + return tServerInstance; + } + @Override public String getHost() { return tServerInstance.getHostAndPort().getHost(); diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/TabletState.java b/core/src/main/java/org/apache/accumulo/core/metadata/TabletState.java index 4be39cf906..9fcf8add3f 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/TabletState.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/TabletState.java @@ -18,29 +18,18 @@ */ package org.apache.accumulo.core.metadata; -import java.util.Map; import java.util.Set; -import org.apache.accumulo.core.data.TabletId; -import org.apache.accumulo.core.dataImpl.TabletIdImpl; -import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.apache.accumulo.core.spi.balancer.TabletBalancer; -import org.apache.accumulo.core.spi.balancer.data.TabletServerId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public enum TabletState { - UNASSIGNED, ASSIGNED, HOSTED, ASSIGNED_TO_DEAD_SERVER, SUSPENDED, NEEDS_REASSIGNMENT; + UNASSIGNED, ASSIGNED, HOSTED, ASSIGNED_TO_DEAD_SERVER, SUSPENDED; private static Logger log = LoggerFactory.getLogger(TabletState.class); public static TabletState compute(TabletMetadata tm, Set<TServerInstance> liveTServers) { - return compute(tm, liveTServers, null, null); - } - - public static TabletState compute(TabletMetadata tm, Set<TServerInstance> liveTServers, - TabletBalancer balancer, Map<TabletServerId,String> tserverGroups) { TabletMetadata.Location current = null; TabletMetadata.Location future = null; if (tm.hasCurrent()) { @@ -53,42 +42,6 @@ public enum TabletState { : TabletState.ASSIGNED_TO_DEAD_SERVER; } else if (current != null) { if (liveTServers.contains(current.getServerInstance())) { - if (balancer != null) { - var tsii = new TabletServerIdImpl(current.getServerInstance()); - var resourceGroup = tserverGroups.get(tsii); - - if (resourceGroup != null) { - var reassign = balancer.needsReassignment(new TabletBalancer.CurrentAssignment() { - @Override - public TabletId getTablet() { - return new TabletIdImpl(tm.getExtent()); - } - - @Override - public TabletServerId getTabletServer() { - return tsii; - } - - @Override - public String getResourceGroup() { - return resourceGroup; - } - }); - - if (reassign) { - return TabletState.NEEDS_REASSIGNMENT; - } - } else { - // A tablet server should always have a resource group, however there is a race - // conditions where the resource group map was read before a tablet server came into - // existence. Another possible cause for an absent resource group is a bug in accumulo. - // In either case do not call the balancer for now with the assumption that the resource - // group will be available later. Log a message in case it is a bug. - log.trace( - "Could not find resource group for tserver {}, so did not consult balancer. Assuming this is a temporary race condition.", - current.getServerInstance()); - } - } return TabletState.HOSTED; } else { return TabletState.ASSIGNED_TO_DEAD_SERVER; diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java deleted file mode 100644 index 1b72fa2a55..0000000000 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.server.manager.state; - -import java.util.Map; -import java.util.Set; - -import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.manager.thrift.ManagerState; -import org.apache.accumulo.core.metadata.TServerInstance; -import org.apache.accumulo.server.manager.LiveTServerSet.LiveTServersSnapshot; - -public interface CurrentState { - - Set<TableId> onlineTables(); - - Set<TServerInstance> onlineTabletServers(); - - LiveTServersSnapshot tserversSnapshot(); - - Set<TServerInstance> shutdownServers(); - - /** - * Provide an immutable snapshot view of migrating tablets. Objects contained in the set may still - * be mutable. - */ - Set<KeyExtent> migrationsSnapshot(); - - ManagerState getManagerState(); - - // ELASTICITIY_TODO it would be nice if this method could take DataLevel as an argument and only - // retrieve information about compactions in that data level. Attempted this and a lot of - // refactoring was needed to get that small bit of information to this method. Would be best to - // address this after issue. May be best to attempt this after #3576. - Map<Long,Map<String,String>> getCompactionHints(); -} diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java index ce2b3ede02..8545ecd7c7 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/LoggingTabletStateStore.java @@ -55,8 +55,9 @@ class LoggingTabletStateStore implements TabletStateStore { } @Override - public ClosableIterator<TabletManagement> iterator(List<Range> ranges) { - return wrapped.iterator(ranges); + public ClosableIterator<TabletManagement> iterator(List<Range> ranges, + TabletManagementParameters parameters) { + return wrapped.iterator(ranges, parameters); } @Override diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java index 2a2ee11c09..a182745da5 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java @@ -30,26 +30,25 @@ import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import com.google.common.base.Preconditions; + class MetaDataStateStore extends AbstractTabletStateStore implements TabletStateStore { protected final ClientContext context; - protected final CurrentState state; private final String targetTableName; private final Ample ample; private final DataLevel level; - protected MetaDataStateStore(DataLevel level, ClientContext context, CurrentState state, - String targetTableName) { + protected MetaDataStateStore(DataLevel level, ClientContext context, String targetTableName) { super(context); this.level = level; this.context = context; - this.state = state; this.ample = context.getAmple(); this.targetTableName = targetTableName; } - MetaDataStateStore(DataLevel level, ClientContext context, CurrentState state) { - this(level, context, state, MetadataTable.NAME); + MetaDataStateStore(DataLevel level, ClientContext context) { + this(level, context, MetadataTable.NAME); } @Override @@ -58,8 +57,10 @@ class MetaDataStateStore extends AbstractTabletStateStore implements TabletState } @Override - public ClosableIterator<TabletManagement> iterator(List<Range> ranges) { - return new TabletManagementScanner(context, ranges, state, targetTableName); + public ClosableIterator<TabletManagement> iterator(List<Range> ranges, + TabletManagementParameters parameters) { + Preconditions.checkArgument(parameters.getLevel() == getLevel()); + return new TabletManagementScanner(context, ranges, parameters, targetTableName); } @Override diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java index 4b574ade13..97c9f7ec32 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java @@ -26,15 +26,19 @@ import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; +import com.google.common.base.Preconditions; + class RootTabletStateStore extends MetaDataStateStore { - RootTabletStateStore(DataLevel level, ClientContext context, CurrentState state) { - super(level, context, state, RootTable.NAME); + RootTabletStateStore(DataLevel level, ClientContext context) { + super(level, context, RootTable.NAME); } @Override - public ClosableIterator<TabletManagement> iterator(List<Range> ranges) { - return new TabletManagementScanner(context, ranges, state, RootTable.NAME); + public ClosableIterator<TabletManagement> iterator(List<Range> ranges, + TabletManagementParameters parameters) { + Preconditions.checkArgument(parameters.getLevel() == getLevel()); + return new TabletManagementScanner(context, ranges, parameters, RootTable.NAME); } @Override diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletGoalState.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletGoalState.java new file mode 100644 index 0000000000..4dedbe0bf8 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletGoalState.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.manager.state; + +import org.apache.accumulo.core.data.TabletId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.dataImpl.TabletIdImpl; +import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; +import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.TabletState; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletOperationType; +import org.apache.accumulo.core.spi.balancer.TabletBalancer; +import org.apache.accumulo.core.spi.balancer.data.TabletServerId; +import org.apache.accumulo.core.tablet.thrift.TUnloadTabletGoal; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public enum TabletGoalState { + + HOSTED(TUnloadTabletGoal.UNKNOWN), + UNASSIGNED(TUnloadTabletGoal.UNASSIGNED), + SUSPENDED(TUnloadTabletGoal.SUSPENDED); + + private final TUnloadTabletGoal unloadGoal; + + TabletGoalState(TUnloadTabletGoal unloadGoal) { + this.unloadGoal = unloadGoal; + } + + /** The purpose of unloading this tablet. */ + public TUnloadTabletGoal howUnload() { + return unloadGoal; + } + + private static final Logger log = LoggerFactory.getLogger(TabletGoalState.class); + + public static TabletGoalState compute(TabletMetadata tm, TabletState currentState, + TabletBalancer balancer, TabletManagementParameters params) { + Preconditions.checkArgument(Ample.DataLevel.of(tm.getTableId()) == params.getLevel(), + "Tablet %s not in expected level %s", tm.getExtent(), params.getLevel()); + + // Always follow through with assignments + if (currentState == TabletState.ASSIGNED) { + return HOSTED; + } + + KeyExtent extent = tm.getExtent(); + // Shutting down? + TabletGoalState systemGoalState = getSystemGoalState(tm, params); + + if (systemGoalState == TabletGoalState.HOSTED) { + if (!params.isParentLevelUpgraded()) { + // The place where this tablet stores its metadata was not upgraded, so do not assign this + // tablet yet. + return UNASSIGNED; + } + + // When an operation id is set tablets need to be unassigned unless there are still wals. When + // there are wals the tablet needs to be hosted to recover data in them. However, deleting + // tablets do not need to recover wals. + if (tm.getOperationId() != null && (tm.getLogs().isEmpty() + || tm.getOperationId().getType() == TabletOperationType.DELETING)) { + return TabletGoalState.UNASSIGNED; + } + + if (!params.isTableOnline(tm.getTableId())) { + return UNASSIGNED; + } + + switch (tm.getHostingGoal()) { + case NEVER: + return UNASSIGNED; + case ONDEMAND: + if (!tm.getHostingRequested()) { + return UNASSIGNED; + } + } + + TServerInstance dest = params.getMigrations().get(extent); + if (dest != null && tm.hasCurrent() && !dest.equals(tm.getLocation().getServerInstance())) { + return UNASSIGNED; + } + + if (currentState == TabletState.HOSTED && balancer != null) { + // see if the balancer thinks this tablet needs to be unassigned + + Preconditions.checkArgument( + tm.getLocation().getType() == TabletMetadata.LocationType.CURRENT, + "Expected current tablet location %s %s", tm.getExtent(), tm.getLocation()); + var tsii = new TabletServerIdImpl(tm.getLocation().getServerInstance()); + + var resourceGroup = params.getResourceGroup(tm.getLocation().getServerInstance()); + + if (resourceGroup != null) { + var reassign = balancer.needsReassignment(new TabletBalancer.CurrentAssignment() { + @Override + public TabletId getTablet() { + return new TabletIdImpl(tm.getExtent()); + } + + @Override + public TabletServerId getTabletServer() { + return tsii; + } + + @Override + public String getResourceGroup() { + return resourceGroup; + } + }); + + if (reassign) { + return UNASSIGNED; + } + } else { + // ELASTICITY_TODO this log level was set to error so that this case can be examined for + // bugs. A tablet server should always have a resource group. If there are unavoidable + // race conditions for getting tablet servers and their RGs, that that should be handled + // in the TabletManagementParameters data acquisition phase so that not all code has to + // deal with it. Eventually this log level should possibly be adjusted or converted to an + // exception. + log.error( + "Could not find resource group for tserver {}, so did not consult balancer. Need to determine the cause of this.", + tm.getLocation().getServerInstance()); + } + } + + if (tm.hasCurrent() + && params.getServersToShutdown().contains(tm.getLocation().getServerInstance())) { + if (params.canSuspendTablets()) { + return SUSPENDED; + } else { + return UNASSIGNED; + } + } + } + return systemGoalState; + } + + private static TabletGoalState getSystemGoalState(TabletMetadata tm, + TabletManagementParameters params) { + switch (params.getManagerState()) { + case NORMAL: + return HOSTED; + case HAVE_LOCK: // fall-through intended + case INITIAL: // fall-through intended + case SAFE_MODE: + if (tm.getExtent().isMeta()) { + return HOSTED; + } + return TabletGoalState.UNASSIGNED; + case UNLOAD_METADATA_TABLETS: + if (tm.getExtent().isRootTablet()) { + return HOSTED; + } + return UNASSIGNED; + case UNLOAD_ROOT_TABLET: + case STOP: + return UNASSIGNED; + default: + throw new IllegalStateException("Unknown Manager State"); + } + } +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java index 76ac7576a0..c4e3d70bdb 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java @@ -18,19 +18,11 @@ */ package org.apache.accumulo.server.manager.state; -import static org.apache.accumulo.core.util.LazySingletons.GSON; - import java.io.IOException; -import java.lang.reflect.Type; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Base64; -import java.util.Collection; import java.util.Collections; import java.util.EnumSet; -import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -38,24 +30,19 @@ import java.util.SortedMap; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.ScannerBase; -import org.apache.accumulo.core.client.admin.TabletHostingGoal; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.SkippingIterator; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iterators.user.WholeRowIterator; -import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction; import org.apache.accumulo.core.manager.thrift.ManagerState; -import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletState; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; @@ -69,24 +56,15 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Se import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.apache.accumulo.core.metadata.schema.TabletOperationType; import org.apache.accumulo.core.spi.balancer.SimpleLoadBalancer; import org.apache.accumulo.core.spi.balancer.TabletBalancer; -import org.apache.accumulo.core.spi.balancer.data.TabletServerId; import org.apache.accumulo.core.spi.compaction.CompactionKind; -import org.apache.accumulo.core.util.AddressUtil; import org.apache.accumulo.server.compaction.CompactionJobGenerator; import org.apache.accumulo.server.iterators.TabletIteratorEnvironment; -import org.apache.accumulo.server.manager.LiveTServerSet.LiveTServersSnapshot; import org.apache.accumulo.server.manager.balancer.BalancerEnvironmentImpl; -import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.io.DataOutputBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Joiner; -import com.google.gson.reflect.TypeToken; - /** * Iterator used by the TabletGroupWatcher threads in the Manager. This iterator returns * TabletManagement objects for each Tablet that needs some type of action performed on it by the @@ -94,146 +72,10 @@ import com.google.gson.reflect.TypeToken; */ public class TabletManagementIterator extends SkippingIterator { private static final Logger LOG = LoggerFactory.getLogger(TabletManagementIterator.class); - - private static final String SERVERS_OPTION = "servers"; - private static final String TABLES_OPTION = "tables"; - private static final String MIGRATIONS_OPTION = "migrations"; - private static final String MANAGER_STATE_OPTION = "managerState"; - private static final String SHUTTING_DOWN_OPTION = "shuttingDown"; - private static final String RESOURCE_GROUPS = "resourceGroups"; - private static final String TSERVER_GROUP_PREFIX = "serverGroups_"; - private static final String COMPACTION_HINTS_OPTIONS = "compactionHints"; + private static final String TABLET_GOAL_STATE_PARAMS_OPTION = "tgsParams"; private CompactionJobGenerator compactionGenerator; private TabletBalancer balancer; - private static void setCurrentServers(final IteratorSetting cfg, - final Set<TServerInstance> goodServers) { - if (goodServers != null) { - List<String> servers = new ArrayList<>(); - for (TServerInstance server : goodServers) { - servers.add(server.getHostPortSession()); - } - cfg.addOption(SERVERS_OPTION, Joiner.on(",").join(servers)); - } - } - - private static void setOnlineTables(final IteratorSetting cfg, final Set<TableId> onlineTables) { - if (onlineTables != null) { - cfg.addOption(TABLES_OPTION, Joiner.on(",").join(onlineTables)); - } - } - - private static void setMigrations(final IteratorSetting cfg, - final Collection<KeyExtent> migrations) { - DataOutputBuffer buffer = new DataOutputBuffer(); - try { - for (KeyExtent extent : migrations) { - extent.writeTo(buffer); - } - } catch (Exception ex) { - throw new RuntimeException(ex); - } - String encoded = - Base64.getEncoder().encodeToString(Arrays.copyOf(buffer.getData(), buffer.getLength())); - cfg.addOption(MIGRATIONS_OPTION, encoded); - } - - private static void setManagerState(final IteratorSetting cfg, final ManagerState state) { - cfg.addOption(MANAGER_STATE_OPTION, state.toString()); - } - - private static void setShuttingDown(final IteratorSetting cfg, - final Set<TServerInstance> servers) { - if (servers != null) { - cfg.addOption(SHUTTING_DOWN_OPTION, Joiner.on(",").join(servers)); - } - } - - private static void setCompactionHints(final IteratorSetting cfg, - Map<Long,Map<String,String>> allHints) { - cfg.addOption(COMPACTION_HINTS_OPTIONS, GSON.get().toJson(allHints)); - } - - private static void setTServerResourceGroups(final IteratorSetting cfg, - Map<String,Set<TServerInstance>> tServerResourceGroups) { - if (tServerResourceGroups == null) { - return; - } - cfg.addOption(RESOURCE_GROUPS, Joiner.on(",").join(tServerResourceGroups.keySet())); - for (Entry<String,Set<TServerInstance>> entry : tServerResourceGroups.entrySet()) { - cfg.addOption(TSERVER_GROUP_PREFIX + entry.getKey(), Joiner.on(",").join(entry.getValue())); - } - } - - private static Map<TabletServerId,String> parseTServerResourceGroups(Map<String,String> options) { - Map<TabletServerId,String> resourceGroups = new HashMap<>(); - String groups = options.get(RESOURCE_GROUPS); - if (groups != null) { - for (String groupName : groups.split(",")) { - String groupServers = options.get(TSERVER_GROUP_PREFIX + groupName); - if (groupServers != null) { - Set<TServerInstance> servers = parseServers(groupServers); - servers.forEach(server -> resourceGroups.put(new TabletServerIdImpl(server), groupName)); - } - } - } - return resourceGroups; - } - - private static Set<KeyExtent> parseMigrations(final String migrations) { - Set<KeyExtent> result = new HashSet<>(); - if (migrations != null) { - try { - DataInputBuffer buffer = new DataInputBuffer(); - byte[] data = Base64.getDecoder().decode(migrations); - buffer.reset(data, data.length); - while (buffer.available() > 0) { - result.add(KeyExtent.readFrom(buffer)); - } - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - return result; - } - - private static Set<TableId> parseTableIDs(final String tableIDs) { - Set<TableId> result = new HashSet<>(); - if (tableIDs != null) { - for (String tableID : tableIDs.split(",")) { - result.add(TableId.of(tableID)); - } - } - return result; - } - - private static Set<TServerInstance> parseServers(final String servers) { - Set<TServerInstance> result = new HashSet<>(); - if (servers != null) { - // parse "host:port[INSTANCE]" - if (!servers.isEmpty()) { - for (String part : servers.split(",")) { - String[] parts = part.split("\\[", 2); - String hostport = parts[0]; - String instance = parts[1]; - if (instance != null && instance.endsWith("]")) { - instance = instance.substring(0, instance.length() - 1); - } - result.add(new TServerInstance(AddressUtil.parseAddress(hostport, false), instance)); - } - } - } - return result; - } - - private static Map<Long,Map<String,String>> parseCompactionHints(String json) { - if (json == null) { - return Map.of(); - } - Type tt = new TypeToken<Map<Long,Map<String,String>>>() {}.getType(); - return GSON.get().fromJson(json, tt); - } - private static boolean shouldReturnDueToSplit(final TabletMetadata tm, final long splitThreshold) { final long sumOfFileSizes = @@ -244,57 +86,38 @@ public class TabletManagementIterator extends SkippingIterator { return shouldSplit; } - private boolean shouldReturnDueToLocation(final TabletMetadata tm, - final Set<TableId> onlineTables, final Set<TServerInstance> current) { + private boolean shouldReturnDueToLocation(final TabletMetadata tm) { - if (migrations.contains(tm.getExtent())) { + if (tabletMgmtParams.getMigrations().containsKey(tm.getExtent())) { + // Ideally only the state and goalState would need to be used to determine if a tablet should + // be returned. However, the Manager/TGW currently needs everything in the migrating set + // returned so it can update in memory maps it has. If this were improved then this case would + // not be needed. return true; } - // is the table supposed to be online or offline? - final boolean shouldBeOnline = - onlineTables.contains(tm.getTableId()) && tm.getOperationId() == null; - - TabletState state = TabletState.compute(tm, current, balancer, tserverResourceGroups); + TabletState state = TabletState.compute(tm, tabletMgmtParams.getOnlineTsevers()); + TabletGoalState goalState = TabletGoalState.compute(tm, state, balancer, tabletMgmtParams); if (LOG.isTraceEnabled()) { - LOG.trace( - "{} is {}. Table is {}line. Tablet hosting goal is {}, hostingRequested: {}, opId: {}", - tm.getExtent(), state, (shouldBeOnline ? "on" : "off"), tm.getHostingGoal(), - tm.getHostingRequested(), tm.getOperationId()); + LOG.trace("extent:{} state:{} goalState:{} hostingGoal:{}, hostingRequested: {}, opId: {}", + tm.getExtent(), state, goalState, tm.getHostingGoal(), tm.getHostingRequested(), + tm.getOperationId()); } - switch (state) { - case ASSIGNED: - // we always want data about assigned tablets - return true; + + switch (goalState) { case HOSTED: - if (!shouldBeOnline || tm.getHostingGoal() == TabletHostingGoal.NEVER - || (tm.getHostingGoal() == TabletHostingGoal.ONDEMAND && !tm.getHostingRequested())) { - return true; - } - break; - case ASSIGNED_TO_DEAD_SERVER: - case NEEDS_REASSIGNMENT: - return true; + return state != TabletState.HOSTED; case SUSPENDED: + return state != TabletState.SUSPENDED; case UNASSIGNED: - if (shouldBeOnline && (tm.getHostingGoal() == TabletHostingGoal.ALWAYS - || (tm.getHostingGoal() == TabletHostingGoal.ONDEMAND && tm.getHostingRequested()))) { - return true; - } - // If the Tablet has walogs and operation id then need to return so - // TGW can bring online to process the logs - if (!tm.getLogs().isEmpty() && tm.getOperationId() != null - && tm.getOperationId().getType() == TabletOperationType.MERGING) { - return true; - } - break; + return state != TabletState.UNASSIGNED; default: - throw new AssertionError("Inconceivable! The tablet is an unrecognized state: " + state); + throw new IllegalStateException("unknown goal state " + goalState); } - return false; } - public static void configureScanner(final ScannerBase scanner, final CurrentState state) { + public static void configureScanner(final ScannerBase scanner, + final TabletManagementParameters tabletMgmtParams) { // TODO so many columns are being fetch it may not make sense to fetch columns TabletColumnFamily.PREV_ROW_COLUMN.fetch(scanner); ServerColumnFamily.DIRECTORY_COLUMN.fetch(scanner); @@ -311,17 +134,7 @@ public class TabletManagementIterator extends SkippingIterator { scanner.addScanIterator(new IteratorSetting(1000, "wholeRows", WholeRowIterator.class)); IteratorSetting tabletChange = new IteratorSetting(1001, "ManagerTabletInfoIterator", TabletManagementIterator.class); - if (state != null) { - LiveTServersSnapshot tserversSnapshot = state.tserversSnapshot(); - TabletManagementIterator.setCurrentServers(tabletChange, tserversSnapshot.getTservers()); - TabletManagementIterator.setOnlineTables(tabletChange, state.onlineTables()); - TabletManagementIterator.setMigrations(tabletChange, state.migrationsSnapshot()); - TabletManagementIterator.setManagerState(tabletChange, state.getManagerState()); - TabletManagementIterator.setShuttingDown(tabletChange, state.shutdownServers()); - TabletManagementIterator.setTServerResourceGroups(tabletChange, - tserversSnapshot.getTserverGroups()); - setCompactionHints(tabletChange, state.getCompactionHints()); - } + tabletChange.addOption(TABLET_GOAL_STATE_PARAMS_OPTION, tabletMgmtParams.serialize()); scanner.addScanIterator(tabletChange); } @@ -329,38 +142,20 @@ public class TabletManagementIterator extends SkippingIterator { return new TabletManagement(e.getKey(), e.getValue()); } - private final Set<TServerInstance> current = new HashSet<>(); - private final Set<TableId> onlineTables = new HashSet<>(); - private final Map<TabletServerId,String> tserverResourceGroups = new HashMap<>(); - private final Set<KeyExtent> migrations = new HashSet<>(); - private ManagerState managerState = ManagerState.NORMAL; private IteratorEnvironment env; private Key topKey = null; private Value topValue = null; + private TabletManagementParameters tabletMgmtParams = null; @Override public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { super.init(source, options, env); this.env = env; - current.addAll(parseServers(options.get(SERVERS_OPTION))); - onlineTables.addAll(parseTableIDs(options.get(TABLES_OPTION))); - tserverResourceGroups.putAll(parseTServerResourceGroups(options)); - migrations.addAll(parseMigrations(options.get(MIGRATIONS_OPTION))); - String managerStateOptionValue = options.get(MANAGER_STATE_OPTION); - try { - managerState = ManagerState.valueOf(managerStateOptionValue); - } catch (RuntimeException ex) { - if (managerStateOptionValue != null) { - LOG.error("Unable to decode managerState {}", managerStateOptionValue); - } - } - Set<TServerInstance> shuttingDown = parseServers(options.get(SHUTTING_DOWN_OPTION)); - if (shuttingDown != null) { - current.removeAll(shuttingDown); - } - compactionGenerator = new CompactionJobGenerator(env.getPluginEnv(), - parseCompactionHints(options.get(COMPACTION_HINTS_OPTIONS))); + tabletMgmtParams = + TabletManagementParameters.deserialize(options.get(TABLET_GOAL_STATE_PARAMS_OPTION)); + compactionGenerator = + new CompactionJobGenerator(env.getPluginEnv(), tabletMgmtParams.getCompactionHints()); final AccumuloConfiguration conf = new ConfigurationCopy(env.getPluginEnv().getConfiguration()); BalancerEnvironmentImpl benv = new BalancerEnvironmentImpl(((TabletIteratorEnvironment) env).getServerContext()); @@ -400,7 +195,9 @@ public class TabletManagementIterator extends SkippingIterator { actions.clear(); Exception error = null; try { - if (managerState != ManagerState.NORMAL || current.isEmpty() || onlineTables.isEmpty()) { + if (tabletMgmtParams.getManagerState() != ManagerState.NORMAL + || tabletMgmtParams.getOnlineTsevers().isEmpty() + || tabletMgmtParams.getOnlineTables().isEmpty()) { // when manager is in the process of starting up or shutting down return everything. actions.add(ManagementAction.NEEDS_LOCATION_UPDATE); } else { @@ -449,7 +246,7 @@ public class TabletManagementIterator extends SkippingIterator { return; } - if (shouldReturnDueToLocation(tm, onlineTables, current)) { + if (shouldReturnDueToLocation(tm)) { reasonsToReturnThisTablet.add(ManagementAction.NEEDS_LOCATION_UPDATE); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java new file mode 100644 index 0000000000..36f880e5d2 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementParameters.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.manager.state; + +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; +import static java.util.stream.Collectors.toSet; +import static java.util.stream.Collectors.toUnmodifiableMap; +import static java.util.stream.Collectors.toUnmodifiableSet; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Base64; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; + +import org.apache.accumulo.core.data.AbstractId; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.manager.thrift.ManagerState; +import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.server.manager.LiveTServerSet; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; + +import com.google.common.base.Suppliers; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +/** + * An immutable snapshot of the information needed by the TabletGroupWatcher and the + * {@link TabletManagementIterator} to make decisions about tablets. + */ +public class TabletManagementParameters { + // ELASTICITY_TODO need to unit test serialization and deserialization of this class. + private final ManagerState managerState; + private final Map<Ample.DataLevel,Boolean> parentUpgradeMap; + private final Set<TableId> onlineTables; + private final Set<TServerInstance> serversToShutdown; + private final Map<KeyExtent,TServerInstance> migrations; + + private final Ample.DataLevel level; + + private final Supplier<Map<TServerInstance,String>> resourceGroups; + private final Map<String,Set<TServerInstance>> tserverGroups; + private final Map<Long,Map<String,String>> compactionHints; + private final Set<TServerInstance> onlineTservers; + private final boolean canSuspendTablets; + + public TabletManagementParameters(ManagerState managerState, + Map<Ample.DataLevel,Boolean> parentUpgradeMap, Set<TableId> onlineTables, + LiveTServerSet.LiveTServersSnapshot liveTServersSnapshot, + Set<TServerInstance> serversToShutdown, Map<KeyExtent,TServerInstance> migrations, + Ample.DataLevel level, Map<Long,Map<String,String>> compactionHints, + boolean canSuspendTablets) { + this.managerState = managerState; + this.parentUpgradeMap = Map.copyOf(parentUpgradeMap); + // TODO could filter by level + this.onlineTables = Set.copyOf(onlineTables); + // This is already immutable, so no need to copy + this.onlineTservers = liveTServersSnapshot.getTservers(); + this.serversToShutdown = Set.copyOf(serversToShutdown); + // TODO could filter by level + this.migrations = Map.copyOf(migrations); + this.level = level; + // This is already immutable, so no need to copy + this.tserverGroups = liveTServersSnapshot.getTserverGroups(); + this.compactionHints = makeImmutable(compactionHints); + this.resourceGroups = Suppliers.memoize(() -> { + Map<TServerInstance,String> resourceGroups = new HashMap<>(); + TabletManagementParameters.this.tserverGroups.forEach((resourceGroup, tservers) -> tservers + .forEach(tserver -> resourceGroups.put(tserver, resourceGroup))); + return Map.copyOf(resourceGroups); + }); + this.canSuspendTablets = canSuspendTablets; + } + + private TabletManagementParameters(JsonData jdata) { + this.managerState = jdata.managerState; + this.parentUpgradeMap = Map.copyOf(jdata.parentUpgradeMap); + this.onlineTables = jdata.onlineTables.stream().map(TableId::of).collect(toUnmodifiableSet()); + this.onlineTservers = + jdata.onlineTservers.stream().map(TServerInstance::new).collect(toUnmodifiableSet()); + this.serversToShutdown = + jdata.serversToShutdown.stream().map(TServerInstance::new).collect(toUnmodifiableSet()); + this.migrations = jdata.migrations.entrySet().stream() + .collect(toUnmodifiableMap(entry -> JsonData.strToExtent(entry.getKey()), + entry -> new TServerInstance(entry.getValue()))); + this.level = jdata.level; + this.compactionHints = makeImmutable(jdata.compactionHints); + this.tserverGroups = jdata.tserverGroups.entrySet().stream().collect(toUnmodifiableMap( + Map.Entry::getKey, + entry -> entry.getValue().stream().map(TServerInstance::new).collect(toUnmodifiableSet()))); + this.resourceGroups = Suppliers.memoize(() -> { + Map<TServerInstance,String> resourceGroups = new HashMap<>(); + TabletManagementParameters.this.tserverGroups.forEach((resourceGroup, tservers) -> tservers + .forEach(tserver -> resourceGroups.put(tserver, resourceGroup))); + return Map.copyOf(resourceGroups); + }); + this.canSuspendTablets = jdata.canSuspendTablets; + ; + } + + public ManagerState getManagerState() { + return managerState; + } + + public boolean isParentLevelUpgraded() { + return parentUpgradeMap.get(level); + } + + public Set<TServerInstance> getOnlineTsevers() { + return onlineTservers; + } + + public Set<TServerInstance> getServersToShutdown() { + return serversToShutdown; + } + + public boolean isTableOnline(TableId tableId) { + return onlineTables.contains(tableId); + } + + public Map<KeyExtent,TServerInstance> getMigrations() { + return migrations; + } + + public Ample.DataLevel getLevel() { + return level; + } + + public String getResourceGroup(TServerInstance tserver) { + return resourceGroups.get().get(tserver); + } + + public Map<String,Set<TServerInstance>> getGroupedTServers() { + return tserverGroups; + } + + public Set<TableId> getOnlineTables() { + return onlineTables; + } + + public Map<Long,Map<String,String>> getCompactionHints() { + return compactionHints; + } + + public boolean canSuspendTablets() { + return canSuspendTablets; + } + + private static Map<Long,Map<String,String>> + makeImmutable(Map<Long,Map<String,String>> compactionHints) { + var copy = new HashMap<Long,Map<String,String>>(); + compactionHints.forEach((ftxid, hints) -> copy.put(ftxid, Map.copyOf(hints))); + return Collections.unmodifiableMap(copy); + } + + private static class JsonData { + final ManagerState managerState; + final Map<Ample.DataLevel,Boolean> parentUpgradeMap; + final Collection<String> onlineTables; + final Collection<String> onlineTservers; + final Collection<String> serversToShutdown; + final Map<String,String> migrations; + + final Ample.DataLevel level; + + final Map<String,Set<String>> tserverGroups; + + final Map<Long,Map<String,String>> compactionHints; + + final boolean canSuspendTablets; + + private static String toString(KeyExtent extent) { + DataOutputBuffer buffer = new DataOutputBuffer(); + try { + extent.writeTo(buffer); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + return Base64.getEncoder() + .encodeToString(Arrays.copyOf(buffer.getData(), buffer.getLength())); + + } + + private static KeyExtent strToExtent(String kes) { + byte[] data = Base64.getDecoder().decode(kes); + DataInputBuffer buffer = new DataInputBuffer(); + buffer.reset(data, data.length); + try { + return KeyExtent.readFrom(buffer); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + JsonData(TabletManagementParameters params) { + managerState = params.managerState; + parentUpgradeMap = params.parentUpgradeMap; + onlineTables = params.onlineTables.stream().map(AbstractId::canonical).collect(toList()); + onlineTservers = params.getOnlineTsevers().stream().map(TServerInstance::getHostPortSession) + .collect(toList()); + serversToShutdown = params.serversToShutdown.stream().map(TServerInstance::getHostPortSession) + .collect(toList()); + migrations = params.migrations.entrySet().stream().collect( + toMap(entry -> toString(entry.getKey()), entry -> entry.getValue().getHostPortSession())); + level = params.level; + tserverGroups = params.getGroupedTServers().entrySet().stream() + .collect(toMap(Map.Entry::getKey, entry -> entry.getValue().stream() + .map(TServerInstance::getHostPortSession).collect(toSet()))); + compactionHints = params.compactionHints; + canSuspendTablets = params.canSuspendTablets; + } + + } + + public String serialize() { + Gson gson = new GsonBuilder().disableHtmlEscaping().create(); + return gson.toJson(new JsonData(this)); + } + + public static TabletManagementParameters deserialize(String json) { + Gson gson = new GsonBuilder().disableHtmlEscaping().create(); + JsonData jdata = gson.fromJson(json, JsonData.class); + return new TabletManagementParameters(jdata); + } + +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementScanner.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementScanner.java index e34d16304c..c5a02ef93a 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementScanner.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementScanner.java @@ -59,8 +59,8 @@ public class TabletManagementScanner implements ClosableIterator<TabletManagemen private final AtomicBoolean closed = new AtomicBoolean(false); // This constructor is called from TabletStateStore implementations - public TabletManagementScanner(ClientContext context, List<Range> ranges, CurrentState state, - String tableName) { + public TabletManagementScanner(ClientContext context, List<Range> ranges, + TabletManagementParameters tmgmtParams, String tableName) { // scan over metadata table, looking for tablets in the wrong state based on the live servers // and online tables try { @@ -90,16 +90,11 @@ public class TabletManagementScanner implements ClosableIterator<TabletManagemen throw new RuntimeException("Error obtaining locations for table: " + tableName); } cleanable = CleanerUtil.unclosed(this, TabletManagementScanner.class, closed, log, mdScanner); - TabletManagementIterator.configureScanner(mdScanner, state); + TabletManagementIterator.configureScanner(mdScanner, tmgmtParams); mdScanner.setRanges(ranges); iter = mdScanner.iterator(); } - // This constructor is called from utilities and tests - public TabletManagementScanner(ClientContext context, Range range, String tableName) { - this(context, List.of(range), null, tableName); - } - @Override public void close() { if (closed.compareAndSet(false, true)) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java index 19cc5b5832..ee81950fe7 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletStateStore.java @@ -36,7 +36,7 @@ import org.apache.hadoop.fs.Path; /** * Interface for storing information about tablet assignments. There are three implementations: */ -public interface TabletStateStore extends ClosableIterable<TabletManagement> { +public interface TabletStateStore { /** * Get the level for this state store @@ -52,14 +52,14 @@ public interface TabletStateStore extends ClosableIterable<TabletManagement> { * Scan the information about the tablets covered by this store that have end row in the specified * ranges. */ - ClosableIterator<TabletManagement> iterator(List<Range> ranges); + ClosableIterator<TabletManagement> iterator(List<Range> ranges, + TabletManagementParameters parameters); /** * Scan the information about all tablets covered by this store.. */ - @Override - default ClosableIterator<TabletManagement> iterator() { - return iterator(List.of(MetadataSchema.TabletsSection.getRange())); + default ClosableIterator<TabletManagement> iterator(TabletManagementParameters parameters) { + return iterator(List.of(MetadataSchema.TabletsSection.getRange()), parameters); } /** @@ -118,11 +118,6 @@ public interface TabletStateStore extends ClosableIterable<TabletManagement> { } public static TabletStateStore getStoreForLevel(DataLevel level, ServerContext context) { - return getStoreForLevel(level, context, null); - } - - public static TabletStateStore getStoreForLevel(DataLevel level, ServerContext context, - CurrentState state) { TabletStateStore tss; switch (level) { @@ -130,10 +125,10 @@ public interface TabletStateStore extends ClosableIterable<TabletManagement> { tss = new ZooTabletStateStore(level, context); break; case METADATA: - tss = new RootTabletStateStore(level, context, state); + tss = new RootTabletStateStore(level, context); break; case USER: - tss = new MetaDataStateStore(level, context, state); + tss = new MetaDataStateStore(level, context); break; default: throw new IllegalArgumentException("Unknown level " + level); diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java index c88847c536..05f5648497 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/ZooTabletStateStore.java @@ -40,6 +40,8 @@ import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + class ZooTabletStateStore extends AbstractTabletStateStore implements TabletStateStore { private static final Logger log = LoggerFactory.getLogger(ZooTabletStateStore.class); @@ -60,7 +62,9 @@ class ZooTabletStateStore extends AbstractTabletStateStore implements TabletStat } @Override - public ClosableIterator<TabletManagement> iterator(List<Range> ranges) { + public ClosableIterator<TabletManagement> iterator(List<Range> ranges, + TabletManagementParameters parameters) { + Preconditions.checkArgument(parameters.getLevel() == getLevel()); return new ClosableIterator<>() { boolean finished = false; diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java b/server/base/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java index 2cd208a629..fb28ed13e8 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java @@ -23,25 +23,18 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletState; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.cli.ServerUtilOpts; import org.apache.accumulo.server.manager.LiveTServerSet; import org.apache.accumulo.server.manager.LiveTServerSet.Listener; -import org.apache.accumulo.server.manager.state.TabletManagementScanner; -import org.apache.accumulo.server.manager.state.TabletStateStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,8 +75,8 @@ public class FindOfflineTablets { tservers.startListeningForTabletServerChanges(); scanning.set(true); - Iterator<TabletManagement> zooScanner = - TabletStateStore.getStoreForLevel(DataLevel.ROOT, context).iterator(); + Iterator<TabletMetadata> zooScanner = + context.getAmple().readTablets().forLevel(DataLevel.ROOT).build().iterator(); int offline = 0; @@ -97,8 +90,8 @@ public class FindOfflineTablets { } System.out.println("Scanning " + RootTable.NAME); - Iterator<TabletManagement> rootScanner = - new TabletManagementScanner(context, TabletsSection.getRange(), RootTable.NAME); + Iterator<TabletMetadata> rootScanner = + context.getAmple().readTablets().forLevel(DataLevel.METADATA).build().iterator(); if ((offline = checkTablets(context, rootScanner, tservers)) > 0) { return offline; } @@ -109,32 +102,24 @@ public class FindOfflineTablets { System.out.println("Scanning " + MetadataTable.NAME); - Range range = TabletsSection.getRange(); - if (tableName != null) { - TableId tableId = context.getTableId(tableName); - range = new KeyExtent(tableId, null, null).toMetaRange(); - } - - try (TabletManagementScanner metaScanner = - new TabletManagementScanner(context, range, MetadataTable.NAME)) { - return checkTablets(context, metaScanner, tservers); + try (var metaScanner = context.getAmple().readTablets().forLevel(DataLevel.USER).build()) { + return checkTablets(context, metaScanner.iterator(), tservers); } } - private static int checkTablets(ServerContext context, Iterator<TabletManagement> scanner, + private static int checkTablets(ServerContext context, Iterator<TabletMetadata> scanner, LiveTServerSet tservers) { int offline = 0; while (scanner.hasNext() && !System.out.checkError()) { - TabletManagement mti = scanner.next(); - TabletMetadata tabletMetadata = mti.getTabletMetadata(); + TabletMetadata tabletMetadata = scanner.next(); Set<TServerInstance> liveTServers = tservers.getCurrentServers(); TabletState state = TabletState.compute(tabletMetadata, liveTServers); if (state != null && state != TabletState.HOSTED - && context.getTableManager().getTableState(mti.getTabletMetadata().getTableId()) + && context.getTableManager().getTableState(tabletMetadata.getTableId()) != TableState.OFFLINE) { - System.out.println( - mti + " is " + state + " #walogs:" + mti.getTabletMetadata().getLogs().size()); + System.out.println(tabletMetadata.getExtent() + " is " + state + " #walogs:" + + tabletMetadata.getLogs().size()); offline++; } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ListOnlineOnDemandTablets.java b/server/base/src/main/java/org/apache/accumulo/server/util/ListOnlineOnDemandTablets.java index 8bf64157b8..70e02e5ebc 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ListOnlineOnDemandTablets.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ListOnlineOnDemandTablets.java @@ -24,19 +24,17 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.TabletHostingGoal; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletState; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.cli.ServerUtilOpts; import org.apache.accumulo.server.manager.LiveTServerSet; import org.apache.accumulo.server.manager.LiveTServerSet.Listener; -import org.apache.accumulo.server.manager.state.TabletManagementScanner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,27 +77,24 @@ public class ListOnlineOnDemandTablets { System.out.println("Scanning " + MetadataTable.NAME); - Range range = TabletsSection.getRange(); - - try (TabletManagementScanner metaScanner = - new TabletManagementScanner(context, range, MetadataTable.NAME)) { - return checkTablets(context, metaScanner, tservers); + try (TabletsMetadata metaScanner = + context.getAmple().readTablets().forLevel(Ample.DataLevel.USER).build()) { + return checkTablets(context, metaScanner.iterator(), tservers); } } - private static int checkTablets(ServerContext context, Iterator<TabletManagement> scanner, + private static int checkTablets(ServerContext context, Iterator<TabletMetadata> scanner, LiveTServerSet tservers) { int online = 0; while (scanner.hasNext() && !System.out.checkError()) { - final TabletManagement mti = scanner.next(); - TabletMetadata tabletMetadata = mti.getTabletMetadata(); + TabletMetadata tabletMetadata = scanner.next(); Set<TServerInstance> liveTServers = tservers.getCurrentServers(); TabletState state = TabletState.compute(tabletMetadata, liveTServers); if (state == TabletState.HOSTED - && mti.getTabletMetadata().getHostingGoal() == TabletHostingGoal.ONDEMAND) { - System.out.println( - mti + " is " + state + " #walogs:" + mti.getTabletMetadata().getLogs().size()); + && tabletMetadata.getHostingGoal() == TabletHostingGoal.ONDEMAND) { + System.out.println(tabletMetadata.getExtent() + " is " + state + " #walogs:" + + tabletMetadata.getLogs().size()); online++; } } diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java index f783d77ac5..fba1867b47 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java @@ -18,6 +18,12 @@ */ package org.apache.accumulo.gc; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LAST; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND; + import java.io.FileNotFoundException; import java.io.IOException; import java.util.Collection; @@ -31,7 +37,6 @@ import java.util.UUID; import org.apache.accumulo.core.gc.thrift.GCStatus; import org.apache.accumulo.core.gc.thrift.GcCycleStats; -import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletState; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; @@ -45,7 +50,6 @@ import org.apache.accumulo.server.log.WalStateManager; import org.apache.accumulo.server.log.WalStateManager.WalMarkerException; import org.apache.accumulo.server.log.WalStateManager.WalState; import org.apache.accumulo.server.manager.LiveTServerSet; -import org.apache.accumulo.server.manager.state.TabletStateStore; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; @@ -64,7 +68,7 @@ public class GarbageCollectWriteAheadLogs { private final VolumeManager fs; private final LiveTServerSet liveServers; private final WalStateManager walMarker; - private final Iterable<TabletManagement> store; + private final Iterable<TabletMetadata> store; /** * Creates a new GC WAL object. @@ -79,9 +83,12 @@ public class GarbageCollectWriteAheadLogs { this.liveServers = liveServers; this.walMarker = new WalStateManager(context); this.store = () -> Iterators.concat( - TabletStateStore.getStoreForLevel(DataLevel.ROOT, context).iterator(), - TabletStateStore.getStoreForLevel(DataLevel.METADATA, context).iterator(), - TabletStateStore.getStoreForLevel(DataLevel.USER, context).iterator()); + context.getAmple().readTablets().forLevel(DataLevel.ROOT) + .fetch(LOCATION, LAST, LOGS, PREV_ROW, SUSPEND).checkConsistency().build().iterator(), + context.getAmple().readTablets().forLevel(DataLevel.METADATA) + .fetch(LOCATION, LAST, LOGS, PREV_ROW, SUSPEND).checkConsistency().build().iterator(), + context.getAmple().readTablets().forLevel(DataLevel.USER) + .fetch(LOCATION, LAST, LOGS, PREV_ROW, SUSPEND).checkConsistency().build().iterator()); } /** @@ -93,7 +100,7 @@ public class GarbageCollectWriteAheadLogs { */ @VisibleForTesting GarbageCollectWriteAheadLogs(ServerContext context, VolumeManager fs, - LiveTServerSet liveTServerSet, WalStateManager walMarker, Iterable<TabletManagement> store) { + LiveTServerSet liveTServerSet, WalStateManager walMarker, Iterable<TabletMetadata> store) { this.context = context; this.fs = fs; this.liveServers = liveTServerSet; @@ -275,13 +282,11 @@ public class GarbageCollectWriteAheadLogs { } // remove any entries if there's a log reference (recovery hasn't finished) - for (TabletManagement mti : store) { + for (TabletMetadata tabletMetadata : store) { // Tablet is still assigned to a dead server. Manager has moved markers and reassigned it // Easiest to just ignore all the WALs for the dead server. - TabletMetadata tabletMetadata = mti.getTabletMetadata(); if (TabletState.compute(tabletMetadata, liveServers) == TabletState.ASSIGNED_TO_DEAD_SERVER) { - Set<UUID> idsToIgnore = - candidates.remove(mti.getTabletMetadata().getLocation().getServerInstance()); + Set<UUID> idsToIgnore = candidates.remove(tabletMetadata.getLocation().getServerInstance()); if (idsToIgnore != null) { result.keySet().removeAll(idsToIgnore); recoveryLogs.keySet().removeAll(idsToIgnore); @@ -289,7 +294,7 @@ public class GarbageCollectWriteAheadLogs { } // Tablet is being recovered and has WAL references, remove all the WALs for the dead server // that made the WALs. - for (LogEntry wals : mti.getTabletMetadata().getLogs()) { + for (LogEntry wals : tabletMetadata.getLogs()) { String wal = wals.filename; UUID walUUID = path2uuid(new Path(wal)); TServerInstance dead = result.get(walUUID); diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java index f8e9c8b151..989fd43bd4 100644 --- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java +++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java @@ -25,14 +25,12 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; import org.apache.accumulo.core.client.admin.TabletHostingGoal; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.gc.thrift.GCStatus; import org.apache.accumulo.core.gc.thrift.GcCycleStats; -import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; @@ -60,27 +58,25 @@ public class GarbageCollectWriteAheadLogsTest { private final Path path = new Path("hdfs://localhost:9000/accumulo/wal/localhost+1234/" + id); private final KeyExtent extent = KeyExtent.fromMetaRow(new Text("1<")); private final List<LogEntry> walogs = Collections.emptyList(); - private final TabletManagement tabletAssignedToServer1; - private final TabletManagement tabletAssignedToServer2; + private final TabletMetadata tabletAssignedToServer1; + private final TabletMetadata tabletAssignedToServer2; { try { - tabletAssignedToServer1 = new TabletManagement(Set.of(), + tabletAssignedToServer1 = TabletMetadata.builder(extent).putLocation(Location.current(server1)) - .putHostingGoal(TabletHostingGoal.ALWAYS).build(LAST, SUSPEND, LOGS), - ""); - tabletAssignedToServer2 = new TabletManagement(Set.of(), + .putHostingGoal(TabletHostingGoal.ALWAYS).build(LAST, SUSPEND, LOGS); + tabletAssignedToServer2 = TabletMetadata.builder(extent).putLocation(Location.current(server2)) - .putHostingGoal(TabletHostingGoal.NEVER).build(LAST, SUSPEND, LOGS), - ""); + .putHostingGoal(TabletHostingGoal.NEVER).build(LAST, SUSPEND, LOGS); } catch (Exception ex) { throw new RuntimeException(ex); } } - private final Iterable<TabletManagement> tabletOnServer1List = + private final Iterable<TabletMetadata> tabletOnServer1List = Collections.singletonList(tabletAssignedToServer1); - private final Iterable<TabletManagement> tabletOnServer2List = + private final Iterable<TabletMetadata> tabletOnServer2List = Collections.singletonList(tabletAssignedToServer2); @Test diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index b5de7196d0..6608b56638 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -99,7 +99,6 @@ import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; -import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metrics.MetricsUtil; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.spi.balancer.BalancerEnvironment; @@ -108,7 +107,6 @@ import org.apache.accumulo.core.spi.balancer.TabletBalancer; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; -import org.apache.accumulo.core.tablet.thrift.TUnloadTabletGoal; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.Retry; @@ -132,7 +130,6 @@ import org.apache.accumulo.server.manager.LiveTServerSet; import org.apache.accumulo.server.manager.LiveTServerSet.LiveTServersSnapshot; import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection; import org.apache.accumulo.server.manager.balancer.BalancerEnvironmentImpl; -import org.apache.accumulo.server.manager.state.CurrentState; import org.apache.accumulo.server.manager.state.DeadServerList; import org.apache.accumulo.server.manager.state.TabletServerState; import org.apache.accumulo.server.manager.state.TabletStateStore; @@ -174,7 +171,7 @@ import io.opentelemetry.context.Scope; * The manager will also coordinate log recoveries and reports general status. */ public class Manager extends AbstractServer - implements LiveTServerSet.Listener, TableObserver, CurrentState, HighlyAvailableService { + implements LiveTServerSet.Listener, TableObserver, HighlyAvailableService { static final Logger log = LoggerFactory.getLogger(Manager.class); @@ -236,12 +233,14 @@ public class Manager extends AbstractServer private final TabletStateStore metadataTabletStore; private final TabletStateStore userTabletStore; - @Override public synchronized ManagerState getManagerState() { return state; } - @Override + // ELASTICITIY_TODO it would be nice if this method could take DataLevel as an argument and only + // retrieve information about compactions in that data level. Attempted this and a lot of + // refactoring was needed to get that small bit of information to this method. Would be best to + // address this after issue. May be best to attempt this after #3576. public Map<Long,Map<String,String>> getCompactionHints() { Map<Long,CompactionConfig> allConfig = null; try { @@ -455,9 +454,9 @@ public class Manager extends AbstractServer final long tokenLifetime = aconf.getTimeInMillis(Property.GENERAL_DELEGATION_TOKEN_LIFETIME); - this.rootTabletStore = TabletStateStore.getStoreForLevel(DataLevel.ROOT, context, this); - this.metadataTabletStore = TabletStateStore.getStoreForLevel(DataLevel.METADATA, context, this); - this.userTabletStore = TabletStateStore.getStoreForLevel(DataLevel.USER, context, this); + this.rootTabletStore = TabletStateStore.getStoreForLevel(DataLevel.ROOT, context); + this.metadataTabletStore = TabletStateStore.getStoreForLevel(DataLevel.METADATA, context); + this.userTabletStore = TabletStateStore.getStoreForLevel(DataLevel.USER, context); authenticationTokenKeyManager = null; keyDistributor = null; @@ -543,110 +542,8 @@ public class Manager extends AbstractServer return compactionJobQueues; } - enum TabletGoalState { - HOSTED(TUnloadTabletGoal.UNKNOWN), - UNASSIGNED(TUnloadTabletGoal.UNASSIGNED), - DELETED(TUnloadTabletGoal.DELETED), - SUSPENDED(TUnloadTabletGoal.SUSPENDED); - - private final TUnloadTabletGoal unloadGoal; - - TabletGoalState(TUnloadTabletGoal unloadGoal) { - this.unloadGoal = unloadGoal; - } - - /** The purpose of unloading this tablet. */ - public TUnloadTabletGoal howUnload() { - return unloadGoal; - } - } - - TabletGoalState getSystemGoalState(TabletMetadata tm) { - switch (getManagerState()) { - case NORMAL: - return TabletGoalState.HOSTED; - case HAVE_LOCK: // fall-through intended - case INITIAL: // fall-through intended - case SAFE_MODE: - if (tm.getExtent().isMeta()) { - return TabletGoalState.HOSTED; - } - return TabletGoalState.UNASSIGNED; - case UNLOAD_METADATA_TABLETS: - if (tm.getExtent().isRootTablet()) { - return TabletGoalState.HOSTED; - } - return TabletGoalState.UNASSIGNED; - case UNLOAD_ROOT_TABLET: - return TabletGoalState.UNASSIGNED; - case STOP: - return TabletGoalState.UNASSIGNED; - default: - throw new IllegalStateException("Unknown Manager State"); - } - } - - TabletGoalState getTableGoalState(TabletMetadata tm) { - TableState tableState = getContext().getTableManager().getTableState(tm.getTableId()); - if (tableState == null) { - return TabletGoalState.DELETED; - } - switch (tableState) { - case DELETING: - return TabletGoalState.DELETED; - case OFFLINE: - case NEW: - return TabletGoalState.UNASSIGNED; - default: - switch (tm.getHostingGoal()) { - case ALWAYS: - return TabletGoalState.HOSTED; - case NEVER: - return TabletGoalState.UNASSIGNED; - case ONDEMAND: - if (tm.getHostingRequested()) { - return TabletGoalState.HOSTED; - } else { - return TabletGoalState.UNASSIGNED; - } - default: - throw new IllegalStateException( - "Tablet Hosting Goal is unhandled: " + tm.getHostingGoal()); - } - } - } - - TabletGoalState getGoalState(TabletMetadata tm) { - KeyExtent extent = tm.getExtent(); - // Shutting down? - TabletGoalState state = getSystemGoalState(tm); - - if (state == TabletGoalState.HOSTED) { - if (!upgradeCoordinator.getStatus().isParentLevelUpgraded(extent)) { - // The place where this tablet stores its metadata was not upgraded, so do not assign this - // tablet yet. - return TabletGoalState.UNASSIGNED; - } - - if (tm.getOperationId() != null) { - return TabletGoalState.UNASSIGNED; - } - - if (tm.hasCurrent() && serversToShutdown.contains(tm.getLocation().getServerInstance())) { - return TabletGoalState.SUSPENDED; - } - - // taking table offline? - state = getTableGoalState(tm); - if (state == TabletGoalState.HOSTED) { - // Maybe this tablet needs to be migrated - TServerInstance dest = migrations.get(extent); - if (dest != null && tm.hasCurrent() && !dest.equals(tm.getLocation().getServerInstance())) { - return TabletGoalState.UNASSIGNED; - } - } - } - return state; + public UpgradeCoordinator.UpgradeStatus getUpgradeStatus() { + return upgradeCoordinator.getStatus(); } private class MigrationCleanupThread implements Runnable { @@ -878,7 +775,7 @@ public class Manager extends AbstractServer private long balanceTablets() { BalanceParamsImpl params = BalanceParamsImpl.fromThrift(tserverStatusForBalancer, - tServerGroupingForBalancer, tserverStatus, migrationsSnapshot()); + tServerGroupingForBalancer, tserverStatus, migrationsSnapshot().keySet()); long wait = tabletBalancer.balance(params); for (TabletMigration m : checkMigrationSanity(tserverStatusForBalancer.keySet(), @@ -1580,7 +1477,6 @@ public class Manager extends AbstractServer @Override public void sessionExpired() {} - @Override public Set<TableId> onlineTables() { Set<TableId> result = new HashSet<>(); if (getManagerState() != ManagerState.NORMAL) { @@ -1604,12 +1500,10 @@ public class Manager extends AbstractServer return result; } - @Override public Set<TServerInstance> onlineTabletServers() { return tserverSet.getSnapshot().getTservers(); } - @Override public LiveTServersSnapshot tserversSnapshot() { return tserverSet.getSnapshot(); } @@ -1695,19 +1589,15 @@ public class Manager extends AbstractServer return delegationTokensAvailable; } - @Override - public Set<KeyExtent> migrationsSnapshot() { - Set<KeyExtent> migrationKeys; + public Map<KeyExtent,TServerInstance> migrationsSnapshot() { synchronized (migrations) { - migrationKeys = new HashSet<>(migrations.keySet()); + return Map.copyOf(migrations); } - return Collections.unmodifiableSet(migrationKeys); } - @Override public Set<TServerInstance> shutdownServers() { synchronized (serversToShutdown) { - return new HashSet<>(serversToShutdown); + return Set.copyOf(serversToShutdown); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index 0b7902bd46..3031590ec4 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -51,7 +51,6 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.logging.TabletLogger; -import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.manager.state.TabletManagement.ManagementAction; import org.apache.accumulo.core.manager.state.tables.TableState; @@ -62,33 +61,32 @@ import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletState; -import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; -import org.apache.accumulo.core.metadata.schema.TabletOperationType; import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.spi.balancer.data.TabletServerId; import org.apache.accumulo.core.util.TextUtil; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.core.util.threads.Threads.AccumuloDaemonThread; -import org.apache.accumulo.manager.Manager.TabletGoalState; import org.apache.accumulo.manager.metrics.ManagerMetrics; import org.apache.accumulo.manager.split.SplitTask; import org.apache.accumulo.manager.state.TableCounts; import org.apache.accumulo.manager.state.TableStats; +import org.apache.accumulo.manager.upgrade.UpgradeCoordinator; import org.apache.accumulo.server.ServiceEnvironmentImpl; import org.apache.accumulo.server.compaction.CompactionJobGenerator; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.log.WalStateManager; import org.apache.accumulo.server.log.WalStateManager.WalMarkerException; -import org.apache.accumulo.server.manager.LiveTServerSet; import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection; import org.apache.accumulo.server.manager.state.Assignment; import org.apache.accumulo.server.manager.state.ClosableIterator; import org.apache.accumulo.server.manager.state.DistributedStoreException; +import org.apache.accumulo.server.manager.state.TabletGoalState; import org.apache.accumulo.server.manager.state.TabletManagementIterator; +import org.apache.accumulo.server.manager.state.TabletManagementParameters; import org.apache.accumulo.server.manager.state.TabletStateStore; import org.apache.accumulo.server.manager.state.UnassignedTablet; import org.apache.hadoop.fs.Path; @@ -126,8 +124,8 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { private SortedSet<TServerInstance> lastScanServers = Collections.emptySortedSet(); private final EventHandler eventHandler; private final ManagerMetrics metrics; - private WalStateManager walStateManager; + private volatile Set<TServerInstance> filteredServersToShutdown = Set.of(); TabletGroupWatcher(Manager manager, TabletStateStore store, TabletGroupWatcher dependentWatcher, ManagerMetrics metrics) { @@ -174,32 +172,31 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { private final SortedMap<TServerInstance,TabletServerStatus> destinations; private final Map<String,Set<TServerInstance>> currentTServerGrouping; - public TabletLists(Manager m, SortedMap<TServerInstance,TabletServerStatus> curTServers, - Map<String,Set<TServerInstance>> grouping) { - synchronized (m.serversToShutdown) { - var destinationsMod = new TreeMap<>(curTServers); - if (!m.serversToShutdown.isEmpty()) { - // Remove servers that are in the process of shutting down from the lists of tablet - // servers. - destinationsMod.keySet().removeAll(m.serversToShutdown); - HashMap<String,Set<TServerInstance>> groupingCopy = new HashMap<>(); - grouping.forEach((group, groupsServers) -> { - if (Collections.disjoint(groupsServers, m.serversToShutdown)) { - groupingCopy.put(group, groupsServers); - } else { - var serversCopy = new HashSet<>(groupsServers); - serversCopy.removeAll(m.serversToShutdown); - groupingCopy.put(group, Collections.unmodifiableSet(serversCopy)); - } - }); - - this.currentTServerGrouping = Collections.unmodifiableMap(groupingCopy); - } else { - this.currentTServerGrouping = grouping; - } + public TabletLists(SortedMap<TServerInstance,TabletServerStatus> curTServers, + Map<String,Set<TServerInstance>> grouping, Set<TServerInstance> serversToShutdown) { + + var destinationsMod = new TreeMap<>(curTServers); + if (!serversToShutdown.isEmpty()) { + // Remove servers that are in the process of shutting down from the lists of tablet + // servers. + destinationsMod.keySet().removeAll(serversToShutdown); + HashMap<String,Set<TServerInstance>> groupingCopy = new HashMap<>(); + grouping.forEach((group, groupsServers) -> { + if (Collections.disjoint(groupsServers, serversToShutdown)) { + groupingCopy.put(group, groupsServers); + } else { + var serversCopy = new HashSet<>(groupsServers); + serversCopy.removeAll(serversToShutdown); + groupingCopy.put(group, Collections.unmodifiableSet(serversCopy)); + } + }); - this.destinations = Collections.unmodifiableSortedMap(destinationsMod); + this.currentTServerGrouping = Collections.unmodifiableMap(groupingCopy); + } else { + this.currentTServerGrouping = grouping; } + + this.destinations = Collections.unmodifiableSortedMap(destinationsMod); } public void reset() { @@ -241,17 +238,17 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { continue; } - LiveTServerSet.LiveTServersSnapshot tservers = manager.tserverSet.getSnapshot(); - var currentTservers = getTserversStatus(tservers.getTservers()); + TabletManagementParameters tabletMgmtParams = createTabletManagementParameters(); + var currentTservers = getCurrentTservers(tabletMgmtParams.getOnlineTsevers()); if (currentTservers.isEmpty()) { setNeedsFullScan(); continue; } - try (var iter = store.iterator(ranges)) { + try (var iter = store.iterator(ranges, tabletMgmtParams)) { long t1 = System.currentTimeMillis(); - manageTablets(iter, currentTservers, tservers.getTserverGroups(), false); + manageTablets(iter, tabletMgmtParams, currentTservers, false); long t2 = System.currentTimeMillis(); Manager.log.debug(String.format("[%s]: partial scan time %.2f seconds for %,d ranges", store.name(), (t2 - t1) / 1000., ranges.size())); @@ -315,20 +312,49 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { } } + private TabletManagementParameters createTabletManagementParameters() { + + HashMap<Ample.DataLevel,Boolean> parentLevelUpgrade = new HashMap<>(); + UpgradeCoordinator.UpgradeStatus upgradeStatus = manager.getUpgradeStatus(); + for (var level : Ample.DataLevel.values()) { + parentLevelUpgrade.put(level, upgradeStatus.isParentLevelUpgraded(level)); + } + + Set<TServerInstance> shutdownServers; + if (store.getLevel() == Ample.DataLevel.USER) { + shutdownServers = manager.shutdownServers(); + } else { + // Use the servers to shutdown filtered by the dependent watcher. These are servers to + // shutdown that the dependent watcher has determined it has no tablets hosted on or assigned + // to. + shutdownServers = dependentWatcher.getFilteredServersToShutdown(); + } + + var tServersSnapshot = manager.tserversSnapshot(); + + return new TabletManagementParameters(manager.getManagerState(), parentLevelUpgrade, + manager.onlineTables(), tServersSnapshot, shutdownServers, manager.migrationsSnapshot(), + store.getLevel(), manager.getCompactionHints(), canSuspendTablets()); + } + + private Set<TServerInstance> getFilteredServersToShutdown() { + return filteredServersToShutdown; + } + private static class TableMgmtStats { int[] counts = new int[TabletState.values().length]; private int totalUnloaded; } private TableMgmtStats manageTablets(Iterator<TabletManagement> iter, - SortedMap<TServerInstance,TabletServerStatus> currentTServers, - Map<String,Set<TServerInstance>> tserverGroups, boolean isFullScan) + TabletManagementParameters tableMgmtParams, + SortedMap<TServerInstance,TabletServerStatus> currentTServers, boolean isFullScan) throws BadLocationStateException, TException, DistributedStoreException, WalMarkerException, IOException { TableMgmtStats tableMgmtStats = new TableMgmtStats(); final boolean shuttingDownAllTabletServers = - manager.serversToShutdown.equals(currentTServers.keySet()); + tableMgmtParams.getServersToShutdown().equals(currentTServers.keySet()); if (shuttingDownAllTabletServers && !isFullScan) { // If we are shutting down all of the TabletServers, then don't process any events // from the EventCoordinator. @@ -338,16 +364,14 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { int unloaded = 0; - TabletLists tLists = new TabletLists(manager, currentTServers, tserverGroups); + TabletLists tLists = new TabletLists(currentTServers, tableMgmtParams.getGroupedTServers(), + tableMgmtParams.getServersToShutdown()); CompactionJobGenerator compactionGenerator = new CompactionJobGenerator( - new ServiceEnvironmentImpl(manager.getContext()), manager.getCompactionHints()); + new ServiceEnvironmentImpl(manager.getContext()), tableMgmtParams.getCompactionHints()); - final Map<TabletServerId,String> resourceGroups = new HashMap<>(); - tserverGroups.forEach((group, tservers) -> { - tservers.stream().map(TabletServerIdImpl::new) - .forEach(tabletServerId -> resourceGroups.put(tabletServerId, group)); - }); + Set<TServerInstance> filteredServersToShutdown = + new HashSet<>(tableMgmtParams.getServersToShutdown()); while (iter.hasNext()) { final TabletManagement mti = iter.next(); @@ -391,9 +415,12 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { final TableConfiguration tableConf = manager.getContext().getTableConfiguration(tableId); - TabletGoalState goal = manager.getGoalState(tm); - TabletState state = - TabletState.compute(tm, currentTServers.keySet(), manager.tabletBalancer, resourceGroups); + final TabletState state = TabletState.compute(tm, currentTServers.keySet()); + // This is final because nothing in this method should change the goal. All computation of the + // goal should be done in TabletGoalState.compute() so that all parts of the Accumulo code + // will compute a consistent goal. + final TabletGoalState goal = + TabletGoalState.compute(tm, state, manager.tabletBalancer, tableMgmtParams); final Location location = tm.getLocation(); Location current = null; @@ -411,68 +438,14 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { stats.update(tableId, state); } - // Always follow through with assignments - if (state == TabletState.ASSIGNED) { - goal = TabletGoalState.HOSTED; - } else if (state == TabletState.NEEDS_REASSIGNMENT) { - goal = TabletGoalState.UNASSIGNED; - } - - if (tm.getOperationId() != null) { - // If there are still wals the tablet needs to be hosted - // to process the wals before starting the merge op - if (!tm.getLogs().isEmpty() - && tm.getOperationId().getType() == TabletOperationType.MERGING) { - goal = TabletGoalState.HOSTED; - } else { - goal = TabletGoalState.UNASSIGNED; - } - } - if (Manager.log.isTraceEnabled()) { Manager.log.trace( "[{}] Shutting down all Tservers: {}, dependentCount: {} Extent: {}, state: {}, goal: {} actions:{}", - store.name(), manager.serversToShutdown.equals(currentTServers.keySet()), + store.name(), tableMgmtParams.getServersToShutdown().equals(currentTServers.keySet()), dependentWatcher == null ? "null" : dependentWatcher.assignedOrHosted(), tm.getExtent(), state, goal, actions); } - // if we are shutting down all the tabletservers, we have to do it in order - if (shuttingDownAllTabletServers - && (goal == TabletGoalState.SUSPENDED && state == TabletState.HOSTED)) { - if (dependentWatcher != null) { - // If the dependentWatcher is for the user tables, check to see - // that user tables exist. - DataLevel dependentLevel = dependentWatcher.store.getLevel(); - boolean userTablesExist = true; - switch (dependentLevel) { - case USER: - Set<TableId> onlineTables = manager.onlineTables(); - onlineTables.remove(RootTable.ID); - onlineTables.remove(MetadataTable.ID); - userTablesExist = !onlineTables.isEmpty(); - break; - case METADATA: - case ROOT: - default: - break; - } - // If the stats object in the dependentWatcher is empty, then it - // currently does not have data about what is hosted or not. In - // that case host these tablets until the dependent watcher can - // gather some data. - final Map<TableId,TableCounts> stats = dependentWatcher.getStats(); - if (dependentLevel == DataLevel.USER) { - if (userTablesExist - && (stats == null || stats.isEmpty() || assignedOrHosted(stats) > 0)) { - goal = TabletGoalState.HOSTED; - } - } else if (stats == null || stats.isEmpty() || assignedOrHosted(stats) > 0) { - goal = TabletGoalState.HOSTED; - } - } - } - if (actions.contains(ManagementAction.NEEDS_SPLITTING)) { LOG.debug("{} may need splitting.", tm.getExtent()); if (manager.getSplitter().isSplittable(tm)) { @@ -502,6 +475,11 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { // metadata scan could remove any tablets that were not updated during the scan. if (actions.contains(ManagementAction.NEEDS_LOCATION_UPDATE)) { + + if (tm.getLocation() != null) { + filteredServersToShutdown.remove(tm.getLocation().getServerInstance()); + } + if (goal == TabletGoalState.HOSTED) { if ((state != TabletState.HOSTED && !tm.getLogs().isEmpty()) && manager.recoveryManager.recoverLogs(tm.getExtent(), tm.getLogs())) { @@ -544,7 +522,6 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { case ASSIGNED_TO_DEAD_SERVER: unassignDeadTablet(tLists, tm); break; - case NEEDS_REASSIGNMENT: case HOSTED: TServerConnection client = manager.tserverSet.getConnection(location.getServerInstance()); @@ -568,14 +545,19 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { } flushChanges(tLists); + + if (isFullScan) { + this.filteredServersToShutdown = Set.copyOf(filteredServersToShutdown); + } + return tableMgmtStats; } private SortedMap<TServerInstance,TabletServerStatus> - getTserversStatus(Set<TServerInstance> currentServers) { + getCurrentTservers(Set<TServerInstance> onlineTservers) { // Get the current status for the current list of tservers final SortedMap<TServerInstance,TabletServerStatus> currentTServers = new TreeMap<>(); - for (TServerInstance entry : currentServers) { + for (TServerInstance entry : onlineTservers) { currentTServers.put(entry, manager.tserverStatus.get(entry)); } return currentTServers; @@ -594,8 +576,8 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { final long waitTimeBetweenScans = manager.getConfiguration() .getTimeInMillis(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL); - LiveTServerSet.LiveTServersSnapshot tservers = manager.tserverSet.getSnapshot(); - var currentTServers = getTserversStatus(tservers.getTservers()); + TabletManagementParameters tableMgmtParams = createTabletManagementParameters(); + var currentTServers = getCurrentTservers(tableMgmtParams.getOnlineTsevers()); ClosableIterator<TabletManagement> iter = null; try { @@ -609,15 +591,14 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { stats.begin(); - ManagerState managerState = manager.getManagerState(); + ManagerState managerState = tableMgmtParams.getManagerState(); // Clear the need for a full scan before starting a full scan inorder to detect events that // happen during the full scan. eventHandler.clearNeedsFullScan(); - iter = store.iterator(); - var tabletMgmtStats = - manageTablets(iter, currentTServers, tservers.getTserverGroups(), true); + iter = store.iterator(tableMgmtParams); + var tabletMgmtStats = manageTablets(iter, tableMgmtParams, currentTServers, true); // provide stats after flushing changes to avoid race conditions w/ delete table stats.end(managerState); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java index c0d60c8234..0b0512b072 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java @@ -18,6 +18,10 @@ */ package org.apache.accumulo.manager.tableOps.delete; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND; + import java.io.IOException; import java.net.UnknownHostException; import java.util.Arrays; @@ -27,17 +31,14 @@ import java.util.Set; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.iterators.user.GrepIterator; -import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletState; @@ -49,7 +50,6 @@ import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; import org.apache.accumulo.manager.tableOps.Utils; import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.manager.state.TabletManagementIterator; import org.apache.accumulo.server.problems.ProblemReports; import org.apache.accumulo.server.util.MetadataTableUtil; import org.apache.hadoop.fs.Path; @@ -92,23 +92,20 @@ class CleanUp extends ManagerRepo { } boolean done = true; - Range tableRange = new KeyExtent(tableId, null, null).toMetaRange(); - Scanner scanner = manager.getContext().createScanner(MetadataTable.NAME, Authorizations.EMPTY); - TabletManagementIterator.configureScanner(scanner, manager); - scanner.setRange(tableRange); - - for (Entry<Key,Value> entry : scanner) { - final TabletManagement mti = TabletManagementIterator.decode(entry); - final TabletMetadata tm = mti.getTabletMetadata(); + + try (var tablets = manager.getContext().getAmple().readTablets().forTable(tableId) + .fetch(LOCATION, PREV_ROW, SUSPEND).checkConsistency().build()) { Set<TServerInstance> liveTServers = manager.onlineTabletServers(); - TabletState state = TabletState.compute(tm, liveTServers); - if (!state.equals(TabletState.UNASSIGNED)) { - // This code will even wait on tablets that are assigned to dead tablets servers. This is - // intentional because the manager may make metadata writes for these tablets. See #587 - log.debug("Still waiting for table({}) to be deleted; Target tablet state: UNASSIGNED, " - + "Current tablet state: {}, locationState: {}", tableId, state, tm); - done = false; - break; + for (TabletMetadata tm : tablets) { + TabletState state = TabletState.compute(tm, liveTServers); + if (!state.equals(TabletState.UNASSIGNED)) { + // This code will even wait on tablets that are assigned to dead tablets servers. This is + // intentional because the manager may make metadata writes for these tablets. See #587 + log.debug("Still waiting for table({}) to be deleted; Target tablet state: UNASSIGNED, " + + "Current tablet state: {}, locationState: {}", tableId, state, tm); + done = false; + break; + } } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java index 6413e83c14..0cc31fcaa9 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/merge/DeleteRows.java @@ -99,7 +99,6 @@ public class DeleteRows extends ManagerRepo { for (var tabletMetadata : tabletsMetadata) { validateTablet(tabletMetadata, fateStr, opid, data.tableId); - var tabletMutator = tabletsMutator.mutateTablet(tabletMetadata.getExtent()) .requireOperation(opid).requireAbsentLocation(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java index 32ce0c2934..dedda83872 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java @@ -32,9 +32,9 @@ import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.ReadOnlyTStore; import org.apache.accumulo.core.fate.ZooStore; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.volume.Volume; import org.apache.accumulo.manager.EventCoordinator; @@ -59,7 +59,7 @@ public class UpgradeCoordinator { */ INITIAL { @Override - public boolean isParentLevelUpgraded(KeyExtent extent) { + public boolean isParentLevelUpgraded(Ample.DataLevel level) { return false; } }, @@ -68,8 +68,8 @@ public class UpgradeCoordinator { */ UPGRADED_ZOOKEEPER { @Override - public boolean isParentLevelUpgraded(KeyExtent extent) { - return extent.isRootTablet(); + public boolean isParentLevelUpgraded(Ample.DataLevel level) { + return level == Ample.DataLevel.ROOT; } }, /** @@ -77,8 +77,8 @@ public class UpgradeCoordinator { */ UPGRADED_ROOT { @Override - public boolean isParentLevelUpgraded(KeyExtent extent) { - return extent.isMeta(); + public boolean isParentLevelUpgraded(Ample.DataLevel level) { + return level == Ample.DataLevel.METADATA || level == Ample.DataLevel.ROOT; } }, /** @@ -86,7 +86,7 @@ public class UpgradeCoordinator { */ COMPLETE { @Override - public boolean isParentLevelUpgraded(KeyExtent extent) { + public boolean isParentLevelUpgraded(Ample.DataLevel level) { return true; } }, @@ -95,7 +95,7 @@ public class UpgradeCoordinator { */ FAILED { @Override - public boolean isParentLevelUpgraded(KeyExtent extent) { + public boolean isParentLevelUpgraded(Ample.DataLevel level) { return false; } }; @@ -104,7 +104,7 @@ public class UpgradeCoordinator { * Determines if the place where this extent stores its metadata was upgraded for a given * upgrade status. */ - public abstract boolean isParentLevelUpgraded(KeyExtent extent); + public abstract boolean isParentLevelUpgraded(Ample.DataLevel level); } private static final Logger log = LoggerFactory.getLogger(UpgradeCoordinator.class); diff --git a/server/monitor/pom.xml b/server/monitor/pom.xml index c52fe1fb62..49534f0d1e 100644 --- a/server/monitor/pom.xml +++ b/server/monitor/pom.xml @@ -80,10 +80,6 @@ <groupId>org.apache.accumulo</groupId> <artifactId>accumulo-start</artifactId> </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client-api</artifactId> - </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/tables/TablesResource.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/tables/TablesResource.java index 39a6b51cf7..ccc5eb7ca5 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/tables/TablesResource.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/tables/TablesResource.java @@ -36,23 +36,20 @@ import jakarta.ws.rs.PathParam; import jakarta.ws.rs.Produces; import jakarta.ws.rs.core.MediaType; -import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo; import org.apache.accumulo.core.manager.thrift.TableInfo; import org.apache.accumulo.core.manager.thrift.TabletServerStatus; -import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.monitor.Monitor; import org.apache.accumulo.monitor.rest.tservers.TabletServer; import org.apache.accumulo.monitor.rest.tservers.TabletServers; -import org.apache.accumulo.server.manager.state.TabletManagementScanner; import org.apache.accumulo.server.tables.TableManager; import org.apache.accumulo.server.util.TableInfoUtil; -import org.apache.hadoop.io.Text; /** * Generates a tables list from the Monitor as a JSON object @@ -145,25 +142,20 @@ public class TablesResource { if (RootTable.ID.equals(tableId)) { locs.add(rootTabletLocation); } else { - String systemTableName = - MetadataTable.ID.equals(tableId) ? RootTable.NAME : MetadataTable.NAME; - TabletManagementScanner scanner = new TabletManagementScanner(monitor.getContext(), - new Range(TabletsSection.encodeRow(tableId, new Text()), - TabletsSection.encodeRow(tableId, null)), - systemTableName); - - while (scanner.hasNext()) { - final TabletMetadata tm = scanner.next().getTabletMetadata(); - if (tm.hasCurrent()) { - try { - locs.add(tm.getLocation().getHostPort()); - } catch (Exception ex) { - scanner.close(); - return tabletServers; + var level = Ample.DataLevel.of(tableId); + try (TabletsMetadata tablets = + monitor.getContext().getAmple().readTablets().forLevel(level).build()) { + + for (TabletMetadata tm : tablets) { + if (tm.hasCurrent()) { + try { + locs.add(tm.getLocation().getHostPort()); + } catch (Exception ex) { + return tabletServers; + } } } } - scanner.close(); } List<TabletServerStatus> tservers = new ArrayList<>(); diff --git a/test/src/main/java/org/apache/accumulo/test/LocatorIT.java b/test/src/main/java/org/apache/accumulo/test/LocatorIT.java index fa301cb5ae..3bad4dbd45 100644 --- a/test/src/main/java/org/apache/accumulo/test/LocatorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/LocatorIT.java @@ -52,10 +52,8 @@ import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.test.functional.ManagerAssignmentIT; import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.io.Text; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -@Disabled // ELASTICITY_TODO public class LocatorIT extends AccumuloClusterHarness { @Override @@ -129,10 +127,8 @@ public class LocatorIT extends AccumuloClusterHarness { ranges.clear(); tableOps.setTabletHostingGoal(tableName, new Range(), TabletHostingGoal.ALWAYS); - Wait.waitFor( - () -> alwaysHostedAndCurrentNotNull.test( - ManagerAssignmentIT.getManagerTabletInfo(client, tableId, null).getTabletMetadata()), - 60000, 250); + Wait.waitFor(() -> alwaysHostedAndCurrentNotNull + .test(ManagerAssignmentIT.getTabletMetadata(client, tableId, null)), 60000, 250); ranges.add(r1); Locations ret = tableOps.locate(tableName, ranges); @@ -147,13 +143,8 @@ public class LocatorIT extends AccumuloClusterHarness { splits.add(new Text("r")); tableOps.addSplits(tableName, splits); - // ELASTICITY_TODO split does not set hosting goal, so this throws exception - assertThrows(AccumuloException.class, () -> tableOps.locate(tableName, ranges)); - tableOps.setTabletHostingGoal(tableName, new Range(), TabletHostingGoal.ALWAYS); - Wait.waitFor( - () -> alwaysHostedAndCurrentNotNull.test( - ManagerAssignmentIT.getManagerTabletInfo(client, tableId, null).getTabletMetadata()), - 60000, 250); + Wait.waitFor(() -> alwaysHostedAndCurrentNotNull + .test(ManagerAssignmentIT.getTabletMetadata(client, tableId, null)), 60000, 250); ret = tableOps.locate(tableName, ranges); assertContains(ret, tservers, Map.of(r1, Set.of(t2), r2, Set.of(t2, t3)), @@ -165,15 +156,9 @@ public class LocatorIT extends AccumuloClusterHarness { tableOps.online(tableName, true); - // ELASTICITY_TODO Split does not set hosting goal - tableOps.setTabletHostingGoal(tableName, new Range(), TabletHostingGoal.ALWAYS); - - // TabletGroupWatcher interval set to 5s - Thread.sleep(7000); - Wait.waitFor( - () -> alwaysHostedAndCurrentNotNull.test(ManagerAssignmentIT - .getManagerTabletInfo(client, tableId, new Text("r")).getTabletMetadata()), + () -> alwaysHostedAndCurrentNotNull + .test(ManagerAssignmentIT.getTabletMetadata(client, tableId, new Text("r"))), 60000, 250); ArrayList<Range> ranges2 = new ArrayList<>(); diff --git a/test/src/main/java/org/apache/accumulo/test/ManagerRepairsDualAssignmentIT.java b/test/src/main/java/org/apache/accumulo/test/ManagerRepairsDualAssignmentIT.java index c4094cfe08..502921e975 100644 --- a/test/src/main/java/org/apache/accumulo/test/ManagerRepairsDualAssignmentIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ManagerRepairsDualAssignmentIT.java @@ -26,15 +26,14 @@ import java.util.HashSet; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.TabletHostingGoal; -import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; @@ -45,8 +44,6 @@ import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.server.ServerContext; -import org.apache.accumulo.server.manager.state.ClosableIterator; -import org.apache.accumulo.server.manager.state.TabletStateStore; import org.apache.accumulo.test.functional.ConfigurableMacBase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.RawLocalFileSystem; @@ -74,7 +71,6 @@ public class ManagerRepairsDualAssignmentIT extends ConfigurableMacBase { public void test() throws Exception { // make some tablets, spread 'em around try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { - ClientContext context = (ClientContext) c; ServerContext serverContext = cluster.getServerContext(); String table = this.getUniqueNames(1)[0]; c.securityOperations().grantTablePermission("root", MetadataTable.NAME, @@ -90,16 +86,19 @@ public class ManagerRepairsDualAssignmentIT extends ConfigurableMacBase { // scan the metadata table and get the two table location states Set<TabletMetadata.Location> states = new HashSet<>(); Set<TabletMetadata> oldLocations = new HashSet<>(); - TabletStateStore store = TabletStateStore.getStoreForLevel(DataLevel.USER, serverContext); while (states.size() < 2) { UtilWaitThread.sleep(250); oldLocations.clear(); - for (TabletManagement mti : store) { - if (mti.getTabletMetadata().hasCurrent()) { - states.add(mti.getTabletMetadata().getLocation()); - oldLocations.add(mti.getTabletMetadata()); - } + try ( + var tablets = serverContext.getAmple().readTablets().forLevel(DataLevel.USER).build()) { + tablets.iterator().forEachRemaining(tm -> { + if (tm.hasCurrent()) { + states.add(tm.getLocation()); + oldLocations.add(tm); + } + }); } + } assertEquals(2, states.size()); // Kill a tablet server... we don't care which one... wait for everything to be reassigned @@ -109,16 +108,19 @@ public class ManagerRepairsDualAssignmentIT extends ConfigurableMacBase { while (true) { UtilWaitThread.sleep(1000); states.clear(); - boolean allAssigned = true; - for (TabletManagement mti : store) { - if (mti.getTabletMetadata().hasCurrent()) { - states.add(mti.getTabletMetadata().getLocation()); - } else { - allAssigned = false; - } + AtomicBoolean allAssigned = new AtomicBoolean(true); + try ( + var tablets = serverContext.getAmple().readTablets().forLevel(DataLevel.USER).build()) { + tablets.iterator().forEachRemaining(tm -> { + if (tm.hasCurrent()) { + states.add(tm.getLocation()); + } else { + allAssigned.set(false); + } + }); } System.out.println(states + " size " + states.size() + " allAssigned " + allAssigned); - if (states.size() != 2 && allAssigned) { + if (states.size() != 2 && allAssigned.get()) { break; } } @@ -136,20 +138,20 @@ public class ManagerRepairsDualAssignmentIT extends ConfigurableMacBase { tabletMutator.putLocation(moved.getLocation()); tabletMutator.mutate(); // wait for the manager to fix the problem - waitForCleanStore(store); + waitForCleanStore(serverContext, DataLevel.USER); // now jam up the metadata table tabletMutator = serverContext.getAmple().mutateTablet(new KeyExtent(MetadataTable.ID, null, null)); tabletMutator.putLocation(moved.getLocation()); tabletMutator.mutate(); - waitForCleanStore(TabletStateStore.getStoreForLevel(DataLevel.METADATA, serverContext)); + waitForCleanStore(serverContext, DataLevel.METADATA); } } - private void waitForCleanStore(TabletStateStore store) { + private void waitForCleanStore(ServerContext serverContext, DataLevel level) { while (true) { - try (ClosableIterator<TabletManagement> iter = store.iterator()) { - iter.forEachRemaining(t -> {}); + try (var tablets = serverContext.getAmple().readTablets().forLevel(level).build()) { + tablets.iterator().forEachRemaining(t -> {}); } catch (Exception ex) { System.out.println(ex); UtilWaitThread.sleep(250); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AssignLocationModeIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AssignLocationModeIT.java index e742cbbde4..92805e572a 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AssignLocationModeIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AssignLocationModeIT.java @@ -66,7 +66,7 @@ public class AssignLocationModeIT extends ConfigurableMacBase { TabletMetadata newTablet; do { UtilWaitThread.sleep(250); - newTablet = ManagerAssignmentIT.getManagerTabletInfo(c, tableId, null).getTabletMetadata(); + newTablet = ManagerAssignmentIT.getTabletMetadata(c, tableId, null); } while (!newTablet.hasCurrent()); // this would be null if the mode was not "assign" assertEquals(newTablet.getLocation().getHostPort(), newTablet.getLast().getHostPort()); @@ -82,24 +82,21 @@ public class AssignLocationModeIT extends ConfigurableMacBase { .get(Property.TSERV_LAST_LOCATION_MODE.getKey())); // last location should not be set yet - TabletMetadata unflushed = - ManagerAssignmentIT.getManagerTabletInfo(c, tableId, null).getTabletMetadata(); + TabletMetadata unflushed = ManagerAssignmentIT.getTabletMetadata(c, tableId, null); assertEquals(newTablet.getLocation().getHostPort(), unflushed.getLocation().getHostPort()); assertEquals(newTablet.getLocation().getHostPort(), unflushed.getLast().getHostPort()); assertTrue(newTablet.hasCurrent()); // take the tablet offline c.tableOperations().offline(tableName, true); - TabletMetadata offline = - ManagerAssignmentIT.getManagerTabletInfo(c, tableId, null).getTabletMetadata(); + TabletMetadata offline = ManagerAssignmentIT.getTabletMetadata(c, tableId, null); assertNull(offline.getLocation()); assertFalse(offline.hasCurrent()); assertEquals(newTablet.getLocation().getHostPort(), offline.getLast().getHostPort()); // put it back online, should have the same last location c.tableOperations().online(tableName, true); - TabletMetadata online = - ManagerAssignmentIT.getManagerTabletInfo(c, tableId, null).getTabletMetadata(); + TabletMetadata online = ManagerAssignmentIT.getTabletMetadata(c, tableId, null); assertTrue(online.hasCurrent()); assertNotNull(online.getLocation()); assertEquals(newTablet.getLast().getHostPort(), online.getLast().getHostPort()); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactLocationModeIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactLocationModeIT.java index 471aedf566..eb672574f0 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CompactLocationModeIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactLocationModeIT.java @@ -63,7 +63,7 @@ public class CompactLocationModeIT extends ConfigurableMacBase { TabletMetadata newTablet; do { UtilWaitThread.sleep(250); - newTablet = ManagerAssignmentIT.getManagerTabletInfo(c, tableId, null).getTabletMetadata(); + newTablet = ManagerAssignmentIT.getTabletMetadata(c, tableId, null); } while (!newTablet.hasCurrent()); assertNull(newTablet.getLast()); assertNotNull(newTablet.getLocation()); @@ -79,8 +79,7 @@ public class CompactLocationModeIT extends ConfigurableMacBase { .get(Property.TSERV_LAST_LOCATION_MODE.getKey())); // no last location should be set yet - TabletMetadata unflushed = - ManagerAssignmentIT.getManagerTabletInfo(c, tableId, null).getTabletMetadata(); + TabletMetadata unflushed = ManagerAssignmentIT.getTabletMetadata(c, tableId, null); assertEquals(newTablet.getLocation().getHostPort(), unflushed.getLocation().getHostPort()); assertNull(unflushed.getLast()); assertTrue(newTablet.hasCurrent()); @@ -88,23 +87,20 @@ public class CompactLocationModeIT extends ConfigurableMacBase { // This should give it a last location if the mode is being used correctly c.tableOperations().flush(tableName, null, null, true); - TabletMetadata flushed = - ManagerAssignmentIT.getManagerTabletInfo(c, tableId, null).getTabletMetadata(); + TabletMetadata flushed = ManagerAssignmentIT.getTabletMetadata(c, tableId, null); assertEquals(newTablet.getLocation().getHostPort(), flushed.getLocation().getHostPort()); assertEquals(flushed.getLocation().getHostPort(), flushed.getLast().getHostPort()); assertTrue(newTablet.hasCurrent()); // take the tablet offline c.tableOperations().offline(tableName, true); - TabletMetadata offline = - ManagerAssignmentIT.getManagerTabletInfo(c, tableId, null).getTabletMetadata(); + TabletMetadata offline = ManagerAssignmentIT.getTabletMetadata(c, tableId, null); assertNull(offline.getLocation()); assertEquals(flushed.getLocation().getHostPort(), offline.getLast().getHostPort()); // put it back online, should have the same last location c.tableOperations().online(tableName, true); - TabletMetadata online = - ManagerAssignmentIT.getManagerTabletInfo(c, tableId, null).getTabletMetadata(); + TabletMetadata online = ManagerAssignmentIT.getTabletMetadata(c, tableId, null); assertTrue(online.hasCurrent()); assertNotNull(online.getLocation()); assertEquals(offline.getLast().getHostPort(), online.getLast().getHostPort()); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java index 0f82311d7a..739add9599 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ManagerAssignmentIT.java @@ -63,7 +63,6 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.lock.ServiceLock; -import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.schema.Ample; @@ -79,7 +78,6 @@ import org.apache.accumulo.core.tabletserver.thrift.TabletStats; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.minicluster.ServerType; -import org.apache.accumulo.server.manager.state.TabletManagementScanner; import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.BeforeAll; @@ -133,10 +131,8 @@ public class ManagerAssignmentIT extends SharedMiniClusterBase { // wait for the tablet to exist in the metadata table. The tablet // will not be hosted so the current location will be empty. - Wait.waitFor( - () -> getManagerTabletInfo(c, tableId, null).getTabletMetadata().getExtent() != null, - 10000, 250); - TabletMetadata newTablet = getManagerTabletInfo(c, tableId, null).getTabletMetadata(); + Wait.waitFor(() -> getTabletMetadata(c, tableId, null) != null, 10000, 250); + TabletMetadata newTablet = getTabletMetadata(c, tableId, null); assertNotNull(newTablet.getExtent()); assertFalse(newTablet.hasCurrent()); assertNull(newTablet.getLast()); @@ -152,7 +148,7 @@ public class ManagerAssignmentIT extends SharedMiniClusterBase { // give it a last location c.tableOperations().flush(tableName, null, null, true); - TabletMetadata flushed = getManagerTabletInfo(c, tableId, null).getTabletMetadata(); + TabletMetadata flushed = getTabletMetadata(c, tableId, null); assertTrue(flushed.hasCurrent()); assertNotNull(flushed.getLocation()); assertEquals(flushed.getLocation().getHostPort(), flushed.getLast().getHostPort()); @@ -161,7 +157,7 @@ public class ManagerAssignmentIT extends SharedMiniClusterBase { // take the tablet offline c.tableOperations().offline(tableName, true); - TabletMetadata offline = getManagerTabletInfo(c, tableId, null).getTabletMetadata(); + TabletMetadata offline = getTabletMetadata(c, tableId, null); assertFalse(offline.hasCurrent()); assertNull(offline.getLocation()); assertEquals(flushed.getLocation().getHostPort(), offline.getLast().getHostPort()); @@ -169,7 +165,7 @@ public class ManagerAssignmentIT extends SharedMiniClusterBase { // put it back online c.tableOperations().online(tableName, true); - TabletMetadata online = getManagerTabletInfo(c, tableId, null).getTabletMetadata(); + TabletMetadata online = getTabletMetadata(c, tableId, null); assertTrue(online.hasCurrent()); assertNotNull(online.getLocation()); assertEquals(online.getLocation().getHostPort(), online.getLast().getHostPort()); @@ -181,10 +177,10 @@ public class ManagerAssignmentIT extends SharedMiniClusterBase { Predicate<TabletMetadata> alwaysHostedOrCurrentNotNull = t -> (t.getHostingGoal() == TabletHostingGoal.ALWAYS && t.hasCurrent()); - Wait.waitFor(() -> alwaysHostedOrCurrentNotNull - .test(getManagerTabletInfo(c, tableId, null).getTabletMetadata()), 60000, 250); + Wait.waitFor(() -> alwaysHostedOrCurrentNotNull.test(getTabletMetadata(c, tableId, null)), + 60000, 250); - final TabletMetadata always = getManagerTabletInfo(c, tableId, null).getTabletMetadata(); + final TabletMetadata always = getTabletMetadata(c, tableId, null); assertTrue(alwaysHostedOrCurrentNotNull.test(always)); assertTrue(always.hasCurrent()); assertEquals(flushed.getLocation().getHostPort(), always.getLast().getHostPort()); @@ -194,10 +190,10 @@ public class ManagerAssignmentIT extends SharedMiniClusterBase { c.tableOperations().setTabletHostingGoal(tableName, new Range(), TabletHostingGoal.NEVER); Predicate<TabletMetadata> neverHostedOrCurrentNull = t -> (t.getHostingGoal() == TabletHostingGoal.NEVER && !t.hasCurrent()); - Wait.waitFor(() -> neverHostedOrCurrentNull - .test(getManagerTabletInfo(c, tableId, null).getTabletMetadata()), 60000, 250); + Wait.waitFor(() -> neverHostedOrCurrentNull.test(getTabletMetadata(c, tableId, null)), 60000, + 250); - final TabletMetadata never = getManagerTabletInfo(c, tableId, null).getTabletMetadata(); + final TabletMetadata never = getTabletMetadata(c, tableId, null); assertTrue(neverHostedOrCurrentNull.test(never)); assertNull(never.getLocation()); assertEquals(flushed.getLocation().getHostPort(), never.getLast().getHostPort()); @@ -207,10 +203,8 @@ public class ManagerAssignmentIT extends SharedMiniClusterBase { c.tableOperations().setTabletHostingGoal(tableName, new Range(), TabletHostingGoal.ONDEMAND); Predicate<TabletMetadata> ondemandHosted = t -> t.getHostingGoal() == TabletHostingGoal.ONDEMAND; - Wait.waitFor( - () -> ondemandHosted.test(getManagerTabletInfo(c, tableId, null).getTabletMetadata()), - 60000, 250); - final TabletMetadata ondemand = getManagerTabletInfo(c, tableId, null).getTabletMetadata(); + Wait.waitFor(() -> ondemandHosted.test(getTabletMetadata(c, tableId, null)), 60000, 250); + final TabletMetadata ondemand = getTabletMetadata(c, tableId, null); assertTrue(ondemandHosted.test(ondemand)); assertNull(ondemand.getLocation()); assertEquals(flushed.getLocation().getHostPort(), ondemand.getLast().getHostPort()); @@ -621,11 +615,16 @@ public class ManagerAssignmentIT extends SharedMiniClusterBase { } } - public static TabletManagement getManagerTabletInfo(AccumuloClient c, String tableId, - Text endRow) { - try (TabletManagementScanner s = new TabletManagementScanner((ClientContext) c, - new Range(TabletsSection.encodeRow(TableId.of(tableId), endRow)), MetadataTable.NAME)) { - return s.next(); + public static TabletMetadata getTabletMetadata(AccumuloClient c, String tableId, Text endRow) { + var ctx = (ClientContext) c; + try (var tablets = ctx.getAmple().readTablets().forTable(TableId.of(tableId)) + .overlapping(endRow, null).build()) { + var iter = tablets.iterator(); + if (iter.hasNext()) { + return iter.next(); + } } + + return null; } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java index 3b1a688ca8..f8edad4551 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -63,6 +62,7 @@ import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.manager.thrift.ManagerState; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily; @@ -72,8 +72,8 @@ import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.server.manager.LiveTServerSet; -import org.apache.accumulo.server.manager.state.CurrentState; import org.apache.accumulo.server.manager.state.TabletManagementIterator; +import org.apache.accumulo.server.manager.state.TabletManagementParameters; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -123,15 +123,15 @@ public class TabletManagementIteratorIT extends AccumuloClusterHarness { // examine a clone of the metadata table, so we can manipulate it copyTable(client, MetadataTable.NAME, metaCopy1); - State state = new State(client); - int tabletsInFlux = findTabletsNeedingAttention(client, metaCopy1, state); + TabletManagementParameters tabletMgmtParams = createParameters(client); + int tabletsInFlux = findTabletsNeedingAttention(client, metaCopy1, tabletMgmtParams); while (tabletsInFlux > 0) { log.debug("Waiting for {} tablets for {}", tabletsInFlux, metaCopy1); UtilWaitThread.sleep(500); copyTable(client, MetadataTable.NAME, metaCopy1); - tabletsInFlux = findTabletsNeedingAttention(client, metaCopy1, state); + tabletsInFlux = findTabletsNeedingAttention(client, metaCopy1, tabletMgmtParams); } - assertEquals(0, findTabletsNeedingAttention(client, metaCopy1, state), + assertEquals(0, findTabletsNeedingAttention(client, metaCopy1, tabletMgmtParams), "No tables should need attention"); // The metadata table stabilized and metaCopy1 contains a copy suitable for testing. Before @@ -143,30 +143,30 @@ public class TabletManagementIteratorIT extends AccumuloClusterHarness { setTabletHostingGoal(client, metaCopy1, t1, TabletHostingGoal.ALWAYS.name()); // t3 is hosted, setting to never will generate a change to unhost tablets setTabletHostingGoal(client, metaCopy1, t3, TabletHostingGoal.NEVER.name()); - state = new State(client); - assertEquals(4, findTabletsNeedingAttention(client, metaCopy1, state), + tabletMgmtParams = createParameters(client); + assertEquals(4, findTabletsNeedingAttention(client, metaCopy1, tabletMgmtParams), "Should have four tablets with hosting goal changes"); // test the assigned case (no location) removeLocation(client, metaCopy1, t3); - assertEquals(2, findTabletsNeedingAttention(client, metaCopy1, state), + assertEquals(2, findTabletsNeedingAttention(client, metaCopy1, tabletMgmtParams), "Should have two tablets without a loc"); // Test setting the operation id on one of the tablets in table t1. Table t1 has two tablets // w/o a location. Only one should need attention because of the operation id. setOperationId(client, metaCopy1, t1); - assertEquals(1, findTabletsNeedingAttention(client, metaCopy1, state), + assertEquals(1, findTabletsNeedingAttention(client, metaCopy1, tabletMgmtParams), "Should have tablets needing attention because of operation id"); // test the cases where the assignment is to a dead tserver reassignLocation(client, metaCopy2, t3); - assertEquals(1, findTabletsNeedingAttention(client, metaCopy2, state), + assertEquals(1, findTabletsNeedingAttention(client, metaCopy2, tabletMgmtParams), "Only 1 of 2 tablets in table t1 should be returned"); // test the bad tablet location state case (inconsistent metadata) - state = new State(client); + tabletMgmtParams = createParameters(client); addDuplicateLocation(client, metaCopy3, t3); - assertEquals(1, findTabletsNeedingAttention(client, metaCopy3, state), + assertEquals(1, findTabletsNeedingAttention(client, metaCopy3, tabletMgmtParams), "Should have 1 tablet that needs a metadata repair"); // clean up @@ -252,13 +252,12 @@ public class TabletManagementIteratorIT extends AccumuloClusterHarness { deleter.close(); } - private int findTabletsNeedingAttention(AccumuloClient client, String table, State state) - throws TableNotFoundException, IOException { + private int findTabletsNeedingAttention(AccumuloClient client, String table, + TabletManagementParameters tabletMgmtParams) throws TableNotFoundException, IOException { int results = 0; List<KeyExtent> resultList = new ArrayList<>(); try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) { - TabletManagementIterator.configureScanner(scanner, state); - log.debug("Current state = {}", state); + TabletManagementIterator.configureScanner(scanner, tabletMgmtParams); scanner.updateScanIteratorOption("tabletChange", "debug", "1"); for (Entry<Key,Value> e : scanner) { if (e != null) { @@ -345,71 +344,29 @@ public class TabletManagementIteratorIT extends AccumuloClusterHarness { } } - private static class State implements CurrentState { - - final ClientContext context; - - State(AccumuloClient client) { - this.context = (ClientContext) client; - } - - private Set<TServerInstance> tservers; - private Set<TableId> onlineTables; - - @Override - public Set<TServerInstance> onlineTabletServers() { - HashSet<TServerInstance> tservers = new HashSet<>(); - for (String tserver : context.instanceOperations().getTabletServers()) { - try { - var zPath = ServiceLock.path(ZooUtil.getRoot(context.instanceOperations().getInstanceId()) - + Constants.ZTSERVERS + "/" + tserver); - long sessionId = ServiceLock.getSessionId(context.getZooCache(), zPath); - tservers.add(new TServerInstance(tserver, sessionId)); - } catch (Exception e) { - throw new RuntimeException(e); - } + private static TabletManagementParameters createParameters(AccumuloClient client) { + var context = (ClientContext) client; + Set<TableId> onlineTables = Sets.filter(context.getTableIdToNameMap().keySet(), + tableId -> context.getTableState(tableId) == TableState.ONLINE); + + HashSet<TServerInstance> tservers = new HashSet<>(); + for (String tserver : context.instanceOperations().getTabletServers()) { + try { + var zPath = ServiceLock.path(ZooUtil.getRoot(context.instanceOperations().getInstanceId()) + + Constants.ZTSERVERS + "/" + tserver); + long sessionId = ServiceLock.getSessionId(context.getZooCache(), zPath); + tservers.add(new TServerInstance(tserver, sessionId)); + } catch (Exception e) { + throw new RuntimeException(e); } - this.tservers = Collections.unmodifiableSet(tservers); - return tservers; } - @Override - public LiveTServerSet.LiveTServersSnapshot tserversSnapshot() { - return new LiveTServerSet.LiveTServersSnapshot(onlineTabletServers(), new HashMap<>()); - } - - @Override - public Set<TableId> onlineTables() { - Set<TableId> onlineTables = context.getTableIdToNameMap().keySet(); - this.onlineTables = - Sets.filter(onlineTables, tableId -> context.getTableState(tableId) == TableState.ONLINE); - return this.onlineTables; - } - - @Override - public Set<KeyExtent> migrationsSnapshot() { - return Collections.emptySet(); - } - - @Override - public Set<TServerInstance> shutdownServers() { - return Collections.emptySet(); - } - - @Override - public ManagerState getManagerState() { - return ManagerState.NORMAL; - } - - @Override - public Map<Long,Map<String,String>> getCompactionHints() { - return Map.of(); - } - - @Override - public String toString() { - return "tservers: " + tservers + " onlineTables: " + onlineTables; - } + return new TabletManagementParameters(ManagerState.NORMAL, + Map.of( + Ample.DataLevel.ROOT, true, Ample.DataLevel.USER, true, Ample.DataLevel.METADATA, true), + onlineTables, + new LiveTServerSet.LiveTServersSnapshot(tservers, + Map.of(Constants.DEFAULT_RESOURCE_GROUP_NAME, tservers)), + Set.of(), Map.of(), Ample.DataLevel.USER, Map.of(), true); } - } diff --git a/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java b/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java index cd183ef364..e8c38a5bc4 100644 --- a/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/manager/SuspendedTabletsIT.java @@ -52,11 +52,11 @@ import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.ClientTabletCache; import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; @@ -65,7 +65,6 @@ import org.apache.accumulo.core.spi.balancer.data.TabletServerId; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.miniclusterImpl.ProcessReference; -import org.apache.accumulo.server.manager.state.TabletManagementScanner; import org.apache.accumulo.test.functional.ConfigurableMacBase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; @@ -135,11 +134,11 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { // Wait for the balancer to assign all metadata tablets to the chosen server. ClientContext ctx = (ClientContext) client; - TabletLocations tl = TabletLocations.retrieve(ctx, MetadataTable.NAME, RootTable.NAME); + TabletLocations tl = TabletLocations.retrieve(ctx, MetadataTable.NAME); while (tl.hosted.keySet().size() != 1 || !tl.hosted.containsKey(metadataServer)) { log.info("Metadata tablets are not hosted on the correct server. Waiting for balancer..."); Thread.sleep(1000L); - tl = TabletLocations.retrieve(ctx, MetadataTable.NAME, RootTable.NAME); + tl = TabletLocations.retrieve(ctx, MetadataTable.NAME); } log.info("Metadata tablets are now hosted on {}", metadataServer); } @@ -403,11 +402,6 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { public static TabletLocations retrieve(final ClientContext ctx, final String tableName) throws Exception { - return retrieve(ctx, tableName, MetadataTable.NAME); - } - - public static TabletLocations retrieve(final ClientContext ctx, final String tableName, - final String metaName) throws Exception { int sleepTime = 200; int remainingAttempts = 30; @@ -415,7 +409,7 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { try { FutureTask<TabletLocations> tlsFuture = new FutureTask<>(() -> { TabletLocations answer = new TabletLocations(); - answer.scan(ctx, tableName, metaName); + answer.scan(ctx, tableName); return answer; }); THREAD_POOL.execute(tlsFuture); @@ -434,12 +428,14 @@ public class SuspendedTabletsIT extends ConfigurableMacBase { } } - private void scan(ClientContext ctx, String tableName, String metaName) { + private void scan(ClientContext ctx, String tableName) { Map<String,String> idMap = ctx.tableOperations().tableIdMap(); String tableId = Objects.requireNonNull(idMap.get(tableName)); - try (var scanner = new TabletManagementScanner(ctx, new Range(), metaName)) { + var level = Ample.DataLevel.of(TableId.of(tableId)); + try (var tablets = ctx.getAmple().readTablets().forLevel(level).build()) { + var scanner = tablets.iterator(); while (scanner.hasNext()) { - final TabletMetadata tm = scanner.next().getTabletMetadata(); + final TabletMetadata tm = scanner.next(); final KeyExtent ke = tm.getExtent(); if (!tm.getTableId().canonical().equals(tableId)) { diff --git a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java index 63175fb26c..af8f9ddd02 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java @@ -53,7 +53,6 @@ import org.apache.accumulo.core.dataImpl.thrift.TSummaries; import org.apache.accumulo.core.dataImpl.thrift.TSummaryRequest; import org.apache.accumulo.core.dataImpl.thrift.UpdateErrors; import org.apache.accumulo.core.manager.thrift.TabletServerStatus; -import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.TabletMetadata; @@ -73,7 +72,6 @@ import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.client.ClientServiceHandler; import org.apache.accumulo.server.manager.state.Assignment; -import org.apache.accumulo.server.manager.state.TabletManagementScanner; import org.apache.accumulo.server.manager.state.TabletStateStore; import org.apache.accumulo.server.rpc.TServerUtils; import org.apache.accumulo.server.rpc.ThriftProcessorTypes; @@ -311,12 +309,13 @@ public class NullTserver { // read the locations for the table Range tableRange = new KeyExtent(tableId, null, null).toMetaRange(); List<Assignment> assignments = new ArrayList<>(); - try (var s = new TabletManagementScanner(context, tableRange, MetadataTable.NAME)) { + try (var tablets = context.getAmple().readTablets().forLevel(DataLevel.USER).build()) { long randomSessionID = opts.port; TServerInstance instance = new TServerInstance(addr, randomSessionID); + var s = tablets.iterator(); while (s.hasNext()) { - TabletMetadata next = s.next().getTabletMetadata(); + TabletMetadata next = s.next(); assignments.add(new Assignment(next.getExtent(), instance, next.getLast())); } }