This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 9839e4d42fae9bd20b1529ca0e5e2fbbbbed4122
Merge: 23e17129de 05c2f45042
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Wed Mar 13 19:52:45 2024 +0000

    Merge branch '2.1'

 .../coordinator/CompactionCoordinator.java         | 32 ++++++++++++++++++++--
 .../accumulo/coordinator/QueueSummaries.java       |  8 ++++++
 2 files changed, 37 insertions(+), 3 deletions(-)

diff --cc 
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index d5ecbf9ddd,b0ec498a9e..685080c5b1
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@@ -22,8 -23,7 +22,9 @@@ import static com.google.common.util.co
  
  import java.lang.reflect.InvocationTargetException;
  import java.net.UnknownHostException;
 +import java.util.ArrayList;
+ import java.util.HashSet;
 +import java.util.Iterator;
  import java.util.List;
  import java.util.Map;
  import java.util.Set;
@@@ -321,35 -325,58 +325,57 @@@ public class CompactionCoordinator exte
      LOG.info("Shutting down");
    }
  
+   private Map<String,List<HostAndPort>> getIdleCompactors() {
+ 
+     Map<String,List<HostAndPort>> allCompactors =
+         ExternalCompactionUtil.getCompactorAddrs(getContext());
+ 
+     Set<String> emptyQueues = new HashSet<>();
+ 
+     // Remove all of the compactors that are running a compaction
+     RUNNING_CACHE.values().forEach(rc -> {
+       List<HostAndPort> busyCompactors = allCompactors.get(rc.getQueueName());
+       if (busyCompactors != null
+           && 
busyCompactors.remove(HostAndPort.fromString(rc.getCompactorAddress()))) {
+         if (busyCompactors.isEmpty()) {
+           emptyQueues.add(rc.getQueueName());
+         }
+       }
+     });
+     // Remove entries with empty queues
+     emptyQueues.forEach(e -> allCompactors.remove(e));
+     return allCompactors;
+   }
+ 
    private void updateSummaries() {
 -    ExecutorService executor = 
ThreadPools.getServerThreadPools().createFixedThreadPool(10,
 -        "Compaction Summary Gatherer", false);
 -    try {
 -      Set<String> queuesSeen = new ConcurrentSkipListSet<>();
  
 -      tserverSet.getCurrentServers().forEach(tsi -> {
 -        executor.execute(() -> updateSummaries(tsi, queuesSeen));
 -      });
 +    final ArrayList<Future<?>> tasks = new ArrayList<>();
 +    Set<String> queuesSeen = new ConcurrentSkipListSet<>();
  
 -      executor.shutdown();
 +    tserverSet.getCurrentServers().forEach(tsi -> {
 +      tasks.add(summariesExecutor.submit(() -> updateSummaries(tsi, 
queuesSeen)));
 +    });
  
 -      try {
 -        while (!executor.awaitTermination(1, TimeUnit.MINUTES)) {}
 -      } catch (InterruptedException e) {
 -        Thread.currentThread().interrupt();
 -        throw new RuntimeException(e);
 +    // Wait for all tasks to complete
 +    while (!tasks.isEmpty()) {
 +      Iterator<Future<?>> iter = tasks.iterator();
 +      while (iter.hasNext()) {
 +        Future<?> f = iter.next();
 +        if (f.isDone()) {
 +          iter.remove();
 +        }
        }
 +      Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
 +    }
  
 -      // remove any queues that were seen in the past, but were not seen in 
the latest gathering of
 -      // summaries
 -      TIME_COMPACTOR_LAST_CHECKED.keySet().retainAll(queuesSeen);
 +    // remove any queues that were seen in the past, but were not seen in the 
latest gathering of
 +    // summaries
 +    TIME_COMPACTOR_LAST_CHECKED.keySet().retainAll(queuesSeen);
  
 -      // add any queues that were never seen before
 -      queuesSeen.forEach(q -> {
 -        TIME_COMPACTOR_LAST_CHECKED.computeIfAbsent(q, k -> 
System.currentTimeMillis());
 -      });
 -    } finally {
 -      executor.shutdownNow();
 -    }
 +    // add any queues that were never seen before
 +    queuesSeen.forEach(q -> {
 +      TIME_COMPACTOR_LAST_CHECKED.computeIfAbsent(q, k -> 
System.currentTimeMillis());
 +    });
    }
  
    private void updateSummaries(TServerInstance tsi, Set<String> queuesSeen) {

Reply via email to