This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 36df8cee36 removed queued jobs for a tablet if no new jobs are seen 
(#4394)
36df8cee36 is described below

commit 36df8cee360081ca445f615cbc354e9e2ff2e6fc
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Mon Mar 18 13:51:20 2024 -0400

    removed queued jobs for a tablet if no new jobs are seen (#4394)
    
    fixes #3528
---
 .../accumulo/manager/TabletGroupWatcher.java       |   2 +
 .../queue/CompactionJobPriorityQueue.java          |  56 ++++-
 .../compaction/queue/CompactionJobQueues.java      |  35 +++-
 .../compaction/CompactionCoordinatorTest.java      |   1 +
 .../queue/CompactionJobPriorityQueueTest.java      |  14 +-
 .../compaction/queue/CompactionJobQueuesTest.java  | 229 ++++++++++++++++++++-
 6 files changed, 314 insertions(+), 23 deletions(-)

diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index 8779568916..3e8f1fdc5b 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@ -678,7 +678,9 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
         eventHandler.clearNeedsFullScan();
 
         iter = store.iterator(tableMgmtParams);
+        
manager.getCompactionCoordinator().getJobQueues().beginFullScan(store.getLevel());
         var tabletMgmtStats = manageTablets(iter, tableMgmtParams, 
currentTServers, true);
+        
manager.getCompactionCoordinator().getJobQueues().endFullScan(store.getLevel());
 
         // If currently looking for volume replacements, determine if the next 
round needs to look.
         if (lookForTabletsNeedingVolReplacement) {
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
index e4aa059e95..4dfd6868ad 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
@@ -23,18 +23,23 @@ import static 
com.google.common.base.Preconditions.checkState;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.spi.compaction.CompactionJob;
 import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
 import org.apache.accumulo.core.util.compaction.CompactionJobPrioritizer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
@@ -49,6 +54,8 @@ import com.google.common.base.Preconditions;
  */
 public class CompactionJobPriorityQueue {
 
+  private static final Logger log = 
LoggerFactory.getLogger(CompactionJobPriorityQueue.class);
+
   private final CompactorGroupId groupId;
 
   private class CjpqKey implements Comparable<CjpqKey> {
@@ -99,9 +106,19 @@ public class CompactionJobPriorityQueue {
   private final AtomicLong rejectedJobs;
   private final AtomicLong dequeuedJobs;
 
+  private static class TabletJobs {
+    final long generation;
+    final HashSet<CjpqKey> jobs;
+
+    private TabletJobs(long generation, HashSet<CjpqKey> jobs) {
+      this.generation = generation;
+      this.jobs = jobs;
+    }
+  }
+
   // This map tracks what jobs a tablet currently has in the queue. Its used 
to efficiently remove
   // jobs in the queue when new jobs are queued for a tablet.
-  private final Map<KeyExtent,List<CjpqKey>> tabletJobs;
+  private final Map<KeyExtent,TabletJobs> tabletJobs;
 
   private final AtomicLong nextSeq = new AtomicLong(0);
 
@@ -116,10 +133,32 @@ public class CompactionJobPriorityQueue {
     this.dequeuedJobs = new AtomicLong(0);
   }
 
+  public synchronized void removeOlderGenerations(Ample.DataLevel level, long 
currGeneration) {
+    if (closed.get()) {
+      return;
+    }
+
+    List<KeyExtent> removals = new ArrayList<>();
+
+    tabletJobs.forEach((extent, jobs) -> {
+      if (Ample.DataLevel.of(extent.tableId()) == level && jobs.generation < 
currGeneration) {
+        removals.add(extent);
+      }
+    });
+
+    if (!removals.isEmpty()) {
+      log.trace("Removed {} queued tablets that no longer need compaction for 
{} {}",
+          removals.size(), groupId, level);
+    }
+
+    removals.forEach(this::removePreviousSubmissions);
+  }
+
   /**
    * @return the number of jobs added. If the queue is closed returns -1
    */
-  public synchronized int add(TabletMetadata tabletMetadata, 
Collection<CompactionJob> jobs) {
+  public synchronized int add(TabletMetadata tabletMetadata, 
Collection<CompactionJob> jobs,
+      long generation) {
     Preconditions.checkArgument(jobs.stream().allMatch(job -> 
job.getGroup().equals(groupId)));
     if (closed.get()) {
       return -1;
@@ -127,13 +166,13 @@ public class CompactionJobPriorityQueue {
 
     removePreviousSubmissions(tabletMetadata.getExtent());
 
-    List<CjpqKey> newEntries = new ArrayList<>(jobs.size());
+    HashSet<CjpqKey> newEntries = new HashSet<>(jobs.size());
 
     int jobsAdded = 0;
     for (CompactionJob job : jobs) {
       CjpqKey cjqpKey = addJobToQueue(tabletMetadata, job);
       if (cjqpKey != null) {
-        newEntries.add(cjqpKey);
+        checkState(newEntries.add(cjqpKey));
         jobsAdded++;
       } else {
         // The priority for this job was lower than all other priorities and 
not added
@@ -143,7 +182,8 @@ public class CompactionJobPriorityQueue {
     }
 
     if (!newEntries.isEmpty()) {
-      checkState(tabletJobs.put(tabletMetadata.getExtent(), newEntries) == 
null);
+      checkState(tabletJobs.put(tabletMetadata.getExtent(), new 
TabletJobs(generation, newEntries))
+          == null);
     }
 
     return jobsAdded;
@@ -178,7 +218,7 @@ public class CompactionJobPriorityQueue {
     if (first != null) {
       dequeuedJobs.getAndIncrement();
       var extent = first.getValue().getTabletMetadata().getExtent();
-      List<CjpqKey> jobs = tabletJobs.get(extent);
+      Set<CjpqKey> jobs = tabletJobs.get(extent).jobs;
       checkState(jobs.remove(first.getKey()));
       if (jobs.isEmpty()) {
         tabletJobs.remove(extent);
@@ -207,9 +247,9 @@ public class CompactionJobPriorityQueue {
   }
 
   private void removePreviousSubmissions(KeyExtent extent) {
-    List<CjpqKey> prevJobs = tabletJobs.get(extent);
+    TabletJobs prevJobs = tabletJobs.get(extent);
     if (prevJobs != null) {
-      prevJobs.forEach(jobQueue::remove);
+      prevJobs.jobs.forEach(jobQueue::remove);
       tabletJobs.remove(extent);
     }
   }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
index b47ec92675..8c46227357 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
@@ -19,10 +19,15 @@
 package org.apache.accumulo.manager.compaction.queue;
 
 import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap.KeySetView;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.spi.compaction.CompactionJob;
 import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
@@ -46,8 +51,35 @@ public class CompactionJobQueues {
 
   private final int queueSize;
 
+  private final Map<DataLevel,AtomicLong> currentGenerations;
+
   public CompactionJobQueues(int queueSize) {
     this.queueSize = queueSize;
+    Map<DataLevel,AtomicLong> cg = new EnumMap<>(DataLevel.class);
+    for (var level : DataLevel.values()) {
+      cg.put(level, new AtomicLong());
+    }
+    currentGenerations = Collections.unmodifiableMap(cg);
+
+  }
+
+  public void beginFullScan(DataLevel level) {
+    currentGenerations.get(level).incrementAndGet();
+  }
+
+  /**
+   * The purpose of this method is to remove any tablets that were added 
before beginFullScan() was
+   * called. The purpose of this is to handle tablets that were queued for 
compaction for a while
+   * and because of some change no longer need to compact. If a full scan of 
the metadata table does
+   * not find any new work for tablet, then any previously queued work for 
that tablet should be
+   * discarded.
+   *
+   * @param level full metadata scans are done independently per DataLevel, so 
the tracking what
+   *        needs to be removed must be done per DataLevel
+   */
+  public void endFullScan(DataLevel level) {
+    priorityQueues.values()
+        .forEach(pq -> pq.removeOlderGenerations(level, 
currentGenerations.get(level).get()));
   }
 
   public void add(TabletMetadata tabletMetadata, Collection<CompactionJob> 
jobs) {
@@ -155,7 +187,8 @@ public class CompactionJobQueues {
 
     var pq = priorityQueues.computeIfAbsent(groupId,
         gid -> new CompactionJobPriorityQueue(gid, queueSize));
-    while (pq.add(tabletMetadata, jobs) < 0) {
+    while (pq.add(tabletMetadata, jobs,
+        
currentGenerations.get(DataLevel.of(tabletMetadata.getTableId())).get()) < 0) {
       // When entering this loop its expected the queue is closed
       Preconditions.checkState(pq.isClosed());
       // This loop handles race condition where poll() closes empty priority 
queues. The queue could
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
index 20f5cdc17e..bb75b8c65c 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
@@ -294,6 +294,7 @@ public class CompactionCoordinatorTest {
     TabletMetadata tm = EasyMock.createNiceMock(TabletMetadata.class);
     expect(tm.getExtent()).andReturn(ke).anyTimes();
     expect(tm.getFiles()).andReturn(Collections.emptySet()).anyTimes();
+    expect(tm.getTableId()).andReturn(ke.tableId()).anyTimes();
 
     EasyMock.replay(tconf, context, creds, tm, security);
 
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
index f511f35078..2e090c32a2 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
@@ -71,7 +71,7 @@ public class CompactionJobPriorityQueueTest {
     EasyMock.replay(tm, cj1, cj2);
 
     CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 
2);
-    assertEquals(1, queue.add(tm, List.of(cj1)));
+    assertEquals(1, queue.add(tm, List.of(cj1), 1L));
 
     MetaJob job = queue.peek();
     assertEquals(cj1, job.getJob());
@@ -84,7 +84,7 @@ public class CompactionJobPriorityQueueTest {
     assertEquals(1, queue.getQueuedJobs());
 
     // replace the files for the same tablet
-    assertEquals(1, queue.add(tm, List.of(cj2)));
+    assertEquals(1, queue.add(tm, List.of(cj2), 1L));
 
     job = queue.peek();
     assertEquals(cj2, job.getJob());
@@ -126,7 +126,7 @@ public class CompactionJobPriorityQueueTest {
     EasyMock.replay(tm, cj1, cj2);
 
     CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 
2);
-    assertEquals(2, queue.add(tm, List.of(cj1, cj2)));
+    assertEquals(2, queue.add(tm, List.of(cj1, cj2), 1L));
 
     EasyMock.verify(tm, cj1, cj2);
 
@@ -183,7 +183,7 @@ public class CompactionJobPriorityQueueTest {
     EasyMock.replay(tm, cj1, cj2, cj3);
 
     CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 
2);
-    assertEquals(2, queue.add(tm, List.of(cj1, cj2, cj3)));
+    assertEquals(2, queue.add(tm, List.of(cj1, cj2, cj3), 1L));
 
     EasyMock.verify(tm, cj1, cj2, cj3);
 
@@ -234,7 +234,7 @@ public class CompactionJobPriorityQueueTest {
     EasyMock.replay(tm, cj1, cj2);
 
     CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 
2);
-    assertEquals(2, queue.add(tm, List.of(cj1, cj2)));
+    assertEquals(2, queue.add(tm, List.of(cj1, cj2), 1L));
 
     assertFalse(queue.closeIfEmpty());
 
@@ -257,7 +257,7 @@ public class CompactionJobPriorityQueueTest {
 
     assertTrue(queue.closeIfEmpty());
 
-    assertEquals(-1, queue.add(tm, List.of(cj1, cj2)));
+    assertEquals(-1, queue.add(tm, List.of(cj1, cj2), 1L));
 
   }
 
@@ -295,7 +295,7 @@ public class CompactionJobPriorityQueueTest {
     // create and add 1000 jobs
     for (int x = 0; x < 1000; x++) {
       Pair<TabletMetadata,CompactionJob> pair = createJob();
-      queue.add(pair.getFirst(), Set.of(pair.getSecond()));
+      queue.add(pair.getFirst(), Set.of(pair.getSecond()), 1L);
       expected.add(pair.getSecond());
     }
 
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
index dfd956fc91..73aa404295 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
@@ -19,8 +19,10 @@
 package org.apache.accumulo.manager.compaction.queue;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -33,7 +35,10 @@ import java.util.stream.Stream;
 import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.AccumuloTable;
 import org.apache.accumulo.core.metadata.CompactableFileImpl;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.spi.compaction.CompactionJob;
 import org.apache.accumulo.core.spi.compaction.CompactionKind;
@@ -44,6 +49,222 @@ import org.junit.jupiter.api.Test;
 
 public class CompactionJobQueuesTest {
 
+  private CompactionJob newJob(short prio, int file, CompactorGroupId cgi)
+      throws URISyntaxException {
+    Collection<CompactableFile> files = List
+        .of(new CompactableFileImpl(new 
URI("file://accumulo/tables//123/t-0/f" + file), 100, 100));
+    return new CompactionJobImpl(prio, cgi, files, CompactionKind.SYSTEM, 
Optional.empty());
+  }
+
+  @Test
+  public void testFullScanHandling() throws Exception {
+
+    var tid = TableId.of("1");
+    var extent1 = new KeyExtent(tid, new Text("z"), new Text("q"));
+    var extent2 = new KeyExtent(tid, new Text("q"), new Text("l"));
+    var extent3 = new KeyExtent(tid, new Text("l"), new Text("c"));
+    var extent4 = new KeyExtent(tid, new Text("c"), new Text("a"));
+
+    var tm1 = TabletMetadata.builder(extent1).build();
+    var tm2 = TabletMetadata.builder(extent2).build();
+    var tm3 = TabletMetadata.builder(extent3).build();
+    var tm4 = TabletMetadata.builder(extent4).build();
+
+    var cg1 = CompactorGroupId.of("CG1");
+    var cg2 = CompactorGroupId.of("CG2");
+    var cg3 = CompactorGroupId.of("CG3");
+
+    CompactionJobQueues jobQueues = new CompactionJobQueues(100);
+
+    jobQueues.beginFullScan(DataLevel.USER);
+
+    jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1)));
+    jobQueues.add(tm2, List.of(newJob((short) 2, 6, cg1)));
+    jobQueues.add(tm3, List.of(newJob((short) 3, 7, cg1)));
+    jobQueues.add(tm4, List.of(newJob((short) 4, 8, cg1)));
+
+    jobQueues.add(tm1, List.of(newJob((short) 4, 1, cg2)));
+    jobQueues.add(tm2, List.of(newJob((short) 3, 2, cg2)));
+    jobQueues.add(tm3, List.of(newJob((short) 2, 3, cg2)));
+    jobQueues.add(tm4, List.of(newJob((short) 1, 4, cg2)));
+
+    jobQueues.endFullScan(DataLevel.USER);
+
+    assertEquals(4, jobQueues.getQueuedJobs(cg1));
+    assertEquals(4, jobQueues.getQueuedJobs(cg2));
+    assertEquals(0, jobQueues.getQueuedJobs(cg3));
+
+    assertEquals(extent4, jobQueues.poll(cg1).getTabletMetadata().getExtent());
+    assertEquals(extent1, jobQueues.poll(cg2).getTabletMetadata().getExtent());
+
+    assertEquals(3, jobQueues.getQueuedJobs(cg1));
+    assertEquals(3, jobQueues.getQueuedJobs(cg2));
+    assertEquals(0, jobQueues.getQueuedJobs(cg3));
+
+    jobQueues.beginFullScan(DataLevel.USER);
+
+    // should still be able to poll and get things added in the last full scan
+    assertEquals(extent3, jobQueues.poll(cg1).getTabletMetadata().getExtent());
+    assertEquals(2, jobQueues.getQueuedJobs(cg1));
+    assertEquals(3, jobQueues.getQueuedJobs(cg2));
+
+    // add something new during the full scan
+    jobQueues.add(tm1, List.of(newJob((short) -7, 9, cg2)));
+    assertEquals(2, jobQueues.getQueuedJobs(cg1));
+    assertEquals(4, jobQueues.getQueuedJobs(cg2));
+
+    // should still be able to poll and get things added in the last full scan
+    assertEquals(extent2, jobQueues.poll(cg2).getTabletMetadata().getExtent());
+    assertEquals(2, jobQueues.getQueuedJobs(cg1));
+    assertEquals(3, jobQueues.getQueuedJobs(cg2));
+
+    // this should remove anything that was added before begin full scan was 
called
+    jobQueues.endFullScan(DataLevel.USER);
+
+    assertEquals(0, jobQueues.getQueuedJobs(cg1));
+    assertEquals(1, jobQueues.getQueuedJobs(cg2));
+    assertEquals(0, jobQueues.getQueuedJobs(cg3));
+
+    assertNull(jobQueues.poll(cg1));
+    assertEquals(extent1, jobQueues.poll(cg2).getTabletMetadata().getExtent());
+
+    assertEquals(0, jobQueues.getQueuedJobs(cg1));
+    assertEquals(0, jobQueues.getQueuedJobs(cg2));
+    assertEquals(0, jobQueues.getQueuedJobs(cg3));
+
+    // add some things outside of a begin/end full scan calls
+    jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1)));
+    jobQueues.add(tm2, List.of(newJob((short) 2, 6, cg1)));
+
+    jobQueues.add(tm1, List.of(newJob((short) 4, 1, cg2)));
+    jobQueues.add(tm2, List.of(newJob((short) 3, 2, cg2)));
+
+    jobQueues.beginFullScan(DataLevel.USER);
+
+    assertEquals(2, jobQueues.getQueuedJobs(cg1));
+    assertEquals(2, jobQueues.getQueuedJobs(cg2));
+    assertEquals(0, jobQueues.getQueuedJobs(cg3));
+
+    // add some things inside the begin/end full scan calls
+    jobQueues.add(tm3, List.of(newJob((short) 3, 7, cg1)));
+    jobQueues.add(tm4, List.of(newJob((short) 4, 8, cg1)));
+
+    jobQueues.add(tm3, List.of(newJob((short) 2, 3, cg2)));
+    jobQueues.add(tm4, List.of(newJob((short) 1, 4, cg2)));
+
+    assertEquals(4, jobQueues.getQueuedJobs(cg1));
+    assertEquals(4, jobQueues.getQueuedJobs(cg2));
+    assertEquals(0, jobQueues.getQueuedJobs(cg3));
+
+    // poll inside the full scan calls
+    assertEquals(extent4, jobQueues.poll(cg1).getTabletMetadata().getExtent());
+    assertEquals(extent1, jobQueues.poll(cg2).getTabletMetadata().getExtent());
+
+    assertEquals(3, jobQueues.getQueuedJobs(cg1));
+    assertEquals(3, jobQueues.getQueuedJobs(cg2));
+    assertEquals(0, jobQueues.getQueuedJobs(cg3));
+
+    // should remove any tablets added before the full scan started
+    jobQueues.endFullScan(DataLevel.USER);
+
+    assertEquals(1, jobQueues.getQueuedJobs(cg1));
+    assertEquals(2, jobQueues.getQueuedJobs(cg2));
+    assertEquals(0, jobQueues.getQueuedJobs(cg3));
+
+    assertEquals(extent3, jobQueues.poll(cg1).getTabletMetadata().getExtent());
+    assertEquals(extent3, jobQueues.poll(cg2).getTabletMetadata().getExtent());
+    assertEquals(extent4, jobQueues.poll(cg2).getTabletMetadata().getExtent());
+
+    assertNull(jobQueues.poll(cg1));
+    assertNull(jobQueues.poll(cg2));
+    assertNull(jobQueues.poll(cg3));
+
+    assertEquals(0, jobQueues.getQueuedJobs(cg1));
+    assertEquals(0, jobQueues.getQueuedJobs(cg2));
+    assertEquals(0, jobQueues.getQueuedJobs(cg3));
+
+    // add jobs outside of begin/end full scan
+    jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1)));
+    jobQueues.add(tm2, List.of(newJob((short) 2, 6, cg1)));
+    jobQueues.add(tm3, List.of(newJob((short) 3, 7, cg1)));
+    jobQueues.add(tm4, List.of(newJob((short) 4, 8, cg1)));
+
+    jobQueues.add(tm1, List.of(newJob((short) 4, 1, cg2)));
+    jobQueues.add(tm2, List.of(newJob((short) 3, 2, cg2)));
+    jobQueues.add(tm3, List.of(newJob((short) 2, 3, cg2)));
+    jobQueues.add(tm4, List.of(newJob((short) 1, 4, cg2)));
+
+    jobQueues.beginFullScan(DataLevel.USER);
+
+    // readd some of the tablets added before the beginFullScan, this should 
prevent those tablets
+    // from being removed by endFullScan
+    jobQueues.add(tm4, List.of(newJob((short) 5, 5, cg2)));
+    jobQueues.add(tm1, List.of(newJob((short) -7, 5, cg2)));
+
+    assertEquals(4, jobQueues.getQueuedJobs(cg1));
+    assertEquals(4, jobQueues.getQueuedJobs(cg2));
+    assertEquals(0, jobQueues.getQueuedJobs(cg3));
+
+    // should remove all jobs added before begin full scan
+    jobQueues.endFullScan(DataLevel.USER);
+
+    assertEquals(0, jobQueues.getQueuedJobs(cg1));
+    assertEquals(2, jobQueues.getQueuedJobs(cg2));
+    assertEquals(0, jobQueues.getQueuedJobs(cg3));
+
+    // make sure we see what was added last for the tablets
+    assertEquals(5, jobQueues.poll(cg2).getJob().getPriority());
+    assertEquals(-7, jobQueues.poll(cg2).getJob().getPriority());
+
+    assertEquals(0, jobQueues.getQueuedJobs(cg1));
+    assertEquals(0, jobQueues.getQueuedJobs(cg2));
+    assertEquals(0, jobQueues.getQueuedJobs(cg3));
+
+    assertNull(jobQueues.poll(cg1));
+    assertNull(jobQueues.poll(cg2));
+    assertNull(jobQueues.poll(cg3));
+  }
+
+  @Test
+  public void testFullScanLevels() throws Exception {
+    var tid = TableId.of("1");
+    var extent1 = new KeyExtent(tid, new Text("z"), new Text("q"));
+    var extent2 = new KeyExtent(tid, new Text("q"), new Text("l"));
+    var meta = new KeyExtent(AccumuloTable.METADATA.tableId(), new Text("l"), 
new Text("c"));
+    var root = RootTable.EXTENT;
+
+    var tm1 = TabletMetadata.builder(extent1).build();
+    var tm2 = TabletMetadata.builder(extent2).build();
+    var tmm = TabletMetadata.builder(meta).build();
+    var tmr = TabletMetadata.builder(root).build();
+
+    var cg1 = CompactorGroupId.of("CG1");
+
+    CompactionJobQueues jobQueues = new CompactionJobQueues(100);
+
+    jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1)));
+    jobQueues.add(tm2, List.of(newJob((short) 2, 6, cg1)));
+    jobQueues.add(tmm, List.of(newJob((short) 3, 7, cg1)));
+    jobQueues.add(tmr, List.of(newJob((short) 4, 8, cg1)));
+
+    // verify that a begin and end full scan will only drop tablets in its 
level
+
+    jobQueues.beginFullScan(DataLevel.ROOT);
+    assertEquals(4, jobQueues.getQueuedJobs(cg1));
+    jobQueues.endFullScan(DataLevel.ROOT);
+    assertEquals(3, jobQueues.getQueuedJobs(cg1));
+
+    jobQueues.beginFullScan(DataLevel.USER);
+    assertEquals(3, jobQueues.getQueuedJobs(cg1));
+    jobQueues.endFullScan(DataLevel.USER);
+    assertEquals(1, jobQueues.getQueuedJobs(cg1));
+
+    jobQueues.beginFullScan(DataLevel.METADATA);
+    assertEquals(1, jobQueues.getQueuedJobs(cg1));
+    jobQueues.endFullScan(DataLevel.METADATA);
+    assertEquals(0, jobQueues.getQueuedJobs(cg1));
+  }
+
   /**
    * When a queue goes empty it is removed. This removal should be done safely 
and should not cause
    * any data being concurrently added to be lost. The way this test adds and 
removes data in
@@ -94,13 +315,7 @@ public class CompactionJobQueuesTest {
       // queue for that extent which could throw off the counts
       KeyExtent extent = new KeyExtent(TableId.of("1"), new Text(i + "z"), new 
Text(i + "a"));
       TabletMetadata tm = TabletMetadata.builder(extent).build();
-
-      Collection<CompactableFile> files = List
-          .of(new CompactableFileImpl(new 
URI("file://accumulo/tables//123/t-0/f" + i), 100, 100));
-      Collection<CompactionJob> jobs = List.of(new CompactionJobImpl((short) 
(i % 31),
-          groups[i % groups.length], files, CompactionKind.SYSTEM, 
Optional.empty()));
-
-      jobQueues.add(tm, jobs);
+      jobQueues.add(tm, List.of(newJob((short) (i % 31), i, groups[i % 
groups.length])));
     }
 
     // Cause the background threads to exit after polling all data

Reply via email to