This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 8f1019a10bc3d3c331dd4bde8432b2cab6f75a3c Merge: 97f2f624ee 6f7033ab3f Author: Dave Marion <dlmar...@apache.org> AuthorDate: Tue Jul 23 15:13:57 2024 +0000 Merge branch '2.1' .../java/org/apache/accumulo/manager/Manager.java | 104 ++++++++++++++++----- 1 file changed, 80 insertions(+), 24 deletions(-) diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index e4d8247004,d61c43a491..039282cb66 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@@ -959,14 -957,64 +956,67 @@@ public class Manager extends AbstractSe } } - private long balanceTablets() { - - Map<DataLevel,Set<KeyExtent>> partitionedMigrations = + /** + * balanceTablets() balances tables by DataLevel. Return the current set of migrations + * partitioned by DataLevel + */ + private Map<DataLevel,Set<KeyExtent>> partitionMigrations(final Set<KeyExtent> migrations) { + final Map<DataLevel,Set<KeyExtent>> partitionedMigrations = new HashMap<>(DataLevel.values().length); - migrationsSnapshot().forEach(ke -> { - partitionedMigrations.computeIfAbsent(DataLevel.of(ke.tableId()), f -> new HashSet<>()) - .add(ke); + // populate to prevent NPE + for (DataLevel dl : DataLevel.values()) { + partitionedMigrations.put(dl, new HashSet<>()); + } + migrations.forEach(ke -> { + partitionedMigrations.get(DataLevel.of(ke.tableId())).add(ke); + }); + return partitionedMigrations; + } + + /** + * Given the current tserverStatus map and a DataLevel, return a view of the tserverStatus map + * that only contains entries for tables in the DataLevel + */ + private SortedMap<TServerInstance,TabletServerStatus> createTServerStatusView( + final DataLevel dl, final SortedMap<TServerInstance,TabletServerStatus> status) { + final SortedMap<TServerInstance,TabletServerStatus> tserverStatusForLevel = new TreeMap<>(); + status.forEach((tsi, tss) -> { + final TabletServerStatus copy = tss.deepCopy(); + final Map<String,TableInfo> oldTableMap = copy.getTableMap(); + final Map<String,TableInfo> newTableMap = + new HashMap<>(dl == DataLevel.USER ? oldTableMap.size() : 1); + if (dl == DataLevel.ROOT) { - if (oldTableMap.containsKey(RootTable.NAME)) { - newTableMap.put(RootTable.NAME, oldTableMap.get(RootTable.NAME)); ++ if (oldTableMap.containsKey(AccumuloTable.ROOT.tableName())) { ++ newTableMap.put(AccumuloTable.ROOT.tableName(), ++ oldTableMap.get(AccumuloTable.ROOT.tableName())); + } + } else if (dl == DataLevel.METADATA) { - if (oldTableMap.containsKey(MetadataTable.NAME)) { - newTableMap.put(MetadataTable.NAME, oldTableMap.get(MetadataTable.NAME)); ++ if (oldTableMap.containsKey(AccumuloTable.METADATA.tableName())) { ++ newTableMap.put(AccumuloTable.METADATA.tableName(), ++ oldTableMap.get(AccumuloTable.METADATA.tableName())); + } + } else if (dl == DataLevel.USER) { - if (!oldTableMap.containsKey(MetadataTable.NAME) - && !oldTableMap.containsKey(RootTable.NAME)) { ++ if (!oldTableMap.containsKey(AccumuloTable.METADATA.tableName()) ++ && !oldTableMap.containsKey(AccumuloTable.ROOT.tableName())) { + newTableMap.putAll(oldTableMap); + } else { + oldTableMap.forEach((table, info) -> { - if (!table.equals(RootTable.NAME) && !table.equals(MetadataTable.NAME)) { ++ if (!table.equals(AccumuloTable.ROOT.tableName()) ++ && !table.equals(AccumuloTable.METADATA.tableName())) { + newTableMap.put(table, info); + } + }); + } + } else { + throw new IllegalArgumentException("Unhandled DataLevel value: " + dl); + } + copy.setTableMap(newTableMap); + tserverStatusForLevel.put(tsi, copy); }); + return tserverStatusForLevel; + } + + private long balanceTablets() { final int tabletsNotHosted = notHosted(); BalanceParamsImpl params = null; @@@ -1039,10 -1095,12 +1097,10 @@@ } - private SortedMap<TServerInstance,TabletServerStatus> gatherTableInformation( - Set<TServerInstance> currentServers, SortedMap<TabletServerId,TServerStatus> balancerMap) { + private SortedMap<TServerInstance,TabletServerStatus> + gatherTableInformation(Set<TServerInstance> currentServers) { final long rpcTimeout = getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT); int threads = getConfiguration().getCount(Property.MANAGER_STATUS_THREAD_POOL_SIZE); - ExecutorService tp = ThreadPools.getServerThreadPools() - .createExecutorService(getConfiguration(), Property.MANAGER_STATUS_THREAD_POOL_SIZE); long start = System.currentTimeMillis(); final SortedMap<TServerInstance,TabletServerStatus> result = new ConcurrentSkipListMap<>(); final RateLimiter shutdownServerRateLimiter = RateLimiter.create(MAX_SHUTDOWNS_PER_SEC); @@@ -1104,33 -1161,19 +1162,31 @@@ badServers.remove(server); } } - }); - } - tp.shutdown(); - try { - tp.awaitTermination(Math.max(10000, rpcTimeout / 3), MILLISECONDS); - } catch (InterruptedException e) { - log.debug("Interrupted while fetching status"); + })); + } + // wait at least 10 seconds + final Duration timeToWait = + Comparators.max(Duration.ofSeconds(10), Duration.ofMillis(rpcTimeout / 3)); + final NanoTime startTime = NanoTime.now(); + // Wait for all tasks to complete + while (!tasks.isEmpty()) { + boolean cancel = (startTime.elapsed().compareTo(timeToWait) > 0); + Iterator<Future<?>> iter = tasks.iterator(); + while (iter.hasNext()) { + Future<?> f = iter.next(); + if (cancel) { + f.cancel(true); + } else { + if (f.isDone()) { + iter.remove(); + } + } + } + Uninterruptibles.sleepUninterruptibly(1, MILLISECONDS); } - tp.shutdownNow(); - // Threads may still modify map after shutdownNow is called, so create an immutable snapshot. SortedMap<TServerInstance,TabletServerStatus> info = ImmutableSortedMap.copyOf(result); - tserverStatus.forEach((tsi, status) -> balancerMap.put(new TabletServerIdImpl(tsi), - TServerStatusImpl.fromThrift(status))); synchronized (badServers) { badServers.keySet().retainAll(currentServers);