This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new b0b9555b13 Added TabletServer in WaitForBalanceIT (#4969)
b0b9555b13 is described below

commit b0b9555b1314e2707d8c36a34320c7b38361c046
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Thu Oct 10 14:21:04 2024 -0400

    Added TabletServer in WaitForBalanceIT (#4969)
    
    WaitForBalanceIT was assuming that the tablets were not balanced
    after adding splits. In the last step of the split code, in 
DeleteOperationIds,
    the EventCoordinator is notified to try and get the new splits hosted as 
fast
    as possible. This commit adds a new TabletServer which will cause a
    balancing to occur. I also modified the isBalanced method to take into
    account the tservers in the cluster, not just the location from the 
metadata.
---
 .../org/apache/accumulo/test/WaitForBalanceIT.java | 33 ++++++++++++++--------
 1 file changed, 22 insertions(+), 11 deletions(-)

diff --git a/test/src/main/java/org/apache/accumulo/test/WaitForBalanceIT.java 
b/test/src/main/java/org/apache/accumulo/test/WaitForBalanceIT.java
index fe1b7891c3..2c08441f5a 100644
--- a/test/src/main/java/org/apache/accumulo/test/WaitForBalanceIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/WaitForBalanceIT.java
@@ -18,7 +18,7 @@
  */
 package org.apache.accumulo.test;
 
-import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.time.Duration;
@@ -33,6 +33,7 @@ import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.client.admin.TabletAvailability;
+import org.apache.accumulo.core.client.admin.servers.ServerId.Type;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.metadata.AccumuloTable;
@@ -40,6 +41,7 @@ import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.test.functional.ConfigurableMacBase;
 import org.apache.accumulo.test.util.Wait;
 import org.apache.hadoop.io.Text;
@@ -57,6 +59,7 @@ public class WaitForBalanceIT extends ConfigurableMacBase {
   @Test
   public void test() throws Exception {
     try (AccumuloClient c = 
Accumulo.newClient().from(getClientProperties()).build()) {
+      assertEquals(2, 
c.instanceOperations().getServers(Type.TABLET_SERVER).size());
       // ensure the metadata table is online
       try (Scanner scanner =
           c.createScanner(AccumuloTable.METADATA.tableName(), 
Authorizations.EMPTY)) {
@@ -74,14 +77,22 @@ public class WaitForBalanceIT extends ConfigurableMacBase {
         partitionKeys.add(new Text("" + i));
       }
       c.tableOperations().addSplits(tableName, partitionKeys);
-      assertFalse(isBalanced(c));
       c.instanceOperations().waitForBalance();
-      Wait.waitFor(() -> isBalanced(c));
+      assertTrue(isBalanced(c));
+
+      // Add another tserver to force a rebalance
+      
getCluster().getConfig().getClusterServerConfiguration().setNumDefaultTabletServers(3);
+      getCluster().getClusterControl().start(ServerType.TABLET_SERVER);
+      Wait.waitFor(() -> 
c.instanceOperations().getServers(Type.TABLET_SERVER).size() == 3);
+      c.instanceOperations().waitForBalance();
+      assertTrue(isBalanced(c));
     }
   }
 
   private boolean isBalanced(AccumuloClient c) throws Exception {
-    final Map<String,Integer> counts = new HashMap<>();
+    final Map<String,Integer> tserverCounts = new HashMap<>();
+    c.instanceOperations().getServers(Type.TABLET_SERVER)
+        .forEach(ts -> tserverCounts.put(ts.toHostPortString(), 0));
     int offline = 0;
     for (String tableName : new String[] {AccumuloTable.METADATA.tableName(),
         AccumuloTable.ROOT.tableName()}) {
@@ -93,17 +104,17 @@ public class WaitForBalanceIT extends ConfigurableMacBase {
         for (Entry<Key,Value> entry : s) {
           Key key = entry.getKey();
           if (key.getColumnFamily().equals(CurrentLocationColumnFamily.NAME)) {
-            location = key.getColumnQualifier().toString();
+            location = entry.getValue().toString();
           } else if (TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) {
             if (location == null) {
               offline++;
             } else {
-              Integer count = counts.get(location);
+              Integer count = tserverCounts.get(location);
               if (count == null) {
                 count = 0;
               }
               count = count + 1;
-              counts.put(location, count);
+              tserverCounts.put(location, count);
             }
             location = null;
           }
@@ -115,13 +126,13 @@ public class WaitForBalanceIT extends ConfigurableMacBase 
{
       return false;
     }
     int average = 0;
-    for (Integer i : counts.values()) {
+    for (Integer i : tserverCounts.values()) {
       average += i;
     }
-    average /= counts.size();
-    System.out.println(counts);
+    average /= tserverCounts.size();
+    System.out.println(tserverCounts);
     int tablesCount = c.tableOperations().list().size();
-    for (Entry<String,Integer> hostCount : counts.entrySet()) {
+    for (Entry<String,Integer> hostCount : tserverCounts.entrySet()) {
       if (Math.abs(average - hostCount.getValue()) > tablesCount) {
         System.out.println(
             "Average " + average + " count " + hostCount.getKey() + ": " + 
hostCount.getValue());

Reply via email to