This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 714cad4a0a Resets compaction queue max size when config changes (#5278)
714cad4a0a is described below

commit 714cad4a0ac60ec4ca3d40fc13c77bde85bf2337
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Fri Jan 24 14:28:02 2025 -0500

    Resets compaction queue max size when config changes (#5278)
    
    Updates the manager to reset the max size on compaction queues when
    config changes.  In the case where the current data size of the queue
    exceeds the new configured size the lowest priority jobs are dropped
    until the size is acceptable.
---
 .../coordinator/CompactionCoordinator.java         | 13 ++++++
 .../queue/CompactionJobPriorityQueue.java          | 20 +++++++++
 .../compaction/queue/CompactionJobQueues.java      |  7 ++-
 .../compaction/CompactionCoordinatorTest.java      |  3 ++
 .../queue/CompactionJobPriorityQueueTest.java      | 52 ++++++++++++++++++++++
 .../compaction/queue/CompactionJobQueuesTest.java  | 30 +++++++++++++
 6 files changed, 124 insertions(+), 1 deletion(-)

diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index d8366a8792..ab8e1bbf44 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -294,10 +294,23 @@ public class CompactionCoordinator
     ThreadPools.watchNonCriticalScheduledTask(future);
   }
 
+  protected void startConfigMonitor(ScheduledThreadPoolExecutor schedExecutor) 
{
+    ScheduledFuture<?> future =
+        schedExecutor.scheduleWithFixedDelay(this::checkForConfigChanges, 0, 
1, TimeUnit.MINUTES);
+    ThreadPools.watchNonCriticalScheduledTask(future);
+  }
+
+  private void checkForConfigChanges() {
+    long jobQueueMaxSize =
+        
ctx.getConfiguration().getAsBytes(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE);
+    jobQueues.resetMaxSize(jobQueueMaxSize);
+  }
+
   @Override
   public void run() {
 
     this.coordinatorStartTime = System.currentTimeMillis();
+    startConfigMonitor(schedExecutor);
     startCompactorZKCleaner(schedExecutor);
 
     // On a re-start of the coordinator it's possible that external 
compactions are in-progress.
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
index 9a36f07f56..51d33a8057 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
@@ -360,6 +360,26 @@ public class CompactionJobPriorityQueue {
     }
   }
 
+  public synchronized void resetMaxSize(long size) {
+    Preconditions.checkArgument(size > 0);
+    long oldSize = maxSize.getAndSet(size);
+    if (oldSize != size) {
+      // remove the lowest priority jobs if the current queue data size 
exceeds the new max size
+      long removed = 0;
+      while (jobQueue.dataSize() > maxSize.get()) {
+        var last = jobQueue.pollLastEntry();
+        if (last == null) {
+          break;
+        } else {
+          rejectedJobs.getAndIncrement();
+          removed++;
+        }
+      }
+      log.debug("Adjusted max size for compaction queue {} from {} to {} 
removing {} jobs.",
+          groupId, oldSize, size, removed);
+    }
+  }
+
   public CompactionJobPriorityQueueStats getJobQueueStats() {
     return jobQueueStats.get();
   }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
index 2e2dc3cef9..e342b5b965 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
@@ -48,7 +48,7 @@ public class CompactionJobQueues {
   private final ConcurrentHashMap<CompactorGroupId,CompactionJobPriorityQueue> 
priorityQueues =
       new ConcurrentHashMap<>();
 
-  private final long queueSize;
+  private volatile long queueSize;
 
   private final Map<DataLevel,AtomicLong> currentGenerations;
 
@@ -193,4 +193,9 @@ public class CompactionJobQueues {
     pq.add(tabletMetadata, jobs,
         
currentGenerations.get(DataLevel.of(tabletMetadata.getTableId())).get());
   }
+
+  public void resetMaxSize(long size) {
+    this.queueSize = size;
+    priorityQueues.values().forEach(cjpq -> cjpq.resetMaxSize(this.queueSize));
+  }
 }
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
index 931a0b6e7a..95ebdb8b47 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
@@ -152,6 +152,9 @@ public class CompactionCoordinatorTest {
       this.shutdown.countDown();
     }
 
+    @Override
+    protected void startConfigMonitor(ScheduledThreadPoolExecutor 
schedExecutor) {}
+
     @Override
     public void compactionCompleted(TInfo tinfo, TCredentials credentials,
         String externalCompactionId, TKeyExtent textent, TCompactionStats 
stats)
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
index 6617ad5cef..d7b067f049 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.manager.compaction.queue;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.ArrayList;
@@ -338,4 +339,55 @@ public class CompactionJobPriorityQueueTest {
         < 2 * (CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD + 
CANCEL_THRESHOLD));
     assertTrue(maxFuturesSize > 2 * 
CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD);
   }
+
+  @Test
+  public void testResetMaxSize() {
+    TreeSet<CompactionJob> expected = new 
TreeSet<>(CompactionJobPrioritizer.JOB_COMPARATOR);
+
+    // create a queue with a weigher that gives each job a data size of 10
+    CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 
1000, mj -> 10);
+
+    // create and add 200 jobs, because of the queue size 100 should be 
dropped.
+    for (int x = 0; x < 200; x++) {
+      Pair<TabletMetadata,CompactionJob> pair = createJob();
+      queue.add(pair.getFirst(), Set.of(pair.getSecond()), 1L);
+      expected.add(pair.getSecond());
+    }
+
+    assertEquals(1000, queue.getMaxSize());
+    assertEquals(0, queue.getDequeuedJobs());
+    assertEquals(100, queue.getQueuedJobs());
+    assertEquals(100, queue.getRejectedJobs());
+
+    // reset the max size, this should cause the 50 lowest priority jobs to be 
dropped from the
+    // queue
+    queue.resetMaxSize(500);
+
+    assertEquals(500, queue.getMaxSize());
+    assertEquals(0, queue.getDequeuedJobs());
+    assertEquals(50, queue.getQueuedJobs());
+    assertEquals(150, queue.getRejectedJobs());
+
+    // ensure what is left in the queue is the 50 highest priority jobs
+    int matchesSeen = 0;
+    for (CompactionJob expectedJob : expected) {
+      MetaJob queuedJob = queue.poll();
+      if (queuedJob == null) {
+        break;
+      }
+      assertEquals(expectedJob.getPriority(), 
queuedJob.getJob().getPriority());
+      assertEquals(expectedJob.getFiles(), queuedJob.getJob().getFiles());
+      matchesSeen++;
+    }
+
+    assertEquals(50, matchesSeen);
+
+    assertEquals(500, queue.getMaxSize());
+    assertEquals(50, queue.getDequeuedJobs());
+    assertEquals(0, queue.getQueuedJobs());
+    assertEquals(150, queue.getRejectedJobs());
+
+    // try setting an illegal value
+    assertThrows(IllegalArgumentException.class, () -> 
queue.resetMaxSize(-100));
+  }
 }
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
index f63e56cc49..9d3592c893 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
@@ -31,6 +31,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -410,4 +411,33 @@ public class CompactionJobQueuesTest {
     assertTrue(future6.isCompletedExceptionally());
     assertTrue(future6.isDone());
   }
+
+  @Test
+  public void testResetSize() throws Exception {
+    CompactionJobQueues jobQueues = new CompactionJobQueues(1000000);
+
+    var tid = TableId.of("1");
+    var extent1 = new KeyExtent(tid, new Text("z"), new Text("q"));
+
+    var tm1 = TabletMetadata.builder(extent1).build();
+
+    var cg1 = CompactorGroupId.of("CG1");
+    var cg2 = CompactorGroupId.of("CG2");
+
+    jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1)));
+
+    assertEquals(Set.of(cg1), jobQueues.getQueueIds());
+    assertEquals(1000000, jobQueues.getQueueMaxSize(cg1));
+
+    jobQueues.resetMaxSize(500000);
+
+    assertEquals(Set.of(cg1), jobQueues.getQueueIds());
+    assertEquals(500000, jobQueues.getQueueMaxSize(cg1));
+
+    // create a new queue and ensure it uses the updated max size
+    jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg2)));
+    assertEquals(Set.of(cg1, cg2), jobQueues.getQueueIds());
+    assertEquals(500000, jobQueues.getQueueMaxSize(cg1));
+    assertEquals(500000, jobQueues.getQueueMaxSize(cg2));
+  }
 }

Reply via email to