This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new 36df8cee36 removed queued jobs for a tablet if no new jobs are seen
(#4394)
36df8cee36 is described below
commit 36df8cee360081ca445f615cbc354e9e2ff2e6fc
Author: Keith Turner <[email protected]>
AuthorDate: Mon Mar 18 13:51:20 2024 -0400
removed queued jobs for a tablet if no new jobs are seen (#4394)
fixes #3528
---
.../accumulo/manager/TabletGroupWatcher.java | 2 +
.../queue/CompactionJobPriorityQueue.java | 56 ++++-
.../compaction/queue/CompactionJobQueues.java | 35 +++-
.../compaction/CompactionCoordinatorTest.java | 1 +
.../queue/CompactionJobPriorityQueueTest.java | 14 +-
.../compaction/queue/CompactionJobQueuesTest.java | 229 ++++++++++++++++++++-
6 files changed, 314 insertions(+), 23 deletions(-)
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index 8779568916..3e8f1fdc5b 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@ -678,7 +678,9 @@ abstract class TabletGroupWatcher extends
AccumuloDaemonThread {
eventHandler.clearNeedsFullScan();
iter = store.iterator(tableMgmtParams);
+
manager.getCompactionCoordinator().getJobQueues().beginFullScan(store.getLevel());
var tabletMgmtStats = manageTablets(iter, tableMgmtParams,
currentTServers, true);
+
manager.getCompactionCoordinator().getJobQueues().endFullScan(store.getLevel());
// If currently looking for volume replacements, determine if the next
round needs to look.
if (lookForTabletsNeedingVolReplacement) {
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
index e4aa059e95..4dfd6868ad 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
@@ -23,18 +23,23 @@ import static
com.google.common.base.Preconditions.checkState;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.spi.compaction.CompactionJob;
import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
import org.apache.accumulo.core.util.compaction.CompactionJobPrioritizer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
@@ -49,6 +54,8 @@ import com.google.common.base.Preconditions;
*/
public class CompactionJobPriorityQueue {
+ private static final Logger log =
LoggerFactory.getLogger(CompactionJobPriorityQueue.class);
+
private final CompactorGroupId groupId;
private class CjpqKey implements Comparable<CjpqKey> {
@@ -99,9 +106,19 @@ public class CompactionJobPriorityQueue {
private final AtomicLong rejectedJobs;
private final AtomicLong dequeuedJobs;
+ private static class TabletJobs {
+ final long generation;
+ final HashSet<CjpqKey> jobs;
+
+ private TabletJobs(long generation, HashSet<CjpqKey> jobs) {
+ this.generation = generation;
+ this.jobs = jobs;
+ }
+ }
+
// This map tracks what jobs a tablet currently has in the queue. Its used
to efficiently remove
// jobs in the queue when new jobs are queued for a tablet.
- private final Map<KeyExtent,List<CjpqKey>> tabletJobs;
+ private final Map<KeyExtent,TabletJobs> tabletJobs;
private final AtomicLong nextSeq = new AtomicLong(0);
@@ -116,10 +133,32 @@ public class CompactionJobPriorityQueue {
this.dequeuedJobs = new AtomicLong(0);
}
+ public synchronized void removeOlderGenerations(Ample.DataLevel level, long
currGeneration) {
+ if (closed.get()) {
+ return;
+ }
+
+ List<KeyExtent> removals = new ArrayList<>();
+
+ tabletJobs.forEach((extent, jobs) -> {
+ if (Ample.DataLevel.of(extent.tableId()) == level && jobs.generation <
currGeneration) {
+ removals.add(extent);
+ }
+ });
+
+ if (!removals.isEmpty()) {
+ log.trace("Removed {} queued tablets that no longer need compaction for
{} {}",
+ removals.size(), groupId, level);
+ }
+
+ removals.forEach(this::removePreviousSubmissions);
+ }
+
/**
* @return the number of jobs added. If the queue is closed returns -1
*/
- public synchronized int add(TabletMetadata tabletMetadata,
Collection<CompactionJob> jobs) {
+ public synchronized int add(TabletMetadata tabletMetadata,
Collection<CompactionJob> jobs,
+ long generation) {
Preconditions.checkArgument(jobs.stream().allMatch(job ->
job.getGroup().equals(groupId)));
if (closed.get()) {
return -1;
@@ -127,13 +166,13 @@ public class CompactionJobPriorityQueue {
removePreviousSubmissions(tabletMetadata.getExtent());
- List<CjpqKey> newEntries = new ArrayList<>(jobs.size());
+ HashSet<CjpqKey> newEntries = new HashSet<>(jobs.size());
int jobsAdded = 0;
for (CompactionJob job : jobs) {
CjpqKey cjqpKey = addJobToQueue(tabletMetadata, job);
if (cjqpKey != null) {
- newEntries.add(cjqpKey);
+ checkState(newEntries.add(cjqpKey));
jobsAdded++;
} else {
// The priority for this job was lower than all other priorities and
not added
@@ -143,7 +182,8 @@ public class CompactionJobPriorityQueue {
}
if (!newEntries.isEmpty()) {
- checkState(tabletJobs.put(tabletMetadata.getExtent(), newEntries) ==
null);
+ checkState(tabletJobs.put(tabletMetadata.getExtent(), new
TabletJobs(generation, newEntries))
+ == null);
}
return jobsAdded;
@@ -178,7 +218,7 @@ public class CompactionJobPriorityQueue {
if (first != null) {
dequeuedJobs.getAndIncrement();
var extent = first.getValue().getTabletMetadata().getExtent();
- List<CjpqKey> jobs = tabletJobs.get(extent);
+ Set<CjpqKey> jobs = tabletJobs.get(extent).jobs;
checkState(jobs.remove(first.getKey()));
if (jobs.isEmpty()) {
tabletJobs.remove(extent);
@@ -207,9 +247,9 @@ public class CompactionJobPriorityQueue {
}
private void removePreviousSubmissions(KeyExtent extent) {
- List<CjpqKey> prevJobs = tabletJobs.get(extent);
+ TabletJobs prevJobs = tabletJobs.get(extent);
if (prevJobs != null) {
- prevJobs.forEach(jobQueue::remove);
+ prevJobs.jobs.forEach(jobQueue::remove);
tabletJobs.remove(extent);
}
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
index b47ec92675..8c46227357 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
@@ -19,10 +19,15 @@
package org.apache.accumulo.manager.compaction.queue;
import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentHashMap.KeySetView;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.spi.compaction.CompactionJob;
import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
@@ -46,8 +51,35 @@ public class CompactionJobQueues {
private final int queueSize;
+ private final Map<DataLevel,AtomicLong> currentGenerations;
+
public CompactionJobQueues(int queueSize) {
this.queueSize = queueSize;
+ Map<DataLevel,AtomicLong> cg = new EnumMap<>(DataLevel.class);
+ for (var level : DataLevel.values()) {
+ cg.put(level, new AtomicLong());
+ }
+ currentGenerations = Collections.unmodifiableMap(cg);
+
+ }
+
+ public void beginFullScan(DataLevel level) {
+ currentGenerations.get(level).incrementAndGet();
+ }
+
+ /**
+ * The purpose of this method is to remove any tablets that were added
before beginFullScan() was
+ * called. The purpose of this is to handle tablets that were queued for
compaction for a while
+ * and because of some change no longer need to compact. If a full scan of
the metadata table does
+ * not find any new work for tablet, then any previously queued work for
that tablet should be
+ * discarded.
+ *
+ * @param level full metadata scans are done independently per DataLevel, so
the tracking what
+ * needs to be removed must be done per DataLevel
+ */
+ public void endFullScan(DataLevel level) {
+ priorityQueues.values()
+ .forEach(pq -> pq.removeOlderGenerations(level,
currentGenerations.get(level).get()));
}
public void add(TabletMetadata tabletMetadata, Collection<CompactionJob>
jobs) {
@@ -155,7 +187,8 @@ public class CompactionJobQueues {
var pq = priorityQueues.computeIfAbsent(groupId,
gid -> new CompactionJobPriorityQueue(gid, queueSize));
- while (pq.add(tabletMetadata, jobs) < 0) {
+ while (pq.add(tabletMetadata, jobs,
+
currentGenerations.get(DataLevel.of(tabletMetadata.getTableId())).get()) < 0) {
// When entering this loop its expected the queue is closed
Preconditions.checkState(pq.isClosed());
// This loop handles race condition where poll() closes empty priority
queues. The queue could
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
index 20f5cdc17e..bb75b8c65c 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
@@ -294,6 +294,7 @@ public class CompactionCoordinatorTest {
TabletMetadata tm = EasyMock.createNiceMock(TabletMetadata.class);
expect(tm.getExtent()).andReturn(ke).anyTimes();
expect(tm.getFiles()).andReturn(Collections.emptySet()).anyTimes();
+ expect(tm.getTableId()).andReturn(ke.tableId()).anyTimes();
EasyMock.replay(tconf, context, creds, tm, security);
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
index f511f35078..2e090c32a2 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
@@ -71,7 +71,7 @@ public class CompactionJobPriorityQueueTest {
EasyMock.replay(tm, cj1, cj2);
CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP,
2);
- assertEquals(1, queue.add(tm, List.of(cj1)));
+ assertEquals(1, queue.add(tm, List.of(cj1), 1L));
MetaJob job = queue.peek();
assertEquals(cj1, job.getJob());
@@ -84,7 +84,7 @@ public class CompactionJobPriorityQueueTest {
assertEquals(1, queue.getQueuedJobs());
// replace the files for the same tablet
- assertEquals(1, queue.add(tm, List.of(cj2)));
+ assertEquals(1, queue.add(tm, List.of(cj2), 1L));
job = queue.peek();
assertEquals(cj2, job.getJob());
@@ -126,7 +126,7 @@ public class CompactionJobPriorityQueueTest {
EasyMock.replay(tm, cj1, cj2);
CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP,
2);
- assertEquals(2, queue.add(tm, List.of(cj1, cj2)));
+ assertEquals(2, queue.add(tm, List.of(cj1, cj2), 1L));
EasyMock.verify(tm, cj1, cj2);
@@ -183,7 +183,7 @@ public class CompactionJobPriorityQueueTest {
EasyMock.replay(tm, cj1, cj2, cj3);
CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP,
2);
- assertEquals(2, queue.add(tm, List.of(cj1, cj2, cj3)));
+ assertEquals(2, queue.add(tm, List.of(cj1, cj2, cj3), 1L));
EasyMock.verify(tm, cj1, cj2, cj3);
@@ -234,7 +234,7 @@ public class CompactionJobPriorityQueueTest {
EasyMock.replay(tm, cj1, cj2);
CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP,
2);
- assertEquals(2, queue.add(tm, List.of(cj1, cj2)));
+ assertEquals(2, queue.add(tm, List.of(cj1, cj2), 1L));
assertFalse(queue.closeIfEmpty());
@@ -257,7 +257,7 @@ public class CompactionJobPriorityQueueTest {
assertTrue(queue.closeIfEmpty());
- assertEquals(-1, queue.add(tm, List.of(cj1, cj2)));
+ assertEquals(-1, queue.add(tm, List.of(cj1, cj2), 1L));
}
@@ -295,7 +295,7 @@ public class CompactionJobPriorityQueueTest {
// create and add 1000 jobs
for (int x = 0; x < 1000; x++) {
Pair<TabletMetadata,CompactionJob> pair = createJob();
- queue.add(pair.getFirst(), Set.of(pair.getSecond()));
+ queue.add(pair.getFirst(), Set.of(pair.getSecond()), 1L);
expected.add(pair.getSecond());
}
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
index dfd956fc91..73aa404295 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
@@ -19,8 +19,10 @@
package org.apache.accumulo.manager.compaction.queue;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -33,7 +35,10 @@ import java.util.stream.Stream;
import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.metadata.CompactableFileImpl;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.spi.compaction.CompactionJob;
import org.apache.accumulo.core.spi.compaction.CompactionKind;
@@ -44,6 +49,222 @@ import org.junit.jupiter.api.Test;
public class CompactionJobQueuesTest {
+ private CompactionJob newJob(short prio, int file, CompactorGroupId cgi)
+ throws URISyntaxException {
+ Collection<CompactableFile> files = List
+ .of(new CompactableFileImpl(new
URI("file://accumulo/tables//123/t-0/f" + file), 100, 100));
+ return new CompactionJobImpl(prio, cgi, files, CompactionKind.SYSTEM,
Optional.empty());
+ }
+
+ @Test
+ public void testFullScanHandling() throws Exception {
+
+ var tid = TableId.of("1");
+ var extent1 = new KeyExtent(tid, new Text("z"), new Text("q"));
+ var extent2 = new KeyExtent(tid, new Text("q"), new Text("l"));
+ var extent3 = new KeyExtent(tid, new Text("l"), new Text("c"));
+ var extent4 = new KeyExtent(tid, new Text("c"), new Text("a"));
+
+ var tm1 = TabletMetadata.builder(extent1).build();
+ var tm2 = TabletMetadata.builder(extent2).build();
+ var tm3 = TabletMetadata.builder(extent3).build();
+ var tm4 = TabletMetadata.builder(extent4).build();
+
+ var cg1 = CompactorGroupId.of("CG1");
+ var cg2 = CompactorGroupId.of("CG2");
+ var cg3 = CompactorGroupId.of("CG3");
+
+ CompactionJobQueues jobQueues = new CompactionJobQueues(100);
+
+ jobQueues.beginFullScan(DataLevel.USER);
+
+ jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1)));
+ jobQueues.add(tm2, List.of(newJob((short) 2, 6, cg1)));
+ jobQueues.add(tm3, List.of(newJob((short) 3, 7, cg1)));
+ jobQueues.add(tm4, List.of(newJob((short) 4, 8, cg1)));
+
+ jobQueues.add(tm1, List.of(newJob((short) 4, 1, cg2)));
+ jobQueues.add(tm2, List.of(newJob((short) 3, 2, cg2)));
+ jobQueues.add(tm3, List.of(newJob((short) 2, 3, cg2)));
+ jobQueues.add(tm4, List.of(newJob((short) 1, 4, cg2)));
+
+ jobQueues.endFullScan(DataLevel.USER);
+
+ assertEquals(4, jobQueues.getQueuedJobs(cg1));
+ assertEquals(4, jobQueues.getQueuedJobs(cg2));
+ assertEquals(0, jobQueues.getQueuedJobs(cg3));
+
+ assertEquals(extent4, jobQueues.poll(cg1).getTabletMetadata().getExtent());
+ assertEquals(extent1, jobQueues.poll(cg2).getTabletMetadata().getExtent());
+
+ assertEquals(3, jobQueues.getQueuedJobs(cg1));
+ assertEquals(3, jobQueues.getQueuedJobs(cg2));
+ assertEquals(0, jobQueues.getQueuedJobs(cg3));
+
+ jobQueues.beginFullScan(DataLevel.USER);
+
+ // should still be able to poll and get things added in the last full scan
+ assertEquals(extent3, jobQueues.poll(cg1).getTabletMetadata().getExtent());
+ assertEquals(2, jobQueues.getQueuedJobs(cg1));
+ assertEquals(3, jobQueues.getQueuedJobs(cg2));
+
+ // add something new during the full scan
+ jobQueues.add(tm1, List.of(newJob((short) -7, 9, cg2)));
+ assertEquals(2, jobQueues.getQueuedJobs(cg1));
+ assertEquals(4, jobQueues.getQueuedJobs(cg2));
+
+ // should still be able to poll and get things added in the last full scan
+ assertEquals(extent2, jobQueues.poll(cg2).getTabletMetadata().getExtent());
+ assertEquals(2, jobQueues.getQueuedJobs(cg1));
+ assertEquals(3, jobQueues.getQueuedJobs(cg2));
+
+ // this should remove anything that was added before begin full scan was
called
+ jobQueues.endFullScan(DataLevel.USER);
+
+ assertEquals(0, jobQueues.getQueuedJobs(cg1));
+ assertEquals(1, jobQueues.getQueuedJobs(cg2));
+ assertEquals(0, jobQueues.getQueuedJobs(cg3));
+
+ assertNull(jobQueues.poll(cg1));
+ assertEquals(extent1, jobQueues.poll(cg2).getTabletMetadata().getExtent());
+
+ assertEquals(0, jobQueues.getQueuedJobs(cg1));
+ assertEquals(0, jobQueues.getQueuedJobs(cg2));
+ assertEquals(0, jobQueues.getQueuedJobs(cg3));
+
+ // add some things outside of a begin/end full scan calls
+ jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1)));
+ jobQueues.add(tm2, List.of(newJob((short) 2, 6, cg1)));
+
+ jobQueues.add(tm1, List.of(newJob((short) 4, 1, cg2)));
+ jobQueues.add(tm2, List.of(newJob((short) 3, 2, cg2)));
+
+ jobQueues.beginFullScan(DataLevel.USER);
+
+ assertEquals(2, jobQueues.getQueuedJobs(cg1));
+ assertEquals(2, jobQueues.getQueuedJobs(cg2));
+ assertEquals(0, jobQueues.getQueuedJobs(cg3));
+
+ // add some things inside the begin/end full scan calls
+ jobQueues.add(tm3, List.of(newJob((short) 3, 7, cg1)));
+ jobQueues.add(tm4, List.of(newJob((short) 4, 8, cg1)));
+
+ jobQueues.add(tm3, List.of(newJob((short) 2, 3, cg2)));
+ jobQueues.add(tm4, List.of(newJob((short) 1, 4, cg2)));
+
+ assertEquals(4, jobQueues.getQueuedJobs(cg1));
+ assertEquals(4, jobQueues.getQueuedJobs(cg2));
+ assertEquals(0, jobQueues.getQueuedJobs(cg3));
+
+ // poll inside the full scan calls
+ assertEquals(extent4, jobQueues.poll(cg1).getTabletMetadata().getExtent());
+ assertEquals(extent1, jobQueues.poll(cg2).getTabletMetadata().getExtent());
+
+ assertEquals(3, jobQueues.getQueuedJobs(cg1));
+ assertEquals(3, jobQueues.getQueuedJobs(cg2));
+ assertEquals(0, jobQueues.getQueuedJobs(cg3));
+
+ // should remove any tablets added before the full scan started
+ jobQueues.endFullScan(DataLevel.USER);
+
+ assertEquals(1, jobQueues.getQueuedJobs(cg1));
+ assertEquals(2, jobQueues.getQueuedJobs(cg2));
+ assertEquals(0, jobQueues.getQueuedJobs(cg3));
+
+ assertEquals(extent3, jobQueues.poll(cg1).getTabletMetadata().getExtent());
+ assertEquals(extent3, jobQueues.poll(cg2).getTabletMetadata().getExtent());
+ assertEquals(extent4, jobQueues.poll(cg2).getTabletMetadata().getExtent());
+
+ assertNull(jobQueues.poll(cg1));
+ assertNull(jobQueues.poll(cg2));
+ assertNull(jobQueues.poll(cg3));
+
+ assertEquals(0, jobQueues.getQueuedJobs(cg1));
+ assertEquals(0, jobQueues.getQueuedJobs(cg2));
+ assertEquals(0, jobQueues.getQueuedJobs(cg3));
+
+ // add jobs outside of begin/end full scan
+ jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1)));
+ jobQueues.add(tm2, List.of(newJob((short) 2, 6, cg1)));
+ jobQueues.add(tm3, List.of(newJob((short) 3, 7, cg1)));
+ jobQueues.add(tm4, List.of(newJob((short) 4, 8, cg1)));
+
+ jobQueues.add(tm1, List.of(newJob((short) 4, 1, cg2)));
+ jobQueues.add(tm2, List.of(newJob((short) 3, 2, cg2)));
+ jobQueues.add(tm3, List.of(newJob((short) 2, 3, cg2)));
+ jobQueues.add(tm4, List.of(newJob((short) 1, 4, cg2)));
+
+ jobQueues.beginFullScan(DataLevel.USER);
+
+ // readd some of the tablets added before the beginFullScan, this should
prevent those tablets
+ // from being removed by endFullScan
+ jobQueues.add(tm4, List.of(newJob((short) 5, 5, cg2)));
+ jobQueues.add(tm1, List.of(newJob((short) -7, 5, cg2)));
+
+ assertEquals(4, jobQueues.getQueuedJobs(cg1));
+ assertEquals(4, jobQueues.getQueuedJobs(cg2));
+ assertEquals(0, jobQueues.getQueuedJobs(cg3));
+
+ // should remove all jobs added before begin full scan
+ jobQueues.endFullScan(DataLevel.USER);
+
+ assertEquals(0, jobQueues.getQueuedJobs(cg1));
+ assertEquals(2, jobQueues.getQueuedJobs(cg2));
+ assertEquals(0, jobQueues.getQueuedJobs(cg3));
+
+ // make sure we see what was added last for the tablets
+ assertEquals(5, jobQueues.poll(cg2).getJob().getPriority());
+ assertEquals(-7, jobQueues.poll(cg2).getJob().getPriority());
+
+ assertEquals(0, jobQueues.getQueuedJobs(cg1));
+ assertEquals(0, jobQueues.getQueuedJobs(cg2));
+ assertEquals(0, jobQueues.getQueuedJobs(cg3));
+
+ assertNull(jobQueues.poll(cg1));
+ assertNull(jobQueues.poll(cg2));
+ assertNull(jobQueues.poll(cg3));
+ }
+
+ @Test
+ public void testFullScanLevels() throws Exception {
+ var tid = TableId.of("1");
+ var extent1 = new KeyExtent(tid, new Text("z"), new Text("q"));
+ var extent2 = new KeyExtent(tid, new Text("q"), new Text("l"));
+ var meta = new KeyExtent(AccumuloTable.METADATA.tableId(), new Text("l"),
new Text("c"));
+ var root = RootTable.EXTENT;
+
+ var tm1 = TabletMetadata.builder(extent1).build();
+ var tm2 = TabletMetadata.builder(extent2).build();
+ var tmm = TabletMetadata.builder(meta).build();
+ var tmr = TabletMetadata.builder(root).build();
+
+ var cg1 = CompactorGroupId.of("CG1");
+
+ CompactionJobQueues jobQueues = new CompactionJobQueues(100);
+
+ jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1)));
+ jobQueues.add(tm2, List.of(newJob((short) 2, 6, cg1)));
+ jobQueues.add(tmm, List.of(newJob((short) 3, 7, cg1)));
+ jobQueues.add(tmr, List.of(newJob((short) 4, 8, cg1)));
+
+ // verify that a begin and end full scan will only drop tablets in its
level
+
+ jobQueues.beginFullScan(DataLevel.ROOT);
+ assertEquals(4, jobQueues.getQueuedJobs(cg1));
+ jobQueues.endFullScan(DataLevel.ROOT);
+ assertEquals(3, jobQueues.getQueuedJobs(cg1));
+
+ jobQueues.beginFullScan(DataLevel.USER);
+ assertEquals(3, jobQueues.getQueuedJobs(cg1));
+ jobQueues.endFullScan(DataLevel.USER);
+ assertEquals(1, jobQueues.getQueuedJobs(cg1));
+
+ jobQueues.beginFullScan(DataLevel.METADATA);
+ assertEquals(1, jobQueues.getQueuedJobs(cg1));
+ jobQueues.endFullScan(DataLevel.METADATA);
+ assertEquals(0, jobQueues.getQueuedJobs(cg1));
+ }
+
/**
* When a queue goes empty it is removed. This removal should be done safely
and should not cause
* any data being concurrently added to be lost. The way this test adds and
removes data in
@@ -94,13 +315,7 @@ public class CompactionJobQueuesTest {
// queue for that extent which could throw off the counts
KeyExtent extent = new KeyExtent(TableId.of("1"), new Text(i + "z"), new
Text(i + "a"));
TabletMetadata tm = TabletMetadata.builder(extent).build();
-
- Collection<CompactableFile> files = List
- .of(new CompactableFileImpl(new
URI("file://accumulo/tables//123/t-0/f" + i), 100, 100));
- Collection<CompactionJob> jobs = List.of(new CompactionJobImpl((short)
(i % 31),
- groups[i % groups.length], files, CompactionKind.SYSTEM,
Optional.empty()));
-
- jobQueues.add(tm, jobs);
+ jobQueues.add(tm, List.of(newJob((short) (i % 31), i, groups[i %
groups.length])));
}
// Cause the background threads to exit after polling all data