This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 8f266f4123 Moves balancing code out of Manager class (#5537) 8f266f4123 is described below commit 8f266f412371c579508ade8fb73aec754c14236d Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu Jun 26 15:09:54 2025 -0400 Moves balancing code out of Manager class (#5537) The balancing code needs some improvements. Currently the code is entangled in the manager code in odd places (like under some of the balancing code is un the Manager.StatusThread class). This change moves the balancing code to its own class w/o making any changes to the code (except fixing two small existing bugs in shouldCleanupMigration()). Making this change as a first step in making the balancer more independent of other manager code and threads. Co-authored-by: Dom G. <domgargu...@apache.org> --- .../apache/accumulo/manager/BalanceManager.java | 460 +++++++++++++++++++++ .../java/org/apache/accumulo/manager/Manager.java | 398 +----------------- .../manager/ManagerClientServiceHandler.java | 2 +- .../accumulo/manager/TabletGroupWatcher.java | 8 +- 4 files changed, 478 insertions(+), 390 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/BalanceManager.java b/server/manager/src/main/java/org/apache/accumulo/manager/BalanceManager.java new file mode 100644 index 0000000000..b7ac49ec7d --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/BalanceManager.java @@ -0,0 +1,460 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager; + +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import static java.util.concurrent.TimeUnit.MINUTES; + +import java.util.Collections; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.manager.balancer.AssignmentParamsImpl; +import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl; +import org.apache.accumulo.core.manager.balancer.TServerStatusImpl; +import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; +import org.apache.accumulo.core.manager.state.tables.TableState; +import org.apache.accumulo.core.manager.thrift.TableInfo; +import org.apache.accumulo.core.manager.thrift.TabletServerStatus; +import org.apache.accumulo.core.metadata.SystemTables; +import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.filters.HasMigrationFilter; +import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.core.spi.balancer.BalancerEnvironment; +import org.apache.accumulo.core.spi.balancer.DoNothingBalancer; +import org.apache.accumulo.core.spi.balancer.TabletBalancer; +import org.apache.accumulo.core.spi.balancer.data.TServerStatus; +import org.apache.accumulo.core.spi.balancer.data.TabletMigration; +import org.apache.accumulo.core.spi.balancer.data.TabletServerId; +import org.apache.accumulo.core.util.threads.Threads; +import org.apache.accumulo.manager.metrics.BalancerMetrics; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.manager.balancer.BalancerEnvironmentImpl; +import org.apache.accumulo.server.manager.state.UnassignedTablet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +class BalanceManager { + + private static final Logger log = LoggerFactory.getLogger(BalanceManager.class); + + private final AtomicReference<Manager> manager; + // all access to this should be through getBalancer() + private TabletBalancer tabletBalancer; + private volatile BalancerEnvironment balancerEnvironment; + private final BalancerMetrics balancerMetrics = new BalancerMetrics(); + private final Object balancedNotifier = new Object(); + private static final long CLEANUP_INTERVAL_MINUTES = Manager.CLEANUP_INTERVAL_MINUTES; + + BalanceManager() { + this.manager = new AtomicReference<>(null); + } + + void setManager(Manager manager) { + Objects.requireNonNull(manager); + if (this.manager.compareAndSet(null, manager)) { + this.balancerEnvironment = new BalancerEnvironmentImpl(manager.getContext()); + } else if (this.manager.get() != manager) { + throw new IllegalStateException("Attempted to set different manager object"); + } + } + + private Manager getManager() { + // fail fast if not yet set + return Objects.requireNonNull(manager.get(), "Manager has not been set."); + } + + synchronized TabletBalancer getBalancer() { + String configuredBalancerClass = + getManager().getConfiguration().get(Property.MANAGER_TABLET_BALANCER); + try { + if (tabletBalancer == null + || !tabletBalancer.getClass().getName().equals(configuredBalancerClass)) { + log.debug("Attempting to initialize balancer using class {}, was {}", + configuredBalancerClass, + tabletBalancer == null ? "null" : tabletBalancer.getClass().getName()); + var localTabletBalancer = + Property.createInstanceFromPropertyName(getManager().getConfiguration(), + Property.MANAGER_TABLET_BALANCER, TabletBalancer.class, new DoNothingBalancer()); + localTabletBalancer.init(balancerEnvironment); + tabletBalancer = localTabletBalancer; + log.info("tablet balancer changed to {}", localTabletBalancer.getClass().getName()); + } + } catch (Exception e) { + log.warn("Failed to create balancer {} using {} instead", configuredBalancerClass, + DoNothingBalancer.class, e); + var localTabletBalancer = new DoNothingBalancer(); + localTabletBalancer.init(balancerEnvironment); + tabletBalancer = localTabletBalancer; + } + + return tabletBalancer; + } + + private ServerContext getContext() { + return getManager().getContext(); + } + + MetricsProducer getMetrics() { + return balancerMetrics; + } + + /** + * balanceTablets() balances tables by DataLevel. Return the current set of migrations partitioned + * by DataLevel + */ + private Map<Ample.DataLevel,Set<KeyExtent>> partitionMigrations() { + final Map<Ample.DataLevel,Set<KeyExtent>> partitionedMigrations = + new EnumMap<>(Ample.DataLevel.class); + for (Ample.DataLevel dl : Ample.DataLevel.values()) { + Set<KeyExtent> extents = new HashSet<>(); + // prev row needed for the extent + try (var tabletsMetadata = getContext() + .getAmple().readTablets().forLevel(dl).fetch(TabletMetadata.ColumnType.PREV_ROW, + TabletMetadata.ColumnType.MIGRATION, TabletMetadata.ColumnType.LOCATION) + .filter(new HasMigrationFilter()).build()) { + // filter out migrations that are awaiting cleanup + tabletsMetadata.stream().filter(tm -> !shouldCleanupMigration(tm)) + .forEach(tm -> extents.add(tm.getExtent())); + } + partitionedMigrations.put(dl, extents); + } + return partitionedMigrations; + } + + /** + * Given the current tserverStatus map and a DataLevel, return a view of the tserverStatus map + * that only contains entries for tables in the DataLevel + */ + private SortedMap<TServerInstance,TabletServerStatus> createTServerStatusView( + final Ample.DataLevel dl, final SortedMap<TServerInstance,TabletServerStatus> status) { + final SortedMap<TServerInstance,TabletServerStatus> tserverStatusForLevel = new TreeMap<>(); + final String METADATA_TABLE_ID = SystemTables.METADATA.tableId().canonical(); + final String ROOT_TABLE_ID = SystemTables.ROOT.tableId().canonical(); + status.forEach((tsi, tss) -> { + final TabletServerStatus copy = tss.deepCopy(); + final Map<String,TableInfo> oldTableMap = copy.getTableMap(); + final Map<String,TableInfo> newTableMap = + new HashMap<>(dl == Ample.DataLevel.USER ? oldTableMap.size() : 1); + switch (dl) { + case ROOT: { + var tableInfo = oldTableMap.get(ROOT_TABLE_ID); + if (tableInfo != null) { + newTableMap.put(ROOT_TABLE_ID, tableInfo); + } + break; + } + case METADATA: { + var tableInfo = oldTableMap.get(METADATA_TABLE_ID); + if (tableInfo != null) { + newTableMap.put(METADATA_TABLE_ID, tableInfo); + } + break; + } + case USER: + if (!oldTableMap.containsKey(METADATA_TABLE_ID) + && !oldTableMap.containsKey(ROOT_TABLE_ID)) { + newTableMap.putAll(oldTableMap); + } else { + oldTableMap.forEach((table, info) -> { + if (!table.equals(ROOT_TABLE_ID) && !table.equals(METADATA_TABLE_ID)) { + newTableMap.put(table, info); + } + }); + } + break; + + default: + throw new IllegalArgumentException("Unhandled DataLevel value: " + dl); + } + copy.setTableMap(newTableMap); + tserverStatusForLevel.put(tsi, copy); + }); + return tserverStatusForLevel; + } + + private Map<String,TableId> getTablesForLevel(Ample.DataLevel dataLevel) { + switch (dataLevel) { + case ROOT: + return Map.of(SystemTables.ROOT.tableName(), SystemTables.ROOT.tableId()); + case METADATA: + return Map.of(SystemTables.METADATA.tableName(), SystemTables.METADATA.tableId()); + case USER: { + Map<String,TableId> userTables = getContext().createQualifiedTableNameToIdMap(); + for (var accumuloTable : SystemTables.values()) { + if (Ample.DataLevel.of(accumuloTable.tableId()) != Ample.DataLevel.USER) { + userTables.remove(accumuloTable.tableName()); + } + } + return Collections.unmodifiableMap(userTables); + } + default: + throw new IllegalArgumentException("Unknown data level " + dataLevel); + } + } + + private List<TabletMigration> checkMigrationSanity(Set<TabletServerId> current, + List<TabletMigration> migrations, Ample.DataLevel level) { + return migrations.stream().filter(m -> { + boolean includeMigration = false; + if (m.getTablet() == null) { + log.error("Balancer gave back a null tablet {}", m); + } else if (Ample.DataLevel.of(m.getTablet().getTable()) != level) { + log.warn( + "Balancer wants to move a tablet ({}) outside of the current processing level ({}), " + + "ignoring and should be processed at the correct level ({})", + m.getTablet(), level, Ample.DataLevel.of(m.getTablet().getTable())); + } else if (m.getNewTabletServer() == null) { + log.error("Balancer did not set the destination {}", m); + } else if (m.getOldTabletServer() == null) { + log.error("Balancer did not set the source {}", m); + } else if (!current.contains(m.getOldTabletServer())) { + log.warn("Balancer wants to move a tablet from a server that is not current: {}", m); + } else if (!current.contains(m.getNewTabletServer())) { + log.warn("Balancer wants to move a tablet to a server that is not current: {}", m); + } else { + includeMigration = true; + } + return includeMigration; + }).collect(Collectors.toList()); + } + + long balanceTablets() { + final int tabletsNotHosted = getManager().notHosted(); + BalanceParamsImpl params = null; + long wait = 0; + long totalMigrationsOut = 0; + final Map<Ample.DataLevel,Set<KeyExtent>> partitionedMigrations = partitionMigrations(); + int levelsCompleted = 0; + + for (Ample.DataLevel dl : Ample.DataLevel.values()) { + if (dl == Ample.DataLevel.USER && tabletsNotHosted > 0) { + log.debug("not balancing user tablets because there are {} unhosted tablets", + tabletsNotHosted); + continue; + } + + if (dl == Ample.DataLevel.USER && !canAssignAndBalance()) { + log.debug("not balancing user tablets because not enough tablet servers"); + continue; + } + + if ((dl == Ample.DataLevel.METADATA || dl == Ample.DataLevel.USER) + && !partitionedMigrations.get(Ample.DataLevel.ROOT).isEmpty()) { + log.debug("Not balancing {} because {} has migrations", dl, Ample.DataLevel.ROOT); + continue; + } + + if (dl == Ample.DataLevel.USER + && !partitionedMigrations.get(Ample.DataLevel.METADATA).isEmpty()) { + log.debug("Not balancing {} because {} has migrations", dl, Ample.DataLevel.METADATA); + continue; + } + + // Create a view of the tserver status such that it only contains the tables + // for this level in the tableMap. + SortedMap<TServerInstance,TabletServerStatus> tserverStatusForLevel = + createTServerStatusView(dl, getManager().tserverStatus); + // Construct the Thrift variant of the map above for the BalancerParams + final SortedMap<TabletServerId,TServerStatus> tserverStatusForBalancerLevel = new TreeMap<>(); + tserverStatusForLevel.forEach((tsi, status) -> tserverStatusForBalancerLevel + .put(new TabletServerIdImpl(tsi), TServerStatusImpl.fromThrift(status))); + + log.debug("Balancing for tables at level {}", dl); + + SortedMap<TabletServerId,TServerStatus> statusForBalancerLevel = + tserverStatusForBalancerLevel; + params = BalanceParamsImpl.fromThrift(statusForBalancerLevel, + getManager().tServerGroupingForBalancer, tserverStatusForLevel, + partitionedMigrations.get(dl), dl, getTablesForLevel(dl)); + wait = Math.max(getBalancer().balance(params), wait); + long migrationsOutForLevel = 0; + try (var tabletsMutator = getContext().getAmple().conditionallyMutateTablets(result -> {})) { + for (TabletMigration m : checkMigrationSanity(statusForBalancerLevel.keySet(), + params.migrationsOut(), dl)) { + final KeyExtent ke = KeyExtent.fromTabletId(m.getTablet()); + if (partitionedMigrations.get(dl).contains(ke)) { + log.warn("balancer requested migration more than once, skipping {}", m); + continue; + } + migrationsOutForLevel++; + var migration = TabletServerIdImpl.toThrift(m.getNewTabletServer()); + tabletsMutator.mutateTablet(ke).requireAbsentOperation() + .requireCurrentLocationNotEqualTo(migration).putMigration(migration) + .submit(tm -> false); + log.debug("migration {}", m); + } + } + totalMigrationsOut += migrationsOutForLevel; + + // increment this at end of loop to signal complete run w/o any continue + levelsCompleted++; + } + final long totalMigrations = + totalMigrationsOut + partitionedMigrations.values().stream().mapToLong(Set::size).sum(); + balancerMetrics.assignMigratingCount(() -> totalMigrations); + + if (totalMigrationsOut == 0 && levelsCompleted == Ample.DataLevel.values().length) { + synchronized (balancedNotifier) { + balancedNotifier.notifyAll(); + } + } else if (totalMigrationsOut > 0) { + getManager().nextEvent.event("Migrating %d more tablets, %d total", totalMigrationsOut, + totalMigrations); + } + return wait; + } + + @SuppressFBWarnings(value = "UW_UNCOND_WAIT", justification = "TODO needs triage") + void waitForBalance() { + synchronized (balancedNotifier) { + long eventCounter; + do { + eventCounter = getManager().nextEvent.waitForEvents(0, 0); + try { + balancedNotifier.wait(); + } catch (InterruptedException e) { + log.debug(e.toString(), e); + } + } while (getManager().displayUnassigned() > 0 || numMigrations() > 0 + || eventCounter != getManager().nextEvent.waitForEvents(0, 0)); + } + } + + long numMigrations() { + long count = 0; + for (Ample.DataLevel dl : Ample.DataLevel.values()) { + try (var tabletsMetadata = getContext().getAmple().readTablets().forLevel(dl) + .fetch(TabletMetadata.ColumnType.MIGRATION).filter(new HasMigrationFilter()).build()) { + count += tabletsMetadata.stream().count(); + } + } + return count; + } + + void getAssignments(SortedMap<TServerInstance,TabletServerStatus> currentStatus, + Map<String,Set<TServerInstance>> currentTServerGroups, + Map<KeyExtent,UnassignedTablet> unassigned, Map<KeyExtent,TServerInstance> assignedOut) { + AssignmentParamsImpl params = + AssignmentParamsImpl.fromThrift(currentStatus, currentTServerGroups, + unassigned.entrySet().stream().collect(HashMap::new, + (m, e) -> m.put(e.getKey(), + e.getValue().getLastLocation() == null ? null + : e.getValue().getLastLocation().getServerInstance()), + Map::putAll), + assignedOut); + getBalancer().getAssignments(params); + if (!canAssignAndBalance()) { + // remove assignment for user tables + Iterator<KeyExtent> iter = assignedOut.keySet().iterator(); + while (iter.hasNext()) { + KeyExtent ke = iter.next(); + if (!ke.isMeta()) { + iter.remove(); + log.trace("Removed assignment for {} as assignments for user tables is disabled.", ke); + } + } + } + } + + private boolean canAssignAndBalance() { + final int threshold = getManager().getConfiguration() + .getCount(Property.MANAGER_TABLET_BALANCER_TSERVER_THRESHOLD); + if (threshold == 0) { + return true; + } + final int numTServers = getManager().tserverSet.size(); + final boolean result = numTServers >= threshold; + if (!result) { + log.warn("Not assigning or balancing as number of tservers ({}) is below threshold ({})", + numTServers, threshold); + } + return result; + } + + private boolean shouldCleanupMigration(TabletMetadata tabletMetadata) { + var tableState = getContext().getTableManager().getTableState(tabletMetadata.getTableId()); + var migration = tabletMetadata.getMigration(); + Preconditions.checkState(migration != null, + "This method should only be called if there is a migration"); + return tableState == TableState.OFFLINE + || !getManager().onlineTabletServers().contains(migration) + || (tabletMetadata.getLocation() != null + && tabletMetadata.getLocation().getServerInstance().equals(migration)); + } + + void startMigrationCleanupThread() { + Threads.createCriticalThread("Migration Cleanup Thread", new MigrationCleanupThread()).start(); + } + + private class MigrationCleanupThread implements Runnable { + + @Override + public void run() { + while (getManager().stillManager()) { + try { + // - Remove any migrations for tablets of offline tables, as the migration can never + // succeed because no tablet server will load the tablet + // - Remove any migrations to tablet servers that are not live + // - Remove any migrations where the tablets current location equals the migration + // (the migration has completed) + var ample = getContext().getAmple(); + for (Ample.DataLevel dl : Ample.DataLevel.values()) { + // prev row needed for the extent + try (var tabletsMetadata = ample.readTablets().forLevel(dl) + .fetch(TabletMetadata.ColumnType.PREV_ROW, TabletMetadata.ColumnType.MIGRATION, + TabletMetadata.ColumnType.LOCATION) + .filter(new HasMigrationFilter()).build(); + var tabletsMutator = ample.conditionallyMutateTablets(result -> {})) { + for (var tabletMetadata : tabletsMetadata) { + var migration = tabletMetadata.getMigration(); + if (shouldCleanupMigration(tabletMetadata)) { + tabletsMutator.mutateTablet(tabletMetadata.getExtent()).requireAbsentOperation() + .requireMigration(migration).deleteMigration().submit(tm -> false); + } + } + } + } + } catch (Exception ex) { + log.error("Error cleaning up migrations", ex); + } + sleepUninterruptibly(CLEANUP_INTERVAL_MINUTES, MINUTES); + } + } + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 28b435ef13..5504ae1efc 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -34,7 +34,6 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.EnumMap; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -44,7 +43,6 @@ import java.util.Map.Entry; import java.util.Optional; import java.util.Set; import java.util.SortedMap; -import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; @@ -56,7 +54,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Predicate; -import java.util.stream.Collectors; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.cli.ConfigOpts; @@ -89,10 +86,6 @@ import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.apache.accumulo.core.lock.ServiceLockSupport.HAServiceLockWatcher; -import org.apache.accumulo.core.manager.balancer.AssignmentParamsImpl; -import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl; -import org.apache.accumulo.core.manager.balancer.TServerStatusImpl; -import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.manager.thrift.BulkImportState; import org.apache.accumulo.core.manager.thrift.ManagerGoalState; @@ -103,16 +96,8 @@ import org.apache.accumulo.core.manager.thrift.TabletServerStatus; import org.apache.accumulo.core.metadata.SystemTables; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; -import org.apache.accumulo.core.metadata.schema.TabletMetadata; -import org.apache.accumulo.core.metadata.schema.filters.HasMigrationFilter; import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.metrics.MetricsProducer; -import org.apache.accumulo.core.spi.balancer.BalancerEnvironment; -import org.apache.accumulo.core.spi.balancer.DoNothingBalancer; -import org.apache.accumulo.core.spi.balancer.TabletBalancer; -import org.apache.accumulo.core.spi.balancer.data.TServerStatus; -import org.apache.accumulo.core.spi.balancer.data.TabletMigration; -import org.apache.accumulo.core.spi.balancer.data.TabletServerId; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.util.Timer; @@ -122,7 +107,6 @@ import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.core.zookeeper.ZcStat; import org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator; import org.apache.accumulo.manager.merge.FindMergeableRangeTask; -import org.apache.accumulo.manager.metrics.BalancerMetrics; import org.apache.accumulo.manager.metrics.ManagerMetrics; import org.apache.accumulo.manager.recovery.RecoveryManager; import org.apache.accumulo.manager.split.Splitter; @@ -137,11 +121,9 @@ import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.manager.LiveTServerSet; import org.apache.accumulo.server.manager.LiveTServerSet.LiveTServersSnapshot; import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection; -import org.apache.accumulo.server.manager.balancer.BalancerEnvironmentImpl; import org.apache.accumulo.server.manager.state.DeadServerList; import org.apache.accumulo.server.manager.state.TabletServerState; import org.apache.accumulo.server.manager.state.TabletStateStore; -import org.apache.accumulo.server.manager.state.UnassignedTablet; import org.apache.accumulo.server.rpc.ServerAddress; import org.apache.accumulo.server.rpc.TServerUtils; import org.apache.accumulo.server.rpc.ThriftProcessorTypes; @@ -167,7 +149,6 @@ import com.google.common.collect.Maps; import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.Uninterruptibles; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.micrometer.core.instrument.MeterRegistry; import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Scope; @@ -182,7 +163,7 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener { static final Logger log = LoggerFactory.getLogger(Manager.class); static final int ONE_SECOND = 1000; - private static final long CLEANUP_INTERVAL_MINUTES = 5; + static final long CLEANUP_INTERVAL_MINUTES = 5; static final long WAIT_BETWEEN_ERRORS = ONE_SECOND; private static final long DEFAULT_WAIT_FOR_WATCHER = 10 * ONE_SECOND; private static final int MAX_CLEANUP_WAIT_TIME = ONE_SECOND; @@ -191,7 +172,6 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener { private static final int MAX_BAD_STATUS_COUNT = 3; private static final double MAX_SHUTDOWNS_PER_SEC = 10D / 60D; - private final Object balancedNotifier = new Object(); final LiveTServerSet tserverSet; private final List<TabletGroupWatcher> watchers = new ArrayList<>(); final Map<TServerInstance,AtomicInteger> badServers = @@ -208,9 +188,7 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener { ServiceLock managerLock = null; private TServer clientService = null; - protected volatile TabletBalancer tabletBalancer = null; - private final BalancerEnvironment balancerEnvironment; - private final BalancerMetrics balancerMetrics = new BalancerMetrics(); + private final BalanceManager balanceManager; private ManagerState state = ManagerState.INITIAL; @@ -238,6 +216,10 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener { return state; } + public BalanceManager getBalanceManager() { + return balanceManager; + } + public Map<FateId,Map<String,String>> getCompactionHints(DataLevel level) { Predicate<TableId> tablePredicate = (tableId) -> DataLevel.of(tableId) == level; Map<FateId,CompactionConfig> allConfig; @@ -377,7 +359,7 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener { - assignedOrHosted(SystemTables.ROOT.tableId()); } - private int notHosted() { + int notHosted() { int result = 0; for (TabletGroupWatcher watcher : watchers) { for (TableCounts counts : watcher.getStats().values()) { @@ -453,7 +435,7 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener { super(ServerId.Type.MANAGER, opts, serverContextFactory, args); ServerContext context = super.getContext(); upgradeCoordinator = new UpgradeCoordinator(context); - balancerEnvironment = new BalancerEnvironmentImpl(context); + balanceManager = new BalanceManager(); AccumuloConfiguration aconf = context.getConfiguration(); @@ -461,7 +443,6 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener { log.info("Instance {}", context.getInstanceID()); timeKeeper = new ManagerTime(this, aconf); tserverSet = new LiveTServerSet(context, this); - initializeBalancer(); final long tokenLifetime = aconf.getTimeInMillis(Property.GENERAL_DELEGATION_TOKEN_LIFETIME); @@ -522,10 +503,6 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener { return splitter; } - public MetricsProducer getBalancerMetrics() { - return balancerMetrics; - } - public UpgradeCoordinator.UpgradeStatus getUpgradeStatus() { return upgradeCoordinator.getStatus(); } @@ -544,52 +521,6 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener { } } - private class MigrationCleanupThread implements Runnable { - - @Override - public void run() { - while (stillManager()) { - try { - // - Remove any migrations for tablets of offline tables, as the migration can never - // succeed because no tablet server will load the tablet - // - Remove any migrations to tablet servers that are not live - // - Remove any migrations where the tablets current location equals the migration - // (the migration has completed) - var ample = getContext().getAmple(); - for (DataLevel dl : DataLevel.values()) { - // prev row needed for the extent - try (var tabletsMetadata = ample.readTablets().forLevel(dl) - .fetch(TabletMetadata.ColumnType.PREV_ROW, TabletMetadata.ColumnType.MIGRATION, - TabletMetadata.ColumnType.LOCATION) - .filter(new HasMigrationFilter()).build(); - var tabletsMutator = ample.conditionallyMutateTablets(result -> {})) { - for (var tabletMetadata : tabletsMetadata) { - var migration = tabletMetadata.getMigration(); - if (shouldCleanupMigration(tabletMetadata)) { - tabletsMutator.mutateTablet(tabletMetadata.getExtent()).requireAbsentOperation() - .requireMigration(migration).deleteMigration().submit(tm -> false); - } - } - } - } - } catch (Exception ex) { - log.error("Error cleaning up migrations", ex); - } - sleepUninterruptibly(CLEANUP_INTERVAL_MINUTES, MINUTES); - } - } - } - - private boolean shouldCleanupMigration(TabletMetadata tabletMetadata) { - var tableState = getContext().getTableManager().getTableState(tabletMetadata.getTableId()); - var migration = tabletMetadata.getMigration(); - Preconditions.checkState(migration != null, - "This method should only be called if there is a migration"); - return tableState == TableState.OFFLINE || !onlineTabletServers().contains(migration) - || (tabletMetadata.getLocation() != null - && tabletMetadata.getLocation().getServerInstance().equals(migration)); - } - private class ScanServerZKCleaner implements Runnable { @Override @@ -632,28 +563,6 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener { } - /** - * balanceTablets() balances tables by DataLevel. Return the current set of migrations partitioned - * by DataLevel - */ - private Map<DataLevel,Set<KeyExtent>> partitionMigrations() { - final Map<DataLevel,Set<KeyExtent>> partitionedMigrations = new EnumMap<>(DataLevel.class); - for (DataLevel dl : DataLevel.values()) { - Set<KeyExtent> extents = new HashSet<>(); - // prev row needed for the extent - try (var tabletsMetadata = getContext() - .getAmple().readTablets().forLevel(dl).fetch(TabletMetadata.ColumnType.PREV_ROW, - TabletMetadata.ColumnType.MIGRATION, TabletMetadata.ColumnType.LOCATION) - .filter(new HasMigrationFilter()).build()) { - // filter out migrations that are awaiting cleanup - tabletsMetadata.stream().filter(tm -> !shouldCleanupMigration(tm)) - .forEach(tm -> extents.add(tm.getExtent())); - } - partitionedMigrations.put(dl, extents); - } - return partitionedMigrations; - } - private class StatusThread implements Runnable { private boolean goodStats() { @@ -798,7 +707,7 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener { return DEFAULT_WAIT_FOR_WATCHER; } } - return balanceTablets(); + return balanceManager.balanceTablets(); } return DEFAULT_WAIT_FOR_WATCHER; } @@ -830,186 +739,6 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener { badServers.putIfAbsent(instance, new AtomicInteger(1)); } } - - /** - * Given the current tserverStatus map and a DataLevel, return a view of the tserverStatus map - * that only contains entries for tables in the DataLevel - */ - private SortedMap<TServerInstance,TabletServerStatus> createTServerStatusView( - final DataLevel dl, final SortedMap<TServerInstance,TabletServerStatus> status) { - final SortedMap<TServerInstance,TabletServerStatus> tserverStatusForLevel = new TreeMap<>(); - status.forEach((tsi, tss) -> { - final TabletServerStatus copy = tss.deepCopy(); - final Map<String,TableInfo> oldTableMap = copy.getTableMap(); - final Map<String,TableInfo> newTableMap = - new HashMap<>(dl == DataLevel.USER ? oldTableMap.size() : 1); - if (dl == DataLevel.ROOT) { - if (oldTableMap.containsKey(SystemTables.ROOT.tableId().canonical())) { - newTableMap.put(SystemTables.ROOT.tableId().canonical(), - oldTableMap.get(SystemTables.ROOT.tableId().canonical())); - } - } else if (dl == DataLevel.METADATA) { - if (oldTableMap.containsKey(SystemTables.METADATA.tableId().canonical())) { - newTableMap.put(SystemTables.METADATA.tableId().canonical(), - oldTableMap.get(SystemTables.METADATA.tableId().canonical())); - } - } else if (dl == DataLevel.USER) { - if (!oldTableMap.containsKey(SystemTables.METADATA.tableId().canonical()) - && !oldTableMap.containsKey(SystemTables.ROOT.tableId().canonical())) { - newTableMap.putAll(oldTableMap); - } else { - oldTableMap.forEach((table, info) -> { - if (!table.equals(SystemTables.ROOT.tableId().canonical()) - && !table.equals(SystemTables.METADATA.tableId().canonical())) { - newTableMap.put(table, info); - } - }); - } - } else { - throw new IllegalArgumentException("Unhandled DataLevel value: " + dl); - } - copy.setTableMap(newTableMap); - tserverStatusForLevel.put(tsi, copy); - }); - return tserverStatusForLevel; - } - - private Map<String,TableId> getTablesForLevel(DataLevel dataLevel) { - switch (dataLevel) { - case ROOT: - return Map.of(SystemTables.ROOT.tableName(), SystemTables.ROOT.tableId()); - case METADATA: - return Map.of(SystemTables.METADATA.tableName(), SystemTables.METADATA.tableId()); - case USER: { - Map<String,TableId> userTables = getContext().createQualifiedTableNameToIdMap(); - for (var accumuloTable : SystemTables.values()) { - if (DataLevel.of(accumuloTable.tableId()) != DataLevel.USER) { - userTables.remove(accumuloTable.tableName()); - } - } - return Collections.unmodifiableMap(userTables); - } - default: - throw new IllegalArgumentException("Unknown data level " + dataLevel); - } - } - - private long balanceTablets() { - - // Check for balancer property change - initializeBalancer(); - - final int tabletsNotHosted = notHosted(); - BalanceParamsImpl params = null; - long wait = 0; - long totalMigrationsOut = 0; - final Map<DataLevel,Set<KeyExtent>> partitionedMigrations = partitionMigrations(); - int levelsCompleted = 0; - - for (DataLevel dl : DataLevel.values()) { - - if (dl == DataLevel.USER && tabletsNotHosted > 0) { - log.debug("not balancing user tablets because there are {} unhosted tablets", - tabletsNotHosted); - continue; - } - - if (dl == DataLevel.USER && !canAssignAndBalance()) { - log.debug("not balancing user tablets because not enough tablet servers"); - continue; - } - - if ((dl == DataLevel.METADATA || dl == DataLevel.USER) - && !partitionedMigrations.get(DataLevel.ROOT).isEmpty()) { - log.debug("Not balancing {} because {} has migrations", dl, DataLevel.ROOT); - continue; - } - - if (dl == DataLevel.USER && !partitionedMigrations.get(DataLevel.METADATA).isEmpty()) { - log.debug("Not balancing {} because {} has migrations", dl, DataLevel.METADATA); - continue; - } - - // Create a view of the tserver status such that it only contains the tables - // for this level in the tableMap. - SortedMap<TServerInstance,TabletServerStatus> tserverStatusForLevel = - createTServerStatusView(dl, tserverStatus); - // Construct the Thrift variant of the map above for the BalancerParams - final SortedMap<TabletServerId,TServerStatus> tserverStatusForBalancerLevel = - new TreeMap<>(); - tserverStatusForLevel.forEach((tsi, status) -> tserverStatusForBalancerLevel - .put(new TabletServerIdImpl(tsi), TServerStatusImpl.fromThrift(status))); - - log.debug("Balancing for tables at level {}", dl); - - SortedMap<TabletServerId,TServerStatus> statusForBalancerLevel = - tserverStatusForBalancerLevel; - params = BalanceParamsImpl.fromThrift(statusForBalancerLevel, tServerGroupingForBalancer, - tserverStatusForLevel, partitionedMigrations.get(dl), dl, getTablesForLevel(dl)); - wait = Math.max(tabletBalancer.balance(params), wait); - long migrationsOutForLevel = 0; - try ( - var tabletsMutator = getContext().getAmple().conditionallyMutateTablets(result -> {})) { - for (TabletMigration m : checkMigrationSanity(statusForBalancerLevel.keySet(), - params.migrationsOut(), dl)) { - final KeyExtent ke = KeyExtent.fromTabletId(m.getTablet()); - if (partitionedMigrations.get(dl).contains(ke)) { - log.warn("balancer requested migration more than once, skipping {}", m); - continue; - } - migrationsOutForLevel++; - var migration = TabletServerIdImpl.toThrift(m.getNewTabletServer()); - tabletsMutator.mutateTablet(ke).requireAbsentOperation() - .requireCurrentLocationNotEqualTo(migration).putMigration(migration) - .submit(tm -> false); - log.debug("migration {}", m); - } - } - totalMigrationsOut += migrationsOutForLevel; - - // increment this at end of loop to signal complete run w/o any continue - levelsCompleted++; - } - final long totalMigrations = - totalMigrationsOut + partitionedMigrations.values().stream().mapToLong(Set::size).sum(); - balancerMetrics.assignMigratingCount(() -> totalMigrations); - - if (totalMigrationsOut == 0 && levelsCompleted == DataLevel.values().length) { - synchronized (balancedNotifier) { - balancedNotifier.notifyAll(); - } - } else if (totalMigrationsOut > 0) { - nextEvent.event("Migrating %d more tablets, %d total", totalMigrationsOut, totalMigrations); - } - return wait; - } - - private List<TabletMigration> checkMigrationSanity(Set<TabletServerId> current, - List<TabletMigration> migrations, DataLevel level) { - return migrations.stream().filter(m -> { - boolean includeMigration = false; - if (m.getTablet() == null) { - log.error("Balancer gave back a null tablet {}", m); - } else if (DataLevel.of(m.getTablet().getTable()) != level) { - log.warn( - "Balancer wants to move a tablet ({}) outside of the current processing level ({}), " - + "ignoring and should be processed at the correct level ({})", - m.getTablet(), level, DataLevel.of(m.getTablet().getTable())); - } else if (m.getNewTabletServer() == null) { - log.error("Balancer did not set the destination {}", m); - } else if (m.getOldTabletServer() == null) { - log.error("Balancer did not set the source {}", m); - } else if (!current.contains(m.getOldTabletServer())) { - log.warn("Balancer wants to move a tablet from a server that is not current: {}", m); - } else if (!current.contains(m.getNewTabletServer())) { - log.warn("Balancer wants to move a tablet to a server that is not current: {}", m); - } else { - includeMigration = true; - } - return includeMigration; - }).collect(Collectors.toList()); - } - } private SortedMap<TServerInstance,TabletServerStatus> @@ -1115,6 +844,8 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener { public void run() { final ServerContext context = getContext(); + balanceManager.setManager(this); + // ACCUMULO-4424 Put up the Thrift servers before getting the lock as a sign of process health // when a hot-standby // @@ -1203,7 +934,7 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener { MetricsInfo metricsInfo = getContext().getMetricsInfo(); ManagerMetrics managerMetrics = new ManagerMetrics(getConfiguration(), this); var producers = managerMetrics.getProducers(getConfiguration(), this); - producers.add(balancerMetrics); + producers.add(balanceManager.getMetrics()); final TabletGroupWatcher userTableTGW = new TabletGroupWatcher(this, this.userTabletStore, null, managerMetrics) { @@ -1319,7 +1050,7 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener { metricsInfo.init(MetricsInfo.serviceTags(getContext().getInstanceName(), getApplicationName(), sa.getAddress(), getResourceGroup())); - Threads.createCriticalThread("Migration Cleanup Thread", new MigrationCleanupThread()).start(); + balanceManager.startMigrationCleanupThread(); Threads.createCriticalThread("ScanServer Cleanup Thread", new ScanServerZKCleaner()).start(); // Don't call start the CompactionCoordinator until we have tservers and upgrade is complete. @@ -1742,22 +1473,6 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener { } } - @SuppressFBWarnings(value = "UW_UNCOND_WAIT", justification = "TODO needs triage") - public void waitForBalance() { - synchronized (balancedNotifier) { - long eventCounter; - do { - eventCounter = nextEvent.waitForEvents(0, 0); - try { - balancedNotifier.wait(); - } catch (InterruptedException e) { - log.debug(e.toString(), e); - } - } while (displayUnassigned() > 0 || numMigrations() > 0 - || eventCounter != nextEvent.waitForEvents(0, 0)); - } - } - public ManagerMonitorInfo getManagerMonitorInfo() { final ManagerMonitorInfo result = new ManagerMonitorInfo(); @@ -1826,67 +1541,6 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener { return upgradeCoordinator.getStatus() != UpgradeCoordinator.UpgradeStatus.COMPLETE; } - private void initializeBalancer() { - String configuredBalancerClass = getConfiguration().get(Property.MANAGER_TABLET_BALANCER); - try { - if (tabletBalancer == null - || !tabletBalancer.getClass().getName().equals(configuredBalancerClass)) { - log.debug("Attempting to initialize balancer using class {}, was {}", - configuredBalancerClass, - tabletBalancer == null ? "null" : tabletBalancer.getClass().getName()); - var localTabletBalancer = Property.createInstanceFromPropertyName(getConfiguration(), - Property.MANAGER_TABLET_BALANCER, TabletBalancer.class, new DoNothingBalancer()); - localTabletBalancer.init(balancerEnvironment); - tabletBalancer = localTabletBalancer; - log.info("tablet balancer changed to {}", localTabletBalancer.getClass().getName()); - } - } catch (Exception e) { - log.warn("Failed to create balancer {} using {} instead", configuredBalancerClass, - DoNothingBalancer.class, e); - var localTabletBalancer = new DoNothingBalancer(); - localTabletBalancer.init(balancerEnvironment); - tabletBalancer = localTabletBalancer; - } - } - - void getAssignments(SortedMap<TServerInstance,TabletServerStatus> currentStatus, - Map<String,Set<TServerInstance>> currentTServerGroups, - Map<KeyExtent,UnassignedTablet> unassigned, Map<KeyExtent,TServerInstance> assignedOut) { - AssignmentParamsImpl params = - AssignmentParamsImpl.fromThrift(currentStatus, currentTServerGroups, - unassigned.entrySet().stream().collect(HashMap::new, - (m, e) -> m.put(e.getKey(), - e.getValue().getLastLocation() == null ? null - : e.getValue().getLastLocation().getServerInstance()), - Map::putAll), - assignedOut); - tabletBalancer.getAssignments(params); - if (!canAssignAndBalance()) { - // remove assignment for user tables - Iterator<KeyExtent> iter = assignedOut.keySet().iterator(); - while (iter.hasNext()) { - KeyExtent ke = iter.next(); - if (!ke.isMeta()) { - iter.remove(); - log.trace("Removed assignment for {} as assignments for user tables is disabled.", ke); - } - } - } - } - - public TabletStateStore getTabletStateStore(DataLevel level) { - switch (level) { - case METADATA: - return this.metadataTabletStore; - case ROOT: - return this.rootTabletStore; - case USER: - return this.userTabletStore; - default: - throw new IllegalStateException("Unhandled DataLevel value: " + level); - } - } - @Override public void registerMetrics(MeterRegistry registry) { super.registerMetrics(registry); @@ -1903,30 +1557,4 @@ public class Manager extends AbstractServer implements LiveTServerSet.Listener { public ServiceLock getLock() { return managerLock; } - - private long numMigrations() { - long count = 0; - for (DataLevel dl : DataLevel.values()) { - try (var tabletsMetadata = getContext().getAmple().readTablets().forLevel(dl) - .fetch(TabletMetadata.ColumnType.MIGRATION).filter(new HasMigrationFilter()).build()) { - count += tabletsMetadata.stream().count(); - } - } - return count; - } - - private boolean canAssignAndBalance() { - final int threshold = - getConfiguration().getCount(Property.MANAGER_TABLET_BALANCER_TSERVER_THRESHOLD); - if (threshold == 0) { - return true; - } - final int numTServers = tserverSet.size(); - final boolean result = numTServers >= threshold; - if (!result) { - log.warn("Not assigning or balancing as number of tservers ({}) is below threshold ({})", - numTServers, threshold); - } - return result; - } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java index 6c8ff0ef3f..2be39fdfcf 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java @@ -549,7 +549,7 @@ public class ManagerClientServiceHandler implements ManagerClientService.Iface { @Override public void waitForBalance(TInfo tinfo) { - manager.waitForBalance(); + manager.getBalanceManager().waitForBalance(); } @Override diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index c9ff9ba426..6e306936a6 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -533,8 +533,8 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { // This is final because nothing in this method should change the goal. All computation of the // goal should be done in TabletGoalState.compute() so that all parts of the Accumulo code // will compute a consistent goal. - final TabletGoalState goal = - TabletGoalState.compute(tm, state, manager.tabletBalancer, tableMgmtParams); + final TabletGoalState goal = TabletGoalState.compute(tm, state, + manager.getBalanceManager().getBalancer(), tableMgmtParams); final Set<ManagementAction> actions = mti.getActions(); @@ -969,8 +969,8 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { Map<KeyExtent,UnassignedTablet> unassigned) { if (!tLists.destinations.isEmpty()) { Map<KeyExtent,TServerInstance> assignedOut = new HashMap<>(); - manager.getAssignments(tLists.destinations, tLists.currentTServerGrouping, unassigned, - assignedOut); + manager.getBalanceManager().getAssignments(tLists.destinations, tLists.currentTServerGrouping, + unassigned, assignedOut); for (Entry<KeyExtent,TServerInstance> assignment : assignedOut.entrySet()) { if (unassigned.containsKey(assignment.getKey())) { if (assignment.getValue() != null) {