This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new b01528f665 Adds needsReassignment to balancer (#3603) b01528f665 is described below commit b01528f665926a9cbfd5e53200467d50e0df4c54 Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri Jul 28 12:53:43 2023 -0400 Adds needsReassignment to balancer (#3603) Adds a new method to the balancer that can be used to efficiently detect when a tablet needs reassignment based on configuration changes in balancer. fixes #3590 --- .../apache/accumulo/core/metadata/TabletState.java | 81 +++++++++++++++++- .../core/metadata/schema/TabletMetadata.java | 46 ----------- .../spi/balancer/HostRegexTableLoadBalancer.java | 23 ++++++ .../core/spi/balancer/TableLoadBalancer.java | 27 +++--- .../accumulo/core/spi/balancer/TabletBalancer.java | 50 ++++++++++-- .../core/metadata/schema/TabletMetadataTest.java | 10 +-- .../core/spi/balancer/TableLoadBalancerTest.java | 95 ++++++++++++++++++++++ .../manager/state/TabletManagementIterator.java | 15 ++-- .../accumulo/server/util/FindOfflineTablets.java | 5 +- .../server/util/ListOnlineOnDemandTablets.java | 5 +- .../accumulo/gc/GarbageCollectWriteAheadLogs.java | 5 +- .../java/org/apache/accumulo/manager/Manager.java | 3 +- .../accumulo/manager/TabletGroupWatcher.java | 17 ++-- .../apache/accumulo/manager/state/MergeStats.java | 18 ++-- .../accumulo/manager/tableOps/delete/CleanUp.java | 5 +- .../shell/commands/ListTabletsCommand.java | 3 +- .../apache/accumulo/test/manager/MergeStateIT.java | 4 +- 17 files changed, 312 insertions(+), 100 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/TabletState.java b/core/src/main/java/org/apache/accumulo/core/metadata/TabletState.java index 0d4b74d060..4be39cf906 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/TabletState.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/TabletState.java @@ -18,6 +18,85 @@ */ package org.apache.accumulo.core.metadata; +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.data.TabletId; +import org.apache.accumulo.core.dataImpl.TabletIdImpl; +import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.spi.balancer.TabletBalancer; +import org.apache.accumulo.core.spi.balancer.data.TabletServerId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public enum TabletState { - UNASSIGNED, ASSIGNED, HOSTED, ASSIGNED_TO_DEAD_SERVER, SUSPENDED, ASSIGNED_TO_WRONG_GROUP + UNASSIGNED, ASSIGNED, HOSTED, ASSIGNED_TO_DEAD_SERVER, SUSPENDED, NEEDS_REASSIGNMENT; + + private static Logger log = LoggerFactory.getLogger(TabletState.class); + + public static TabletState compute(TabletMetadata tm, Set<TServerInstance> liveTServers) { + return compute(tm, liveTServers, null, null); + } + + public static TabletState compute(TabletMetadata tm, Set<TServerInstance> liveTServers, + TabletBalancer balancer, Map<TabletServerId,String> tserverGroups) { + TabletMetadata.Location current = null; + TabletMetadata.Location future = null; + if (tm.hasCurrent()) { + current = tm.getLocation(); + } else { + future = tm.getLocation(); + } + if (future != null) { + return liveTServers.contains(future.getServerInstance()) ? TabletState.ASSIGNED + : TabletState.ASSIGNED_TO_DEAD_SERVER; + } else if (current != null) { + if (liveTServers.contains(current.getServerInstance())) { + if (balancer != null) { + var tsii = new TabletServerIdImpl(current.getServerInstance()); + var resourceGroup = tserverGroups.get(tsii); + + if (resourceGroup != null) { + var reassign = balancer.needsReassignment(new TabletBalancer.CurrentAssignment() { + @Override + public TabletId getTablet() { + return new TabletIdImpl(tm.getExtent()); + } + + @Override + public TabletServerId getTabletServer() { + return tsii; + } + + @Override + public String getResourceGroup() { + return resourceGroup; + } + }); + + if (reassign) { + return TabletState.NEEDS_REASSIGNMENT; + } + } else { + // A tablet server should always have a resource group, however there is a race + // conditions where the resource group map was read before a tablet server came into + // existence. Another possible cause for an absent resource group is a bug in accumulo. + // In either case do not call the balancer for now with the assumption that the resource + // group will be available later. Log a message in case it is a bug. + log.trace( + "Could not find resource group for tserver {}, so did not consult balancer. Assuming this is a temporary race condition.", + current.getServerInstance()); + } + } + return TabletState.HOSTED; + } else { + return TabletState.ASSIGNED_TO_DEAD_SERVER; + } + } else if (tm.getSuspend() != null) { + return TabletState.SUSPENDED; + } else { + return TabletState.UNASSIGNED; + } + } } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java index 9a05ceb26c..31c1ffaf04 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java @@ -52,18 +52,15 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.dataImpl.TabletIdImpl; import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLockData; -import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.SuspendingTServer; import org.apache.accumulo.core.metadata.TServerInstance; -import org.apache.accumulo.core.metadata.TabletState; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily; @@ -79,8 +76,6 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Sc import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; -import org.apache.accumulo.core.spi.balancer.TabletBalancer; -import org.apache.accumulo.core.spi.balancer.data.TabletServerId; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; @@ -430,47 +425,6 @@ public class TabletMetadata { return keyValues; } - public TabletState getTabletState(Set<TServerInstance> liveTServers) { - return getTabletState(liveTServers, null, null); - } - - public TabletState getTabletState(Set<TServerInstance> liveTServers, TabletBalancer balancer, - Map<String,Set<TabletServerId>> currentTServerGrouping) { - ensureFetched(ColumnType.LOCATION); - ensureFetched(ColumnType.LAST); - ensureFetched(ColumnType.SUSPEND); - Location current = null; - Location future = null; - if (hasCurrent()) { - current = location; - } else { - future = location; - } - if (future != null) { - return liveTServers.contains(future.getServerInstance()) ? TabletState.ASSIGNED - : TabletState.ASSIGNED_TO_DEAD_SERVER; - } else if (current != null) { - if (liveTServers.contains(current.getServerInstance())) { - if (balancer != null) { - String resourceGroup = balancer.getResourceGroup(new TabletIdImpl(extent)); - log.trace("Resource Group for extent {} is {}", extent, resourceGroup); - Set<TabletServerId> tservers = currentTServerGrouping.get(resourceGroup); - if (tservers == null - || !tservers.contains(new TabletServerIdImpl(current.getServerInstance()))) { - return TabletState.ASSIGNED_TO_WRONG_GROUP; - } - } - return TabletState.HOSTED; - } else { - return TabletState.ASSIGNED_TO_DEAD_SERVER; - } - } else if (getSuspend() != null) { - return TabletState.SUSPENDED; - } else { - return TabletState.UNASSIGNED; - } - } - public Map<ExternalCompactionId,ExternalCompactionMetadata> getExternalCompactions() { ensureFetched(ColumnType.ECOMP); return extCompactions; diff --git a/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java b/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java index 0fe3ed04fc..3d81810a73 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java @@ -41,6 +41,7 @@ import java.util.stream.Collectors; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.PluginEnvironment; +import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; @@ -566,4 +567,26 @@ public class HostRegexTableLoadBalancer extends TableLoadBalancer { .collect(Collectors.joining(", ", "[", "]")); } + @Override + public boolean needsReassignment(CurrentAssignment currentAssignment) { + String tableName; + try { + tableName = environment.getTableName(currentAssignment.getTablet().getTable()); + } catch (TableNotFoundException e) { + LOG.trace("Table name not found for {}, assuming table was deleted", + currentAssignment.getTablet().getTable(), e); + // if the table was deleted, then other parts of Accumulo can sort that out + return false; + } + + var hostPools = getPoolNamesForHost(currentAssignment.getTabletServer()); + var poolForTable = getPoolNameForTable(tableName); + + if (!hostPools.contains(poolForTable)) { + return true; + } + + return getBalancerForTable(currentAssignment.getTablet().getTable()) + .needsReassignment(currentAssignment); + } } diff --git a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java index 915f4acb7c..853608018f 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java @@ -48,7 +48,7 @@ import org.slf4j.LoggerFactory; * <p> * Note that in versions prior to 4.0 this class would pass all known TabletServers to the Table * load balancers. In version 4.0 this changed with the introduction of the - * {@link TABLE_ASSIGNMENT_GROUP_PROPERTY} table property. If defined, this balancer passes the + * {@value #TABLE_ASSIGNMENT_GROUP_PROPERTY} table property. If defined, this balancer passes the * TabletServers that have the corresponding {@link Property#TSERV_GROUP_NAME} property to the Table * load balancer. * @@ -188,6 +188,22 @@ public class TableLoadBalancer implements TabletBalancer { } } + @Override + public boolean needsReassignment(CurrentAssignment currentAssignment) { + var tableId = currentAssignment.getTablet().getTable(); + String value = environment.getConfiguration(tableId).get(TABLE_ASSIGNMENT_GROUP_PROPERTY); + String expectedGroup = (value == null || StringUtils.isEmpty(value)) + ? Constants.DEFAULT_RESOURCE_GROUP_NAME : value; + + if (!expectedGroup.equals(currentAssignment.getResourceGroup())) { + // The tablet is not in the expected resource group, so it needs to be reassigned + return true; + } + + // defer to the per table balancer + return getBalancerForTable(tableId).needsReassignment(currentAssignment); + } + @Override public long balance(BalanceParameters params) { long minBalanceTime = 5_000; @@ -212,13 +228,4 @@ public class TableLoadBalancer implements TabletBalancer { } return minBalanceTime; } - - @Override - public String getResourceGroup(TabletId tabletId) { - String value = - environment.getConfiguration(tabletId.getTable()).get(TABLE_ASSIGNMENT_GROUP_PROPERTY); - return (value == null || StringUtils.isEmpty(value)) ? Constants.DEFAULT_RESOURCE_GROUP_NAME - : value; - } - } diff --git a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java index f20cb3c835..6e4952a381 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java @@ -23,7 +23,6 @@ import java.util.Map; import java.util.Set; import java.util.SortedMap; -import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; @@ -135,13 +134,52 @@ public interface TabletBalancer { long balance(BalanceParameters params); /** - * Get the ResourceGroup name for this tablet + * Provides access to information related to a tablet that is currently assigned to a tablet + * server. * - * @param tabletId id of tablet - * @return resource group name * @since 4.0.0 */ - default String getResourceGroup(TabletId tabletId) { - return Constants.DEFAULT_RESOURCE_GROUP_NAME; + interface CurrentAssignment { + TabletId getTablet(); + + TabletServerId getTabletServer(); + + String getResourceGroup(); + } + + /** + * <p> + * The manager periodically scans all tablets looking for tablets that are assigned to dead tablet + * servers or unassigned. During the scan this method is also called for tablets that are + * currently assigned to a live tserver to see if they should be unassigned and reassigned. If + * this method returns true the tablet will be unloaded from the tablet sever and then later the + * tablet will be passed to {@link #getAssignments(AssignmentParameters)}. + * </p> + * + * <p> + * One example use case for this method is a balancer that partitions tablet servers into groups. + * If the balancers config is changed such that a table that was assigned to tablet server group A + * should now be assigned to tablet server B, then this method can return true for the tablets in + * that table assigned to tablet server group A. After those tablets are unloaded and passed to + * the {@link #getAssignments(AssignmentParameters)} method it can reassign them to tablet server + * group B. + * </p> + * + * <p> + * Accumulo may instantiate this plugin in different processes and call this method. When the + * manager looks for tablets that needs reassignment it currently uses an Accumulo iterator to + * scan the metadata table and filter tablets. That iterator may run on multiple tablets servers + * and call this plugin. Keep this in mind when implementing this plugin and considering keeping + * state between calls to this method. + * </p> + * + * <p> + * This new method may be used instead of or in addition to {@link #balance(BalanceParameters)} + * </p> + * + * @since 4.0.0 + */ + default boolean needsReassignment(CurrentAssignment currentAssignment) { + return false; } } diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java index 117440910c..3e53f4d12e 100644 --- a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java +++ b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java @@ -238,7 +238,7 @@ public class TabletMetadataTest { TabletMetadata tm = TabletMetadata.convertRow(rowMap.entrySet().iterator(), colsToFetch, false, false); - TabletState state = tm.getTabletState(tservers); + TabletState state = TabletState.compute(tm, tservers); assertEquals(TabletState.ASSIGNED, state); assertEquals(ser1, tm.getLocation().getServerInstance()); @@ -254,7 +254,7 @@ public class TabletMetadataTest { tm = TabletMetadata.convertRow(rowMap.entrySet().iterator(), colsToFetch, false, false); - assertEquals(TabletState.HOSTED, tm.getTabletState(tservers)); + assertEquals(TabletState.HOSTED, TabletState.compute(tm, tservers)); assertEquals(ser2, tm.getLocation().getServerInstance()); assertEquals(ser2.getSession(), tm.getLocation().getSession()); assertEquals(LocationType.CURRENT, tm.getLocation().getType()); @@ -268,7 +268,7 @@ public class TabletMetadataTest { tm = TabletMetadata.convertRow(rowMap.entrySet().iterator(), colsToFetch, false, false); - assertEquals(TabletState.ASSIGNED_TO_DEAD_SERVER, tm.getTabletState(tservers)); + assertEquals(TabletState.ASSIGNED_TO_DEAD_SERVER, TabletState.compute(tm, tservers)); assertEquals(deadSer, tm.getLocation().getServerInstance()); assertEquals(deadSer.getSession(), tm.getLocation().getSession()); assertEquals(LocationType.CURRENT, tm.getLocation().getType()); @@ -280,7 +280,7 @@ public class TabletMetadataTest { tm = TabletMetadata.convertRow(rowMap.entrySet().iterator(), colsToFetch, false, false); - assertEquals(TabletState.UNASSIGNED, tm.getTabletState(tservers)); + assertEquals(TabletState.UNASSIGNED, TabletState.compute(tm, tservers)); assertNull(tm.getLocation()); assertFalse(tm.hasCurrent()); @@ -293,7 +293,7 @@ public class TabletMetadataTest { tm = TabletMetadata.convertRow(rowMap.entrySet().iterator(), colsToFetch, false, false); - assertEquals(TabletState.SUSPENDED, tm.getTabletState(tservers)); + assertEquals(TabletState.SUSPENDED, TabletState.compute(tm, tservers)); assertEquals(1000L, tm.getSuspend().suspensionTime); assertEquals(ser2.getHostAndPort(), tm.getSuspend().server); assertNull(tm.getLocation()); diff --git a/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java index 13b55931ca..0ce1cff351 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java @@ -23,6 +23,8 @@ import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; import java.util.Collections; @@ -40,6 +42,7 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.dataImpl.TabletIdImpl; import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl; import org.apache.accumulo.core.manager.balancer.TServerStatusImpl; import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; @@ -169,4 +172,96 @@ public class TableLoadBalancerTest { } } + private static class TestCurrAssignment implements TabletBalancer.CurrentAssignment { + + private final TabletIdImpl tablet; + private final String resourceGroup; + + TestCurrAssignment(TableId tid, String rg) { + this.tablet = new TabletIdImpl(new KeyExtent(tid, null, null)); + this.resourceGroup = rg; + } + + @Override + public TabletId getTablet() { + return tablet; + } + + @Override + public TabletServerId getTabletServer() { + return null; + } + + @Override + public String getResourceGroup() { + return resourceGroup; + } + } + + @Test + public void testNeedsReassignment() { + + ConfigurationCopy cc1 = + new ConfigurationCopy(Map.of(TableLoadBalancer.TABLE_ASSIGNMENT_GROUP_PROPERTY, "G1")); + ConfigurationImpl table1Config = new ConfigurationImpl(cc1); + + ConfigurationCopy cc2 = + new ConfigurationCopy(Map.of(TableLoadBalancer.TABLE_ASSIGNMENT_GROUP_PROPERTY, "G2")); + ConfigurationImpl table2Config = new ConfigurationImpl(cc2); + + var tid1 = TableId.of("1"); + var tid2 = TableId.of("2"); + var tid3 = TableId.of("3"); + + BalancerEnvironment environment = createMock(BalancerEnvironment.class); + expect(environment.getConfiguration(tid1)).andReturn(table1Config).anyTimes(); + expect(environment.getConfiguration(tid2)).andReturn(table2Config).anyTimes(); + expect(environment.getConfiguration(tid3)) + .andReturn(new ConfigurationImpl(new ConfigurationCopy())).anyTimes(); + replay(environment); + + var tls = new TableLoadBalancer() { + @Override + protected TabletBalancer getBalancerForTable(TableId tableId) { + TabletBalancer balancer = createMock(TabletBalancer.class); + expect(balancer.needsReassignment(anyObject())).andReturn(false); + replay(balancer); + return balancer; + } + }; + tls.init(environment); + + assertFalse(tls.needsReassignment(new TestCurrAssignment(tid1, "G1"))); + assertTrue(tls.needsReassignment(new TestCurrAssignment(tid1, "G2"))); + + assertFalse(tls.needsReassignment(new TestCurrAssignment(tid2, "G2"))); + assertTrue(tls.needsReassignment(new TestCurrAssignment(tid2, "G1"))); + + assertFalse( + tls.needsReassignment(new TestCurrAssignment(tid3, Constants.DEFAULT_RESOURCE_GROUP_NAME))); + assertTrue(tls.needsReassignment(new TestCurrAssignment(tid3, "G1"))); + + // test when the delegated table balancer returns true for one table and false for others + var tls2 = new TableLoadBalancer() { + @Override + protected TabletBalancer getBalancerForTable(TableId tableId) { + TabletBalancer balancer = createMock(TabletBalancer.class); + expect(balancer.needsReassignment(anyObject())).andReturn(tableId.equals(tid1)); + replay(balancer); + return balancer; + } + }; + tls2.init(environment); + + assertTrue(tls2.needsReassignment(new TestCurrAssignment(tid1, "G1"))); + assertTrue(tls2.needsReassignment(new TestCurrAssignment(tid1, "G2"))); + + assertFalse(tls2.needsReassignment(new TestCurrAssignment(tid2, "G2"))); + assertTrue(tls2.needsReassignment(new TestCurrAssignment(tid2, "G1"))); + + assertFalse(tls2 + .needsReassignment(new TestCurrAssignment(tid3, Constants.DEFAULT_RESOURCE_GROUP_NAME))); + assertTrue(tls2.needsReassignment(new TestCurrAssignment(tid3, "G1"))); + } + } diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java index f1b169d006..4d12d5112a 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java @@ -35,7 +35,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.SortedMap; -import java.util.stream.Collectors; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.ScannerBase; @@ -184,17 +183,15 @@ public class TabletManagementIterator extends SkippingIterator { } } - private static Map<String,Set<TabletServerId>> - parseTServerResourceGroups(Map<String,String> options) { - Map<String,Set<TabletServerId>> resourceGroups = new HashMap<>(); + private static Map<TabletServerId,String> parseTServerResourceGroups(Map<String,String> options) { + Map<TabletServerId,String> resourceGroups = new HashMap<>(); String groups = options.get(RESOURCE_GROUPS); if (groups != null) { for (String groupName : groups.split(",")) { String groupServers = options.get(TSERVER_GROUP_PREFIX + groupName); if (groupServers != null) { Set<TServerInstance> servers = parseServers(groupServers); - resourceGroups.put(groupName, - servers.stream().map(s -> new TabletServerIdImpl(s)).collect(Collectors.toSet())); + servers.forEach(server -> resourceGroups.put(new TabletServerIdImpl(server), groupName)); } } } @@ -295,7 +292,7 @@ public class TabletManagementIterator extends SkippingIterator { final boolean shouldBeOnline = onlineTables.contains(tm.getTableId()) && tm.getOperationId() == null; - TabletState state = tm.getTabletState(current, balancer, tserverResourceGroups); + TabletState state = TabletState.compute(tm, current, balancer, tserverResourceGroups); if (LOG.isDebugEnabled()) { LOG.debug("{} is {}. Table is {}line. Tablet hosting goal is {}, hostingRequested: {}", tm.getExtent(), state, (shouldBeOnline ? "on" : "off"), tm.getHostingGoal(), @@ -312,7 +309,7 @@ public class TabletManagementIterator extends SkippingIterator { } break; case ASSIGNED_TO_DEAD_SERVER: - case ASSIGNED_TO_WRONG_GROUP: + case NEEDS_REASSIGNMENT: return true; case SUSPENDED: case UNASSIGNED: @@ -369,7 +366,7 @@ public class TabletManagementIterator extends SkippingIterator { private final Set<TServerInstance> current = new HashSet<>(); private final Set<TableId> onlineTables = new HashSet<>(); - private final Map<String,Set<TabletServerId>> tserverResourceGroups = new HashMap<>(); + private final Map<TabletServerId,String> tserverResourceGroups = new HashMap<>(); private final Map<TableId,MergeInfo> merges = new HashMap<>(); private final Set<KeyExtent> migrations = new HashSet<>(); private ManagerState managerState = ManagerState.NORMAL; diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java b/server/base/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java index 3227ae93db..2cd208a629 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/FindOfflineTablets.java @@ -34,6 +34,7 @@ import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletState; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.cli.ServerUtilOpts; @@ -126,7 +127,9 @@ public class FindOfflineTablets { while (scanner.hasNext() && !System.out.checkError()) { TabletManagement mti = scanner.next(); - TabletState state = mti.getTabletMetadata().getTabletState(tservers.getCurrentServers()); + TabletMetadata tabletMetadata = mti.getTabletMetadata(); + Set<TServerInstance> liveTServers = tservers.getCurrentServers(); + TabletState state = TabletState.compute(tabletMetadata, liveTServers); if (state != null && state != TabletState.HOSTED && context.getTableManager().getTableState(mti.getTabletMetadata().getTableId()) != TableState.OFFLINE) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ListOnlineOnDemandTablets.java b/server/base/src/main/java/org/apache/accumulo/server/util/ListOnlineOnDemandTablets.java index 235e42fbe0..8bf64157b8 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ListOnlineOnDemandTablets.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ListOnlineOnDemandTablets.java @@ -30,6 +30,7 @@ import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletState; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.cli.ServerUtilOpts; @@ -92,7 +93,9 @@ public class ListOnlineOnDemandTablets { while (scanner.hasNext() && !System.out.checkError()) { final TabletManagement mti = scanner.next(); - TabletState state = mti.getTabletMetadata().getTabletState(tservers.getCurrentServers()); + TabletMetadata tabletMetadata = mti.getTabletMetadata(); + Set<TServerInstance> liveTServers = tservers.getCurrentServers(); + TabletState state = TabletState.compute(tabletMetadata, liveTServers); if (state == TabletState.HOSTED && mti.getTabletMetadata().getHostingGoal() == TabletHostingGoal.ONDEMAND) { System.out.println( diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java index e5ece17e2c..f783d77ac5 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java @@ -35,6 +35,7 @@ import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletState; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.Pair; @@ -277,8 +278,8 @@ public class GarbageCollectWriteAheadLogs { for (TabletManagement mti : store) { // Tablet is still assigned to a dead server. Manager has moved markers and reassigned it // Easiest to just ignore all the WALs for the dead server. - if (mti.getTabletMetadata().getTabletState(liveServers) - == TabletState.ASSIGNED_TO_DEAD_SERVER) { + TabletMetadata tabletMetadata = mti.getTabletMetadata(); + if (TabletState.compute(tabletMetadata, liveServers) == TabletState.ASSIGNED_TO_DEAD_SERVER) { Set<UUID> idsToIgnore = candidates.remove(mti.getTabletMetadata().getLocation().getServerInstance()); if (idsToIgnore != null) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index aa6ef53d19..0590a13e6e 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -750,7 +750,8 @@ public class Manager extends AbstractServer case SPLITTING: return TabletGoalState.HOSTED; case WAITING_FOR_CHOPPED: - if (tm.getTabletState(tserverSet.getCurrentServers()).equals(TabletState.HOSTED)) { + Set<TServerInstance> liveTServers = tserverSet.getCurrentServers(); + if (TabletState.compute(tm, liveTServers).equals(TabletState.HOSTED)) { if (tm.hasChopped()) { return TabletGoalState.UNASSIGNED; } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index ffc88ccf6c..0bdc4b0dc0 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -37,7 +37,6 @@ import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; @@ -253,10 +252,10 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { CompactionJobGenerator compactionGenerator = new CompactionJobGenerator( new ServiceEnvironmentImpl(manager.getContext()), manager.getCompactionHints()); - final Map<String,Set<TabletServerId>> resourceGroups = new HashMap<>(); - manager.tServerResourceGroups().forEach((k, v) -> { - resourceGroups.put(k, - v.stream().map(s -> new TabletServerIdImpl(s)).collect(Collectors.toSet())); + final Map<TabletServerId,String> resourceGroups = new HashMap<>(); + manager.tServerResourceGroups().forEach((group, tservers) -> { + tservers.stream().map(TabletServerIdImpl::new) + .forEach(tabletServerId -> resourceGroups.put(tabletServerId, group)); }); // Walk through the tablets in our store, and work tablets @@ -303,8 +302,8 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { return mStats != null ? mStats : new MergeStats(new MergeInfo()); }); TabletGoalState goal = manager.getGoalState(tm, mergeStats.getMergeInfo()); - TabletState state = - tm.getTabletState(currentTServers.keySet(), manager.tabletBalancer, resourceGroups); + TabletState state = TabletState.compute(tm, currentTServers.keySet(), + manager.tabletBalancer, resourceGroups); final Location location = tm.getLocation(); Location current = null; @@ -325,7 +324,7 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { // Always follow through with assignments if (state == TabletState.ASSIGNED) { goal = TabletGoalState.HOSTED; - } else if (state == TabletState.ASSIGNED_TO_WRONG_GROUP) { + } else if (state == TabletState.NEEDS_REASSIGNMENT) { goal = TabletGoalState.UNASSIGNED; } if (Manager.log.isTraceEnabled()) { @@ -444,7 +443,7 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { case ASSIGNED_TO_DEAD_SERVER: unassignDeadTablet(tLists, tm, wals); break; - case ASSIGNED_TO_WRONG_GROUP: + case NEEDS_REASSIGNMENT: case HOSTED: TServerConnection client = manager.tserverSet.getConnection(location.getServerInstance()); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java b/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java index 1114ef978f..eee0dd0ba6 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java @@ -21,6 +21,7 @@ package org.apache.accumulo.manager.state; import java.io.IOException; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Accumulo; @@ -37,6 +38,7 @@ import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletState; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.TabletMetadata; @@ -240,10 +242,13 @@ public class MergeStats { return false; } - if (tm.getTabletState(manager.onlineTabletServers()) != TabletState.UNASSIGNED - && tm.getTabletState(manager.onlineTabletServers()) != TabletState.SUSPENDED) { - log.debug("failing consistency: assigned or hosted {}", tm); - return false; + Set<TServerInstance> liveTServers1 = manager.onlineTabletServers(); + if (TabletState.compute(tm, liveTServers1) != TabletState.UNASSIGNED) { + Set<TServerInstance> liveTServers = manager.onlineTabletServers(); + if (TabletState.compute(tm, liveTServers) != TabletState.SUSPENDED) { + log.debug("failing consistency: assigned or hosted {}", tm); + return false; + } } } else if (!tm.getExtent().isPreviousExtent(prevExtent)) { @@ -253,8 +258,9 @@ public class MergeStats { prevExtent = tm.getExtent(); - verify.update(tm.getExtent(), tm.getTabletState(manager.onlineTabletServers()), - tm.hasChopped(), !tm.getLogs().isEmpty()); + Set<TServerInstance> liveTServers = manager.onlineTabletServers(); + verify.update(tm.getExtent(), TabletState.compute(tm, liveTServers), tm.hasChopped(), + !tm.getLogs().isEmpty()); // stop when we've seen the tablet just beyond our range if (tm.getExtent().prevEndRow() != null && extent.endRow() != null && tm.getExtent().prevEndRow().compareTo(extent.endRow()) > 0) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java index 78db748ece..c0d60c8234 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/delete/CleanUp.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.net.UnknownHostException; import java.util.Arrays; import java.util.Map.Entry; +import java.util.Set; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchScanner; @@ -38,6 +39,7 @@ import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.iterators.user.GrepIterator; import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletState; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; @@ -98,7 +100,8 @@ class CleanUp extends ManagerRepo { for (Entry<Key,Value> entry : scanner) { final TabletManagement mti = TabletManagementIterator.decode(entry); final TabletMetadata tm = mti.getTabletMetadata(); - TabletState state = tm.getTabletState(manager.onlineTabletServers()); + Set<TServerInstance> liveTServers = manager.onlineTabletServers(); + TabletState state = TabletState.compute(tm, liveTServers); if (!state.equals(TabletState.UNASSIGNED)) { // This code will even wait on tablets that are assigned to dead tablets servers. This is // intentional because the manager may make metadata writes for these tablets. See #587 diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/ListTabletsCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/ListTabletsCommand.java index 71b2a18b75..3e0782f723 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/ListTabletsCommand.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/ListTabletsCommand.java @@ -35,6 +35,7 @@ import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.TabletState; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; @@ -252,7 +253,7 @@ public class ListTabletsCommand extends Command { factory.numWalLogs(md.getLogs().size()); factory.dir(md.getDirName()); factory.location(md.getLocation()); - factory.status(md.getTabletState(liveTserverSet).toString()); + factory.status(TabletState.compute(md, liveTserverSet).toString()); results.add(factory.build()); } } diff --git a/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java b/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java index 546427694c..ac618b01e0 100644 --- a/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java @@ -45,6 +45,7 @@ import org.apache.accumulo.core.manager.state.TabletManagement; import org.apache.accumulo.core.manager.thrift.ManagerState; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.TabletState; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; @@ -248,8 +249,9 @@ public class MergeStateIT extends ConfigurableMacBase { MergeStats stats = new MergeStats(state.mergeInfo); stats.getMergeInfo().setState(MergeState.WAITING_FOR_OFFLINE); for (TabletManagement tm : metaDataStateStore) { + TabletMetadata tabletMetadata = tm.getTabletMetadata(); stats.update(tm.getTabletMetadata().getExtent(), - tm.getTabletMetadata().getTabletState(state.onlineTabletServers()), + TabletState.compute(tabletMetadata, state.onlineTabletServers()), tm.getTabletMetadata().hasChopped(), false); } return stats;