This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 9186574872 Fixes metadata balancing issues (#5358)
9186574872 is described below

commit 918657487286d2df9af0ea648bdfe77dc59ea1c2
Author: Daniel Roberts <ddani...@gmail.com>
AuthorDate: Wed Feb 26 13:29:05 2025 -0500

    Fixes metadata balancing issues (#5358)
    
    * Add new metadata balance test to BalanceIT
    
    Adds a new test for checking balancing of the metadata table.
    This test currently breaks as there are outstanding bugs in 2.1
    related to balancing the metadata table.
    
    Fixes balance related filtering that was using table name instead of id
    
    This change passes a map of tables to balance each balancer.
    This reduces the possibility that a balancer will attempt
    to balance tables not assigned to itself.
    
    Removes the while loop in favor of just skipping the current datalevel
    if specific conditions are met.
    
    ---------
    
    Co-authored-by: Keith Turner <ktur...@apache.org>
    Co-authored-by: Christopher Tubbs <ctubb...@apache.org>
---
 .../core/manager/balancer/BalanceParamsImpl.java   |  22 ++++-
 .../core/spi/balancer/BalancerEnvironment.java     |   5 +
 .../spi/balancer/HostRegexTableLoadBalancer.java   |   6 +-
 .../core/spi/balancer/TableLoadBalancer.java       |  14 +--
 .../accumulo/core/spi/balancer/TabletBalancer.java |  10 ++
 .../core/spi/balancer/GroupBalancerTest.java       |   9 +-
 ...tRegexTableLoadBalancerReconfigurationTest.java |   4 +-
 .../balancer/HostRegexTableLoadBalancerTest.java   |  14 +--
 .../core/spi/balancer/SimpleLoadBalancerTest.java  |   4 +-
 .../core/spi/balancer/TableLoadBalancerTest.java   |   6 +-
 .../java/org/apache/accumulo/manager/Manager.java  | 103 ++++++++++++---------
 .../java/org/apache/accumulo/test/BalanceIT.java   |  96 ++++++++++++++++++-
 .../accumulo/test/ChaoticLoadBalancerTest.java     |   2 +-
 13 files changed, 213 insertions(+), 82 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java
 
b/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java
index 97b9315c6e..00f593d9f3 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java
@@ -20,10 +20,12 @@ package org.apache.accumulo.core.manager.balancer;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.stream.Collectors;
 
+import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.TabletId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.dataImpl.TabletIdImpl;
@@ -42,38 +44,43 @@ public class BalanceParamsImpl implements 
TabletBalancer.BalanceParameters {
   private final SortedMap<TServerInstance,TabletServerStatus> 
thriftCurrentStatus;
   private final Set<KeyExtent> thriftCurrentMigrations;
   private final DataLevel currentDataLevel;
+  private final Map<String,TableId> tablesToBalance;
 
   public static BalanceParamsImpl 
fromThrift(SortedMap<TabletServerId,TServerStatus> currentStatus,
       SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus,
-      Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel) {
+      Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel,
+      Map<String,TableId> tablesToBalance) {
     Set<TabletId> currentMigrations = 
thriftCurrentMigrations.stream().map(TabletIdImpl::new)
         .collect(Collectors.toUnmodifiableSet());
 
     return new BalanceParamsImpl(currentStatus, currentMigrations, new 
ArrayList<>(),
-        thriftCurrentStatus, thriftCurrentMigrations, currentLevel);
+        thriftCurrentStatus, thriftCurrentMigrations, currentLevel, 
tablesToBalance);
   }
 
   public BalanceParamsImpl(SortedMap<TabletServerId,TServerStatus> 
currentStatus,
-      Set<TabletId> currentMigrations, List<TabletMigration> migrationsOut,
-      DataLevel currentLevel) {
+      Set<TabletId> currentMigrations, List<TabletMigration> migrationsOut, 
DataLevel currentLevel,
+      Map<String,TableId> tablesToBalance) {
     this.currentStatus = currentStatus;
     this.currentMigrations = currentMigrations;
     this.migrationsOut = migrationsOut;
     this.thriftCurrentStatus = null;
     this.thriftCurrentMigrations = null;
     this.currentDataLevel = currentLevel;
+    this.tablesToBalance = tablesToBalance;
   }
 
   private BalanceParamsImpl(SortedMap<TabletServerId,TServerStatus> 
currentStatus,
       Set<TabletId> currentMigrations, List<TabletMigration> migrationsOut,
       SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus,
-      Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel) {
+      Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel,
+      Map<String,TableId> tablesToBalance) {
     this.currentStatus = currentStatus;
     this.currentMigrations = currentMigrations;
     this.migrationsOut = migrationsOut;
     this.thriftCurrentStatus = thriftCurrentStatus;
     this.thriftCurrentMigrations = thriftCurrentMigrations;
     this.currentDataLevel = currentLevel;
+    this.tablesToBalance = tablesToBalance;
   }
 
   @Override
@@ -110,4 +117,9 @@ public class BalanceParamsImpl implements 
TabletBalancer.BalanceParameters {
   public String currentLevel() {
     return currentDataLevel.name();
   }
+
+  @Override
+  public Map<String,TableId> getTablesToBalance() {
+    return tablesToBalance;
+  }
 }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/balancer/BalancerEnvironment.java
 
b/core/src/main/java/org/apache/accumulo/core/spi/balancer/BalancerEnvironment.java
index 733e847fe5..88aaf60d9a 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/balancer/BalancerEnvironment.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/balancer/BalancerEnvironment.java
@@ -40,6 +40,11 @@ public interface BalancerEnvironment extends 
ServiceEnvironment {
    * Many Accumulo plugins are given table IDs as this is what Accumulo uses 
internally to identify
    * tables. This provides a mapping of table names to table IDs for the 
purposes of translating
    * and/or enumerating the existing tables.
+   *
+   * <p>
+   * This returns all tables that exists in the system. Each request to 
balance should limit itself
+   * to {@link TabletBalancer.BalanceParameters#getTablesToBalance()} and not 
balance everything
+   * returned by this.
    */
   Map<String,TableId> getTableIdMap();
 
diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java
 
b/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java
index cb88ce320c..f6b31af244 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java
@@ -380,7 +380,7 @@ public class HostRegexTableLoadBalancer extends 
TableLoadBalancer {
   public long balance(BalanceParameters params) {
     long minBalanceTime = 20_000;
     // Iterate over the tables and balance each of them
-    Map<String,TableId> tableIdMap = environment.getTableIdMap();
+    Map<String,TableId> tableIdMap = params.getTablesToBalance();
     Map<TableId,String> tableIdToTableName = tableIdMap.entrySet().stream()
         .collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey));
     tableIdToTableName.keySet().forEach(this::checkTableConfig);
@@ -511,8 +511,8 @@ public class HostRegexTableLoadBalancer extends 
TableLoadBalancer {
         continue;
       }
       ArrayList<TabletMigration> newMigrations = new ArrayList<>();
-      getBalancerForTable(tableId).balance(
-          new BalanceParamsImpl(currentView, migrations, newMigrations, 
DataLevel.of(tableId)));
+      getBalancerForTable(tableId).balance(new BalanceParamsImpl(currentView, 
migrations,
+          newMigrations, DataLevel.of(tableId), Map.of(tableName, tableId)));
 
       if (newMigrations.isEmpty()) {
         tableToTimeSinceNoMigrations.remove(tableId);
diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java
 
b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java
index 55a24c3094..84c9074b46 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java
@@ -98,7 +98,8 @@ public class TableLoadBalancer implements TabletBalancer {
       }
 
       if (balancer == null) {
-        log.info("Using balancer {} for table {}", 
SimpleLoadBalancer.class.getName(), tableId);
+        log.info("Creating balancer {} limited to balancing table {}",
+            SimpleLoadBalancer.class.getName(), tableId);
         balancer = new SimpleLoadBalancer(tableId);
       }
       perTableBalancers.put(tableId, balancer);
@@ -124,13 +125,14 @@ public class TableLoadBalancer implements TabletBalancer {
   @Override
   public long balance(BalanceParameters params) {
     long minBalanceTime = 5_000;
-    // Iterate over the tables and balance each of them
     final DataLevel currentDataLevel = 
DataLevel.valueOf(params.currentLevel());
-    for (TableId tableId : environment.getTableIdMap().values()) {
+    for (Entry<String,TableId> entry : params.getTablesToBalance().entrySet()) 
{
+      String tableName = entry.getKey();
+      TableId tableId = entry.getValue();
       ArrayList<TabletMigration> newMigrations = new ArrayList<>();
-      long tableBalanceTime =
-          getBalancerForTable(tableId).balance(new 
BalanceParamsImpl(params.currentStatus(),
-              params.currentMigrations(), newMigrations, currentDataLevel));
+      long tableBalanceTime = getBalancerForTable(tableId)
+          .balance(new BalanceParamsImpl(params.currentStatus(), 
params.currentMigrations(),
+              newMigrations, currentDataLevel, Map.of(tableName, tableId)));
       if (tableBalanceTime < minBalanceTime) {
         minBalanceTime = tableBalanceTime;
       }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java 
b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java
index 06235a10a1..4731a5a844 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import java.util.SortedMap;
 
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.TabletId;
 import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
 import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
@@ -102,6 +103,15 @@ public interface TabletBalancer {
      * @since 2.1.4
      */
     String currentLevel();
+
+    /**
+     * This is the set of tables the balancer should consider. Balancing any 
tables outside of this
+     * set will be ignored and result in an error in the logs.
+     *
+     * @return map of table names to table ids that should be balanced.
+     * @since 2.1.4
+     */
+    Map<String,TableId> getTablesToBalance();
   }
 
   /**
diff --git 
a/core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java
 
b/core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java
index e55eb379d2..f469b8fd2a 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java
@@ -108,10 +108,11 @@ public class GroupBalancerTest {
         }
       };
 
-      balance(balancer, maxMigrations, tid);
+      balance(balancer, maxMigrations, tid, Map.of("1", tid));
     }
 
-    public void balance(TabletBalancer balancer, int maxMigrations, TableId 
tid) {
+    public void balance(TabletBalancer balancer, int maxMigrations, TableId 
tid,
+        Map<String,TableId> tablesToBalance) {
 
       while (true) {
         Set<TabletId> migrations = new HashSet<>();
@@ -123,8 +124,8 @@ public class GroupBalancerTest {
               new 
org.apache.accumulo.core.master.thrift.TabletServerStatus()));
         }
 
-        balancer
-            .balance(new BalanceParamsImpl(current, migrations, migrationsOut, 
DataLevel.of(tid)));
+        balancer.balance(new BalanceParamsImpl(current, migrations, 
migrationsOut,
+            DataLevel.of(tid), tablesToBalance));
 
         assertTrue(migrationsOut.size() <= (maxMigrations + 5),
             "Max Migration exceeded " + maxMigrations + " " + 
migrationsOut.size());
diff --git 
a/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerReconfigurationTest.java
 
b/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerReconfigurationTest.java
index 58a89ec626..5a201f15da 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerReconfigurationTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerReconfigurationTest.java
@@ -108,7 +108,7 @@ public class HostRegexTableLoadBalancerReconfigurationTest
     // getOnlineTabletsForTable
     UtilWaitThread.sleep(3000);
     this.balance(new 
BalanceParamsImpl(Collections.unmodifiableSortedMap(allTabletServers),
-        migrations, migrationsOut, DataLevel.USER));
+        migrations, migrationsOut, DataLevel.USER, tables));
     assertEquals(0, migrationsOut.size());
     // Change property, simulate call by TableConfWatcher
 
@@ -120,7 +120,7 @@ public class HostRegexTableLoadBalancerReconfigurationTest
     // populate the map with an older time value
     this.lastOOBCheckTimes.put(DataLevel.USER, System.currentTimeMillis() / 2);
     this.balance(new 
BalanceParamsImpl(Collections.unmodifiableSortedMap(allTabletServers),
-        migrations, migrationsOut, DataLevel.USER));
+        migrations, migrationsOut, DataLevel.USER, tables));
     assertEquals(5, migrationsOut.size());
     for (TabletMigration migration : migrationsOut) {
       
assertTrue(migration.getNewTabletServer().getHost().startsWith("192.168.0.1")
diff --git 
a/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerTest.java
 
b/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerTest.java
index 4d3162e02d..b508e0fb3a 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerTest.java
@@ -98,7 +98,7 @@ public class HostRegexTableLoadBalancerTest extends 
BaseHostRegexTableLoadBalanc
     List<TabletMigration> migrationsOut = new ArrayList<>();
     long wait =
         this.balance(new 
BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
-            migrations, migrationsOut, DataLevel.USER));
+            migrations, migrationsOut, DataLevel.USER, 
environment.getTableIdMap()));
     assertEquals(20000, wait);
     // should balance four tablets in one of the tables before reaching max
     assertEquals(4, migrationsOut.size());
@@ -109,7 +109,7 @@ public class HostRegexTableLoadBalancerTest extends 
BaseHostRegexTableLoadBalanc
     }
     migrationsOut.clear();
     wait = this.balance(new 
BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
-        migrations, migrationsOut, DataLevel.USER));
+        migrations, migrationsOut, DataLevel.USER, 
environment.getTableIdMap()));
     assertEquals(20000, wait);
     // should balance four tablets in one of the other tables before reaching 
max
     assertEquals(4, migrationsOut.size());
@@ -120,7 +120,7 @@ public class HostRegexTableLoadBalancerTest extends 
BaseHostRegexTableLoadBalanc
     }
     migrationsOut.clear();
     wait = this.balance(new 
BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
-        migrations, migrationsOut, DataLevel.USER));
+        migrations, migrationsOut, DataLevel.USER, 
environment.getTableIdMap()));
     assertEquals(20000, wait);
     // should balance four tablets in one of the other tables before reaching 
max
     assertEquals(4, migrationsOut.size());
@@ -131,7 +131,7 @@ public class HostRegexTableLoadBalancerTest extends 
BaseHostRegexTableLoadBalanc
     }
     migrationsOut.clear();
     wait = this.balance(new 
BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
-        migrations, migrationsOut, DataLevel.USER));
+        migrations, migrationsOut, DataLevel.USER, 
environment.getTableIdMap()));
     assertEquals(20000, wait);
     // no more balancing to do
     assertEquals(0, migrationsOut.size());
@@ -148,7 +148,7 @@ public class HostRegexTableLoadBalancerTest extends 
BaseHostRegexTableLoadBalanc
     migrations.addAll(tableTablets.get(BAR.getTableName()));
     long wait =
         this.balance(new 
BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
-            migrations, migrationsOut, DataLevel.USER));
+            migrations, migrationsOut, DataLevel.USER, 
environment.getTableIdMap()));
     assertEquals(20000, wait);
     // no migrations should have occurred as 10 is the maxOutstandingMigrations
     assertEquals(0, migrationsOut.size());
@@ -495,8 +495,8 @@ public class HostRegexTableLoadBalancerTest extends 
BaseHostRegexTableLoadBalanc
     init(DEFAULT_TABLE_PROPERTIES);
     Set<TabletId> migrations = new HashSet<>();
     List<TabletMigration> migrationsOut = new ArrayList<>();
-    this.balance(
-        new BalanceParamsImpl(createCurrent(15), migrations, migrationsOut, 
DataLevel.USER));
+    this.balance(new BalanceParamsImpl(createCurrent(15), migrations, 
migrationsOut, DataLevel.USER,
+        environment.getTableIdMap()));
     assertEquals(2, migrationsOut.size());
   }
 
diff --git 
a/core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java
 
b/core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java
index 055898928b..41193380ac 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java
@@ -204,7 +204,7 @@ public class SimpleLoadBalancerTest {
     while (true) {
       List<TabletMigration> migrationsOut = new ArrayList<>();
       balancer.balance(new BalanceParamsImpl(getAssignments(servers), 
migrations, migrationsOut,
-          DataLevel.USER));
+          DataLevel.USER, Map.of()));
       if (migrationsOut.isEmpty()) {
         break;
       }
@@ -247,7 +247,7 @@ public class SimpleLoadBalancerTest {
     while (true) {
       List<TabletMigration> migrationsOut = new ArrayList<>();
       balancer.balance(new BalanceParamsImpl(getAssignments(servers), 
migrations, migrationsOut,
-          DataLevel.USER));
+          DataLevel.USER, Map.of()));
       if (migrationsOut.isEmpty()) {
         break;
       }
diff --git 
a/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java
 
b/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java
index 8e9aefd028..6045f417a8 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java
@@ -142,13 +142,15 @@ public class TableLoadBalancerTest {
     List<TabletMigration> migrationsOut = new ArrayList<>();
     TableLoadBalancer tls = new TableLoadBalancer();
     tls.init(environment);
-    tls.balance(new BalanceParamsImpl(state, migrations, migrationsOut, 
DataLevel.USER));
+    tls.balance(
+        new BalanceParamsImpl(state, migrations, migrationsOut, 
DataLevel.USER, tableIdMap));
     assertEquals(0, migrationsOut.size());
 
     state.put(mkts("10.0.0.2", 2345, "0x02030405"), status());
     tls = new TableLoadBalancer();
     tls.init(environment);
-    tls.balance(new BalanceParamsImpl(state, migrations, migrationsOut, 
DataLevel.USER));
+    tls.balance(
+        new BalanceParamsImpl(state, migrations, migrationsOut, 
DataLevel.USER, tableIdMap));
     int count = 0;
     Map<TableId,Integer> movedByTable = new HashMap<>();
     movedByTable.put(TableId.of(t1Id), 0);
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index aa47e713e4..67a6656164 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -99,7 +99,7 @@ import org.apache.accumulo.core.metrics.MetricsProducer;
 import org.apache.accumulo.core.process.thrift.ServerProcessService;
 import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
 import org.apache.accumulo.core.spi.balancer.BalancerEnvironment;
-import org.apache.accumulo.core.spi.balancer.SimpleLoadBalancer;
+import org.apache.accumulo.core.spi.balancer.TableLoadBalancer;
 import org.apache.accumulo.core.spi.balancer.TabletBalancer;
 import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
 import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
@@ -999,20 +999,22 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener,
         final Map<String,TableInfo> newTableMap =
             new HashMap<>(dl == DataLevel.USER ? oldTableMap.size() : 1);
         if (dl == DataLevel.ROOT) {
-          if (oldTableMap.containsKey(RootTable.NAME)) {
-            newTableMap.put(RootTable.NAME, oldTableMap.get(RootTable.NAME));
+          if (oldTableMap.containsKey(RootTable.ID.canonical())) {
+            newTableMap.put(RootTable.ID.canonical(), 
oldTableMap.get(RootTable.ID.canonical()));
           }
         } else if (dl == DataLevel.METADATA) {
-          if (oldTableMap.containsKey(MetadataTable.NAME)) {
-            newTableMap.put(MetadataTable.NAME, 
oldTableMap.get(MetadataTable.NAME));
+          if (oldTableMap.containsKey(MetadataTable.ID.canonical())) {
+            newTableMap.put(MetadataTable.ID.canonical(),
+                oldTableMap.get(MetadataTable.ID.canonical()));
           }
         } else if (dl == DataLevel.USER) {
-          if (!oldTableMap.containsKey(MetadataTable.NAME)
-              && !oldTableMap.containsKey(RootTable.NAME)) {
+          if (!oldTableMap.containsKey(MetadataTable.ID.canonical())
+              && !oldTableMap.containsKey(RootTable.ID.canonical())) {
             newTableMap.putAll(oldTableMap);
           } else {
             oldTableMap.forEach((table, info) -> {
-              if (!table.equals(RootTable.NAME) && 
!table.equals(MetadataTable.NAME)) {
+              if (!table.equals(RootTable.ID.canonical())
+                  && !table.equals(MetadataTable.ID.canonical())) {
                 newTableMap.put(table, info);
               }
             });
@@ -1026,6 +1028,23 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener,
       return tserverStatusForLevel;
     }
 
+    private Map<String,TableId> getTablesForLevel(DataLevel dataLevel) {
+      switch (dataLevel) {
+        case ROOT:
+          return Map.of(RootTable.NAME, RootTable.ID);
+        case METADATA:
+          return Map.of(MetadataTable.NAME, MetadataTable.ID);
+        case USER: {
+          Map<String,TableId> userTables = new 
HashMap<>(getContext().getTableNameToIdMap());
+          userTables.remove(RootTable.NAME);
+          userTables.remove(MetadataTable.NAME);
+          return Collections.unmodifiableMap(userTables);
+        }
+        default:
+          throw new IllegalArgumentException("Unknown data level " + 
dataLevel);
+      }
+    }
+
     private long balanceTablets() {
 
       final int tabletsNotHosted = notHosted();
@@ -1042,6 +1061,18 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener,
               tabletsNotHosted);
           continue;
         }
+
+        if ((dl == DataLevel.METADATA || dl == DataLevel.USER)
+            && !partitionedMigrations.get(DataLevel.ROOT).isEmpty()) {
+          log.debug("Not balancing {} because {} has migrations", dl, 
DataLevel.ROOT);
+          continue;
+        }
+
+        if (dl == DataLevel.USER && 
!partitionedMigrations.get(DataLevel.METADATA).isEmpty()) {
+          log.debug("Not balancing {} because {} has migrations", dl, 
DataLevel.METADATA);
+          continue;
+        }
+
         // Create a view of the tserver status such that it only contains the 
tables
         // for this level in the tableMap.
         SortedMap<TServerInstance,TabletServerStatus> tserverStatusForLevel =
@@ -1052,44 +1083,26 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener,
         tserverStatusForLevel.forEach((tsi, status) -> 
tserverStatusForBalancerLevel
             .put(new TabletServerIdImpl(tsi), 
TServerStatusImpl.fromThrift(status)));
 
+        log.debug("Balancing for tables at level {}", dl);
+
+        SortedMap<TabletServerId,TServerStatus> statusForBalancerLevel =
+            tserverStatusForBalancerLevel;
+        params = BalanceParamsImpl.fromThrift(statusForBalancerLevel, 
tserverStatusForLevel,
+            partitionedMigrations.get(dl), dl, getTablesForLevel(dl));
+        wait = Math.max(tabletBalancer.balance(params), wait);
         long migrationsOutForLevel = 0;
-        int attemptNum = 0;
-        do {
-          log.debug("Balancing for tables at level {}, times-in-loop: {}", dl, 
++attemptNum);
-
-          SortedMap<TabletServerId,TServerStatus> statusForBalancerLevel =
-              tserverStatusForBalancerLevel;
-          if (attemptNum > 1 && (dl == DataLevel.ROOT || dl == 
DataLevel.METADATA)) {
-            // If we are still migrating then perform a re-check on the tablet
-            // servers to make sure non of them have failed.
-            Set<TServerInstance> currentServers = 
tserverSet.getCurrentServers();
-            tserverStatus = gatherTableInformation(currentServers);
-            // Create a view of the tserver status such that it only contains 
the tables
-            // for this level in the tableMap.
-            tserverStatusForLevel = createTServerStatusView(dl, tserverStatus);
-            final SortedMap<TabletServerId,TServerStatus> 
tserverStatusForBalancerLevel2 =
-                new TreeMap<>();
-            tserverStatusForLevel.forEach((tsi, status) -> 
tserverStatusForBalancerLevel2
-                .put(new TabletServerIdImpl(tsi), 
TServerStatusImpl.fromThrift(status)));
-            statusForBalancerLevel = tserverStatusForBalancerLevel2;
+        for (TabletMigration m : 
checkMigrationSanity(statusForBalancerLevel.keySet(),
+            params.migrationsOut(), dl)) {
+          final KeyExtent ke = KeyExtent.fromTabletId(m.getTablet());
+          if (partitionedMigrations.get(dl).contains(ke)) {
+            log.warn("balancer requested migration more than once, skipping 
{}", m);
+            continue;
           }
+          migrationsOutForLevel++;
+          migrations.put(ke, 
TabletServerIdImpl.toThrift(m.getNewTabletServer()));
+          log.debug("migration {}", m);
+        }
 
-          params = BalanceParamsImpl.fromThrift(statusForBalancerLevel, 
tserverStatusForLevel,
-              partitionedMigrations.get(dl), dl);
-          wait = Math.max(tabletBalancer.balance(params), wait);
-          migrationsOutForLevel = 0;
-          for (TabletMigration m : 
checkMigrationSanity(statusForBalancerLevel.keySet(),
-              params.migrationsOut(), dl)) {
-            final KeyExtent ke = KeyExtent.fromTabletId(m.getTablet());
-            if (migrations.containsKey(ke)) {
-              log.warn("balancer requested migration more than once, skipping 
{}", m);
-              continue;
-            }
-            migrationsOutForLevel++;
-            migrations.put(ke, 
TabletServerIdImpl.toThrift(m.getNewTabletServer()));
-            log.debug("migration {}", m);
-          }
-        } while (migrationsOutForLevel > 0 && (dl == DataLevel.ROOT || dl == 
DataLevel.METADATA));
         totalMigrationsOut += migrationsOutForLevel;
 
         // increment this at end of loop to signal complete run w/o any 
continue
@@ -1115,7 +1128,7 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener,
         if (m.getTablet() == null) {
           log.error("Balancer gave back a null tablet {}", m);
         } else if (DataLevel.of(m.getTablet().getTable()) != level) {
-          log.trace(
+          log.warn(
               "Balancer wants to move a tablet ({}) outside of the current 
processing level ({}), "
                   + "ignoring and should be processed at the correct level 
({})",
               m.getTablet(), level, DataLevel.of(m.getTablet().getTable()));
@@ -1946,7 +1959,7 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener,
 
   void initializeBalancer() {
     var localTabletBalancer = 
Property.createInstanceFromPropertyName(getConfiguration(),
-        Property.MANAGER_TABLET_BALANCER, TabletBalancer.class, new 
SimpleLoadBalancer());
+        Property.MANAGER_TABLET_BALANCER, TabletBalancer.class, new 
TableLoadBalancer());
     localTabletBalancer.init(balancerEnvironment);
     tabletBalancer = localTabletBalancer;
   }
diff --git a/test/src/main/java/org/apache/accumulo/test/BalanceIT.java 
b/test/src/main/java/org/apache/accumulo/test/BalanceIT.java
index 0164463903..a282388ce7 100644
--- a/test/src/main/java/org/apache/accumulo/test/BalanceIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/BalanceIT.java
@@ -18,27 +18,40 @@
  */
 package org.apache.accumulo.test;
 
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
+import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
 import java.time.Duration;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.accumulo.test.util.Wait;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BalanceIT extends AccumuloClusterHarness {
+public class BalanceIT extends ConfigurableMacBase {
   private static final Logger log = LoggerFactory.getLogger(BalanceIT.class);
 
   @Override
-  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
     Map<String,String> siteConfig = cfg.getSiteConfig();
     siteConfig.put(Property.TSERV_MAXMEM.getKey(), "10K");
     siteConfig.put(Property.TSERV_MAJC_DELAY.getKey(), "50ms");
@@ -46,7 +59,7 @@ public class BalanceIT extends AccumuloClusterHarness {
     siteConfig.put("general.custom.metrics.opts.logging.step", "0.5s");
     cfg.setSiteConfig(siteConfig);
     // ensure we have two tservers
-    if (cfg.getNumTservers() < 2) {
+    if (cfg.getNumTservers() != 2) {
       cfg.setNumTservers(2);
     }
   }
@@ -59,7 +72,7 @@ public class BalanceIT extends AccumuloClusterHarness {
   @Test
   public void testBalance() throws Exception {
     String tableName = getUniqueNames(1)[0];
-    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProperties()).build()) {
       log.info("Creating table");
       c.tableOperations().create(tableName);
       SortedSet<Text> splits = new TreeSet<>();
@@ -72,4 +85,77 @@ public class BalanceIT extends AccumuloClusterHarness {
       c.instanceOperations().waitForBalance();
     }
   }
+
+  @Test
+  public void testBalanceMetadata() throws Exception {
+    String tableName = getUniqueNames(1)[0];
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProperties()).build()) {
+      SortedSet<Text> splits = new TreeSet<>();
+      for (int i = 0; i < 10; i++) {
+        splits.add(new Text("" + i));
+      }
+      c.tableOperations().create(tableName, new 
NewTableConfiguration().withSplits(splits));
+
+      var metaSplits = IntStream.range(1, 100).mapToObj(i -> 
Integer.toString(i, 36)).map(Text::new)
+          .collect(Collectors.toCollection(TreeSet::new));
+      c.tableOperations().addSplits(MetadataTable.NAME, metaSplits);
+
+      var locCounts = countLocations(c, MetadataTable.NAME);
+
+      c.instanceOperations().waitForBalance();
+
+      locCounts = countLocations(c, MetadataTable.NAME);
+      var stats = locCounts.values().stream().mapToInt(i -> 
i).summaryStatistics();
+      assertTrue(stats.getMax() <= 51, locCounts.toString());
+      assertTrue(stats.getMin() >= 50, locCounts.toString());
+      assertEquals(2, stats.getCount(), locCounts.toString());
+
+      assertEquals(2, getCluster().getConfig().getNumTservers());
+      getCluster().getConfig().setNumTservers(4);
+      getCluster().getClusterControl().start(ServerType.TABLET_SERVER);
+      getCluster().getClusterControl().start(ServerType.TABLET_SERVER);
+
+      Wait.waitFor(() -> {
+        var lc = countLocations(c, MetadataTable.NAME);
+        log.info("locations:{}", lc);
+        return lc.size() == 4;
+      });
+
+      c.instanceOperations().waitForBalance();
+
+      locCounts = countLocations(c, MetadataTable.NAME);
+      stats = locCounts.values().stream().mapToInt(i -> i).summaryStatistics();
+      assertTrue(stats.getMax() <= 26, locCounts.toString());
+      assertTrue(stats.getMin() >= 25, locCounts.toString());
+      assertEquals(4, stats.getCount(), locCounts.toString());
+
+      // The user table should eventually balance
+      Wait.waitFor(() -> {
+        var lc = countLocations(c, tableName);
+        log.info("locations:{}", lc);
+        return lc.size() == 4;
+      });
+
+      locCounts = countLocations(c, tableName);
+      stats = locCounts.values().stream().mapToInt(i -> i).summaryStatistics();
+      assertTrue(stats.getMax() <= 3, locCounts.toString());
+      assertTrue(stats.getMin() >= 2, locCounts.toString());
+      assertEquals(4, stats.getCount(), locCounts.toString());
+    }
+  }
+
+  private Map<String,Integer> countLocations(AccumuloClient client, String 
tableName)
+      throws Exception {
+    var ctx = ((ClientContext) client);
+    var ample = ctx.getAmple();
+    try (var tabletsMeta =
+        
ample.readTablets().forTable(ctx.getTableId(tableName)).fetch(LOCATION, 
PREV_ROW).build()) {
+      Map<String,Integer> locCounts = new HashMap<>();
+      for (var tabletMeta : tabletsMeta) {
+        var loc = tabletMeta.getLocation();
+        locCounts.merge(loc == null ? " none" : loc.toString(), 1, 
Integer::sum);
+      }
+      return locCounts;
+    }
+  }
 }
diff --git 
a/test/src/test/java/org/apache/accumulo/test/ChaoticLoadBalancerTest.java 
b/test/src/test/java/org/apache/accumulo/test/ChaoticLoadBalancerTest.java
index 57fbd33247..5594aec10a 100644
--- a/test/src/test/java/org/apache/accumulo/test/ChaoticLoadBalancerTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/ChaoticLoadBalancerTest.java
@@ -159,7 +159,7 @@ public class ChaoticLoadBalancerTest {
     List<TabletMigration> migrationsOut = new ArrayList<>();
     while (!migrationsOut.isEmpty()) {
       balancer.balance(new BalanceParamsImpl(getAssignments(servers), 
migrations, migrationsOut,
-          DataLevel.USER));
+          DataLevel.USER, Map.of()));
     }
   }
 


Reply via email to