This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 3.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 1abcad44bf43a5664415df488f9c8170284647fc Merge: e4d83cca90 bd9e67637c Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri Feb 28 15:54:32 2025 +0000 Merge branch '2.1' into 3.1 .../core/manager/balancer/BalanceParamsImpl.java | 22 ++++- .../accumulo/core/rpc/TraceProtocolFactory.java | 43 ++++---- .../core/spi/balancer/BalancerEnvironment.java | 5 + .../spi/balancer/HostRegexTableLoadBalancer.java | 6 +- .../core/spi/balancer/TableLoadBalancer.java | 14 +-- .../accumulo/core/spi/balancer/TabletBalancer.java | 10 ++ .../core/spi/balancer/GroupBalancerTest.java | 9 +- ...tRegexTableLoadBalancerReconfigurationTest.java | 4 +- .../balancer/HostRegexTableLoadBalancerTest.java | 14 +-- .../core/spi/balancer/SimpleLoadBalancerTest.java | 4 +- .../core/spi/balancer/TableLoadBalancerTest.java | 6 +- .../java/org/apache/accumulo/manager/Manager.java | 110 ++++++++++++--------- .../accumulo/manager/tableOps/TraceRepo.java | 25 ++++- .../java/org/apache/accumulo/test/BalanceIT.java | 96 +++++++++++++++++- .../accumulo/test/ChaoticLoadBalancerTest.java | 2 +- 15 files changed, 263 insertions(+), 107 deletions(-) diff --cc core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java index 189af6ef6f,f469b8fd2a..d65c0786f4 --- a/core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java @@@ -118,11 -121,11 +119,11 @@@ public class GroupBalancerTest for (TabletServerId tsi : tservers) { current.put(tsi, new TServerStatusImpl( - new org.apache.accumulo.core.master.thrift.TabletServerStatus())); + new org.apache.accumulo.core.manager.thrift.TabletServerStatus())); } - balancer - .balance(new BalanceParamsImpl(current, migrations, migrationsOut, DataLevel.of(tid))); + balancer.balance(new BalanceParamsImpl(current, migrations, migrationsOut, + DataLevel.of(tid), tablesToBalance)); assertTrue(migrationsOut.size() <= (maxMigrations + 5), "Max Migration exceeded " + maxMigrations + " " + migrationsOut.size()); diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 1fa3de8c88,67a6656164..fc826d50de --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@@ -98,8 -97,9 +98,8 @@@ import org.apache.accumulo.core.metadat import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.core.process.thrift.ServerProcessService; -import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator; import org.apache.accumulo.core.spi.balancer.BalancerEnvironment; - import org.apache.accumulo.core.spi.balancer.SimpleLoadBalancer; + import org.apache.accumulo.core.spi.balancer.TableLoadBalancer; import org.apache.accumulo.core.spi.balancer.TabletBalancer; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; @@@ -987,23 -999,22 +987,23 @@@ public class Manager extends AbstractSe final Map<String,TableInfo> newTableMap = new HashMap<>(dl == DataLevel.USER ? oldTableMap.size() : 1); if (dl == DataLevel.ROOT) { - if (oldTableMap.containsKey(AccumuloTable.ROOT.tableName())) { - newTableMap.put(AccumuloTable.ROOT.tableName(), - oldTableMap.get(AccumuloTable.ROOT.tableName())); - if (oldTableMap.containsKey(RootTable.ID.canonical())) { - newTableMap.put(RootTable.ID.canonical(), oldTableMap.get(RootTable.ID.canonical())); ++ if (oldTableMap.containsKey(AccumuloTable.ROOT.tableId().canonical())) { ++ newTableMap.put(AccumuloTable.ROOT.tableId().canonical(), ++ oldTableMap.get(AccumuloTable.ROOT.tableId().canonical())); } } else if (dl == DataLevel.METADATA) { - if (oldTableMap.containsKey(AccumuloTable.METADATA.tableName())) { - newTableMap.put(AccumuloTable.METADATA.tableName(), - oldTableMap.get(AccumuloTable.METADATA.tableName())); - if (oldTableMap.containsKey(MetadataTable.ID.canonical())) { - newTableMap.put(MetadataTable.ID.canonical(), - oldTableMap.get(MetadataTable.ID.canonical())); ++ if (oldTableMap.containsKey(AccumuloTable.METADATA.tableId().canonical())) { ++ newTableMap.put(AccumuloTable.METADATA.tableId().canonical(), ++ oldTableMap.get(AccumuloTable.METADATA.tableId().canonical())); } } else if (dl == DataLevel.USER) { - if (!oldTableMap.containsKey(AccumuloTable.METADATA.tableName()) - && !oldTableMap.containsKey(AccumuloTable.ROOT.tableName())) { - if (!oldTableMap.containsKey(MetadataTable.ID.canonical()) - && !oldTableMap.containsKey(RootTable.ID.canonical())) { ++ if (!oldTableMap.containsKey(AccumuloTable.METADATA.tableId().canonical()) ++ && !oldTableMap.containsKey(AccumuloTable.ROOT.tableId().canonical())) { newTableMap.putAll(oldTableMap); } else { oldTableMap.forEach((table, info) -> { - if (!table.equals(AccumuloTable.ROOT.tableName()) - && !table.equals(AccumuloTable.METADATA.tableName())) { - if (!table.equals(RootTable.ID.canonical()) - && !table.equals(MetadataTable.ID.canonical())) { ++ if (!table.equals(AccumuloTable.ROOT.tableId().canonical()) ++ && !table.equals(AccumuloTable.METADATA.tableId().canonical())) { newTableMap.put(table, info); } }); @@@ -1017,6 -1028,23 +1017,26 @@@ return tserverStatusForLevel; } + private Map<String,TableId> getTablesForLevel(DataLevel dataLevel) { + switch (dataLevel) { + case ROOT: - return Map.of(RootTable.NAME, RootTable.ID); ++ return Map.of(AccumuloTable.ROOT.tableName(), AccumuloTable.ROOT.tableId()); + case METADATA: - return Map.of(MetadataTable.NAME, MetadataTable.ID); ++ return Map.of(AccumuloTable.METADATA.tableName(), AccumuloTable.METADATA.tableId()); + case USER: { + Map<String,TableId> userTables = new HashMap<>(getContext().getTableNameToIdMap()); - userTables.remove(RootTable.NAME); - userTables.remove(MetadataTable.NAME); ++ for (var accumuloTable : AccumuloTable.values()) { ++ if (DataLevel.of(accumuloTable.tableId()) != DataLevel.USER) { ++ userTables.remove(accumuloTable.tableName()); ++ } ++ } + return Collections.unmodifiableMap(userTables); + } + default: + throw new IllegalArgumentException("Unknown data level " + dataLevel); + } + } + private long balanceTablets() { final int tabletsNotHosted = notHosted(); diff --cc server/manager/src/main/java/org/apache/accumulo/manager/tableOps/TraceRepo.java index 752bc90d44,fa2c7e193f..2ad5b273c9 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/TraceRepo.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/TraceRepo.java @@@ -18,13 -18,14 +18,14 @@@ */ package org.apache.accumulo.manager.tableOps; +import static org.apache.accumulo.core.util.LazySingletons.GSON; + +import org.apache.accumulo.core.clientImpl.thrift.TInfo; + import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.trace.thrift.TInfo; import org.apache.accumulo.manager.Manager; -import com.google.gson.Gson; - import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Scope; diff --cc test/src/main/java/org/apache/accumulo/test/BalanceIT.java index 0164463903,a282388ce7..4b35082f53 --- a/test/src/main/java/org/apache/accumulo/test/BalanceIT.java +++ b/test/src/main/java/org/apache/accumulo/test/BalanceIT.java @@@ -25,9 -33,14 +33,14 @@@ import java.util.stream.IntStream import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; + import org.apache.accumulo.core.client.admin.NewTableConfiguration; + import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.conf.Property; - import org.apache.accumulo.harness.AccumuloClusterHarness; -import org.apache.accumulo.core.metadata.MetadataTable; ++import org.apache.accumulo.core.metadata.AccumuloTable; + import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; + import org.apache.accumulo.test.functional.ConfigurableMacBase; + import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.Test; @@@ -72,4 -85,77 +85,77 @@@ public class BalanceIT extends Configur c.instanceOperations().waitForBalance(); } } + + @Test + public void testBalanceMetadata() throws Exception { + String tableName = getUniqueNames(1)[0]; + try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { + SortedSet<Text> splits = new TreeSet<>(); + for (int i = 0; i < 10; i++) { + splits.add(new Text("" + i)); + } + c.tableOperations().create(tableName, new NewTableConfiguration().withSplits(splits)); + + var metaSplits = IntStream.range(1, 100).mapToObj(i -> Integer.toString(i, 36)).map(Text::new) + .collect(Collectors.toCollection(TreeSet::new)); - c.tableOperations().addSplits(MetadataTable.NAME, metaSplits); ++ c.tableOperations().addSplits(AccumuloTable.METADATA.tableName(), metaSplits); + - var locCounts = countLocations(c, MetadataTable.NAME); ++ var locCounts = countLocations(c, AccumuloTable.METADATA.tableName()); + + c.instanceOperations().waitForBalance(); + - locCounts = countLocations(c, MetadataTable.NAME); ++ locCounts = countLocations(c, AccumuloTable.METADATA.tableName()); + var stats = locCounts.values().stream().mapToInt(i -> i).summaryStatistics(); + assertTrue(stats.getMax() <= 51, locCounts.toString()); + assertTrue(stats.getMin() >= 50, locCounts.toString()); + assertEquals(2, stats.getCount(), locCounts.toString()); + + assertEquals(2, getCluster().getConfig().getNumTservers()); + getCluster().getConfig().setNumTservers(4); + getCluster().getClusterControl().start(ServerType.TABLET_SERVER); + getCluster().getClusterControl().start(ServerType.TABLET_SERVER); + + Wait.waitFor(() -> { - var lc = countLocations(c, MetadataTable.NAME); ++ var lc = countLocations(c, AccumuloTable.METADATA.tableName()); + log.info("locations:{}", lc); + return lc.size() == 4; + }); + + c.instanceOperations().waitForBalance(); + - locCounts = countLocations(c, MetadataTable.NAME); ++ locCounts = countLocations(c, AccumuloTable.METADATA.tableName()); + stats = locCounts.values().stream().mapToInt(i -> i).summaryStatistics(); + assertTrue(stats.getMax() <= 26, locCounts.toString()); + assertTrue(stats.getMin() >= 25, locCounts.toString()); + assertEquals(4, stats.getCount(), locCounts.toString()); + + // The user table should eventually balance + Wait.waitFor(() -> { + var lc = countLocations(c, tableName); + log.info("locations:{}", lc); + return lc.size() == 4; + }); + + locCounts = countLocations(c, tableName); + stats = locCounts.values().stream().mapToInt(i -> i).summaryStatistics(); + assertTrue(stats.getMax() <= 3, locCounts.toString()); + assertTrue(stats.getMin() >= 2, locCounts.toString()); + assertEquals(4, stats.getCount(), locCounts.toString()); + } + } + + private Map<String,Integer> countLocations(AccumuloClient client, String tableName) + throws Exception { + var ctx = ((ClientContext) client); + var ample = ctx.getAmple(); + try (var tabletsMeta = + ample.readTablets().forTable(ctx.getTableId(tableName)).fetch(LOCATION, PREV_ROW).build()) { + Map<String,Integer> locCounts = new HashMap<>(); + for (var tabletMeta : tabletsMeta) { + var loc = tabletMeta.getLocation(); + locCounts.merge(loc == null ? " none" : loc.toString(), 1, Integer::sum); + } + return locCounts; + } + } }