This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new b0b9555b13 Added TabletServer in WaitForBalanceIT (#4969) b0b9555b13 is described below commit b0b9555b1314e2707d8c36a34320c7b38361c046 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Thu Oct 10 14:21:04 2024 -0400 Added TabletServer in WaitForBalanceIT (#4969) WaitForBalanceIT was assuming that the tablets were not balanced after adding splits. In the last step of the split code, in DeleteOperationIds, the EventCoordinator is notified to try and get the new splits hosted as fast as possible. This commit adds a new TabletServer which will cause a balancing to occur. I also modified the isBalanced method to take into account the tservers in the cluster, not just the location from the metadata. --- .../org/apache/accumulo/test/WaitForBalanceIT.java | 33 ++++++++++++++-------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/WaitForBalanceIT.java b/test/src/main/java/org/apache/accumulo/test/WaitForBalanceIT.java index fe1b7891c3..2c08441f5a 100644 --- a/test/src/main/java/org/apache/accumulo/test/WaitForBalanceIT.java +++ b/test/src/main/java/org/apache/accumulo/test/WaitForBalanceIT.java @@ -18,7 +18,7 @@ */ package org.apache.accumulo.test; -import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import java.time.Duration; @@ -33,6 +33,7 @@ import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.TabletAvailability; +import org.apache.accumulo.core.client.admin.servers.ServerId.Type; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.metadata.AccumuloTable; @@ -40,6 +41,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.test.functional.ConfigurableMacBase; import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.io.Text; @@ -57,6 +59,7 @@ public class WaitForBalanceIT extends ConfigurableMacBase { @Test public void test() throws Exception { try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { + assertEquals(2, c.instanceOperations().getServers(Type.TABLET_SERVER).size()); // ensure the metadata table is online try (Scanner scanner = c.createScanner(AccumuloTable.METADATA.tableName(), Authorizations.EMPTY)) { @@ -74,14 +77,22 @@ public class WaitForBalanceIT extends ConfigurableMacBase { partitionKeys.add(new Text("" + i)); } c.tableOperations().addSplits(tableName, partitionKeys); - assertFalse(isBalanced(c)); c.instanceOperations().waitForBalance(); - Wait.waitFor(() -> isBalanced(c)); + assertTrue(isBalanced(c)); + + // Add another tserver to force a rebalance + getCluster().getConfig().getClusterServerConfiguration().setNumDefaultTabletServers(3); + getCluster().getClusterControl().start(ServerType.TABLET_SERVER); + Wait.waitFor(() -> c.instanceOperations().getServers(Type.TABLET_SERVER).size() == 3); + c.instanceOperations().waitForBalance(); + assertTrue(isBalanced(c)); } } private boolean isBalanced(AccumuloClient c) throws Exception { - final Map<String,Integer> counts = new HashMap<>(); + final Map<String,Integer> tserverCounts = new HashMap<>(); + c.instanceOperations().getServers(Type.TABLET_SERVER) + .forEach(ts -> tserverCounts.put(ts.toHostPortString(), 0)); int offline = 0; for (String tableName : new String[] {AccumuloTable.METADATA.tableName(), AccumuloTable.ROOT.tableName()}) { @@ -93,17 +104,17 @@ public class WaitForBalanceIT extends ConfigurableMacBase { for (Entry<Key,Value> entry : s) { Key key = entry.getKey(); if (key.getColumnFamily().equals(CurrentLocationColumnFamily.NAME)) { - location = key.getColumnQualifier().toString(); + location = entry.getValue().toString(); } else if (TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) { if (location == null) { offline++; } else { - Integer count = counts.get(location); + Integer count = tserverCounts.get(location); if (count == null) { count = 0; } count = count + 1; - counts.put(location, count); + tserverCounts.put(location, count); } location = null; } @@ -115,13 +126,13 @@ public class WaitForBalanceIT extends ConfigurableMacBase { return false; } int average = 0; - for (Integer i : counts.values()) { + for (Integer i : tserverCounts.values()) { average += i; } - average /= counts.size(); - System.out.println(counts); + average /= tserverCounts.size(); + System.out.println(tserverCounts); int tablesCount = c.tableOperations().list().size(); - for (Entry<String,Integer> hostCount : counts.entrySet()) { + for (Entry<String,Integer> hostCount : tserverCounts.entrySet()) { if (Math.abs(average - hostCount.getValue()) > tablesCount) { System.out.println( "Average " + average + " count " + hostCount.getKey() + ": " + hostCount.getValue());