This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 2a12b13429 Used data size instead of entry size of compaction queue
(#5252)
2a12b13429 is described below
commit 2a12b1342957d8af5ee7211b1bafbc25af820e9c
Author: Keith Turner <[email protected]>
AuthorDate: Thu Jan 16 17:59:10 2025 -0500
Used data size instead of entry size of compaction queue (#5252)
Modified the compaction queue limit to use the data size of the
compaction jobs instead of the number of compaction jobs for limiting
the number of compaction jobs buffered.
fixes #5186
---
.../org/apache/accumulo/core/conf/Property.java | 16 ++-
.../miniclusterImpl/MiniAccumuloConfigImpl.java | 4 +-
.../coordinator/CompactionCoordinator.java | 17 +--
.../queue/CompactionJobPriorityQueue.java | 21 ++--
.../compaction/queue/CompactionJobQueues.java | 12 +-
.../compaction/queue/SizeTrackingTreeMap.java | 129 +++++++++++++++++++++
.../queue/CompactionJobPriorityQueueTest.java | 17 ++-
.../compaction/queue/CompactionJobQueuesTest.java | 8 +-
.../compaction/queue/SizeTrackingTreeMapTest.java | 113 ++++++++++++++++++
.../CompactionPriorityQueueMetricsIT.java | 4 +-
10 files changed, 285 insertions(+), 56 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index c5bcdf4d45..555785f580 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -465,14 +465,12 @@ public enum Property {
"The number of threads used to seed fate split task, the actual split
work is done by fate"
+ " threads.",
"4.0.0"),
-
- MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE(
- "manager.compaction.major.service.queue.initial.size", "10000",
PropertyType.COUNT,
- "The initial size of each resource groups compaction job priority
queue.", "4.0.0"),
- MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE_FACTOR(
- "manager.compaction.major.service.queue.size.factor", "3.0",
PropertyType.FRACTION,
- "The dynamic resizing of the compaction job priority queue is based on"
- + " the number of compactors for the group multiplied by this
factor.",
+
MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE("manager.compaction.major.service.queue.size",
+ "1M", PropertyType.MEMORY,
+ "The data size of each resource groups compaction job priority queue.
The memory size of "
+ + "each compaction job is estimated and the sum of these sizes per
resource group will not "
+ + "exceed this setting. When the size is exceeded the lowest
priority jobs are dropped as "
+ + "needed.",
"4.0.0"),
SPLIT_PREFIX("split.", null, PropertyType.PREFIX,
"System wide properties related to splitting tablets.", "3.1.0"),
@@ -1460,7 +1458,7 @@ public enum Property {
RPC_MAX_MESSAGE_SIZE,
// compaction coordiantor properties
- MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE,
+ MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE,
// block cache options
GENERAL_CACHE_MANAGER_IMPL, TSERV_DATACACHE_SIZE, TSERV_INDEXCACHE_SIZE,
diff --git
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
index 5b81fac468..b5c6667519 100644
---
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
+++
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
@@ -188,8 +188,8 @@ public class MiniAccumuloConfigImpl {
mergeProp(Property.COMPACTOR_PORTSEARCH.getKey(), "true");
-
mergeProp(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE.getKey(),
-
Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE.getDefaultValue());
+
mergeProp(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE.getKey(),
+
Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE.getDefaultValue());
mergeProp(Property.COMPACTION_SERVICE_DEFAULT_PLANNER.getKey(),
Property.COMPACTION_SERVICE_DEFAULT_PLANNER.getDefaultValue());
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 4dc42c1137..f1e6f54a5c 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -194,7 +194,7 @@ public class CompactionCoordinator
private final Manager manager;
private final LoadingCache<String,Integer> compactorCounts;
- private final int jobQueueInitialSize;
+ private final long jobQueueInitialSize;
private volatile long coordinatorStartTime;
@@ -208,8 +208,8 @@ public class CompactionCoordinator
this.security = security;
this.manager = Objects.requireNonNull(manager);
- this.jobQueueInitialSize = ctx.getConfiguration()
-
.getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE);
+ this.jobQueueInitialSize =
+
ctx.getConfiguration().getAsBytes(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE);
this.jobQueues = new CompactionJobQueues(jobQueueInitialSize);
@@ -1121,8 +1121,6 @@ public class CompactionCoordinator
final String compactorQueuesPath = this.ctx.getZooKeeperRoot() +
Constants.ZCOMPACTORS;
final var zoorw = this.ctx.getZooSession().asReaderWriter();
- final double queueSizeFactor = ctx.getConfiguration()
-
.getFraction(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE_FACTOR);
try {
var groups = zoorw.getChildren(compactorQueuesPath);
@@ -1139,7 +1137,6 @@ public class CompactionCoordinator
CompactionJobPriorityQueue queue = getJobQueues().getQueue(cgid);
if (queue != null) {
queue.clearIfInactive(Duration.ofMinutes(10));
- queue.setMaxSize(this.jobQueueInitialSize);
}
} else {
int aliveCompactorsForGroup = 0;
@@ -1152,16 +1149,8 @@ public class CompactionCoordinator
aliveCompactorsForGroup++;
}
}
- CompactionJobPriorityQueue queue = getJobQueues().getQueue(cgid);
- if (queue != null) {
- queue.setMaxSize(Math.min(
- Math.max(1, (int) (aliveCompactorsForGroup *
queueSizeFactor)), Integer.MAX_VALUE));
- }
-
}
-
}
-
} catch (KeeperException | RuntimeException e) {
LOG.warn("Failed to clean up compactors", e);
} catch (InterruptedException e) {
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
index 1f9738dac7..f183b50b86 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
@@ -31,11 +31,9 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -116,8 +114,8 @@ public class CompactionJobPriorityQueue {
// behavior is not supported with a PriorityQueue. Second a PriorityQueue
does not support
// efficiently removing entries from anywhere in the queue. Efficient
removal is needed for the
// case where tablets decided to issues different compaction jobs than what
is currently queued.
- private final TreeMap<CjpqKey,CompactionJobQueues.MetaJob> jobQueue;
- private final AtomicInteger maxSize;
+ private final SizeTrackingTreeMap<CjpqKey,CompactionJobQueues.MetaJob>
jobQueue;
+ private final AtomicLong maxSize;
private final AtomicLong rejectedJobs;
private final AtomicLong dequeuedJobs;
private final ArrayDeque<CompletableFuture<CompactionJobQueues.MetaJob>>
futures;
@@ -142,9 +140,10 @@ public class CompactionJobPriorityQueue {
private final AtomicLong nextSeq = new AtomicLong(0);
- public CompactionJobPriorityQueue(CompactorGroupId groupId, int maxSize) {
- this.jobQueue = new TreeMap<>();
- this.maxSize = new AtomicInteger(maxSize);
+ public CompactionJobPriorityQueue(CompactorGroupId groupId, long maxSize,
+ SizeTrackingTreeMap.Weigher<CompactionJobQueues.MetaJob> weigher) {
+ this.jobQueue = new SizeTrackingTreeMap<>(weigher);
+ this.maxSize = new AtomicLong(maxSize);
this.tabletJobs = new HashMap<>();
this.groupId = groupId;
this.rejectedJobs = new AtomicLong(0);
@@ -230,11 +229,11 @@ public class CompactionJobPriorityQueue {
return jobsAdded;
}
- public synchronized int getMaxSize() {
+ public synchronized long getMaxSize() {
return maxSize.get();
}
- public synchronized void setMaxSize(int maxSize) {
+ public synchronized void setMaxSize(long maxSize) {
Preconditions.checkArgument(maxSize > 0,
"Maximum size of the Compaction job priority queue must be greater
than 0");
this.maxSize.set(maxSize);
@@ -249,7 +248,7 @@ public class CompactionJobPriorityQueue {
}
public synchronized long getQueuedJobs() {
- return jobQueue.size();
+ return jobQueue.entrySize();
}
public synchronized long getLowestPriority() {
@@ -332,7 +331,7 @@ public class CompactionJobPriorityQueue {
}
private CjpqKey addJobToQueue(TabletMetadata tabletMetadata, CompactionJob
job) {
- if (jobQueue.size() >= maxSize.get()) {
+ if (jobQueue.dataSize() >= maxSize.get()) {
var lastEntry = jobQueue.lastKey();
if (job.getPriority() <= lastEntry.job.getPriority()) {
// the queue is full and this job has a lower or same priority than
the lowest job in the
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
index b9fe1ed424..2e2dc3cef9 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
@@ -48,18 +48,20 @@ public class CompactionJobQueues {
private final ConcurrentHashMap<CompactorGroupId,CompactionJobPriorityQueue>
priorityQueues =
new ConcurrentHashMap<>();
- private final int queueSize;
+ private final long queueSize;
private final Map<DataLevel,AtomicLong> currentGenerations;
- public CompactionJobQueues(int queueSize) {
+ private SizeTrackingTreeMap.Weigher<MetaJob> weigher =
+ val -> val.getTabletMetadata().toString().length() +
val.getJob().toString().length();
+
+ public CompactionJobQueues(long queueSize) {
this.queueSize = queueSize;
Map<DataLevel,AtomicLong> cg = new EnumMap<>(DataLevel.class);
for (var level : DataLevel.values()) {
cg.put(level, new AtomicLong());
}
currentGenerations = Collections.unmodifiableMap(cg);
-
}
public void beginFullScan(DataLevel level) {
@@ -164,7 +166,7 @@ public class CompactionJobQueues {
*/
public CompletableFuture<MetaJob> getAsync(CompactorGroupId groupId) {
var pq = priorityQueues.computeIfAbsent(groupId,
- gid -> new CompactionJobPriorityQueue(gid, queueSize));
+ gid -> new CompactionJobPriorityQueue(gid, queueSize, weigher));
return pq.getAsync();
}
@@ -187,7 +189,7 @@ public class CompactionJobQueues {
}
var pq = priorityQueues.computeIfAbsent(groupId,
- gid -> new CompactionJobPriorityQueue(gid, queueSize));
+ gid -> new CompactionJobPriorityQueue(gid, queueSize, weigher));
pq.add(tabletMetadata, jobs,
currentGenerations.get(DataLevel.of(tabletMetadata.getTableId())).get());
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMap.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMap.java
new file mode 100644
index 0000000000..306a56fb64
--- /dev/null
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMap.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.compaction.queue;
+
+import java.util.AbstractMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class wraps a treemap and tracks the data size of everything added and
removed from the
+ * treemap.
+ */
+class SizeTrackingTreeMap<K,V> {
+
+ private static class ValueWrapper<V2> {
+ final V2 val;
+ final long computedSize;
+
+ private ValueWrapper(V2 val, long computedSize) {
+ this.val = val;
+ this.computedSize = computedSize;
+ }
+ }
+
+ private final TreeMap<K,ValueWrapper<V>> map = new TreeMap<>();
+ private long dataSize = 0;
+ private Weigher<V> weigher;
+
+ private Map.Entry<K,V> unwrap(Map.Entry<K,ValueWrapper<V>> wrapperEntry) {
+ if (wrapperEntry == null) {
+ return null;
+ }
+ return new AbstractMap.SimpleImmutableEntry<>(wrapperEntry.getKey(),
+ wrapperEntry.getValue().val);
+ }
+
+ private void incrementDataSize(ValueWrapper<V> val) {
+ Preconditions.checkState(dataSize >= 0);
+ dataSize += val.computedSize;
+ }
+
+ private void decrementDataSize(Map.Entry<K,ValueWrapper<V>> entry) {
+ if (entry != null) {
+ decrementDataSize(entry.getValue());
+ }
+ }
+
+ private void decrementDataSize(ValueWrapper<V> val) {
+ if (val != null) {
+ Preconditions.checkState(dataSize >= val.computedSize);
+ dataSize -= val.computedSize;
+ }
+ }
+
+ interface Weigher<V2> {
+ long weigh(V2 val);
+ }
+
+ public SizeTrackingTreeMap(Weigher<V> weigher) {
+ this.weigher = weigher;
+ }
+
+ public boolean isEmpty() {
+ return map.isEmpty();
+ }
+
+ public long dataSize() {
+ return dataSize;
+ }
+
+ public int entrySize() {
+ return map.size();
+ }
+
+ public K lastKey() {
+ return map.lastKey();
+ }
+
+ public Map.Entry<K,V> firstEntry() {
+ return unwrap(map.firstEntry());
+ }
+
+ public void remove(K key) {
+ var prev = map.remove(key);
+ decrementDataSize(prev);
+ }
+
+ public Map.Entry<K,V> pollFirstEntry() {
+ var first = map.pollFirstEntry();
+ decrementDataSize(first);
+ return unwrap(first);
+ }
+
+ public Map.Entry<K,V> pollLastEntry() {
+ var last = map.pollLastEntry();
+ decrementDataSize(last);
+ return unwrap(last);
+ }
+
+ public void put(K key, V val) {
+ var wrapped = new ValueWrapper<>(val, weigher.weigh(val));
+ var prev = map.put(key, wrapped);
+ decrementDataSize(prev);
+ incrementDataSize(wrapped);
+ }
+
+ public void clear() {
+ map.clear();
+ dataSize = 0;
+ }
+}
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
index 37213cdc48..01c1879875 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
@@ -74,7 +74,7 @@ public class CompactionJobPriorityQueueTest {
EasyMock.replay(tm, cj1, cj2);
- CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP,
2);
+ CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP,
2, mj -> 1);
assertEquals(1, queue.add(tm, List.of(cj1), 1L));
MetaJob job = queue.peek();
@@ -129,7 +129,7 @@ public class CompactionJobPriorityQueueTest {
EasyMock.replay(tm, cj1, cj2);
- CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP,
2);
+ CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP,
2, mj -> 1);
assertEquals(2, queue.add(tm, List.of(cj1, cj2), 1L));
EasyMock.verify(tm, cj1, cj2);
@@ -186,7 +186,7 @@ public class CompactionJobPriorityQueueTest {
EasyMock.replay(tm, cj1, cj2, cj3);
- CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP,
2);
+ CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP,
2, mj -> 1);
assertEquals(2, queue.add(tm, List.of(cj1, cj2, cj3), 1L));
EasyMock.verify(tm, cj1, cj2, cj3);
@@ -247,7 +247,7 @@ public class CompactionJobPriorityQueueTest {
TreeSet<CompactionJob> expected = new
TreeSet<>(CompactionJobPrioritizer.JOB_COMPARATOR);
- CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP,
100);
+ CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP,
1000, mj -> 10);
// create and add 1000 jobs
for (int x = 0; x < 1000; x++) {
@@ -256,7 +256,7 @@ public class CompactionJobPriorityQueueTest {
expected.add(pair.getSecond());
}
- assertEquals(100, queue.getMaxSize());
+ assertEquals(1000, queue.getMaxSize());
assertEquals(100, queue.getQueuedJobs());
assertEquals(900, queue.getRejectedJobs());
// There should be 1000 total job ages even though 900 were rejected
@@ -268,7 +268,7 @@ public class CompactionJobPriorityQueueTest {
assertTrue(stats.getMaxAge().toMillis() > 0);
assertTrue(stats.getAvgAge().toMillis() > 0);
- // iterate over the expected set and make sure that they next job in the
queue
+ // iterate over the expected set and make sure that the next job in the
queue
// matches
int matchesSeen = 0;
for (CompactionJob expectedJob : expected) {
@@ -312,7 +312,7 @@ public class CompactionJobPriorityQueueTest {
*/
@Test
public void testAsyncCancelCleanup() {
- CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP,
100);
+ CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP,
100, mj -> 1);
List<CompletableFuture<MetaJob>> futures = new ArrayList<>();
@@ -342,7 +342,7 @@ public class CompactionJobPriorityQueueTest {
@Test
public void testChangeMaxSize() {
- CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP,
100);
+ CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP,
100, mj -> 1);
assertEquals(100, queue.getMaxSize());
queue.setMaxSize(50);
assertEquals(50, queue.getMaxSize());
@@ -351,5 +351,4 @@ public class CompactionJobPriorityQueueTest {
// Make sure previous value was not changed after invalid setting
assertEquals(50, queue.getMaxSize());
}
-
}
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
index 3d8933fa38..f63e56cc49 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
@@ -81,7 +81,7 @@ public class CompactionJobQueuesTest {
var cg2 = CompactorGroupId.of("CG2");
var cg3 = CompactorGroupId.of("CG3");
- CompactionJobQueues jobQueues = new CompactionJobQueues(100);
+ CompactionJobQueues jobQueues = new CompactionJobQueues(1000000);
jobQueues.beginFullScan(DataLevel.USER);
@@ -247,7 +247,7 @@ public class CompactionJobQueuesTest {
var cg1 = CompactorGroupId.of("CG1");
- CompactionJobQueues jobQueues = new CompactionJobQueues(100);
+ CompactionJobQueues jobQueues = new CompactionJobQueues(1000000);
jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1)));
jobQueues.add(tm2, List.of(newJob((short) 2, 6, cg1)));
@@ -283,7 +283,7 @@ public class CompactionJobQueuesTest {
final int numToAdd = 100_000;
- CompactionJobQueues jobQueues = new CompactionJobQueues(numToAdd + 1);
+ CompactionJobQueues jobQueues = new CompactionJobQueues(10000000);
CompactorGroupId[] groups =
Stream.of("G1", "G2",
"G3").map(CompactorGroupId::of).toArray(CompactorGroupId[]::new);
@@ -342,7 +342,7 @@ public class CompactionJobQueuesTest {
@Test
public void testGetAsync() throws Exception {
- CompactionJobQueues jobQueues = new CompactionJobQueues(100);
+ CompactionJobQueues jobQueues = new CompactionJobQueues(1000000);
var tid = TableId.of("1");
var extent1 = new KeyExtent(tid, new Text("z"), new Text("q"));
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMapTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMapTest.java
new file mode 100644
index 0000000000..384363228f
--- /dev/null
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMapTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.compaction.queue;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+
+import org.junit.jupiter.api.Test;
+
+public class SizeTrackingTreeMapTest {
+ @Test
+ public void testSizeTracking() {
+ List<String> computeSizeCalls = new ArrayList<>();
+ var stmap = new SizeTrackingTreeMap<Integer,String>(val -> {
+ computeSizeCalls.add(val);
+ return val.length();
+ });
+
+ TreeMap<Integer,String> expected = new TreeMap<>();
+
+ check(expected, stmap);
+ assertEquals(List.of(), computeSizeCalls);
+
+ stmap.put(3, "1234567890");
+ expected.put(3, "1234567890");
+ check(expected, stmap);
+ assertEquals(List.of("1234567890"), computeSizeCalls);
+
+ stmap.put(4, "12345");
+ expected.put(4, "12345");
+ check(expected, stmap);
+ assertEquals(List.of("1234567890", "12345"), computeSizeCalls);
+
+ // remove a key that does not exist
+ stmap.remove(2);
+ expected.remove(2);
+ check(expected, stmap);
+ assertEquals(List.of("1234567890", "12345"), computeSizeCalls);
+
+ // remove a key that does exist
+ stmap.remove(3);
+ expected.remove(3);
+ check(expected, stmap);
+ assertEquals(List.of("1234567890", "12345"), computeSizeCalls);
+
+ // update an existing key, should decrement the old size and increment the
new size
+ stmap.put(4, "123");
+ expected.put(4, "123");
+ check(expected, stmap);
+ assertEquals(List.of("1234567890", "12345", "123"), computeSizeCalls);
+
+ stmap.put(7, "123456789012345");
+ expected.put(7, "123456789012345");
+ check(expected, stmap);
+ assertEquals(List.of("1234567890", "12345", "123", "123456789012345"),
computeSizeCalls);
+
+ stmap.put(11, "1234567");
+ expected.put(11, "1234567");
+ check(expected, stmap);
+ assertEquals(List.of("1234567890", "12345", "123", "123456789012345",
"1234567"),
+ computeSizeCalls);
+
+ assertEquals(expected.pollFirstEntry(), stmap.pollFirstEntry());
+ check(expected, stmap);
+ assertEquals(List.of("1234567890", "12345", "123", "123456789012345",
"1234567"),
+ computeSizeCalls);
+
+ assertEquals(expected.pollLastEntry(), stmap.pollLastEntry());
+ check(expected, stmap);
+ assertEquals(List.of("1234567890", "12345", "123", "123456789012345",
"1234567"),
+ computeSizeCalls);
+
+ expected.clear();
+ stmap.clear();
+ check(expected, stmap);
+ assertEquals(List.of("1234567890", "12345", "123", "123456789012345",
"1234567"),
+ computeSizeCalls);
+ }
+
+ private void check(TreeMap<Integer,String> expected,
SizeTrackingTreeMap<Integer,String> stmap) {
+ long expectedDataSize =
expected.values().stream().mapToLong(String::length).sum();
+ assertEquals(expectedDataSize, stmap.dataSize());
+ assertEquals(expected.size(), stmap.entrySize());
+ assertEquals(expected.isEmpty(), stmap.isEmpty());
+ assertEquals(expected.firstEntry(), stmap.firstEntry());
+ if (expected.isEmpty()) {
+ assertThrows(NoSuchElementException.class, stmap::lastKey);
+ } else {
+ assertEquals(expected.lastKey(), stmap.lastKey());
+ }
+ }
+}
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
index bdee45f935..2a03256595 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java
@@ -109,7 +109,7 @@ public class CompactionPriorityQueueMetricsIT extends
SharedMiniClusterBase {
public static final String QUEUE1 = "METRICSQ1";
public static final String QUEUE1_METRIC_LABEL =
MetricsUtil.formatString(QUEUE1);
public static final String QUEUE1_SERVICE = "Q1";
- public static final int QUEUE1_SIZE = 6;
+ public static final int QUEUE1_SIZE = 10 * 1024;
// Metrics collector Thread
final LinkedBlockingQueue<TestStatsDSink.Metric> queueMetrics = new
LinkedBlockingQueue<>();
@@ -202,7 +202,7 @@ public class CompactionPriorityQueueMetricsIT extends
SharedMiniClusterBase {
Property.COMPACTION_SERVICE_PREFIX.getKey() + QUEUE1_SERVICE +
".planner.opts.groups",
"[{'group':'" + QUEUE1 + "'}]");
-
cfg.setProperty(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE,
"6");
+ cfg.setProperty(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE,
"10K");
cfg.getClusterServerConfiguration().addCompactorResourceGroup(QUEUE1, 0);
// This test waits for dead compactors to be absent in zookeeper. The
following setting will