This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 36df8cee36 removed queued jobs for a tablet if no new jobs are seen (#4394) 36df8cee36 is described below commit 36df8cee360081ca445f615cbc354e9e2ff2e6fc Author: Keith Turner <ktur...@apache.org> AuthorDate: Mon Mar 18 13:51:20 2024 -0400 removed queued jobs for a tablet if no new jobs are seen (#4394) fixes #3528 --- .../accumulo/manager/TabletGroupWatcher.java | 2 + .../queue/CompactionJobPriorityQueue.java | 56 ++++- .../compaction/queue/CompactionJobQueues.java | 35 +++- .../compaction/CompactionCoordinatorTest.java | 1 + .../queue/CompactionJobPriorityQueueTest.java | 14 +- .../compaction/queue/CompactionJobQueuesTest.java | 229 ++++++++++++++++++++- 6 files changed, 314 insertions(+), 23 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index 8779568916..3e8f1fdc5b 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -678,7 +678,9 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { eventHandler.clearNeedsFullScan(); iter = store.iterator(tableMgmtParams); + manager.getCompactionCoordinator().getJobQueues().beginFullScan(store.getLevel()); var tabletMgmtStats = manageTablets(iter, tableMgmtParams, currentTServers, true); + manager.getCompactionCoordinator().getJobQueues().endFullScan(store.getLevel()); // If currently looking for volume replacements, determine if the next round needs to look. if (lookForTabletsNeedingVolReplacement) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java index e4aa059e95..4dfd6868ad 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java @@ -23,18 +23,23 @@ import static com.google.common.base.Preconditions.checkState; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.spi.compaction.CompactionJob; import org.apache.accumulo.core.spi.compaction.CompactorGroupId; import org.apache.accumulo.core.util.compaction.CompactionJobPrioritizer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; @@ -49,6 +54,8 @@ import com.google.common.base.Preconditions; */ public class CompactionJobPriorityQueue { + private static final Logger log = LoggerFactory.getLogger(CompactionJobPriorityQueue.class); + private final CompactorGroupId groupId; private class CjpqKey implements Comparable<CjpqKey> { @@ -99,9 +106,19 @@ public class CompactionJobPriorityQueue { private final AtomicLong rejectedJobs; private final AtomicLong dequeuedJobs; + private static class TabletJobs { + final long generation; + final HashSet<CjpqKey> jobs; + + private TabletJobs(long generation, HashSet<CjpqKey> jobs) { + this.generation = generation; + this.jobs = jobs; + } + } + // This map tracks what jobs a tablet currently has in the queue. Its used to efficiently remove // jobs in the queue when new jobs are queued for a tablet. - private final Map<KeyExtent,List<CjpqKey>> tabletJobs; + private final Map<KeyExtent,TabletJobs> tabletJobs; private final AtomicLong nextSeq = new AtomicLong(0); @@ -116,10 +133,32 @@ public class CompactionJobPriorityQueue { this.dequeuedJobs = new AtomicLong(0); } + public synchronized void removeOlderGenerations(Ample.DataLevel level, long currGeneration) { + if (closed.get()) { + return; + } + + List<KeyExtent> removals = new ArrayList<>(); + + tabletJobs.forEach((extent, jobs) -> { + if (Ample.DataLevel.of(extent.tableId()) == level && jobs.generation < currGeneration) { + removals.add(extent); + } + }); + + if (!removals.isEmpty()) { + log.trace("Removed {} queued tablets that no longer need compaction for {} {}", + removals.size(), groupId, level); + } + + removals.forEach(this::removePreviousSubmissions); + } + /** * @return the number of jobs added. If the queue is closed returns -1 */ - public synchronized int add(TabletMetadata tabletMetadata, Collection<CompactionJob> jobs) { + public synchronized int add(TabletMetadata tabletMetadata, Collection<CompactionJob> jobs, + long generation) { Preconditions.checkArgument(jobs.stream().allMatch(job -> job.getGroup().equals(groupId))); if (closed.get()) { return -1; @@ -127,13 +166,13 @@ public class CompactionJobPriorityQueue { removePreviousSubmissions(tabletMetadata.getExtent()); - List<CjpqKey> newEntries = new ArrayList<>(jobs.size()); + HashSet<CjpqKey> newEntries = new HashSet<>(jobs.size()); int jobsAdded = 0; for (CompactionJob job : jobs) { CjpqKey cjqpKey = addJobToQueue(tabletMetadata, job); if (cjqpKey != null) { - newEntries.add(cjqpKey); + checkState(newEntries.add(cjqpKey)); jobsAdded++; } else { // The priority for this job was lower than all other priorities and not added @@ -143,7 +182,8 @@ public class CompactionJobPriorityQueue { } if (!newEntries.isEmpty()) { - checkState(tabletJobs.put(tabletMetadata.getExtent(), newEntries) == null); + checkState(tabletJobs.put(tabletMetadata.getExtent(), new TabletJobs(generation, newEntries)) + == null); } return jobsAdded; @@ -178,7 +218,7 @@ public class CompactionJobPriorityQueue { if (first != null) { dequeuedJobs.getAndIncrement(); var extent = first.getValue().getTabletMetadata().getExtent(); - List<CjpqKey> jobs = tabletJobs.get(extent); + Set<CjpqKey> jobs = tabletJobs.get(extent).jobs; checkState(jobs.remove(first.getKey())); if (jobs.isEmpty()) { tabletJobs.remove(extent); @@ -207,9 +247,9 @@ public class CompactionJobPriorityQueue { } private void removePreviousSubmissions(KeyExtent extent) { - List<CjpqKey> prevJobs = tabletJobs.get(extent); + TabletJobs prevJobs = tabletJobs.get(extent); if (prevJobs != null) { - prevJobs.forEach(jobQueue::remove); + prevJobs.jobs.forEach(jobQueue::remove); tabletJobs.remove(extent); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java index b47ec92675..8c46227357 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java @@ -19,10 +19,15 @@ package org.apache.accumulo.manager.compaction.queue; import java.util.Collection; +import java.util.Collections; +import java.util.EnumMap; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap.KeySetView; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.spi.compaction.CompactionJob; import org.apache.accumulo.core.spi.compaction.CompactorGroupId; @@ -46,8 +51,35 @@ public class CompactionJobQueues { private final int queueSize; + private final Map<DataLevel,AtomicLong> currentGenerations; + public CompactionJobQueues(int queueSize) { this.queueSize = queueSize; + Map<DataLevel,AtomicLong> cg = new EnumMap<>(DataLevel.class); + for (var level : DataLevel.values()) { + cg.put(level, new AtomicLong()); + } + currentGenerations = Collections.unmodifiableMap(cg); + + } + + public void beginFullScan(DataLevel level) { + currentGenerations.get(level).incrementAndGet(); + } + + /** + * The purpose of this method is to remove any tablets that were added before beginFullScan() was + * called. The purpose of this is to handle tablets that were queued for compaction for a while + * and because of some change no longer need to compact. If a full scan of the metadata table does + * not find any new work for tablet, then any previously queued work for that tablet should be + * discarded. + * + * @param level full metadata scans are done independently per DataLevel, so the tracking what + * needs to be removed must be done per DataLevel + */ + public void endFullScan(DataLevel level) { + priorityQueues.values() + .forEach(pq -> pq.removeOlderGenerations(level, currentGenerations.get(level).get())); } public void add(TabletMetadata tabletMetadata, Collection<CompactionJob> jobs) { @@ -155,7 +187,8 @@ public class CompactionJobQueues { var pq = priorityQueues.computeIfAbsent(groupId, gid -> new CompactionJobPriorityQueue(gid, queueSize)); - while (pq.add(tabletMetadata, jobs) < 0) { + while (pq.add(tabletMetadata, jobs, + currentGenerations.get(DataLevel.of(tabletMetadata.getTableId())).get()) < 0) { // When entering this loop its expected the queue is closed Preconditions.checkState(pq.isClosed()); // This loop handles race condition where poll() closes empty priority queues. The queue could diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java index 20f5cdc17e..bb75b8c65c 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java @@ -294,6 +294,7 @@ public class CompactionCoordinatorTest { TabletMetadata tm = EasyMock.createNiceMock(TabletMetadata.class); expect(tm.getExtent()).andReturn(ke).anyTimes(); expect(tm.getFiles()).andReturn(Collections.emptySet()).anyTimes(); + expect(tm.getTableId()).andReturn(ke.tableId()).anyTimes(); EasyMock.replay(tconf, context, creds, tm, security); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java index f511f35078..2e090c32a2 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java @@ -71,7 +71,7 @@ public class CompactionJobPriorityQueueTest { EasyMock.replay(tm, cj1, cj2); CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2); - assertEquals(1, queue.add(tm, List.of(cj1))); + assertEquals(1, queue.add(tm, List.of(cj1), 1L)); MetaJob job = queue.peek(); assertEquals(cj1, job.getJob()); @@ -84,7 +84,7 @@ public class CompactionJobPriorityQueueTest { assertEquals(1, queue.getQueuedJobs()); // replace the files for the same tablet - assertEquals(1, queue.add(tm, List.of(cj2))); + assertEquals(1, queue.add(tm, List.of(cj2), 1L)); job = queue.peek(); assertEquals(cj2, job.getJob()); @@ -126,7 +126,7 @@ public class CompactionJobPriorityQueueTest { EasyMock.replay(tm, cj1, cj2); CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2); - assertEquals(2, queue.add(tm, List.of(cj1, cj2))); + assertEquals(2, queue.add(tm, List.of(cj1, cj2), 1L)); EasyMock.verify(tm, cj1, cj2); @@ -183,7 +183,7 @@ public class CompactionJobPriorityQueueTest { EasyMock.replay(tm, cj1, cj2, cj3); CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2); - assertEquals(2, queue.add(tm, List.of(cj1, cj2, cj3))); + assertEquals(2, queue.add(tm, List.of(cj1, cj2, cj3), 1L)); EasyMock.verify(tm, cj1, cj2, cj3); @@ -234,7 +234,7 @@ public class CompactionJobPriorityQueueTest { EasyMock.replay(tm, cj1, cj2); CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2); - assertEquals(2, queue.add(tm, List.of(cj1, cj2))); + assertEquals(2, queue.add(tm, List.of(cj1, cj2), 1L)); assertFalse(queue.closeIfEmpty()); @@ -257,7 +257,7 @@ public class CompactionJobPriorityQueueTest { assertTrue(queue.closeIfEmpty()); - assertEquals(-1, queue.add(tm, List.of(cj1, cj2))); + assertEquals(-1, queue.add(tm, List.of(cj1, cj2), 1L)); } @@ -295,7 +295,7 @@ public class CompactionJobPriorityQueueTest { // create and add 1000 jobs for (int x = 0; x < 1000; x++) { Pair<TabletMetadata,CompactionJob> pair = createJob(); - queue.add(pair.getFirst(), Set.of(pair.getSecond())); + queue.add(pair.getFirst(), Set.of(pair.getSecond()), 1L); expected.add(pair.getSecond()); } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java index dfd956fc91..73aa404295 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java @@ -19,8 +19,10 @@ package org.apache.accumulo.manager.compaction.queue; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -33,7 +35,10 @@ import java.util.stream.Stream; import org.apache.accumulo.core.client.admin.compaction.CompactableFile; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.CompactableFileImpl; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.spi.compaction.CompactionJob; import org.apache.accumulo.core.spi.compaction.CompactionKind; @@ -44,6 +49,222 @@ import org.junit.jupiter.api.Test; public class CompactionJobQueuesTest { + private CompactionJob newJob(short prio, int file, CompactorGroupId cgi) + throws URISyntaxException { + Collection<CompactableFile> files = List + .of(new CompactableFileImpl(new URI("file://accumulo/tables//123/t-0/f" + file), 100, 100)); + return new CompactionJobImpl(prio, cgi, files, CompactionKind.SYSTEM, Optional.empty()); + } + + @Test + public void testFullScanHandling() throws Exception { + + var tid = TableId.of("1"); + var extent1 = new KeyExtent(tid, new Text("z"), new Text("q")); + var extent2 = new KeyExtent(tid, new Text("q"), new Text("l")); + var extent3 = new KeyExtent(tid, new Text("l"), new Text("c")); + var extent4 = new KeyExtent(tid, new Text("c"), new Text("a")); + + var tm1 = TabletMetadata.builder(extent1).build(); + var tm2 = TabletMetadata.builder(extent2).build(); + var tm3 = TabletMetadata.builder(extent3).build(); + var tm4 = TabletMetadata.builder(extent4).build(); + + var cg1 = CompactorGroupId.of("CG1"); + var cg2 = CompactorGroupId.of("CG2"); + var cg3 = CompactorGroupId.of("CG3"); + + CompactionJobQueues jobQueues = new CompactionJobQueues(100); + + jobQueues.beginFullScan(DataLevel.USER); + + jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1))); + jobQueues.add(tm2, List.of(newJob((short) 2, 6, cg1))); + jobQueues.add(tm3, List.of(newJob((short) 3, 7, cg1))); + jobQueues.add(tm4, List.of(newJob((short) 4, 8, cg1))); + + jobQueues.add(tm1, List.of(newJob((short) 4, 1, cg2))); + jobQueues.add(tm2, List.of(newJob((short) 3, 2, cg2))); + jobQueues.add(tm3, List.of(newJob((short) 2, 3, cg2))); + jobQueues.add(tm4, List.of(newJob((short) 1, 4, cg2))); + + jobQueues.endFullScan(DataLevel.USER); + + assertEquals(4, jobQueues.getQueuedJobs(cg1)); + assertEquals(4, jobQueues.getQueuedJobs(cg2)); + assertEquals(0, jobQueues.getQueuedJobs(cg3)); + + assertEquals(extent4, jobQueues.poll(cg1).getTabletMetadata().getExtent()); + assertEquals(extent1, jobQueues.poll(cg2).getTabletMetadata().getExtent()); + + assertEquals(3, jobQueues.getQueuedJobs(cg1)); + assertEquals(3, jobQueues.getQueuedJobs(cg2)); + assertEquals(0, jobQueues.getQueuedJobs(cg3)); + + jobQueues.beginFullScan(DataLevel.USER); + + // should still be able to poll and get things added in the last full scan + assertEquals(extent3, jobQueues.poll(cg1).getTabletMetadata().getExtent()); + assertEquals(2, jobQueues.getQueuedJobs(cg1)); + assertEquals(3, jobQueues.getQueuedJobs(cg2)); + + // add something new during the full scan + jobQueues.add(tm1, List.of(newJob((short) -7, 9, cg2))); + assertEquals(2, jobQueues.getQueuedJobs(cg1)); + assertEquals(4, jobQueues.getQueuedJobs(cg2)); + + // should still be able to poll and get things added in the last full scan + assertEquals(extent2, jobQueues.poll(cg2).getTabletMetadata().getExtent()); + assertEquals(2, jobQueues.getQueuedJobs(cg1)); + assertEquals(3, jobQueues.getQueuedJobs(cg2)); + + // this should remove anything that was added before begin full scan was called + jobQueues.endFullScan(DataLevel.USER); + + assertEquals(0, jobQueues.getQueuedJobs(cg1)); + assertEquals(1, jobQueues.getQueuedJobs(cg2)); + assertEquals(0, jobQueues.getQueuedJobs(cg3)); + + assertNull(jobQueues.poll(cg1)); + assertEquals(extent1, jobQueues.poll(cg2).getTabletMetadata().getExtent()); + + assertEquals(0, jobQueues.getQueuedJobs(cg1)); + assertEquals(0, jobQueues.getQueuedJobs(cg2)); + assertEquals(0, jobQueues.getQueuedJobs(cg3)); + + // add some things outside of a begin/end full scan calls + jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1))); + jobQueues.add(tm2, List.of(newJob((short) 2, 6, cg1))); + + jobQueues.add(tm1, List.of(newJob((short) 4, 1, cg2))); + jobQueues.add(tm2, List.of(newJob((short) 3, 2, cg2))); + + jobQueues.beginFullScan(DataLevel.USER); + + assertEquals(2, jobQueues.getQueuedJobs(cg1)); + assertEquals(2, jobQueues.getQueuedJobs(cg2)); + assertEquals(0, jobQueues.getQueuedJobs(cg3)); + + // add some things inside the begin/end full scan calls + jobQueues.add(tm3, List.of(newJob((short) 3, 7, cg1))); + jobQueues.add(tm4, List.of(newJob((short) 4, 8, cg1))); + + jobQueues.add(tm3, List.of(newJob((short) 2, 3, cg2))); + jobQueues.add(tm4, List.of(newJob((short) 1, 4, cg2))); + + assertEquals(4, jobQueues.getQueuedJobs(cg1)); + assertEquals(4, jobQueues.getQueuedJobs(cg2)); + assertEquals(0, jobQueues.getQueuedJobs(cg3)); + + // poll inside the full scan calls + assertEquals(extent4, jobQueues.poll(cg1).getTabletMetadata().getExtent()); + assertEquals(extent1, jobQueues.poll(cg2).getTabletMetadata().getExtent()); + + assertEquals(3, jobQueues.getQueuedJobs(cg1)); + assertEquals(3, jobQueues.getQueuedJobs(cg2)); + assertEquals(0, jobQueues.getQueuedJobs(cg3)); + + // should remove any tablets added before the full scan started + jobQueues.endFullScan(DataLevel.USER); + + assertEquals(1, jobQueues.getQueuedJobs(cg1)); + assertEquals(2, jobQueues.getQueuedJobs(cg2)); + assertEquals(0, jobQueues.getQueuedJobs(cg3)); + + assertEquals(extent3, jobQueues.poll(cg1).getTabletMetadata().getExtent()); + assertEquals(extent3, jobQueues.poll(cg2).getTabletMetadata().getExtent()); + assertEquals(extent4, jobQueues.poll(cg2).getTabletMetadata().getExtent()); + + assertNull(jobQueues.poll(cg1)); + assertNull(jobQueues.poll(cg2)); + assertNull(jobQueues.poll(cg3)); + + assertEquals(0, jobQueues.getQueuedJobs(cg1)); + assertEquals(0, jobQueues.getQueuedJobs(cg2)); + assertEquals(0, jobQueues.getQueuedJobs(cg3)); + + // add jobs outside of begin/end full scan + jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1))); + jobQueues.add(tm2, List.of(newJob((short) 2, 6, cg1))); + jobQueues.add(tm3, List.of(newJob((short) 3, 7, cg1))); + jobQueues.add(tm4, List.of(newJob((short) 4, 8, cg1))); + + jobQueues.add(tm1, List.of(newJob((short) 4, 1, cg2))); + jobQueues.add(tm2, List.of(newJob((short) 3, 2, cg2))); + jobQueues.add(tm3, List.of(newJob((short) 2, 3, cg2))); + jobQueues.add(tm4, List.of(newJob((short) 1, 4, cg2))); + + jobQueues.beginFullScan(DataLevel.USER); + + // readd some of the tablets added before the beginFullScan, this should prevent those tablets + // from being removed by endFullScan + jobQueues.add(tm4, List.of(newJob((short) 5, 5, cg2))); + jobQueues.add(tm1, List.of(newJob((short) -7, 5, cg2))); + + assertEquals(4, jobQueues.getQueuedJobs(cg1)); + assertEquals(4, jobQueues.getQueuedJobs(cg2)); + assertEquals(0, jobQueues.getQueuedJobs(cg3)); + + // should remove all jobs added before begin full scan + jobQueues.endFullScan(DataLevel.USER); + + assertEquals(0, jobQueues.getQueuedJobs(cg1)); + assertEquals(2, jobQueues.getQueuedJobs(cg2)); + assertEquals(0, jobQueues.getQueuedJobs(cg3)); + + // make sure we see what was added last for the tablets + assertEquals(5, jobQueues.poll(cg2).getJob().getPriority()); + assertEquals(-7, jobQueues.poll(cg2).getJob().getPriority()); + + assertEquals(0, jobQueues.getQueuedJobs(cg1)); + assertEquals(0, jobQueues.getQueuedJobs(cg2)); + assertEquals(0, jobQueues.getQueuedJobs(cg3)); + + assertNull(jobQueues.poll(cg1)); + assertNull(jobQueues.poll(cg2)); + assertNull(jobQueues.poll(cg3)); + } + + @Test + public void testFullScanLevels() throws Exception { + var tid = TableId.of("1"); + var extent1 = new KeyExtent(tid, new Text("z"), new Text("q")); + var extent2 = new KeyExtent(tid, new Text("q"), new Text("l")); + var meta = new KeyExtent(AccumuloTable.METADATA.tableId(), new Text("l"), new Text("c")); + var root = RootTable.EXTENT; + + var tm1 = TabletMetadata.builder(extent1).build(); + var tm2 = TabletMetadata.builder(extent2).build(); + var tmm = TabletMetadata.builder(meta).build(); + var tmr = TabletMetadata.builder(root).build(); + + var cg1 = CompactorGroupId.of("CG1"); + + CompactionJobQueues jobQueues = new CompactionJobQueues(100); + + jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1))); + jobQueues.add(tm2, List.of(newJob((short) 2, 6, cg1))); + jobQueues.add(tmm, List.of(newJob((short) 3, 7, cg1))); + jobQueues.add(tmr, List.of(newJob((short) 4, 8, cg1))); + + // verify that a begin and end full scan will only drop tablets in its level + + jobQueues.beginFullScan(DataLevel.ROOT); + assertEquals(4, jobQueues.getQueuedJobs(cg1)); + jobQueues.endFullScan(DataLevel.ROOT); + assertEquals(3, jobQueues.getQueuedJobs(cg1)); + + jobQueues.beginFullScan(DataLevel.USER); + assertEquals(3, jobQueues.getQueuedJobs(cg1)); + jobQueues.endFullScan(DataLevel.USER); + assertEquals(1, jobQueues.getQueuedJobs(cg1)); + + jobQueues.beginFullScan(DataLevel.METADATA); + assertEquals(1, jobQueues.getQueuedJobs(cg1)); + jobQueues.endFullScan(DataLevel.METADATA); + assertEquals(0, jobQueues.getQueuedJobs(cg1)); + } + /** * When a queue goes empty it is removed. This removal should be done safely and should not cause * any data being concurrently added to be lost. The way this test adds and removes data in @@ -94,13 +315,7 @@ public class CompactionJobQueuesTest { // queue for that extent which could throw off the counts KeyExtent extent = new KeyExtent(TableId.of("1"), new Text(i + "z"), new Text(i + "a")); TabletMetadata tm = TabletMetadata.builder(extent).build(); - - Collection<CompactableFile> files = List - .of(new CompactableFileImpl(new URI("file://accumulo/tables//123/t-0/f" + i), 100, 100)); - Collection<CompactionJob> jobs = List.of(new CompactionJobImpl((short) (i % 31), - groups[i % groups.length], files, CompactionKind.SYSTEM, Optional.empty())); - - jobQueues.add(tm, jobs); + jobQueues.add(tm, List.of(newJob((short) (i % 31), i, groups[i % groups.length]))); } // Cause the background threads to exit after polling all data