This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit eea8ce48e2d5d7bd5c6b69e05c6824f697d8de0a
Merge: 637dd0fd3f 9d4d68b2a3
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Fri Apr 5 15:09:17 2024 +0000

    Merge branch 'main' into elasticity

 .../apache/accumulo/core/clientImpl/InstanceOperationsImpl.java   | 2 +-
 .../accumulo/core/util/compaction/ExternalCompactionUtil.java     | 7 ++++---
 .../src/main/java/org/apache/accumulo/monitor/Monitor.java        | 2 +-
 .../monitor/rest/compactions/external/CoordinatorInfo.java        | 4 ++--
 .../monitor/rest/compactions/external/ExternalCompactionInfo.java | 8 ++++----
 .../org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java  | 6 ++++--
 6 files changed, 16 insertions(+), 13 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
index 86181247a8,0046af7dc6..48895192bd
--- 
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
@@@ -105,26 -106,26 +106,26 @@@ public class ExternalCompactionUtil 
    }
  
    /**
 -   * @return map of queue names to compactor addresses
 +   * @return map of group names to compactor addresses
     */
-   public static Map<String,List<HostAndPort>> getCompactorAddrs(ClientContext 
context) {
+   public static Map<String,Set<HostAndPort>> getCompactorAddrs(ClientContext 
context) {
      try {
-       final Map<String,List<HostAndPort>> groupsAndAddresses = new 
HashMap<>();
 -      final Map<String,Set<HostAndPort>> queuesAndAddresses = new HashMap<>();
 -      final String compactorQueuesPath = context.getZooKeeperRoot() + 
Constants.ZCOMPACTORS;
++      final Map<String,Set<HostAndPort>> groupsAndAddresses = new HashMap<>();
 +      final String compactorGroupsPath = context.getZooKeeperRoot() + 
Constants.ZCOMPACTORS;
        ZooReader zooReader = context.getZooReader();
 -      List<String> queues = zooReader.getChildren(compactorQueuesPath);
 -      for (String queue : queues) {
 -        queuesAndAddresses.putIfAbsent(queue, new HashSet<>());
 +      List<String> groups = zooReader.getChildren(compactorGroupsPath);
 +      for (String group : groups) {
          try {
 -          List<String> compactors = zooReader.getChildren(compactorQueuesPath 
+ "/" + queue);
 +          List<String> compactors = zooReader.getChildren(compactorGroupsPath 
+ "/" + group);
            for (String compactor : compactors) {
              // compactor is the address, we are checking to see if there is a 
child node which
              // represents the compactor's lock as a check that it's alive.
              List<String> children =
 -                zooReader.getChildren(compactorQueuesPath + "/" + queue + "/" 
+ compactor);
 +                zooReader.getChildren(compactorGroupsPath + "/" + group + "/" 
+ compactor);
              if (!children.isEmpty()) {
                LOG.trace("Found live compactor {} ", compactor);
-               groupsAndAddresses.putIfAbsent(group, new ArrayList<>());
 -              
queuesAndAddresses.get(queue).add(HostAndPort.fromString(compactor));
++              groupsAndAddresses.putIfAbsent(group, new HashSet<>());
 +              
groupsAndAddresses.get(group).add(HostAndPort.fromString(compactor));
              }
            }
          } catch (NoNodeException e) {
diff --cc 
server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CoordinatorInfo.java
index d17ce19f18,8724f758bb..eccda4569e
--- 
a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CoordinatorInfo.java
+++ 
b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CoordinatorInfo.java
@@@ -33,9 -33,9 +33,9 @@@ public class CoordinatorInfo 
  
    public CoordinatorInfo(Optional<HostAndPort> serverOpt, 
ExternalCompactionInfo ecInfo) {
      server = serverOpt.map(HostAndPort::toString).orElse("none");
 -    var queueToCompactors = ecInfo.getCompactors();
 -    numQueues = queueToCompactors.size();
 -    numCompactors = 
queueToCompactors.values().stream().mapToInt(Set::size).sum();
 +    var groupToCompactors = ecInfo.getCompactors();
 +    numQueues = groupToCompactors.size();
-     numCompactors = 
groupToCompactors.values().stream().mapToInt(List::size).sum();
++    numCompactors = 
groupToCompactors.values().stream().mapToInt(Set::size).sum();
      lastContact = System.currentTimeMillis() - ecInfo.getFetchedTimeMillis();
    }
  }
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
index 03f7442c91,771d74d588..637a71eca8
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java
@@@ -22,10 -22,9 +22,12 @@@ import static org.apache.accumulo.test.
  import static org.junit.jupiter.api.Assertions.assertEquals;
  import static org.junit.jupiter.api.Assertions.assertNull;
  import static org.junit.jupiter.api.Assertions.assertTrue;
 +import static org.junit.jupiter.api.Assertions.fail;
  
++import java.util.ArrayList;
  import java.util.List;
  import java.util.Map;
++import java.util.Set;
  import java.util.concurrent.atomic.AtomicReference;
  import java.util.concurrent.atomic.DoubleAdder;
  
@@@ -133,37 -121,6 +135,37 @@@ public class MemoryStarvedMajCIT extend
      String table = getUniqueNames(1)[0];
      try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
  
 +      ClientContext ctx = (ClientContext) client;
 +
 +      // Kill the normal compactors and wait until their addresses in ZK are 
cleared
 +      
getCluster().getConfig().getClusterServerConfiguration().getCompactorConfiguration().keySet()
 +          .forEach(resourceGroup -> {
 +            List<Process> procs = 
getCluster().getClusterControl().getCompactors(resourceGroup);
 +            for (int i = 0; i < procs.size(); i++) {
 +              LOG.info("Stopping compactor process: {}", procs.get(i).pid());
 +              try {
 +                procs.get(i).destroyForcibly().waitFor();
 +              } catch (InterruptedException e) {
 +                fail("Interrupted trying to stop compactor process");
 +              }
 +            }
 +            
getCluster().getClusterControl().getCompactors(resourceGroup).clear();
 +          });
 +      Wait.waitFor(() -> ExternalCompactionUtil.getCompactorAddrs(ctx).size() 
== 0, 60_000);
 +
 +      // Start the Compactors that will consume and free memory when we need 
it to
 +      getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1,
 +          MemoryConsumingCompactor.class);
 +      Wait.waitFor(() -> ExternalCompactionUtil.getCompactorAddrs(ctx).size() 
== 1, 60_000);
 +      Wait.waitFor(() -> ExternalCompactionUtil.getCompactorAddrs(ctx)
 +          .get(Constants.DEFAULT_RESOURCE_GROUP_NAME).size() == 1, 60_000);
 +
-       Map<String,List<HostAndPort>> groupedCompactors =
++      Map<String,Set<HostAndPort>> groupedCompactors =
 +          ExternalCompactionUtil.getCompactorAddrs(ctx);
 +      List<HostAndPort> compactorAddresses =
-           groupedCompactors.get(Constants.DEFAULT_RESOURCE_GROUP_NAME);
++          new 
ArrayList<>(groupedCompactors.get(Constants.DEFAULT_RESOURCE_GROUP_NAME));
 +      HostAndPort compactorAddr = compactorAddresses.get(0);
 +
        TableOperations to = client.tableOperations();
        to.create(table);
  

Reply via email to