This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 714cad4a0a Resets compaction queue max size when config changes (#5278) 714cad4a0a is described below commit 714cad4a0ac60ec4ca3d40fc13c77bde85bf2337 Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri Jan 24 14:28:02 2025 -0500 Resets compaction queue max size when config changes (#5278) Updates the manager to reset the max size on compaction queues when config changes. In the case where the current data size of the queue exceeds the new configured size the lowest priority jobs are dropped until the size is acceptable. --- .../coordinator/CompactionCoordinator.java | 13 ++++++ .../queue/CompactionJobPriorityQueue.java | 20 +++++++++ .../compaction/queue/CompactionJobQueues.java | 7 ++- .../compaction/CompactionCoordinatorTest.java | 3 ++ .../queue/CompactionJobPriorityQueueTest.java | 52 ++++++++++++++++++++++ .../compaction/queue/CompactionJobQueuesTest.java | 30 +++++++++++++ 6 files changed, 124 insertions(+), 1 deletion(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index d8366a8792..ab8e1bbf44 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -294,10 +294,23 @@ public class CompactionCoordinator ThreadPools.watchNonCriticalScheduledTask(future); } + protected void startConfigMonitor(ScheduledThreadPoolExecutor schedExecutor) { + ScheduledFuture<?> future = + schedExecutor.scheduleWithFixedDelay(this::checkForConfigChanges, 0, 1, TimeUnit.MINUTES); + ThreadPools.watchNonCriticalScheduledTask(future); + } + + private void checkForConfigChanges() { + long jobQueueMaxSize = + ctx.getConfiguration().getAsBytes(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE); + jobQueues.resetMaxSize(jobQueueMaxSize); + } + @Override public void run() { this.coordinatorStartTime = System.currentTimeMillis(); + startConfigMonitor(schedExecutor); startCompactorZKCleaner(schedExecutor); // On a re-start of the coordinator it's possible that external compactions are in-progress. diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java index 9a36f07f56..51d33a8057 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java @@ -360,6 +360,26 @@ public class CompactionJobPriorityQueue { } } + public synchronized void resetMaxSize(long size) { + Preconditions.checkArgument(size > 0); + long oldSize = maxSize.getAndSet(size); + if (oldSize != size) { + // remove the lowest priority jobs if the current queue data size exceeds the new max size + long removed = 0; + while (jobQueue.dataSize() > maxSize.get()) { + var last = jobQueue.pollLastEntry(); + if (last == null) { + break; + } else { + rejectedJobs.getAndIncrement(); + removed++; + } + } + log.debug("Adjusted max size for compaction queue {} from {} to {} removing {} jobs.", + groupId, oldSize, size, removed); + } + } + public CompactionJobPriorityQueueStats getJobQueueStats() { return jobQueueStats.get(); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java index 2e2dc3cef9..e342b5b965 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java @@ -48,7 +48,7 @@ public class CompactionJobQueues { private final ConcurrentHashMap<CompactorGroupId,CompactionJobPriorityQueue> priorityQueues = new ConcurrentHashMap<>(); - private final long queueSize; + private volatile long queueSize; private final Map<DataLevel,AtomicLong> currentGenerations; @@ -193,4 +193,9 @@ public class CompactionJobQueues { pq.add(tabletMetadata, jobs, currentGenerations.get(DataLevel.of(tabletMetadata.getTableId())).get()); } + + public void resetMaxSize(long size) { + this.queueSize = size; + priorityQueues.values().forEach(cjpq -> cjpq.resetMaxSize(this.queueSize)); + } } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java index 931a0b6e7a..95ebdb8b47 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java @@ -152,6 +152,9 @@ public class CompactionCoordinatorTest { this.shutdown.countDown(); } + @Override + protected void startConfigMonitor(ScheduledThreadPoolExecutor schedExecutor) {} + @Override public void compactionCompleted(TInfo tinfo, TCredentials credentials, String externalCompactionId, TKeyExtent textent, TCompactionStats stats) diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java index 6617ad5cef..d7b067f049 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java @@ -20,6 +20,7 @@ package org.apache.accumulo.manager.compaction.queue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; @@ -338,4 +339,55 @@ public class CompactionJobPriorityQueueTest { < 2 * (CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD + CANCEL_THRESHOLD)); assertTrue(maxFuturesSize > 2 * CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD); } + + @Test + public void testResetMaxSize() { + TreeSet<CompactionJob> expected = new TreeSet<>(CompactionJobPrioritizer.JOB_COMPARATOR); + + // create a queue with a weigher that gives each job a data size of 10 + CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 1000, mj -> 10); + + // create and add 200 jobs, because of the queue size 100 should be dropped. + for (int x = 0; x < 200; x++) { + Pair<TabletMetadata,CompactionJob> pair = createJob(); + queue.add(pair.getFirst(), Set.of(pair.getSecond()), 1L); + expected.add(pair.getSecond()); + } + + assertEquals(1000, queue.getMaxSize()); + assertEquals(0, queue.getDequeuedJobs()); + assertEquals(100, queue.getQueuedJobs()); + assertEquals(100, queue.getRejectedJobs()); + + // reset the max size, this should cause the 50 lowest priority jobs to be dropped from the + // queue + queue.resetMaxSize(500); + + assertEquals(500, queue.getMaxSize()); + assertEquals(0, queue.getDequeuedJobs()); + assertEquals(50, queue.getQueuedJobs()); + assertEquals(150, queue.getRejectedJobs()); + + // ensure what is left in the queue is the 50 highest priority jobs + int matchesSeen = 0; + for (CompactionJob expectedJob : expected) { + MetaJob queuedJob = queue.poll(); + if (queuedJob == null) { + break; + } + assertEquals(expectedJob.getPriority(), queuedJob.getJob().getPriority()); + assertEquals(expectedJob.getFiles(), queuedJob.getJob().getFiles()); + matchesSeen++; + } + + assertEquals(50, matchesSeen); + + assertEquals(500, queue.getMaxSize()); + assertEquals(50, queue.getDequeuedJobs()); + assertEquals(0, queue.getQueuedJobs()); + assertEquals(150, queue.getRejectedJobs()); + + // try setting an illegal value + assertThrows(IllegalArgumentException.class, () -> queue.resetMaxSize(-100)); + } } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java index f63e56cc49..9d3592c893 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -410,4 +411,33 @@ public class CompactionJobQueuesTest { assertTrue(future6.isCompletedExceptionally()); assertTrue(future6.isDone()); } + + @Test + public void testResetSize() throws Exception { + CompactionJobQueues jobQueues = new CompactionJobQueues(1000000); + + var tid = TableId.of("1"); + var extent1 = new KeyExtent(tid, new Text("z"), new Text("q")); + + var tm1 = TabletMetadata.builder(extent1).build(); + + var cg1 = CompactorGroupId.of("CG1"); + var cg2 = CompactorGroupId.of("CG2"); + + jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1))); + + assertEquals(Set.of(cg1), jobQueues.getQueueIds()); + assertEquals(1000000, jobQueues.getQueueMaxSize(cg1)); + + jobQueues.resetMaxSize(500000); + + assertEquals(Set.of(cg1), jobQueues.getQueueIds()); + assertEquals(500000, jobQueues.getQueueMaxSize(cg1)); + + // create a new queue and ensure it uses the updated max size + jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg2))); + assertEquals(Set.of(cg1, cg2), jobQueues.getQueueIds()); + assertEquals(500000, jobQueues.getQueueMaxSize(cg1)); + assertEquals(500000, jobQueues.getQueueMaxSize(cg2)); + } }