This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 530e39757b6427203ffeb1ee5b2d06578234aaac
Merge: 696030e506 8f1019a10b
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Tue Jul 23 15:27:29 2024 +0000

    Merge branch 'main' into elasticity

 .../java/org/apache/accumulo/manager/Manager.java  | 103 ++++++++++++++++-----
 1 file changed, 80 insertions(+), 23 deletions(-)

diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 7557dc78fb,039282cb66..b989de3998
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@@ -225,17 -217,13 +225,16 @@@ public class Manager extends AbstractSe
  
    private ManagerState state = ManagerState.INITIAL;
  
 -  // fateReadyLatch and fateRef go together; when this latch is ready, then 
the fate reference
 -  // should already have been set; still need to use atomic reference or 
volatile for fateRef, so no
 -  // thread's cached view shows that fateRef is still null after the latch is 
ready
 +  // fateReadyLatch and fateRefs go together; when this latch is ready, then 
the fate references
 +  // should already have been set; ConcurrentHashMap will guarantee that all 
threads will see
 +  // the initialized fate references after the latch is ready
    private final CountDownLatch fateReadyLatch = new CountDownLatch(1);
 -  private final AtomicReference<Fate<Manager>> fateRef = new 
AtomicReference<>(null);
 +  private final AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateRefs 
=
 +      new AtomicReference<>();
  
    volatile SortedMap<TServerInstance,TabletServerStatus> tserverStatus = 
emptySortedMap();
-   volatile SortedMap<TabletServerId,TServerStatus> tserverStatusForBalancer = 
emptySortedMap();
 +  volatile Map<String,Set<TServerInstance>> tServerGroupingForBalancer = 
emptyMap();
 +
    final ServerBulkImportStatus bulkImportStatus = new 
ServerBulkImportStatus();
  
    private final AtomicBoolean managerInitialized = new AtomicBoolean(false);
@@@ -791,11 -905,8 +790,10 @@@
      }
  
      private long updateStatus() {
 -      Set<TServerInstance> currentServers = tserverSet.getCurrentServers();
 -      tserverStatus = gatherTableInformation(currentServers);
 +      var tseversSnapshot = tserverSet.getSnapshot();
-       TreeMap<TabletServerId,TServerStatus> temp = new TreeMap<>();
-       tserverStatus = gatherTableInformation(tseversSnapshot.getTservers(), 
temp);
-       tserverStatusForBalancer = Collections.unmodifiableSortedMap(temp);
++      tserverStatus = gatherTableInformation(tseversSnapshot.getTservers());
 +      tServerGroupingForBalancer = tseversSnapshot.getTserverGroups();
++
        checkForHeldServer(tserverStatus);
  
        if (!badServers.isEmpty()) {
@@@ -845,39 -956,100 +843,100 @@@
        }
      }
  
-     private long balanceTablets() {
- 
-       Map<DataLevel,Set<KeyExtent>> partitionedMigrations =
+     /**
+      * balanceTablets() balances tables by DataLevel. Return the current set 
of migrations
+      * partitioned by DataLevel
+      */
+     private Map<DataLevel,Set<KeyExtent>> partitionMigrations(final 
Set<KeyExtent> migrations) {
+       final Map<DataLevel,Set<KeyExtent>> partitionedMigrations =
            new HashMap<>(DataLevel.values().length);
+       // populate to prevent NPE
+       for (DataLevel dl : DataLevel.values()) {
+         partitionedMigrations.put(dl, new HashSet<>());
+       }
 -      migrations.forEach(ke -> {
 +      migrationsSnapshot().keySet().forEach(ke -> {
-         partitionedMigrations.computeIfAbsent(DataLevel.of(ke.tableId()), f 
-> new HashSet<>())
-             .add(ke);
+         partitionedMigrations.get(DataLevel.of(ke.tableId())).add(ke);
        });
+       return partitionedMigrations;
+     }
+ 
+     /**
+      * Given the current tserverStatus map and a DataLevel, return a view of 
the tserverStatus map
+      * that only contains entries for tables in the DataLevel
+      */
+     private SortedMap<TServerInstance,TabletServerStatus> 
createTServerStatusView(
+         final DataLevel dl, final 
SortedMap<TServerInstance,TabletServerStatus> status) {
+       final SortedMap<TServerInstance,TabletServerStatus> 
tserverStatusForLevel = new TreeMap<>();
+       status.forEach((tsi, tss) -> {
+         final TabletServerStatus copy = tss.deepCopy();
+         final Map<String,TableInfo> oldTableMap = copy.getTableMap();
+         final Map<String,TableInfo> newTableMap =
+             new HashMap<>(dl == DataLevel.USER ? oldTableMap.size() : 1);
+         if (dl == DataLevel.ROOT) {
+           if (oldTableMap.containsKey(AccumuloTable.ROOT.tableName())) {
+             newTableMap.put(AccumuloTable.ROOT.tableName(),
+                 oldTableMap.get(AccumuloTable.ROOT.tableName()));
+           }
+         } else if (dl == DataLevel.METADATA) {
+           if (oldTableMap.containsKey(AccumuloTable.METADATA.tableName())) {
+             newTableMap.put(AccumuloTable.METADATA.tableName(),
+                 oldTableMap.get(AccumuloTable.METADATA.tableName()));
+           }
+         } else if (dl == DataLevel.USER) {
+           if (!oldTableMap.containsKey(AccumuloTable.METADATA.tableName())
+               && !oldTableMap.containsKey(AccumuloTable.ROOT.tableName())) {
+             newTableMap.putAll(oldTableMap);
+           } else {
+             oldTableMap.forEach((table, info) -> {
+               if (!table.equals(AccumuloTable.ROOT.tableName())
+                   && !table.equals(AccumuloTable.METADATA.tableName())) {
+                 newTableMap.put(table, info);
+               }
+             });
+           }
+         } else {
+           throw new IllegalArgumentException("Unhandled DataLevel value: " + 
dl);
+         }
+         copy.setTableMap(newTableMap);
+         tserverStatusForLevel.put(tsi, copy);
+       });
+       return tserverStatusForLevel;
+     }
+ 
+     private long balanceTablets() {
  
        final int tabletsNotHosted = notHosted();
        BalanceParamsImpl params = null;
        long wait = 0;
        long totalMigrationsOut = 0;
+       final Map<DataLevel,Set<KeyExtent>> partitionedMigrations =
 -          partitionMigrations(migrationsSnapshot());
++          partitionMigrations(migrationsSnapshot().keySet());
+ 
        for (DataLevel dl : DataLevel.values()) {
-         final Set<KeyExtent> migrationsForLevel = 
partitionedMigrations.get(dl);
-         if (migrationsForLevel == null) {
-           continue;
-         }
          if (dl == DataLevel.USER && tabletsNotHosted > 0) {
            log.debug("not balancing user tablets because there are {} unhosted 
tablets",
                tabletsNotHosted);
            continue;
          }
+         // Create a view of the tserver status such that it only contains the 
tables
+         // for this level in the tableMap.
+         final SortedMap<TServerInstance,TabletServerStatus> 
tserverStatusForLevel =
+             createTServerStatusView(dl, tserverStatus);
+         // Construct the Thrift variant of the map above for the 
BalancerParams
+         final SortedMap<TabletServerId,TServerStatus> 
tserverStatusForBalancerLevel =
+             new TreeMap<>();
+         tserverStatusForLevel.forEach((tsi, status) -> 
tserverStatusForBalancerLevel
+             .put(new TabletServerIdImpl(tsi), 
TServerStatusImpl.fromThrift(status)));
+ 
          long migrationsOutForLevel = 0;
-         int i = 0;
+         int attemptNum = 0;
          do {
-           i++;
-           log.debug("Balancing for tables at level {}, times-in-loop: {}", 
dl, i);
-           params = BalanceParamsImpl.fromThrift(tserverStatusForBalancer,
-               tServerGroupingForBalancer, tserverStatus, migrationsForLevel);
+           log.debug("Balancing for tables at level {}, times-in-loop: {}", 
dl, ++attemptNum);
+           params = BalanceParamsImpl.fromThrift(tserverStatusForBalancerLevel,
 -              tserverStatusForLevel, partitionedMigrations.get(dl));
++              tServerGroupingForBalancer, tserverStatusForLevel, 
partitionedMigrations.get(dl));
            wait = Math.max(tabletBalancer.balance(params), wait);
            migrationsOutForLevel = params.migrationsOut().size();
-           for (TabletMigration m : 
checkMigrationSanity(tserverStatusForBalancer.keySet(),
+           for (TabletMigration m : 
checkMigrationSanity(tserverStatusForBalancerLevel.keySet(),
                params.migrationsOut())) {
              final KeyExtent ke = KeyExtent.fromTabletId(m.getTablet());
              if (migrations.containsKey(ke)) {

Reply via email to