This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 530e39757b6427203ffeb1ee5b2d06578234aaac Merge: 696030e506 8f1019a10b Author: Dave Marion <dlmar...@apache.org> AuthorDate: Tue Jul 23 15:27:29 2024 +0000 Merge branch 'main' into elasticity .../java/org/apache/accumulo/manager/Manager.java | 103 ++++++++++++++++----- 1 file changed, 80 insertions(+), 23 deletions(-) diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 7557dc78fb,039282cb66..b989de3998 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@@ -225,17 -217,13 +225,16 @@@ public class Manager extends AbstractSe private ManagerState state = ManagerState.INITIAL; - // fateReadyLatch and fateRef go together; when this latch is ready, then the fate reference - // should already have been set; still need to use atomic reference or volatile for fateRef, so no - // thread's cached view shows that fateRef is still null after the latch is ready + // fateReadyLatch and fateRefs go together; when this latch is ready, then the fate references + // should already have been set; ConcurrentHashMap will guarantee that all threads will see + // the initialized fate references after the latch is ready private final CountDownLatch fateReadyLatch = new CountDownLatch(1); - private final AtomicReference<Fate<Manager>> fateRef = new AtomicReference<>(null); + private final AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateRefs = + new AtomicReference<>(); volatile SortedMap<TServerInstance,TabletServerStatus> tserverStatus = emptySortedMap(); - volatile SortedMap<TabletServerId,TServerStatus> tserverStatusForBalancer = emptySortedMap(); + volatile Map<String,Set<TServerInstance>> tServerGroupingForBalancer = emptyMap(); + final ServerBulkImportStatus bulkImportStatus = new ServerBulkImportStatus(); private final AtomicBoolean managerInitialized = new AtomicBoolean(false); @@@ -791,11 -905,8 +790,10 @@@ } private long updateStatus() { - Set<TServerInstance> currentServers = tserverSet.getCurrentServers(); - tserverStatus = gatherTableInformation(currentServers); + var tseversSnapshot = tserverSet.getSnapshot(); - TreeMap<TabletServerId,TServerStatus> temp = new TreeMap<>(); - tserverStatus = gatherTableInformation(tseversSnapshot.getTservers(), temp); - tserverStatusForBalancer = Collections.unmodifiableSortedMap(temp); ++ tserverStatus = gatherTableInformation(tseversSnapshot.getTservers()); + tServerGroupingForBalancer = tseversSnapshot.getTserverGroups(); ++ checkForHeldServer(tserverStatus); if (!badServers.isEmpty()) { @@@ -845,39 -956,100 +843,100 @@@ } } - private long balanceTablets() { - - Map<DataLevel,Set<KeyExtent>> partitionedMigrations = + /** + * balanceTablets() balances tables by DataLevel. Return the current set of migrations + * partitioned by DataLevel + */ + private Map<DataLevel,Set<KeyExtent>> partitionMigrations(final Set<KeyExtent> migrations) { + final Map<DataLevel,Set<KeyExtent>> partitionedMigrations = new HashMap<>(DataLevel.values().length); + // populate to prevent NPE + for (DataLevel dl : DataLevel.values()) { + partitionedMigrations.put(dl, new HashSet<>()); + } - migrations.forEach(ke -> { + migrationsSnapshot().keySet().forEach(ke -> { - partitionedMigrations.computeIfAbsent(DataLevel.of(ke.tableId()), f -> new HashSet<>()) - .add(ke); + partitionedMigrations.get(DataLevel.of(ke.tableId())).add(ke); }); + return partitionedMigrations; + } + + /** + * Given the current tserverStatus map and a DataLevel, return a view of the tserverStatus map + * that only contains entries for tables in the DataLevel + */ + private SortedMap<TServerInstance,TabletServerStatus> createTServerStatusView( + final DataLevel dl, final SortedMap<TServerInstance,TabletServerStatus> status) { + final SortedMap<TServerInstance,TabletServerStatus> tserverStatusForLevel = new TreeMap<>(); + status.forEach((tsi, tss) -> { + final TabletServerStatus copy = tss.deepCopy(); + final Map<String,TableInfo> oldTableMap = copy.getTableMap(); + final Map<String,TableInfo> newTableMap = + new HashMap<>(dl == DataLevel.USER ? oldTableMap.size() : 1); + if (dl == DataLevel.ROOT) { + if (oldTableMap.containsKey(AccumuloTable.ROOT.tableName())) { + newTableMap.put(AccumuloTable.ROOT.tableName(), + oldTableMap.get(AccumuloTable.ROOT.tableName())); + } + } else if (dl == DataLevel.METADATA) { + if (oldTableMap.containsKey(AccumuloTable.METADATA.tableName())) { + newTableMap.put(AccumuloTable.METADATA.tableName(), + oldTableMap.get(AccumuloTable.METADATA.tableName())); + } + } else if (dl == DataLevel.USER) { + if (!oldTableMap.containsKey(AccumuloTable.METADATA.tableName()) + && !oldTableMap.containsKey(AccumuloTable.ROOT.tableName())) { + newTableMap.putAll(oldTableMap); + } else { + oldTableMap.forEach((table, info) -> { + if (!table.equals(AccumuloTable.ROOT.tableName()) + && !table.equals(AccumuloTable.METADATA.tableName())) { + newTableMap.put(table, info); + } + }); + } + } else { + throw new IllegalArgumentException("Unhandled DataLevel value: " + dl); + } + copy.setTableMap(newTableMap); + tserverStatusForLevel.put(tsi, copy); + }); + return tserverStatusForLevel; + } + + private long balanceTablets() { final int tabletsNotHosted = notHosted(); BalanceParamsImpl params = null; long wait = 0; long totalMigrationsOut = 0; + final Map<DataLevel,Set<KeyExtent>> partitionedMigrations = - partitionMigrations(migrationsSnapshot()); ++ partitionMigrations(migrationsSnapshot().keySet()); + for (DataLevel dl : DataLevel.values()) { - final Set<KeyExtent> migrationsForLevel = partitionedMigrations.get(dl); - if (migrationsForLevel == null) { - continue; - } if (dl == DataLevel.USER && tabletsNotHosted > 0) { log.debug("not balancing user tablets because there are {} unhosted tablets", tabletsNotHosted); continue; } + // Create a view of the tserver status such that it only contains the tables + // for this level in the tableMap. + final SortedMap<TServerInstance,TabletServerStatus> tserverStatusForLevel = + createTServerStatusView(dl, tserverStatus); + // Construct the Thrift variant of the map above for the BalancerParams + final SortedMap<TabletServerId,TServerStatus> tserverStatusForBalancerLevel = + new TreeMap<>(); + tserverStatusForLevel.forEach((tsi, status) -> tserverStatusForBalancerLevel + .put(new TabletServerIdImpl(tsi), TServerStatusImpl.fromThrift(status))); + long migrationsOutForLevel = 0; - int i = 0; + int attemptNum = 0; do { - i++; - log.debug("Balancing for tables at level {}, times-in-loop: {}", dl, i); - params = BalanceParamsImpl.fromThrift(tserverStatusForBalancer, - tServerGroupingForBalancer, tserverStatus, migrationsForLevel); + log.debug("Balancing for tables at level {}, times-in-loop: {}", dl, ++attemptNum); + params = BalanceParamsImpl.fromThrift(tserverStatusForBalancerLevel, - tserverStatusForLevel, partitionedMigrations.get(dl)); ++ tServerGroupingForBalancer, tserverStatusForLevel, partitionedMigrations.get(dl)); wait = Math.max(tabletBalancer.balance(params), wait); migrationsOutForLevel = params.migrationsOut().size(); - for (TabletMigration m : checkMigrationSanity(tserverStatusForBalancer.keySet(), + for (TabletMigration m : checkMigrationSanity(tserverStatusForBalancerLevel.keySet(), params.migrationsOut())) { final KeyExtent ke = KeyExtent.fromTabletId(m.getTablet()); if (migrations.containsKey(ke)) {