This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 8f1019a10bc3d3c331dd4bde8432b2cab6f75a3c
Merge: 97f2f624ee 6f7033ab3f
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Tue Jul 23 15:13:57 2024 +0000

    Merge branch '2.1'

 .../java/org/apache/accumulo/manager/Manager.java  | 104 ++++++++++++++++-----
 1 file changed, 80 insertions(+), 24 deletions(-)

diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index e4d8247004,d61c43a491..039282cb66
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@@ -959,14 -957,64 +956,67 @@@ public class Manager extends AbstractSe
        }
      }
  
-     private long balanceTablets() {
- 
-       Map<DataLevel,Set<KeyExtent>> partitionedMigrations =
+     /**
+      * balanceTablets() balances tables by DataLevel. Return the current set 
of migrations
+      * partitioned by DataLevel
+      */
+     private Map<DataLevel,Set<KeyExtent>> partitionMigrations(final 
Set<KeyExtent> migrations) {
+       final Map<DataLevel,Set<KeyExtent>> partitionedMigrations =
            new HashMap<>(DataLevel.values().length);
-       migrationsSnapshot().forEach(ke -> {
-         partitionedMigrations.computeIfAbsent(DataLevel.of(ke.tableId()), f 
-> new HashSet<>())
-             .add(ke);
+       // populate to prevent NPE
+       for (DataLevel dl : DataLevel.values()) {
+         partitionedMigrations.put(dl, new HashSet<>());
+       }
+       migrations.forEach(ke -> {
+         partitionedMigrations.get(DataLevel.of(ke.tableId())).add(ke);
+       });
+       return partitionedMigrations;
+     }
+ 
+     /**
+      * Given the current tserverStatus map and a DataLevel, return a view of 
the tserverStatus map
+      * that only contains entries for tables in the DataLevel
+      */
+     private SortedMap<TServerInstance,TabletServerStatus> 
createTServerStatusView(
+         final DataLevel dl, final 
SortedMap<TServerInstance,TabletServerStatus> status) {
+       final SortedMap<TServerInstance,TabletServerStatus> 
tserverStatusForLevel = new TreeMap<>();
+       status.forEach((tsi, tss) -> {
+         final TabletServerStatus copy = tss.deepCopy();
+         final Map<String,TableInfo> oldTableMap = copy.getTableMap();
+         final Map<String,TableInfo> newTableMap =
+             new HashMap<>(dl == DataLevel.USER ? oldTableMap.size() : 1);
+         if (dl == DataLevel.ROOT) {
 -          if (oldTableMap.containsKey(RootTable.NAME)) {
 -            newTableMap.put(RootTable.NAME, oldTableMap.get(RootTable.NAME));
++          if (oldTableMap.containsKey(AccumuloTable.ROOT.tableName())) {
++            newTableMap.put(AccumuloTable.ROOT.tableName(),
++                oldTableMap.get(AccumuloTable.ROOT.tableName()));
+           }
+         } else if (dl == DataLevel.METADATA) {
 -          if (oldTableMap.containsKey(MetadataTable.NAME)) {
 -            newTableMap.put(MetadataTable.NAME, 
oldTableMap.get(MetadataTable.NAME));
++          if (oldTableMap.containsKey(AccumuloTable.METADATA.tableName())) {
++            newTableMap.put(AccumuloTable.METADATA.tableName(),
++                oldTableMap.get(AccumuloTable.METADATA.tableName()));
+           }
+         } else if (dl == DataLevel.USER) {
 -          if (!oldTableMap.containsKey(MetadataTable.NAME)
 -              && !oldTableMap.containsKey(RootTable.NAME)) {
++          if (!oldTableMap.containsKey(AccumuloTable.METADATA.tableName())
++              && !oldTableMap.containsKey(AccumuloTable.ROOT.tableName())) {
+             newTableMap.putAll(oldTableMap);
+           } else {
+             oldTableMap.forEach((table, info) -> {
 -              if (!table.equals(RootTable.NAME) && 
!table.equals(MetadataTable.NAME)) {
++              if (!table.equals(AccumuloTable.ROOT.tableName())
++                  && !table.equals(AccumuloTable.METADATA.tableName())) {
+                 newTableMap.put(table, info);
+               }
+             });
+           }
+         } else {
+           throw new IllegalArgumentException("Unhandled DataLevel value: " + 
dl);
+         }
+         copy.setTableMap(newTableMap);
+         tserverStatusForLevel.put(tsi, copy);
        });
+       return tserverStatusForLevel;
+     }
+ 
+     private long balanceTablets() {
  
        final int tabletsNotHosted = notHosted();
        BalanceParamsImpl params = null;
@@@ -1039,10 -1095,12 +1097,10 @@@
  
    }
  
-   private SortedMap<TServerInstance,TabletServerStatus> 
gatherTableInformation(
-       Set<TServerInstance> currentServers, 
SortedMap<TabletServerId,TServerStatus> balancerMap) {
+   private SortedMap<TServerInstance,TabletServerStatus>
+       gatherTableInformation(Set<TServerInstance> currentServers) {
      final long rpcTimeout = 
getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT);
      int threads = 
getConfiguration().getCount(Property.MANAGER_STATUS_THREAD_POOL_SIZE);
 -    ExecutorService tp = ThreadPools.getServerThreadPools()
 -        .createExecutorService(getConfiguration(), 
Property.MANAGER_STATUS_THREAD_POOL_SIZE);
      long start = System.currentTimeMillis();
      final SortedMap<TServerInstance,TabletServerStatus> result = new 
ConcurrentSkipListMap<>();
      final RateLimiter shutdownServerRateLimiter = 
RateLimiter.create(MAX_SHUTDOWNS_PER_SEC);
@@@ -1104,33 -1161,19 +1162,31 @@@
              badServers.remove(server);
            }
          }
 -      });
 -    }
 -    tp.shutdown();
 -    try {
 -      tp.awaitTermination(Math.max(10000, rpcTimeout / 3), MILLISECONDS);
 -    } catch (InterruptedException e) {
 -      log.debug("Interrupted while fetching status");
 +      }));
 +    }
 +    // wait at least 10 seconds
 +    final Duration timeToWait =
 +        Comparators.max(Duration.ofSeconds(10), Duration.ofMillis(rpcTimeout 
/ 3));
 +    final NanoTime startTime = NanoTime.now();
 +    // Wait for all tasks to complete
 +    while (!tasks.isEmpty()) {
 +      boolean cancel = (startTime.elapsed().compareTo(timeToWait) > 0);
 +      Iterator<Future<?>> iter = tasks.iterator();
 +      while (iter.hasNext()) {
 +        Future<?> f = iter.next();
 +        if (cancel) {
 +          f.cancel(true);
 +        } else {
 +          if (f.isDone()) {
 +            iter.remove();
 +          }
 +        }
 +      }
 +      Uninterruptibles.sleepUninterruptibly(1, MILLISECONDS);
      }
  
 -    tp.shutdownNow();
 -
      // Threads may still modify map after shutdownNow is called, so create an 
immutable snapshot.
      SortedMap<TServerInstance,TabletServerStatus> info = 
ImmutableSortedMap.copyOf(result);
-     tserverStatus.forEach((tsi, status) -> balancerMap.put(new 
TabletServerIdImpl(tsi),
-         TServerStatusImpl.fromThrift(status)));
  
      synchronized (badServers) {
        badServers.keySet().retainAll(currentServers);

Reply via email to