This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 0ce85b7a3fcd8a5b9a04cb8b5a29d5250e8c8a7e Merge: 53336d23b4 0b8ddf9745 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Fri Jul 5 20:09:50 2024 +0000 Merge branch 'main' into elasticity .../core/metadata/schema/DataLevelOrderTest.java | 40 +++++++++++++++ .../java/org/apache/accumulo/manager/Manager.java | 60 ++++++++++++++++------ 2 files changed, 84 insertions(+), 16 deletions(-) diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 28be0b7ffb,10913b920c..d2f2f0eab1 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@@ -844,22 -956,52 +842,52 @@@ public class Manager extends AbstractSe } private long balanceTablets() { - BalanceParamsImpl params = BalanceParamsImpl.fromThrift(tserverStatusForBalancer, - tServerGroupingForBalancer, tserverStatus, migrationsSnapshot().keySet()); - long wait = tabletBalancer.balance(params); - - for (TabletMigration m : checkMigrationSanity(tserverStatusForBalancer.keySet(), - params.migrationsOut())) { - KeyExtent ke = KeyExtent.fromTabletId(m.getTablet()); - if (migrations.containsKey(ke)) { - log.warn("balancer requested migration more than once, skipping {}", m); + + Map<DataLevel,Set<KeyExtent>> partitionedMigrations = + new HashMap<>(DataLevel.values().length); - migrationsSnapshot().forEach(ke -> { ++ migrationsSnapshot().keySet().forEach(ke -> { + partitionedMigrations.computeIfAbsent(DataLevel.of(ke.tableId()), f -> new HashSet<>()) + .add(ke); + }); + + final int tabletsNotHosted = notHosted(); + BalanceParamsImpl params = null; + long wait = 0; + long totalMigrationsOut = 0; + for (DataLevel dl : DataLevel.values()) { + final Set<KeyExtent> migrationsForLevel = partitionedMigrations.get(dl); + if (migrationsForLevel == null) { + continue; + } + if (dl == DataLevel.USER && tabletsNotHosted > 0) { + log.debug("not balancing user tablets because there are {} unhosted tablets", + tabletsNotHosted); continue; } - TServerInstance tserverInstance = TabletServerIdImpl.toThrift(m.getNewTabletServer()); - migrations.put(ke, tserverInstance); - log.debug("migration {}", m); + long migrationsOutForLevel = 0; + int i = 0; + do { + i++; + log.debug("Balancing for tables at level {}, times-in-loop: {}", dl, i); - params = BalanceParamsImpl.fromThrift(tserverStatusForBalancer, tserverStatus, - migrationsForLevel); ++ params = BalanceParamsImpl.fromThrift(tserverStatusForBalancer, ++ tServerGroupingForBalancer, tserverStatus, migrationsForLevel); + wait = Math.max(tabletBalancer.balance(params), wait); + migrationsOutForLevel = params.migrationsOut().size(); + for (TabletMigration m : checkMigrationSanity(tserverStatusForBalancer.keySet(), + params.migrationsOut())) { + final KeyExtent ke = KeyExtent.fromTabletId(m.getTablet()); + if (migrations.containsKey(ke)) { + log.warn("balancer requested migration more than once, skipping {}", m); + continue; + } + migrations.put(ke, TabletServerIdImpl.toThrift(m.getNewTabletServer())); + log.debug("migration {}", m); + } + } while (migrationsOutForLevel > 0 && (dl == DataLevel.ROOT || dl == DataLevel.METADATA)); + totalMigrationsOut += migrationsOutForLevel; } - if (params.migrationsOut().isEmpty()) { + + if (totalMigrationsOut == 0) { synchronized (balancedNotifier) { balancedNotifier.notifyAll(); }