This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 8f266f4123 Moves balancing code out of Manager class (#5537)
8f266f4123 is described below

commit 8f266f412371c579508ade8fb73aec754c14236d
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Thu Jun 26 15:09:54 2025 -0400

    Moves balancing code out of Manager class (#5537)
    
    The balancing code needs some improvements.  Currently the code is
    entangled in the manager code in odd places (like under some of the
    balancing code is un the Manager.StatusThread class).  This change moves
    the balancing code to its own class w/o making any changes to the code
    (except fixing two small existing bugs in shouldCleanupMigration()).
    
    Making this change as a first step in making the balancer more
    independent of other manager code and threads.
    
    Co-authored-by: Dom G. <domgargu...@apache.org>
---
 .../apache/accumulo/manager/BalanceManager.java    | 460 +++++++++++++++++++++
 .../java/org/apache/accumulo/manager/Manager.java  | 398 +-----------------
 .../manager/ManagerClientServiceHandler.java       |   2 +-
 .../accumulo/manager/TabletGroupWatcher.java       |   8 +-
 4 files changed, 478 insertions(+), 390 deletions(-)

diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/BalanceManager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/BalanceManager.java
new file mode 100644
index 0000000000..b7ac49ec7d
--- /dev/null
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/BalanceManager.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager;
+
+import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static java.util.concurrent.TimeUnit.MINUTES;
+
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.manager.balancer.AssignmentParamsImpl;
+import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl;
+import org.apache.accumulo.core.manager.balancer.TServerStatusImpl;
+import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl;
+import org.apache.accumulo.core.manager.state.tables.TableState;
+import org.apache.accumulo.core.manager.thrift.TableInfo;
+import org.apache.accumulo.core.manager.thrift.TabletServerStatus;
+import org.apache.accumulo.core.metadata.SystemTables;
+import org.apache.accumulo.core.metadata.TServerInstance;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.filters.HasMigrationFilter;
+import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.core.spi.balancer.BalancerEnvironment;
+import org.apache.accumulo.core.spi.balancer.DoNothingBalancer;
+import org.apache.accumulo.core.spi.balancer.TabletBalancer;
+import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
+import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
+import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
+import org.apache.accumulo.core.util.threads.Threads;
+import org.apache.accumulo.manager.metrics.BalancerMetrics;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.manager.balancer.BalancerEnvironmentImpl;
+import org.apache.accumulo.server.manager.state.UnassignedTablet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+class BalanceManager {
+
+  private static final Logger log = 
LoggerFactory.getLogger(BalanceManager.class);
+
+  private final AtomicReference<Manager> manager;
+  // all access to this should be through getBalancer()
+  private TabletBalancer tabletBalancer;
+  private volatile BalancerEnvironment balancerEnvironment;
+  private final BalancerMetrics balancerMetrics = new BalancerMetrics();
+  private final Object balancedNotifier = new Object();
+  private static final long CLEANUP_INTERVAL_MINUTES = 
Manager.CLEANUP_INTERVAL_MINUTES;
+
+  BalanceManager() {
+    this.manager = new AtomicReference<>(null);
+  }
+
+  void setManager(Manager manager) {
+    Objects.requireNonNull(manager);
+    if (this.manager.compareAndSet(null, manager)) {
+      this.balancerEnvironment = new 
BalancerEnvironmentImpl(manager.getContext());
+    } else if (this.manager.get() != manager) {
+      throw new IllegalStateException("Attempted to set different manager 
object");
+    }
+  }
+
+  private Manager getManager() {
+    // fail fast if not yet set
+    return Objects.requireNonNull(manager.get(), "Manager has not been set.");
+  }
+
+  synchronized TabletBalancer getBalancer() {
+    String configuredBalancerClass =
+        getManager().getConfiguration().get(Property.MANAGER_TABLET_BALANCER);
+    try {
+      if (tabletBalancer == null
+          || 
!tabletBalancer.getClass().getName().equals(configuredBalancerClass)) {
+        log.debug("Attempting to initialize balancer using class {}, was {}",
+            configuredBalancerClass,
+            tabletBalancer == null ? "null" : 
tabletBalancer.getClass().getName());
+        var localTabletBalancer =
+            
Property.createInstanceFromPropertyName(getManager().getConfiguration(),
+                Property.MANAGER_TABLET_BALANCER, TabletBalancer.class, new 
DoNothingBalancer());
+        localTabletBalancer.init(balancerEnvironment);
+        tabletBalancer = localTabletBalancer;
+        log.info("tablet balancer changed to {}", 
localTabletBalancer.getClass().getName());
+      }
+    } catch (Exception e) {
+      log.warn("Failed to create balancer {} using {} instead", 
configuredBalancerClass,
+          DoNothingBalancer.class, e);
+      var localTabletBalancer = new DoNothingBalancer();
+      localTabletBalancer.init(balancerEnvironment);
+      tabletBalancer = localTabletBalancer;
+    }
+
+    return tabletBalancer;
+  }
+
+  private ServerContext getContext() {
+    return getManager().getContext();
+  }
+
+  MetricsProducer getMetrics() {
+    return balancerMetrics;
+  }
+
+  /**
+   * balanceTablets() balances tables by DataLevel. Return the current set of 
migrations partitioned
+   * by DataLevel
+   */
+  private Map<Ample.DataLevel,Set<KeyExtent>> partitionMigrations() {
+    final Map<Ample.DataLevel,Set<KeyExtent>> partitionedMigrations =
+        new EnumMap<>(Ample.DataLevel.class);
+    for (Ample.DataLevel dl : Ample.DataLevel.values()) {
+      Set<KeyExtent> extents = new HashSet<>();
+      // prev row needed for the extent
+      try (var tabletsMetadata = getContext()
+          
.getAmple().readTablets().forLevel(dl).fetch(TabletMetadata.ColumnType.PREV_ROW,
+              TabletMetadata.ColumnType.MIGRATION, 
TabletMetadata.ColumnType.LOCATION)
+          .filter(new HasMigrationFilter()).build()) {
+        // filter out migrations that are awaiting cleanup
+        tabletsMetadata.stream().filter(tm -> !shouldCleanupMigration(tm))
+            .forEach(tm -> extents.add(tm.getExtent()));
+      }
+      partitionedMigrations.put(dl, extents);
+    }
+    return partitionedMigrations;
+  }
+
+  /**
+   * Given the current tserverStatus map and a DataLevel, return a view of the 
tserverStatus map
+   * that only contains entries for tables in the DataLevel
+   */
+  private SortedMap<TServerInstance,TabletServerStatus> 
createTServerStatusView(
+      final Ample.DataLevel dl, final 
SortedMap<TServerInstance,TabletServerStatus> status) {
+    final SortedMap<TServerInstance,TabletServerStatus> tserverStatusForLevel 
= new TreeMap<>();
+    final String METADATA_TABLE_ID = 
SystemTables.METADATA.tableId().canonical();
+    final String ROOT_TABLE_ID = SystemTables.ROOT.tableId().canonical();
+    status.forEach((tsi, tss) -> {
+      final TabletServerStatus copy = tss.deepCopy();
+      final Map<String,TableInfo> oldTableMap = copy.getTableMap();
+      final Map<String,TableInfo> newTableMap =
+          new HashMap<>(dl == Ample.DataLevel.USER ? oldTableMap.size() : 1);
+      switch (dl) {
+        case ROOT: {
+          var tableInfo = oldTableMap.get(ROOT_TABLE_ID);
+          if (tableInfo != null) {
+            newTableMap.put(ROOT_TABLE_ID, tableInfo);
+          }
+          break;
+        }
+        case METADATA: {
+          var tableInfo = oldTableMap.get(METADATA_TABLE_ID);
+          if (tableInfo != null) {
+            newTableMap.put(METADATA_TABLE_ID, tableInfo);
+          }
+          break;
+        }
+        case USER:
+          if (!oldTableMap.containsKey(METADATA_TABLE_ID)
+              && !oldTableMap.containsKey(ROOT_TABLE_ID)) {
+            newTableMap.putAll(oldTableMap);
+          } else {
+            oldTableMap.forEach((table, info) -> {
+              if (!table.equals(ROOT_TABLE_ID) && 
!table.equals(METADATA_TABLE_ID)) {
+                newTableMap.put(table, info);
+              }
+            });
+          }
+          break;
+
+        default:
+          throw new IllegalArgumentException("Unhandled DataLevel value: " + 
dl);
+      }
+      copy.setTableMap(newTableMap);
+      tserverStatusForLevel.put(tsi, copy);
+    });
+    return tserverStatusForLevel;
+  }
+
+  private Map<String,TableId> getTablesForLevel(Ample.DataLevel dataLevel) {
+    switch (dataLevel) {
+      case ROOT:
+        return Map.of(SystemTables.ROOT.tableName(), 
SystemTables.ROOT.tableId());
+      case METADATA:
+        return Map.of(SystemTables.METADATA.tableName(), 
SystemTables.METADATA.tableId());
+      case USER: {
+        Map<String,TableId> userTables = 
getContext().createQualifiedTableNameToIdMap();
+        for (var accumuloTable : SystemTables.values()) {
+          if (Ample.DataLevel.of(accumuloTable.tableId()) != 
Ample.DataLevel.USER) {
+            userTables.remove(accumuloTable.tableName());
+          }
+        }
+        return Collections.unmodifiableMap(userTables);
+      }
+      default:
+        throw new IllegalArgumentException("Unknown data level " + dataLevel);
+    }
+  }
+
+  private List<TabletMigration> checkMigrationSanity(Set<TabletServerId> 
current,
+      List<TabletMigration> migrations, Ample.DataLevel level) {
+    return migrations.stream().filter(m -> {
+      boolean includeMigration = false;
+      if (m.getTablet() == null) {
+        log.error("Balancer gave back a null tablet {}", m);
+      } else if (Ample.DataLevel.of(m.getTablet().getTable()) != level) {
+        log.warn(
+            "Balancer wants to move a tablet ({}) outside of the current 
processing level ({}), "
+                + "ignoring and should be processed at the correct level ({})",
+            m.getTablet(), level, 
Ample.DataLevel.of(m.getTablet().getTable()));
+      } else if (m.getNewTabletServer() == null) {
+        log.error("Balancer did not set the destination {}", m);
+      } else if (m.getOldTabletServer() == null) {
+        log.error("Balancer did not set the source {}", m);
+      } else if (!current.contains(m.getOldTabletServer())) {
+        log.warn("Balancer wants to move a tablet from a server that is not 
current: {}", m);
+      } else if (!current.contains(m.getNewTabletServer())) {
+        log.warn("Balancer wants to move a tablet to a server that is not 
current: {}", m);
+      } else {
+        includeMigration = true;
+      }
+      return includeMigration;
+    }).collect(Collectors.toList());
+  }
+
+  long balanceTablets() {
+    final int tabletsNotHosted = getManager().notHosted();
+    BalanceParamsImpl params = null;
+    long wait = 0;
+    long totalMigrationsOut = 0;
+    final Map<Ample.DataLevel,Set<KeyExtent>> partitionedMigrations = 
partitionMigrations();
+    int levelsCompleted = 0;
+
+    for (Ample.DataLevel dl : Ample.DataLevel.values()) {
+      if (dl == Ample.DataLevel.USER && tabletsNotHosted > 0) {
+        log.debug("not balancing user tablets because there are {} unhosted 
tablets",
+            tabletsNotHosted);
+        continue;
+      }
+
+      if (dl == Ample.DataLevel.USER && !canAssignAndBalance()) {
+        log.debug("not balancing user tablets because not enough tablet 
servers");
+        continue;
+      }
+
+      if ((dl == Ample.DataLevel.METADATA || dl == Ample.DataLevel.USER)
+          && !partitionedMigrations.get(Ample.DataLevel.ROOT).isEmpty()) {
+        log.debug("Not balancing {} because {} has migrations", dl, 
Ample.DataLevel.ROOT);
+        continue;
+      }
+
+      if (dl == Ample.DataLevel.USER
+          && !partitionedMigrations.get(Ample.DataLevel.METADATA).isEmpty()) {
+        log.debug("Not balancing {} because {} has migrations", dl, 
Ample.DataLevel.METADATA);
+        continue;
+      }
+
+      // Create a view of the tserver status such that it only contains the 
tables
+      // for this level in the tableMap.
+      SortedMap<TServerInstance,TabletServerStatus> tserverStatusForLevel =
+          createTServerStatusView(dl, getManager().tserverStatus);
+      // Construct the Thrift variant of the map above for the BalancerParams
+      final SortedMap<TabletServerId,TServerStatus> 
tserverStatusForBalancerLevel = new TreeMap<>();
+      tserverStatusForLevel.forEach((tsi, status) -> 
tserverStatusForBalancerLevel
+          .put(new TabletServerIdImpl(tsi), 
TServerStatusImpl.fromThrift(status)));
+
+      log.debug("Balancing for tables at level {}", dl);
+
+      SortedMap<TabletServerId,TServerStatus> statusForBalancerLevel =
+          tserverStatusForBalancerLevel;
+      params = BalanceParamsImpl.fromThrift(statusForBalancerLevel,
+          getManager().tServerGroupingForBalancer, tserverStatusForLevel,
+          partitionedMigrations.get(dl), dl, getTablesForLevel(dl));
+      wait = Math.max(getBalancer().balance(params), wait);
+      long migrationsOutForLevel = 0;
+      try (var tabletsMutator = 
getContext().getAmple().conditionallyMutateTablets(result -> {})) {
+        for (TabletMigration m : 
checkMigrationSanity(statusForBalancerLevel.keySet(),
+            params.migrationsOut(), dl)) {
+          final KeyExtent ke = KeyExtent.fromTabletId(m.getTablet());
+          if (partitionedMigrations.get(dl).contains(ke)) {
+            log.warn("balancer requested migration more than once, skipping 
{}", m);
+            continue;
+          }
+          migrationsOutForLevel++;
+          var migration = TabletServerIdImpl.toThrift(m.getNewTabletServer());
+          tabletsMutator.mutateTablet(ke).requireAbsentOperation()
+              
.requireCurrentLocationNotEqualTo(migration).putMigration(migration)
+              .submit(tm -> false);
+          log.debug("migration {}", m);
+        }
+      }
+      totalMigrationsOut += migrationsOutForLevel;
+
+      // increment this at end of loop to signal complete run w/o any continue
+      levelsCompleted++;
+    }
+    final long totalMigrations =
+        totalMigrationsOut + 
partitionedMigrations.values().stream().mapToLong(Set::size).sum();
+    balancerMetrics.assignMigratingCount(() -> totalMigrations);
+
+    if (totalMigrationsOut == 0 && levelsCompleted == 
Ample.DataLevel.values().length) {
+      synchronized (balancedNotifier) {
+        balancedNotifier.notifyAll();
+      }
+    } else if (totalMigrationsOut > 0) {
+      getManager().nextEvent.event("Migrating %d more tablets, %d total", 
totalMigrationsOut,
+          totalMigrations);
+    }
+    return wait;
+  }
+
+  @SuppressFBWarnings(value = "UW_UNCOND_WAIT", justification = "TODO needs 
triage")
+  void waitForBalance() {
+    synchronized (balancedNotifier) {
+      long eventCounter;
+      do {
+        eventCounter = getManager().nextEvent.waitForEvents(0, 0);
+        try {
+          balancedNotifier.wait();
+        } catch (InterruptedException e) {
+          log.debug(e.toString(), e);
+        }
+      } while (getManager().displayUnassigned() > 0 || numMigrations() > 0
+          || eventCounter != getManager().nextEvent.waitForEvents(0, 0));
+    }
+  }
+
+  long numMigrations() {
+    long count = 0;
+    for (Ample.DataLevel dl : Ample.DataLevel.values()) {
+      try (var tabletsMetadata = 
getContext().getAmple().readTablets().forLevel(dl)
+          .fetch(TabletMetadata.ColumnType.MIGRATION).filter(new 
HasMigrationFilter()).build()) {
+        count += tabletsMetadata.stream().count();
+      }
+    }
+    return count;
+  }
+
+  void getAssignments(SortedMap<TServerInstance,TabletServerStatus> 
currentStatus,
+      Map<String,Set<TServerInstance>> currentTServerGroups,
+      Map<KeyExtent,UnassignedTablet> unassigned, 
Map<KeyExtent,TServerInstance> assignedOut) {
+    AssignmentParamsImpl params =
+        AssignmentParamsImpl.fromThrift(currentStatus, currentTServerGroups,
+            unassigned.entrySet().stream().collect(HashMap::new,
+                (m, e) -> m.put(e.getKey(),
+                    e.getValue().getLastLocation() == null ? null
+                        : e.getValue().getLastLocation().getServerInstance()),
+                Map::putAll),
+            assignedOut);
+    getBalancer().getAssignments(params);
+    if (!canAssignAndBalance()) {
+      // remove assignment for user tables
+      Iterator<KeyExtent> iter = assignedOut.keySet().iterator();
+      while (iter.hasNext()) {
+        KeyExtent ke = iter.next();
+        if (!ke.isMeta()) {
+          iter.remove();
+          log.trace("Removed assignment for {} as assignments for user tables 
is disabled.", ke);
+        }
+      }
+    }
+  }
+
+  private boolean canAssignAndBalance() {
+    final int threshold = getManager().getConfiguration()
+        .getCount(Property.MANAGER_TABLET_BALANCER_TSERVER_THRESHOLD);
+    if (threshold == 0) {
+      return true;
+    }
+    final int numTServers = getManager().tserverSet.size();
+    final boolean result = numTServers >= threshold;
+    if (!result) {
+      log.warn("Not assigning or balancing as number of tservers ({}) is below 
threshold ({})",
+          numTServers, threshold);
+    }
+    return result;
+  }
+
+  private boolean shouldCleanupMigration(TabletMetadata tabletMetadata) {
+    var tableState = 
getContext().getTableManager().getTableState(tabletMetadata.getTableId());
+    var migration = tabletMetadata.getMigration();
+    Preconditions.checkState(migration != null,
+        "This method should only be called if there is a migration");
+    return tableState == TableState.OFFLINE
+        || !getManager().onlineTabletServers().contains(migration)
+        || (tabletMetadata.getLocation() != null
+            && 
tabletMetadata.getLocation().getServerInstance().equals(migration));
+  }
+
+  void startMigrationCleanupThread() {
+    Threads.createCriticalThread("Migration Cleanup Thread", new 
MigrationCleanupThread()).start();
+  }
+
+  private class MigrationCleanupThread implements Runnable {
+
+    @Override
+    public void run() {
+      while (getManager().stillManager()) {
+        try {
+          // - Remove any migrations for tablets of offline tables, as the 
migration can never
+          // succeed because no tablet server will load the tablet
+          // - Remove any migrations to tablet servers that are not live
+          // - Remove any migrations where the tablets current location equals 
the migration
+          // (the migration has completed)
+          var ample = getContext().getAmple();
+          for (Ample.DataLevel dl : Ample.DataLevel.values()) {
+            // prev row needed for the extent
+            try (var tabletsMetadata = ample.readTablets().forLevel(dl)
+                .fetch(TabletMetadata.ColumnType.PREV_ROW, 
TabletMetadata.ColumnType.MIGRATION,
+                    TabletMetadata.ColumnType.LOCATION)
+                .filter(new HasMigrationFilter()).build();
+                var tabletsMutator = ample.conditionallyMutateTablets(result 
-> {})) {
+              for (var tabletMetadata : tabletsMetadata) {
+                var migration = tabletMetadata.getMigration();
+                if (shouldCleanupMigration(tabletMetadata)) {
+                  
tabletsMutator.mutateTablet(tabletMetadata.getExtent()).requireAbsentOperation()
+                      .requireMigration(migration).deleteMigration().submit(tm 
-> false);
+                }
+              }
+            }
+          }
+        } catch (Exception ex) {
+          log.error("Error cleaning up migrations", ex);
+        }
+        sleepUninterruptibly(CLEANUP_INTERVAL_MINUTES, MINUTES);
+      }
+    }
+  }
+}
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 28b435ef13..5504ae1efc 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -34,7 +34,6 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -44,7 +43,6 @@ import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
-import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
@@ -56,7 +54,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.Predicate;
-import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.cli.ConfigOpts;
@@ -89,10 +86,6 @@ import 
org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
 import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
 import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
 import org.apache.accumulo.core.lock.ServiceLockSupport.HAServiceLockWatcher;
-import org.apache.accumulo.core.manager.balancer.AssignmentParamsImpl;
-import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl;
-import org.apache.accumulo.core.manager.balancer.TServerStatusImpl;
-import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl;
 import org.apache.accumulo.core.manager.state.tables.TableState;
 import org.apache.accumulo.core.manager.thrift.BulkImportState;
 import org.apache.accumulo.core.manager.thrift.ManagerGoalState;
@@ -103,16 +96,8 @@ import 
org.apache.accumulo.core.manager.thrift.TabletServerStatus;
 import org.apache.accumulo.core.metadata.SystemTables;
 import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
-import org.apache.accumulo.core.metadata.schema.TabletMetadata;
-import org.apache.accumulo.core.metadata.schema.filters.HasMigrationFilter;
 import org.apache.accumulo.core.metrics.MetricsInfo;
 import org.apache.accumulo.core.metrics.MetricsProducer;
-import org.apache.accumulo.core.spi.balancer.BalancerEnvironment;
-import org.apache.accumulo.core.spi.balancer.DoNothingBalancer;
-import org.apache.accumulo.core.spi.balancer.TabletBalancer;
-import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
-import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
-import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
 import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.util.Retry;
 import org.apache.accumulo.core.util.Timer;
@@ -122,7 +107,6 @@ import org.apache.accumulo.core.util.time.SteadyTime;
 import org.apache.accumulo.core.zookeeper.ZcStat;
 import 
org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator;
 import org.apache.accumulo.manager.merge.FindMergeableRangeTask;
-import org.apache.accumulo.manager.metrics.BalancerMetrics;
 import org.apache.accumulo.manager.metrics.ManagerMetrics;
 import org.apache.accumulo.manager.recovery.RecoveryManager;
 import org.apache.accumulo.manager.split.Splitter;
@@ -137,11 +121,9 @@ import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.manager.LiveTServerSet;
 import org.apache.accumulo.server.manager.LiveTServerSet.LiveTServersSnapshot;
 import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection;
-import org.apache.accumulo.server.manager.balancer.BalancerEnvironmentImpl;
 import org.apache.accumulo.server.manager.state.DeadServerList;
 import org.apache.accumulo.server.manager.state.TabletServerState;
 import org.apache.accumulo.server.manager.state.TabletStateStore;
-import org.apache.accumulo.server.manager.state.UnassignedTablet;
 import org.apache.accumulo.server.rpc.ServerAddress;
 import org.apache.accumulo.server.rpc.TServerUtils;
 import org.apache.accumulo.server.rpc.ThriftProcessorTypes;
@@ -167,7 +149,6 @@ import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.common.util.concurrent.Uninterruptibles;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import io.micrometer.core.instrument.MeterRegistry;
 import io.opentelemetry.api.trace.Span;
 import io.opentelemetry.context.Scope;
@@ -182,7 +163,7 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener {
   static final Logger log = LoggerFactory.getLogger(Manager.class);
 
   static final int ONE_SECOND = 1000;
-  private static final long CLEANUP_INTERVAL_MINUTES = 5;
+  static final long CLEANUP_INTERVAL_MINUTES = 5;
   static final long WAIT_BETWEEN_ERRORS = ONE_SECOND;
   private static final long DEFAULT_WAIT_FOR_WATCHER = 10 * ONE_SECOND;
   private static final int MAX_CLEANUP_WAIT_TIME = ONE_SECOND;
@@ -191,7 +172,6 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener {
   private static final int MAX_BAD_STATUS_COUNT = 3;
   private static final double MAX_SHUTDOWNS_PER_SEC = 10D / 60D;
 
-  private final Object balancedNotifier = new Object();
   final LiveTServerSet tserverSet;
   private final List<TabletGroupWatcher> watchers = new ArrayList<>();
   final Map<TServerInstance,AtomicInteger> badServers =
@@ -208,9 +188,7 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener {
 
   ServiceLock managerLock = null;
   private TServer clientService = null;
-  protected volatile TabletBalancer tabletBalancer = null;
-  private final BalancerEnvironment balancerEnvironment;
-  private final BalancerMetrics balancerMetrics = new BalancerMetrics();
+  private final BalanceManager balanceManager;
 
   private ManagerState state = ManagerState.INITIAL;
 
@@ -238,6 +216,10 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener {
     return state;
   }
 
+  public BalanceManager getBalanceManager() {
+    return balanceManager;
+  }
+
   public Map<FateId,Map<String,String>> getCompactionHints(DataLevel level) {
     Predicate<TableId> tablePredicate = (tableId) -> DataLevel.of(tableId) == 
level;
     Map<FateId,CompactionConfig> allConfig;
@@ -377,7 +359,7 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener {
         - assignedOrHosted(SystemTables.ROOT.tableId());
   }
 
-  private int notHosted() {
+  int notHosted() {
     int result = 0;
     for (TabletGroupWatcher watcher : watchers) {
       for (TableCounts counts : watcher.getStats().values()) {
@@ -453,7 +435,7 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener {
     super(ServerId.Type.MANAGER, opts, serverContextFactory, args);
     ServerContext context = super.getContext();
     upgradeCoordinator = new UpgradeCoordinator(context);
-    balancerEnvironment = new BalancerEnvironmentImpl(context);
+    balanceManager = new BalanceManager();
 
     AccumuloConfiguration aconf = context.getConfiguration();
 
@@ -461,7 +443,6 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener {
     log.info("Instance {}", context.getInstanceID());
     timeKeeper = new ManagerTime(this, aconf);
     tserverSet = new LiveTServerSet(context, this);
-    initializeBalancer();
 
     final long tokenLifetime = 
aconf.getTimeInMillis(Property.GENERAL_DELEGATION_TOKEN_LIFETIME);
 
@@ -522,10 +503,6 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener {
     return splitter;
   }
 
-  public MetricsProducer getBalancerMetrics() {
-    return balancerMetrics;
-  }
-
   public UpgradeCoordinator.UpgradeStatus getUpgradeStatus() {
     return upgradeCoordinator.getStatus();
   }
@@ -544,52 +521,6 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener {
     }
   }
 
-  private class MigrationCleanupThread implements Runnable {
-
-    @Override
-    public void run() {
-      while (stillManager()) {
-        try {
-          // - Remove any migrations for tablets of offline tables, as the 
migration can never
-          // succeed because no tablet server will load the tablet
-          // - Remove any migrations to tablet servers that are not live
-          // - Remove any migrations where the tablets current location equals 
the migration
-          // (the migration has completed)
-          var ample = getContext().getAmple();
-          for (DataLevel dl : DataLevel.values()) {
-            // prev row needed for the extent
-            try (var tabletsMetadata = ample.readTablets().forLevel(dl)
-                .fetch(TabletMetadata.ColumnType.PREV_ROW, 
TabletMetadata.ColumnType.MIGRATION,
-                    TabletMetadata.ColumnType.LOCATION)
-                .filter(new HasMigrationFilter()).build();
-                var tabletsMutator = ample.conditionallyMutateTablets(result 
-> {})) {
-              for (var tabletMetadata : tabletsMetadata) {
-                var migration = tabletMetadata.getMigration();
-                if (shouldCleanupMigration(tabletMetadata)) {
-                  
tabletsMutator.mutateTablet(tabletMetadata.getExtent()).requireAbsentOperation()
-                      .requireMigration(migration).deleteMigration().submit(tm 
-> false);
-                }
-              }
-            }
-          }
-        } catch (Exception ex) {
-          log.error("Error cleaning up migrations", ex);
-        }
-        sleepUninterruptibly(CLEANUP_INTERVAL_MINUTES, MINUTES);
-      }
-    }
-  }
-
-  private boolean shouldCleanupMigration(TabletMetadata tabletMetadata) {
-    var tableState = 
getContext().getTableManager().getTableState(tabletMetadata.getTableId());
-    var migration = tabletMetadata.getMigration();
-    Preconditions.checkState(migration != null,
-        "This method should only be called if there is a migration");
-    return tableState == TableState.OFFLINE || 
!onlineTabletServers().contains(migration)
-        || (tabletMetadata.getLocation() != null
-            && 
tabletMetadata.getLocation().getServerInstance().equals(migration));
-  }
-
   private class ScanServerZKCleaner implements Runnable {
 
     @Override
@@ -632,28 +563,6 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener {
 
   }
 
-  /**
-   * balanceTablets() balances tables by DataLevel. Return the current set of 
migrations partitioned
-   * by DataLevel
-   */
-  private Map<DataLevel,Set<KeyExtent>> partitionMigrations() {
-    final Map<DataLevel,Set<KeyExtent>> partitionedMigrations = new 
EnumMap<>(DataLevel.class);
-    for (DataLevel dl : DataLevel.values()) {
-      Set<KeyExtent> extents = new HashSet<>();
-      // prev row needed for the extent
-      try (var tabletsMetadata = getContext()
-          
.getAmple().readTablets().forLevel(dl).fetch(TabletMetadata.ColumnType.PREV_ROW,
-              TabletMetadata.ColumnType.MIGRATION, 
TabletMetadata.ColumnType.LOCATION)
-          .filter(new HasMigrationFilter()).build()) {
-        // filter out migrations that are awaiting cleanup
-        tabletsMetadata.stream().filter(tm -> !shouldCleanupMigration(tm))
-            .forEach(tm -> extents.add(tm.getExtent()));
-      }
-      partitionedMigrations.put(dl, extents);
-    }
-    return partitionedMigrations;
-  }
-
   private class StatusThread implements Runnable {
 
     private boolean goodStats() {
@@ -798,7 +707,7 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener {
             return DEFAULT_WAIT_FOR_WATCHER;
           }
         }
-        return balanceTablets();
+        return balanceManager.balanceTablets();
       }
       return DEFAULT_WAIT_FOR_WATCHER;
     }
@@ -830,186 +739,6 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener {
         badServers.putIfAbsent(instance, new AtomicInteger(1));
       }
     }
-
-    /**
-     * Given the current tserverStatus map and a DataLevel, return a view of 
the tserverStatus map
-     * that only contains entries for tables in the DataLevel
-     */
-    private SortedMap<TServerInstance,TabletServerStatus> 
createTServerStatusView(
-        final DataLevel dl, final 
SortedMap<TServerInstance,TabletServerStatus> status) {
-      final SortedMap<TServerInstance,TabletServerStatus> 
tserverStatusForLevel = new TreeMap<>();
-      status.forEach((tsi, tss) -> {
-        final TabletServerStatus copy = tss.deepCopy();
-        final Map<String,TableInfo> oldTableMap = copy.getTableMap();
-        final Map<String,TableInfo> newTableMap =
-            new HashMap<>(dl == DataLevel.USER ? oldTableMap.size() : 1);
-        if (dl == DataLevel.ROOT) {
-          if 
(oldTableMap.containsKey(SystemTables.ROOT.tableId().canonical())) {
-            newTableMap.put(SystemTables.ROOT.tableId().canonical(),
-                oldTableMap.get(SystemTables.ROOT.tableId().canonical()));
-          }
-        } else if (dl == DataLevel.METADATA) {
-          if 
(oldTableMap.containsKey(SystemTables.METADATA.tableId().canonical())) {
-            newTableMap.put(SystemTables.METADATA.tableId().canonical(),
-                oldTableMap.get(SystemTables.METADATA.tableId().canonical()));
-          }
-        } else if (dl == DataLevel.USER) {
-          if 
(!oldTableMap.containsKey(SystemTables.METADATA.tableId().canonical())
-              && 
!oldTableMap.containsKey(SystemTables.ROOT.tableId().canonical())) {
-            newTableMap.putAll(oldTableMap);
-          } else {
-            oldTableMap.forEach((table, info) -> {
-              if (!table.equals(SystemTables.ROOT.tableId().canonical())
-                  && 
!table.equals(SystemTables.METADATA.tableId().canonical())) {
-                newTableMap.put(table, info);
-              }
-            });
-          }
-        } else {
-          throw new IllegalArgumentException("Unhandled DataLevel value: " + 
dl);
-        }
-        copy.setTableMap(newTableMap);
-        tserverStatusForLevel.put(tsi, copy);
-      });
-      return tserverStatusForLevel;
-    }
-
-    private Map<String,TableId> getTablesForLevel(DataLevel dataLevel) {
-      switch (dataLevel) {
-        case ROOT:
-          return Map.of(SystemTables.ROOT.tableName(), 
SystemTables.ROOT.tableId());
-        case METADATA:
-          return Map.of(SystemTables.METADATA.tableName(), 
SystemTables.METADATA.tableId());
-        case USER: {
-          Map<String,TableId> userTables = 
getContext().createQualifiedTableNameToIdMap();
-          for (var accumuloTable : SystemTables.values()) {
-            if (DataLevel.of(accumuloTable.tableId()) != DataLevel.USER) {
-              userTables.remove(accumuloTable.tableName());
-            }
-          }
-          return Collections.unmodifiableMap(userTables);
-        }
-        default:
-          throw new IllegalArgumentException("Unknown data level " + 
dataLevel);
-      }
-    }
-
-    private long balanceTablets() {
-
-      // Check for balancer property change
-      initializeBalancer();
-
-      final int tabletsNotHosted = notHosted();
-      BalanceParamsImpl params = null;
-      long wait = 0;
-      long totalMigrationsOut = 0;
-      final Map<DataLevel,Set<KeyExtent>> partitionedMigrations = 
partitionMigrations();
-      int levelsCompleted = 0;
-
-      for (DataLevel dl : DataLevel.values()) {
-
-        if (dl == DataLevel.USER && tabletsNotHosted > 0) {
-          log.debug("not balancing user tablets because there are {} unhosted 
tablets",
-              tabletsNotHosted);
-          continue;
-        }
-
-        if (dl == DataLevel.USER && !canAssignAndBalance()) {
-          log.debug("not balancing user tablets because not enough tablet 
servers");
-          continue;
-        }
-
-        if ((dl == DataLevel.METADATA || dl == DataLevel.USER)
-            && !partitionedMigrations.get(DataLevel.ROOT).isEmpty()) {
-          log.debug("Not balancing {} because {} has migrations", dl, 
DataLevel.ROOT);
-          continue;
-        }
-
-        if (dl == DataLevel.USER && 
!partitionedMigrations.get(DataLevel.METADATA).isEmpty()) {
-          log.debug("Not balancing {} because {} has migrations", dl, 
DataLevel.METADATA);
-          continue;
-        }
-
-        // Create a view of the tserver status such that it only contains the 
tables
-        // for this level in the tableMap.
-        SortedMap<TServerInstance,TabletServerStatus> tserverStatusForLevel =
-            createTServerStatusView(dl, tserverStatus);
-        // Construct the Thrift variant of the map above for the BalancerParams
-        final SortedMap<TabletServerId,TServerStatus> 
tserverStatusForBalancerLevel =
-            new TreeMap<>();
-        tserverStatusForLevel.forEach((tsi, status) -> 
tserverStatusForBalancerLevel
-            .put(new TabletServerIdImpl(tsi), 
TServerStatusImpl.fromThrift(status)));
-
-        log.debug("Balancing for tables at level {}", dl);
-
-        SortedMap<TabletServerId,TServerStatus> statusForBalancerLevel =
-            tserverStatusForBalancerLevel;
-        params = BalanceParamsImpl.fromThrift(statusForBalancerLevel, 
tServerGroupingForBalancer,
-            tserverStatusForLevel, partitionedMigrations.get(dl), dl, 
getTablesForLevel(dl));
-        wait = Math.max(tabletBalancer.balance(params), wait);
-        long migrationsOutForLevel = 0;
-        try (
-            var tabletsMutator = 
getContext().getAmple().conditionallyMutateTablets(result -> {})) {
-          for (TabletMigration m : 
checkMigrationSanity(statusForBalancerLevel.keySet(),
-              params.migrationsOut(), dl)) {
-            final KeyExtent ke = KeyExtent.fromTabletId(m.getTablet());
-            if (partitionedMigrations.get(dl).contains(ke)) {
-              log.warn("balancer requested migration more than once, skipping 
{}", m);
-              continue;
-            }
-            migrationsOutForLevel++;
-            var migration = 
TabletServerIdImpl.toThrift(m.getNewTabletServer());
-            tabletsMutator.mutateTablet(ke).requireAbsentOperation()
-                
.requireCurrentLocationNotEqualTo(migration).putMigration(migration)
-                .submit(tm -> false);
-            log.debug("migration {}", m);
-          }
-        }
-        totalMigrationsOut += migrationsOutForLevel;
-
-        // increment this at end of loop to signal complete run w/o any 
continue
-        levelsCompleted++;
-      }
-      final long totalMigrations =
-          totalMigrationsOut + 
partitionedMigrations.values().stream().mapToLong(Set::size).sum();
-      balancerMetrics.assignMigratingCount(() -> totalMigrations);
-
-      if (totalMigrationsOut == 0 && levelsCompleted == 
DataLevel.values().length) {
-        synchronized (balancedNotifier) {
-          balancedNotifier.notifyAll();
-        }
-      } else if (totalMigrationsOut > 0) {
-        nextEvent.event("Migrating %d more tablets, %d total", 
totalMigrationsOut, totalMigrations);
-      }
-      return wait;
-    }
-
-    private List<TabletMigration> checkMigrationSanity(Set<TabletServerId> 
current,
-        List<TabletMigration> migrations, DataLevel level) {
-      return migrations.stream().filter(m -> {
-        boolean includeMigration = false;
-        if (m.getTablet() == null) {
-          log.error("Balancer gave back a null tablet {}", m);
-        } else if (DataLevel.of(m.getTablet().getTable()) != level) {
-          log.warn(
-              "Balancer wants to move a tablet ({}) outside of the current 
processing level ({}), "
-                  + "ignoring and should be processed at the correct level 
({})",
-              m.getTablet(), level, DataLevel.of(m.getTablet().getTable()));
-        } else if (m.getNewTabletServer() == null) {
-          log.error("Balancer did not set the destination {}", m);
-        } else if (m.getOldTabletServer() == null) {
-          log.error("Balancer did not set the source {}", m);
-        } else if (!current.contains(m.getOldTabletServer())) {
-          log.warn("Balancer wants to move a tablet from a server that is not 
current: {}", m);
-        } else if (!current.contains(m.getNewTabletServer())) {
-          log.warn("Balancer wants to move a tablet to a server that is not 
current: {}", m);
-        } else {
-          includeMigration = true;
-        }
-        return includeMigration;
-      }).collect(Collectors.toList());
-    }
-
   }
 
   private SortedMap<TServerInstance,TabletServerStatus>
@@ -1115,6 +844,8 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener {
   public void run() {
     final ServerContext context = getContext();
 
+    balanceManager.setManager(this);
+
     // ACCUMULO-4424 Put up the Thrift servers before getting the lock as a 
sign of process health
     // when a hot-standby
     //
@@ -1203,7 +934,7 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener {
     MetricsInfo metricsInfo = getContext().getMetricsInfo();
     ManagerMetrics managerMetrics = new ManagerMetrics(getConfiguration(), 
this);
     var producers = managerMetrics.getProducers(getConfiguration(), this);
-    producers.add(balancerMetrics);
+    producers.add(balanceManager.getMetrics());
 
     final TabletGroupWatcher userTableTGW =
         new TabletGroupWatcher(this, this.userTabletStore, null, 
managerMetrics) {
@@ -1319,7 +1050,7 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener {
     metricsInfo.init(MetricsInfo.serviceTags(getContext().getInstanceName(), 
getApplicationName(),
         sa.getAddress(), getResourceGroup()));
 
-    Threads.createCriticalThread("Migration Cleanup Thread", new 
MigrationCleanupThread()).start();
+    balanceManager.startMigrationCleanupThread();
     Threads.createCriticalThread("ScanServer Cleanup Thread", new 
ScanServerZKCleaner()).start();
 
     // Don't call start the CompactionCoordinator until we have tservers and 
upgrade is complete.
@@ -1742,22 +1473,6 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener {
     }
   }
 
-  @SuppressFBWarnings(value = "UW_UNCOND_WAIT", justification = "TODO needs 
triage")
-  public void waitForBalance() {
-    synchronized (balancedNotifier) {
-      long eventCounter;
-      do {
-        eventCounter = nextEvent.waitForEvents(0, 0);
-        try {
-          balancedNotifier.wait();
-        } catch (InterruptedException e) {
-          log.debug(e.toString(), e);
-        }
-      } while (displayUnassigned() > 0 || numMigrations() > 0
-          || eventCounter != nextEvent.waitForEvents(0, 0));
-    }
-  }
-
   public ManagerMonitorInfo getManagerMonitorInfo() {
     final ManagerMonitorInfo result = new ManagerMonitorInfo();
 
@@ -1826,67 +1541,6 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener {
     return upgradeCoordinator.getStatus() != 
UpgradeCoordinator.UpgradeStatus.COMPLETE;
   }
 
-  private void initializeBalancer() {
-    String configuredBalancerClass = 
getConfiguration().get(Property.MANAGER_TABLET_BALANCER);
-    try {
-      if (tabletBalancer == null
-          || 
!tabletBalancer.getClass().getName().equals(configuredBalancerClass)) {
-        log.debug("Attempting to initialize balancer using class {}, was {}",
-            configuredBalancerClass,
-            tabletBalancer == null ? "null" : 
tabletBalancer.getClass().getName());
-        var localTabletBalancer = 
Property.createInstanceFromPropertyName(getConfiguration(),
-            Property.MANAGER_TABLET_BALANCER, TabletBalancer.class, new 
DoNothingBalancer());
-        localTabletBalancer.init(balancerEnvironment);
-        tabletBalancer = localTabletBalancer;
-        log.info("tablet balancer changed to {}", 
localTabletBalancer.getClass().getName());
-      }
-    } catch (Exception e) {
-      log.warn("Failed to create balancer {} using {} instead", 
configuredBalancerClass,
-          DoNothingBalancer.class, e);
-      var localTabletBalancer = new DoNothingBalancer();
-      localTabletBalancer.init(balancerEnvironment);
-      tabletBalancer = localTabletBalancer;
-    }
-  }
-
-  void getAssignments(SortedMap<TServerInstance,TabletServerStatus> 
currentStatus,
-      Map<String,Set<TServerInstance>> currentTServerGroups,
-      Map<KeyExtent,UnassignedTablet> unassigned, 
Map<KeyExtent,TServerInstance> assignedOut) {
-    AssignmentParamsImpl params =
-        AssignmentParamsImpl.fromThrift(currentStatus, currentTServerGroups,
-            unassigned.entrySet().stream().collect(HashMap::new,
-                (m, e) -> m.put(e.getKey(),
-                    e.getValue().getLastLocation() == null ? null
-                        : e.getValue().getLastLocation().getServerInstance()),
-                Map::putAll),
-            assignedOut);
-    tabletBalancer.getAssignments(params);
-    if (!canAssignAndBalance()) {
-      // remove assignment for user tables
-      Iterator<KeyExtent> iter = assignedOut.keySet().iterator();
-      while (iter.hasNext()) {
-        KeyExtent ke = iter.next();
-        if (!ke.isMeta()) {
-          iter.remove();
-          log.trace("Removed assignment for {} as assignments for user tables 
is disabled.", ke);
-        }
-      }
-    }
-  }
-
-  public TabletStateStore getTabletStateStore(DataLevel level) {
-    switch (level) {
-      case METADATA:
-        return this.metadataTabletStore;
-      case ROOT:
-        return this.rootTabletStore;
-      case USER:
-        return this.userTabletStore;
-      default:
-        throw new IllegalStateException("Unhandled DataLevel value: " + level);
-    }
-  }
-
   @Override
   public void registerMetrics(MeterRegistry registry) {
     super.registerMetrics(registry);
@@ -1903,30 +1557,4 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener {
   public ServiceLock getLock() {
     return managerLock;
   }
-
-  private long numMigrations() {
-    long count = 0;
-    for (DataLevel dl : DataLevel.values()) {
-      try (var tabletsMetadata = 
getContext().getAmple().readTablets().forLevel(dl)
-          .fetch(TabletMetadata.ColumnType.MIGRATION).filter(new 
HasMigrationFilter()).build()) {
-        count += tabletsMetadata.stream().count();
-      }
-    }
-    return count;
-  }
-
-  private boolean canAssignAndBalance() {
-    final int threshold =
-        
getConfiguration().getCount(Property.MANAGER_TABLET_BALANCER_TSERVER_THRESHOLD);
-    if (threshold == 0) {
-      return true;
-    }
-    final int numTServers = tserverSet.size();
-    final boolean result = numTServers >= threshold;
-    if (!result) {
-      log.warn("Not assigning or balancing as number of tservers ({}) is below 
threshold ({})",
-          numTServers, threshold);
-    }
-    return result;
-  }
 }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java
index 6c8ff0ef3f..2be39fdfcf 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java
@@ -549,7 +549,7 @@ public class ManagerClientServiceHandler implements 
ManagerClientService.Iface {
 
   @Override
   public void waitForBalance(TInfo tinfo) {
-    manager.waitForBalance();
+    manager.getBalanceManager().waitForBalance();
   }
 
   @Override
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index c9ff9ba426..6e306936a6 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@ -533,8 +533,8 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
       // This is final because nothing in this method should change the goal. 
All computation of the
       // goal should be done in TabletGoalState.compute() so that all parts of 
the Accumulo code
       // will compute a consistent goal.
-      final TabletGoalState goal =
-          TabletGoalState.compute(tm, state, manager.tabletBalancer, 
tableMgmtParams);
+      final TabletGoalState goal = TabletGoalState.compute(tm, state,
+          manager.getBalanceManager().getBalancer(), tableMgmtParams);
 
       final Set<ManagementAction> actions = mti.getActions();
 
@@ -969,8 +969,8 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
       Map<KeyExtent,UnassignedTablet> unassigned) {
     if (!tLists.destinations.isEmpty()) {
       Map<KeyExtent,TServerInstance> assignedOut = new HashMap<>();
-      manager.getAssignments(tLists.destinations, 
tLists.currentTServerGrouping, unassigned,
-          assignedOut);
+      manager.getBalanceManager().getAssignments(tLists.destinations, 
tLists.currentTServerGrouping,
+          unassigned, assignedOut);
       for (Entry<KeyExtent,TServerInstance> assignment : 
assignedOut.entrySet()) {
         if (unassigned.containsKey(assignment.getKey())) {
           if (assignment.getValue() != null) {

Reply via email to