This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 9839e4d42fae9bd20b1529ca0e5e2fbbbbed4122 Merge: 23e17129de 05c2f45042 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Wed Mar 13 19:52:45 2024 +0000 Merge branch '2.1' .../coordinator/CompactionCoordinator.java | 32 ++++++++++++++++++++-- .../accumulo/coordinator/QueueSummaries.java | 8 ++++++ 2 files changed, 37 insertions(+), 3 deletions(-) diff --cc server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index d5ecbf9ddd,b0ec498a9e..685080c5b1 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@@ -22,8 -23,7 +22,9 @@@ import static com.google.common.util.co import java.lang.reflect.InvocationTargetException; import java.net.UnknownHostException; +import java.util.ArrayList; + import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@@ -321,35 -325,58 +325,57 @@@ public class CompactionCoordinator exte LOG.info("Shutting down"); } + private Map<String,List<HostAndPort>> getIdleCompactors() { + + Map<String,List<HostAndPort>> allCompactors = + ExternalCompactionUtil.getCompactorAddrs(getContext()); + + Set<String> emptyQueues = new HashSet<>(); + + // Remove all of the compactors that are running a compaction + RUNNING_CACHE.values().forEach(rc -> { + List<HostAndPort> busyCompactors = allCompactors.get(rc.getQueueName()); + if (busyCompactors != null + && busyCompactors.remove(HostAndPort.fromString(rc.getCompactorAddress()))) { + if (busyCompactors.isEmpty()) { + emptyQueues.add(rc.getQueueName()); + } + } + }); + // Remove entries with empty queues + emptyQueues.forEach(e -> allCompactors.remove(e)); + return allCompactors; + } + private void updateSummaries() { - ExecutorService executor = ThreadPools.getServerThreadPools().createFixedThreadPool(10, - "Compaction Summary Gatherer", false); - try { - Set<String> queuesSeen = new ConcurrentSkipListSet<>(); - tserverSet.getCurrentServers().forEach(tsi -> { - executor.execute(() -> updateSummaries(tsi, queuesSeen)); - }); + final ArrayList<Future<?>> tasks = new ArrayList<>(); + Set<String> queuesSeen = new ConcurrentSkipListSet<>(); - executor.shutdown(); + tserverSet.getCurrentServers().forEach(tsi -> { + tasks.add(summariesExecutor.submit(() -> updateSummaries(tsi, queuesSeen))); + }); - try { - while (!executor.awaitTermination(1, TimeUnit.MINUTES)) {} - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); + // Wait for all tasks to complete + while (!tasks.isEmpty()) { + Iterator<Future<?>> iter = tasks.iterator(); + while (iter.hasNext()) { + Future<?> f = iter.next(); + if (f.isDone()) { + iter.remove(); + } } + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + } - // remove any queues that were seen in the past, but were not seen in the latest gathering of - // summaries - TIME_COMPACTOR_LAST_CHECKED.keySet().retainAll(queuesSeen); + // remove any queues that were seen in the past, but were not seen in the latest gathering of + // summaries + TIME_COMPACTOR_LAST_CHECKED.keySet().retainAll(queuesSeen); - // add any queues that were never seen before - queuesSeen.forEach(q -> { - TIME_COMPACTOR_LAST_CHECKED.computeIfAbsent(q, k -> System.currentTimeMillis()); - }); - } finally { - executor.shutdownNow(); - } + // add any queues that were never seen before + queuesSeen.forEach(q -> { + TIME_COMPACTOR_LAST_CHECKED.computeIfAbsent(q, k -> System.currentTimeMillis()); + }); } private void updateSummaries(TServerInstance tsi, Set<String> queuesSeen) {