This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 2a12b13429 Used data size instead of entry size of compaction queue 
(#5252)
2a12b13429 is described below

commit 2a12b1342957d8af5ee7211b1bafbc25af820e9c
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Thu Jan 16 17:59:10 2025 -0500

    Used data size instead of entry size of compaction queue (#5252)
    
    Modified the compaction queue limit to use the data size of the
    compaction jobs instead of the number of compaction jobs for limiting
    the number of compaction jobs buffered.
    
    fixes #5186
---
 .../org/apache/accumulo/core/conf/Property.java    |  16 ++-
 .../miniclusterImpl/MiniAccumuloConfigImpl.java    |   4 +-
 .../coordinator/CompactionCoordinator.java         |  17 +--
 .../queue/CompactionJobPriorityQueue.java          |  21 ++--
 .../compaction/queue/CompactionJobQueues.java      |  12 +-
 .../compaction/queue/SizeTrackingTreeMap.java      | 129 +++++++++++++++++++++
 .../queue/CompactionJobPriorityQueueTest.java      |  17 ++-
 .../compaction/queue/CompactionJobQueuesTest.java  |   8 +-
 .../compaction/queue/SizeTrackingTreeMapTest.java  | 113 ++++++++++++++++++
 .../CompactionPriorityQueueMetricsIT.java          |   4 +-
 10 files changed, 285 insertions(+), 56 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index c5bcdf4d45..555785f580 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -465,14 +465,12 @@ public enum Property {
       "The number of threads used to seed fate split task, the actual split 
work is done by fate"
           + " threads.",
       "4.0.0"),
-
-  MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE(
-      "manager.compaction.major.service.queue.initial.size", "10000", 
PropertyType.COUNT,
-      "The initial size of each resource groups compaction job priority 
queue.", "4.0.0"),
-  MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE_FACTOR(
-      "manager.compaction.major.service.queue.size.factor", "3.0", 
PropertyType.FRACTION,
-      "The dynamic resizing of the compaction job priority queue is based on"
-          + " the number of compactors for the group multiplied by this 
factor.",
+  
MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE("manager.compaction.major.service.queue.size",
+      "1M", PropertyType.MEMORY,
+      "The data size of each resource groups compaction job priority queue.  
The memory size of "
+          + "each compaction job is estimated and the sum of these sizes per 
resource group will not "
+          + "exceed this setting. When the size is exceeded the lowest 
priority jobs are dropped as "
+          + "needed.",
       "4.0.0"),
   SPLIT_PREFIX("split.", null, PropertyType.PREFIX,
       "System wide properties related to splitting tablets.", "3.1.0"),
@@ -1460,7 +1458,7 @@ public enum Property {
       RPC_MAX_MESSAGE_SIZE,
 
       // compaction coordiantor properties
-      MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE,
+      MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE,
 
       // block cache options
       GENERAL_CACHE_MANAGER_IMPL, TSERV_DATACACHE_SIZE, TSERV_INDEXCACHE_SIZE,
diff --git 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
index 5b81fac468..b5c6667519 100644
--- 
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
+++ 
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
@@ -188,8 +188,8 @@ public class MiniAccumuloConfigImpl {
 
       mergeProp(Property.COMPACTOR_PORTSEARCH.getKey(), "true");
 
-      
mergeProp(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE.getKey(),
-          
Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE.getDefaultValue());
+      
mergeProp(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE.getKey(),
+          
Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE.getDefaultValue());
       mergeProp(Property.COMPACTION_SERVICE_DEFAULT_PLANNER.getKey(),
           Property.COMPACTION_SERVICE_DEFAULT_PLANNER.getDefaultValue());
 
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 4dc42c1137..f1e6f54a5c 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -194,7 +194,7 @@ public class CompactionCoordinator
   private final Manager manager;
 
   private final LoadingCache<String,Integer> compactorCounts;
-  private final int jobQueueInitialSize;
+  private final long jobQueueInitialSize;
 
   private volatile long coordinatorStartTime;
 
@@ -208,8 +208,8 @@ public class CompactionCoordinator
     this.security = security;
     this.manager = Objects.requireNonNull(manager);
 
-    this.jobQueueInitialSize = ctx.getConfiguration()
-        
.getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE);
+    this.jobQueueInitialSize =
+        
ctx.getConfiguration().getAsBytes(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE);
 
     this.jobQueues = new CompactionJobQueues(jobQueueInitialSize);
 
@@ -1121,8 +1121,6 @@ public class CompactionCoordinator
     final String compactorQueuesPath = this.ctx.getZooKeeperRoot() + 
Constants.ZCOMPACTORS;
 
     final var zoorw = this.ctx.getZooSession().asReaderWriter();
-    final double queueSizeFactor = ctx.getConfiguration()
-        
.getFraction(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE_FACTOR);
 
     try {
       var groups = zoorw.getChildren(compactorQueuesPath);
@@ -1139,7 +1137,6 @@ public class CompactionCoordinator
           CompactionJobPriorityQueue queue = getJobQueues().getQueue(cgid);
           if (queue != null) {
             queue.clearIfInactive(Duration.ofMinutes(10));
-            queue.setMaxSize(this.jobQueueInitialSize);
           }
         } else {
           int aliveCompactorsForGroup = 0;
@@ -1152,16 +1149,8 @@ public class CompactionCoordinator
               aliveCompactorsForGroup++;
             }
           }
-          CompactionJobPriorityQueue queue = getJobQueues().getQueue(cgid);
-          if (queue != null) {
-            queue.setMaxSize(Math.min(
-                Math.max(1, (int) (aliveCompactorsForGroup * 
queueSizeFactor)), Integer.MAX_VALUE));
-          }
-
         }
-
       }
-
     } catch (KeeperException | RuntimeException e) {
       LOG.warn("Failed to clean up compactors", e);
     } catch (InterruptedException e) {
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
index 1f9738dac7..f183b50b86 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
@@ -31,11 +31,9 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -116,8 +114,8 @@ public class CompactionJobPriorityQueue {
   // behavior is not supported with a PriorityQueue. Second a PriorityQueue 
does not support
   // efficiently removing entries from anywhere in the queue. Efficient 
removal is needed for the
   // case where tablets decided to issues different compaction jobs than what 
is currently queued.
-  private final TreeMap<CjpqKey,CompactionJobQueues.MetaJob> jobQueue;
-  private final AtomicInteger maxSize;
+  private final SizeTrackingTreeMap<CjpqKey,CompactionJobQueues.MetaJob> 
jobQueue;
+  private final AtomicLong maxSize;
   private final AtomicLong rejectedJobs;
   private final AtomicLong dequeuedJobs;
   private final ArrayDeque<CompletableFuture<CompactionJobQueues.MetaJob>> 
futures;
@@ -142,9 +140,10 @@ public class CompactionJobPriorityQueue {
 
   private final AtomicLong nextSeq = new AtomicLong(0);
 
-  public CompactionJobPriorityQueue(CompactorGroupId groupId, int maxSize) {
-    this.jobQueue = new TreeMap<>();
-    this.maxSize = new AtomicInteger(maxSize);
+  public CompactionJobPriorityQueue(CompactorGroupId groupId, long maxSize,
+      SizeTrackingTreeMap.Weigher<CompactionJobQueues.MetaJob> weigher) {
+    this.jobQueue = new SizeTrackingTreeMap<>(weigher);
+    this.maxSize = new AtomicLong(maxSize);
     this.tabletJobs = new HashMap<>();
     this.groupId = groupId;
     this.rejectedJobs = new AtomicLong(0);
@@ -230,11 +229,11 @@ public class CompactionJobPriorityQueue {
     return jobsAdded;
   }
 
-  public synchronized int getMaxSize() {
+  public synchronized long getMaxSize() {
     return maxSize.get();
   }
 
-  public synchronized void setMaxSize(int maxSize) {
+  public synchronized void setMaxSize(long maxSize) {
     Preconditions.checkArgument(maxSize > 0,
         "Maximum size of the Compaction job priority queue must be greater 
than 0");
     this.maxSize.set(maxSize);
@@ -249,7 +248,7 @@ public class CompactionJobPriorityQueue {
   }
 
   public synchronized long getQueuedJobs() {
-    return jobQueue.size();
+    return jobQueue.entrySize();
   }
 
   public synchronized long getLowestPriority() {
@@ -332,7 +331,7 @@ public class CompactionJobPriorityQueue {
   }
 
   private CjpqKey addJobToQueue(TabletMetadata tabletMetadata, CompactionJob 
job) {
-    if (jobQueue.size() >= maxSize.get()) {
+    if (jobQueue.dataSize() >= maxSize.get()) {
       var lastEntry = jobQueue.lastKey();
       if (job.getPriority() <= lastEntry.job.getPriority()) {
         // the queue is full and this job has a lower or same priority than 
the lowest job in the
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
index b9fe1ed424..2e2dc3cef9 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
@@ -48,18 +48,20 @@ public class CompactionJobQueues {
   private final ConcurrentHashMap<CompactorGroupId,CompactionJobPriorityQueue> 
priorityQueues =
       new ConcurrentHashMap<>();
 
-  private final int queueSize;
+  private final long queueSize;
 
   private final Map<DataLevel,AtomicLong> currentGenerations;
 
-  public CompactionJobQueues(int queueSize) {
+  private SizeTrackingTreeMap.Weigher<MetaJob> weigher =
+      val -> val.getTabletMetadata().toString().length() + 
val.getJob().toString().length();
+
+  public CompactionJobQueues(long queueSize) {
     this.queueSize = queueSize;
     Map<DataLevel,AtomicLong> cg = new EnumMap<>(DataLevel.class);
     for (var level : DataLevel.values()) {
       cg.put(level, new AtomicLong());
     }
     currentGenerations = Collections.unmodifiableMap(cg);
-
   }
 
   public void beginFullScan(DataLevel level) {
@@ -164,7 +166,7 @@ public class CompactionJobQueues {
    */
   public CompletableFuture<MetaJob> getAsync(CompactorGroupId groupId) {
     var pq = priorityQueues.computeIfAbsent(groupId,
-        gid -> new CompactionJobPriorityQueue(gid, queueSize));
+        gid -> new CompactionJobPriorityQueue(gid, queueSize, weigher));
     return pq.getAsync();
   }
 
@@ -187,7 +189,7 @@ public class CompactionJobQueues {
     }
 
     var pq = priorityQueues.computeIfAbsent(groupId,
-        gid -> new CompactionJobPriorityQueue(gid, queueSize));
+        gid -> new CompactionJobPriorityQueue(gid, queueSize, weigher));
     pq.add(tabletMetadata, jobs,
         
currentGenerations.get(DataLevel.of(tabletMetadata.getTableId())).get());
   }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMap.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMap.java
new file mode 100644
index 0000000000..306a56fb64
--- /dev/null
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMap.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.compaction.queue;
+
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class wraps a treemap and tracks the data size of everything added and 
removed from the
+ * treemap.
+ */
+class SizeTrackingTreeMap<K,V> {
+
+  private static class ValueWrapper<V2> {
+    final V2 val;
+    final long computedSize;
+
+    private ValueWrapper(V2 val, long computedSize) {
+      this.val = val;
+      this.computedSize = computedSize;
+    }
+  }
+
+  private final TreeMap<K,ValueWrapper<V>> map = new TreeMap<>();
+  private long dataSize = 0;
+  private Weigher<V> weigher;
+
+  private Map.Entry<K,V> unwrap(Map.Entry<K,ValueWrapper<V>> wrapperEntry) {
+    if (wrapperEntry == null) {
+      return null;
+    }
+    return new AbstractMap.SimpleImmutableEntry<>(wrapperEntry.getKey(),
+        wrapperEntry.getValue().val);
+  }
+
+  private void incrementDataSize(ValueWrapper<V> val) {
+    Preconditions.checkState(dataSize >= 0);
+    dataSize += val.computedSize;
+  }
+
+  private void decrementDataSize(Map.Entry<K,ValueWrapper<V>> entry) {
+    if (entry != null) {
+      decrementDataSize(entry.getValue());
+    }
+  }
+
+  private void decrementDataSize(ValueWrapper<V> val) {
+    if (val != null) {
+      Preconditions.checkState(dataSize >= val.computedSize);
+      dataSize -= val.computedSize;
+    }
+  }
+
+  interface Weigher<V2> {
+    long weigh(V2 val);
+  }
+
+  public SizeTrackingTreeMap(Weigher<V> weigher) {
+    this.weigher = weigher;
+  }
+
+  public boolean isEmpty() {
+    return map.isEmpty();
+  }
+
+  public long dataSize() {
+    return dataSize;
+  }
+
+  public int entrySize() {
+    return map.size();
+  }
+
+  public K lastKey() {
+    return map.lastKey();
+  }
+
+  public Map.Entry<K,V> firstEntry() {
+    return unwrap(map.firstEntry());
+  }
+
+  public void remove(K key) {
+    var prev = map.remove(key);
+    decrementDataSize(prev);
+  }
+
+  public Map.Entry<K,V> pollFirstEntry() {
+    var first = map.pollFirstEntry();
+    decrementDataSize(first);
+    return unwrap(first);
+  }
+
+  public Map.Entry<K,V> pollLastEntry() {
+    var last = map.pollLastEntry();
+    decrementDataSize(last);
+    return unwrap(last);
+  }
+
+  public void put(K key, V val) {
+    var wrapped = new ValueWrapper<>(val, weigher.weigh(val));
+    var prev = map.put(key, wrapped);
+    decrementDataSize(prev);
+    incrementDataSize(wrapped);
+  }
+
+  public void clear() {
+    map.clear();
+    dataSize = 0;
+  }
+}
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
index 37213cdc48..01c1879875 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
@@ -74,7 +74,7 @@ public class CompactionJobPriorityQueueTest {
 
     EasyMock.replay(tm, cj1, cj2);
 
-    CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 
2);
+    CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 
2, mj -> 1);
     assertEquals(1, queue.add(tm, List.of(cj1), 1L));
 
     MetaJob job = queue.peek();
@@ -129,7 +129,7 @@ public class CompactionJobPriorityQueueTest {
 
     EasyMock.replay(tm, cj1, cj2);
 
-    CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 
2);
+    CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 
2, mj -> 1);
     assertEquals(2, queue.add(tm, List.of(cj1, cj2), 1L));
 
     EasyMock.verify(tm, cj1, cj2);
@@ -186,7 +186,7 @@ public class CompactionJobPriorityQueueTest {
 
     EasyMock.replay(tm, cj1, cj2, cj3);
 
-    CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 
2);
+    CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 
2, mj -> 1);
     assertEquals(2, queue.add(tm, List.of(cj1, cj2, cj3), 1L));
 
     EasyMock.verify(tm, cj1, cj2, cj3);
@@ -247,7 +247,7 @@ public class CompactionJobPriorityQueueTest {
 
     TreeSet<CompactionJob> expected = new 
TreeSet<>(CompactionJobPrioritizer.JOB_COMPARATOR);
 
-    CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 
100);
+    CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 
1000, mj -> 10);
 
     // create and add 1000 jobs
     for (int x = 0; x < 1000; x++) {
@@ -256,7 +256,7 @@ public class CompactionJobPriorityQueueTest {
       expected.add(pair.getSecond());
     }
 
-    assertEquals(100, queue.getMaxSize());
+    assertEquals(1000, queue.getMaxSize());
     assertEquals(100, queue.getQueuedJobs());
     assertEquals(900, queue.getRejectedJobs());
     // There should be 1000 total job ages even though 900 were rejected
@@ -268,7 +268,7 @@ public class CompactionJobPriorityQueueTest {
     assertTrue(stats.getMaxAge().toMillis() > 0);
     assertTrue(stats.getAvgAge().toMillis() > 0);
 
-    // iterate over the expected set and make sure that they next job in the 
queue
+    // iterate over the expected set and make sure that the next job in the 
queue
     // matches
     int matchesSeen = 0;
     for (CompactionJob expectedJob : expected) {
@@ -312,7 +312,7 @@ public class CompactionJobPriorityQueueTest {
    */
   @Test
   public void testAsyncCancelCleanup() {
-    CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 
100);
+    CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 
100, mj -> 1);
 
     List<CompletableFuture<MetaJob>> futures = new ArrayList<>();
 
@@ -342,7 +342,7 @@ public class CompactionJobPriorityQueueTest {
 
   @Test
   public void testChangeMaxSize() {
-    CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 
100);
+    CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 
100, mj -> 1);
     assertEquals(100, queue.getMaxSize());
     queue.setMaxSize(50);
     assertEquals(50, queue.getMaxSize());
@@ -351,5 +351,4 @@ public class CompactionJobPriorityQueueTest {
     // Make sure previous value was not changed after invalid setting
     assertEquals(50, queue.getMaxSize());
   }
-
 }
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
index 3d8933fa38..f63e56cc49 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
@@ -81,7 +81,7 @@ public class CompactionJobQueuesTest {
     var cg2 = CompactorGroupId.of("CG2");
     var cg3 = CompactorGroupId.of("CG3");
 
-    CompactionJobQueues jobQueues = new CompactionJobQueues(100);
+    CompactionJobQueues jobQueues = new CompactionJobQueues(1000000);
 
     jobQueues.beginFullScan(DataLevel.USER);
 
@@ -247,7 +247,7 @@ public class CompactionJobQueuesTest {
 
     var cg1 = CompactorGroupId.of("CG1");
 
-    CompactionJobQueues jobQueues = new CompactionJobQueues(100);
+    CompactionJobQueues jobQueues = new CompactionJobQueues(1000000);
 
     jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1)));
     jobQueues.add(tm2, List.of(newJob((short) 2, 6, cg1)));
@@ -283,7 +283,7 @@ public class CompactionJobQueuesTest {
 
     final int numToAdd = 100_000;
 
-    CompactionJobQueues jobQueues = new CompactionJobQueues(numToAdd + 1);
+    CompactionJobQueues jobQueues = new CompactionJobQueues(10000000);
     CompactorGroupId[] groups =
         Stream.of("G1", "G2", 
"G3").map(CompactorGroupId::of).toArray(CompactorGroupId[]::new);
 
@@ -342,7 +342,7 @@ public class CompactionJobQueuesTest {
 
   @Test
   public void testGetAsync() throws Exception {
-    CompactionJobQueues jobQueues = new CompactionJobQueues(100);
+    CompactionJobQueues jobQueues = new CompactionJobQueues(1000000);
 
     var tid = TableId.of("1");
     var extent1 = new KeyExtent(tid, new Text("z"), new Text("q"));
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMapTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMapTest.java
new file mode 100644
index 0000000000..384363228f
--- /dev/null
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMapTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.compaction.queue;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+
+import org.junit.jupiter.api.Test;
+
+public class SizeTrackingTreeMapTest {
+  @Test
+  public void testSizeTracking() {
+    List<String> computeSizeCalls = new ArrayList<>();
+    var stmap = new SizeTrackingTreeMap<Integer,String>(val -> {
+      computeSizeCalls.add(val);
+      return val.length();
+    });
+
+    TreeMap<Integer,String> expected = new TreeMap<>();
+
+    check(expected, stmap);
+    assertEquals(List.of(), computeSizeCalls);
+
+    stmap.put(3, "1234567890");
+    expected.put(3, "1234567890");
+    check(expected, stmap);
+    assertEquals(List.of("1234567890"), computeSizeCalls);
+
+    stmap.put(4, "12345");
+    expected.put(4, "12345");
+    check(expected, stmap);
+    assertEquals(List.of("1234567890", "12345"), computeSizeCalls);
+
+    // remove a key that does not exist
+    stmap.remove(2);
+    expected.remove(2);
+    check(expected, stmap);
+    assertEquals(List.of("1234567890", "12345"), computeSizeCalls);
+
+    // remove a key that does exist
+    stmap.remove(3);
+    expected.remove(3);
+    check(expected, stmap);
+    assertEquals(List.of("1234567890", "12345"), computeSizeCalls);
+
+    // update an existing key, should decrement the old size and increment the 
new size
+    stmap.put(4, "123");
+    expected.put(4, "123");
+    check(expected, stmap);
+    assertEquals(List.of("1234567890", "12345", "123"), computeSizeCalls);
+
+    stmap.put(7, "123456789012345");
+    expected.put(7, "123456789012345");
+    check(expected, stmap);
+    assertEquals(List.of("1234567890", "12345", "123", "123456789012345"), 
computeSizeCalls);
+
+    stmap.put(11, "1234567");
+    expected.put(11, "1234567");
+    check(expected, stmap);
+    assertEquals(List.of("1234567890", "12345", "123", "123456789012345", 
"1234567"),
+        computeSizeCalls);
+
+    assertEquals(expected.pollFirstEntry(), stmap.pollFirstEntry());
+    check(expected, stmap);
+    assertEquals(List.of("1234567890", "12345", "123", "123456789012345", 
"1234567"),
+        computeSizeCalls);
+
+    assertEquals(expected.pollLastEntry(), stmap.pollLastEntry());
+    check(expected, stmap);
+    assertEquals(List.of("1234567890", "12345", "123", "123456789012345", 
"1234567"),
+        computeSizeCalls);
+
+    expected.clear();
+    stmap.clear();
+    check(expected, stmap);
+    assertEquals(List.of("1234567890", "12345", "123", "123456789012345", 
"1234567"),
+        computeSizeCalls);
+  }
+
+  private void check(TreeMap<Integer,String> expected, 
SizeTrackingTreeMap<Integer,String> stmap) {
+    long expectedDataSize = 
expected.values().stream().mapToLong(String::length).sum();
+    assertEquals(expectedDataSize, stmap.dataSize());
+    assertEquals(expected.size(), stmap.entrySize());
+    assertEquals(expected.isEmpty(), stmap.isEmpty());
+    assertEquals(expected.firstEntry(), stmap.firstEntry());
+    if (expected.isEmpty()) {
+      assertThrows(NoSuchElementException.class, stmap::lastKey);
+    } else {
+      assertEquals(expected.lastKey(), stmap.lastKey());
+    }
+  }
+}
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
index bdee45f935..2a03256595 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
@@ -109,7 +109,7 @@ public class CompactionPriorityQueueMetricsIT extends 
SharedMiniClusterBase {
   public static final String QUEUE1 = "METRICSQ1";
   public static final String QUEUE1_METRIC_LABEL = 
MetricsUtil.formatString(QUEUE1);
   public static final String QUEUE1_SERVICE = "Q1";
-  public static final int QUEUE1_SIZE = 6;
+  public static final int QUEUE1_SIZE = 10 * 1024;
 
   // Metrics collector Thread
   final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new 
LinkedBlockingQueue<>();
@@ -202,7 +202,7 @@ public class CompactionPriorityQueueMetricsIT extends 
SharedMiniClusterBase {
           Property.COMPACTION_SERVICE_PREFIX.getKey() + QUEUE1_SERVICE + 
".planner.opts.groups",
           "[{'group':'" + QUEUE1 + "'}]");
 
-      
cfg.setProperty(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE,
 "6");
+      cfg.setProperty(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE, 
"10K");
       cfg.getClusterServerConfiguration().addCompactorResourceGroup(QUEUE1, 0);
 
       // This test waits for dead compactors to be absent in zookeeper. The 
following setting will

Reply via email to