This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit eea8ce48e2d5d7bd5c6b69e05c6824f697d8de0a Merge: 637dd0fd3f 9d4d68b2a3 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Fri Apr 5 15:09:17 2024 +0000 Merge branch 'main' into elasticity .../apache/accumulo/core/clientImpl/InstanceOperationsImpl.java | 2 +- .../accumulo/core/util/compaction/ExternalCompactionUtil.java | 7 ++++--- .../src/main/java/org/apache/accumulo/monitor/Monitor.java | 2 +- .../monitor/rest/compactions/external/CoordinatorInfo.java | 4 ++-- .../monitor/rest/compactions/external/ExternalCompactionInfo.java | 8 ++++---- .../org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java | 6 ++++-- 6 files changed, 16 insertions(+), 13 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java index 86181247a8,0046af7dc6..48895192bd --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java @@@ -105,26 -106,26 +106,26 @@@ public class ExternalCompactionUtil } /** - * @return map of queue names to compactor addresses + * @return map of group names to compactor addresses */ - public static Map<String,List<HostAndPort>> getCompactorAddrs(ClientContext context) { + public static Map<String,Set<HostAndPort>> getCompactorAddrs(ClientContext context) { try { - final Map<String,List<HostAndPort>> groupsAndAddresses = new HashMap<>(); - final Map<String,Set<HostAndPort>> queuesAndAddresses = new HashMap<>(); - final String compactorQueuesPath = context.getZooKeeperRoot() + Constants.ZCOMPACTORS; ++ final Map<String,Set<HostAndPort>> groupsAndAddresses = new HashMap<>(); + final String compactorGroupsPath = context.getZooKeeperRoot() + Constants.ZCOMPACTORS; ZooReader zooReader = context.getZooReader(); - List<String> queues = zooReader.getChildren(compactorQueuesPath); - for (String queue : queues) { - queuesAndAddresses.putIfAbsent(queue, new HashSet<>()); + List<String> groups = zooReader.getChildren(compactorGroupsPath); + for (String group : groups) { try { - List<String> compactors = zooReader.getChildren(compactorQueuesPath + "/" + queue); + List<String> compactors = zooReader.getChildren(compactorGroupsPath + "/" + group); for (String compactor : compactors) { // compactor is the address, we are checking to see if there is a child node which // represents the compactor's lock as a check that it's alive. List<String> children = - zooReader.getChildren(compactorQueuesPath + "/" + queue + "/" + compactor); + zooReader.getChildren(compactorGroupsPath + "/" + group + "/" + compactor); if (!children.isEmpty()) { LOG.trace("Found live compactor {} ", compactor); - groupsAndAddresses.putIfAbsent(group, new ArrayList<>()); - queuesAndAddresses.get(queue).add(HostAndPort.fromString(compactor)); ++ groupsAndAddresses.putIfAbsent(group, new HashSet<>()); + groupsAndAddresses.get(group).add(HostAndPort.fromString(compactor)); } } } catch (NoNodeException e) { diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CoordinatorInfo.java index d17ce19f18,8724f758bb..eccda4569e --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CoordinatorInfo.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CoordinatorInfo.java @@@ -33,9 -33,9 +33,9 @@@ public class CoordinatorInfo public CoordinatorInfo(Optional<HostAndPort> serverOpt, ExternalCompactionInfo ecInfo) { server = serverOpt.map(HostAndPort::toString).orElse("none"); - var queueToCompactors = ecInfo.getCompactors(); - numQueues = queueToCompactors.size(); - numCompactors = queueToCompactors.values().stream().mapToInt(Set::size).sum(); + var groupToCompactors = ecInfo.getCompactors(); + numQueues = groupToCompactors.size(); - numCompactors = groupToCompactors.values().stream().mapToInt(List::size).sum(); ++ numCompactors = groupToCompactors.values().stream().mapToInt(Set::size).sum(); lastContact = System.currentTimeMillis() - ecInfo.getFetchedTimeMillis(); } } diff --cc test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java index 03f7442c91,771d74d588..637a71eca8 --- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java @@@ -22,10 -22,9 +22,12 @@@ import static org.apache.accumulo.test. import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; ++import java.util.ArrayList; import java.util.List; import java.util.Map; ++import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.DoubleAdder; @@@ -133,37 -121,6 +135,37 @@@ public class MemoryStarvedMajCIT extend String table = getUniqueNames(1)[0]; try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + ClientContext ctx = (ClientContext) client; + + // Kill the normal compactors and wait until their addresses in ZK are cleared + getCluster().getConfig().getClusterServerConfiguration().getCompactorConfiguration().keySet() + .forEach(resourceGroup -> { + List<Process> procs = getCluster().getClusterControl().getCompactors(resourceGroup); + for (int i = 0; i < procs.size(); i++) { + LOG.info("Stopping compactor process: {}", procs.get(i).pid()); + try { + procs.get(i).destroyForcibly().waitFor(); + } catch (InterruptedException e) { + fail("Interrupted trying to stop compactor process"); + } + } + getCluster().getClusterControl().getCompactors(resourceGroup).clear(); + }); + Wait.waitFor(() -> ExternalCompactionUtil.getCompactorAddrs(ctx).size() == 0, 60_000); + + // Start the Compactors that will consume and free memory when we need it to + getCluster().getClusterControl().start(ServerType.COMPACTOR, null, 1, + MemoryConsumingCompactor.class); + Wait.waitFor(() -> ExternalCompactionUtil.getCompactorAddrs(ctx).size() == 1, 60_000); + Wait.waitFor(() -> ExternalCompactionUtil.getCompactorAddrs(ctx) + .get(Constants.DEFAULT_RESOURCE_GROUP_NAME).size() == 1, 60_000); + - Map<String,List<HostAndPort>> groupedCompactors = ++ Map<String,Set<HostAndPort>> groupedCompactors = + ExternalCompactionUtil.getCompactorAddrs(ctx); + List<HostAndPort> compactorAddresses = - groupedCompactors.get(Constants.DEFAULT_RESOURCE_GROUP_NAME); ++ new ArrayList<>(groupedCompactors.get(Constants.DEFAULT_RESOURCE_GROUP_NAME)); + HostAndPort compactorAddr = compactorAddresses.get(0); + TableOperations to = client.tableOperations(); to.create(table);