This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 11f8a5095f Added unit test for CompactionJobPriorityQueue (#4296)
11f8a5095f is described below

commit 11f8a5095fc5e96c0ad016f40a98b1ce389382a7
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Tue Feb 27 09:01:46 2024 -0500

    Added unit test for CompactionJobPriorityQueue (#4296)
    
    Modified add method to return into instead of boolean
    and added unit test for CompactionJobPriorityQueue
    
    Fixes #3466
---
 .../queue/CompactionJobPriorityQueue.java          |  39 ++-
 .../compaction/queue/CompactionJobQueues.java      |   2 +-
 .../queue/CompactionJobPriorityQueueTest.java      | 322 +++++++++++++++++++++
 3 files changed, 350 insertions(+), 13 deletions(-)

diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
index 12d82ff1f9..6c28fe5ecf 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -47,7 +48,7 @@ import com.google.common.base.Preconditions;
  * </p>
  */
 public class CompactionJobPriorityQueue {
-  // ELASTICITY_TODO unit test this class
+
   private final CompactorGroupId groupId;
 
   private class CjpqKey implements Comparable<CjpqKey> {
@@ -59,7 +60,7 @@ public class CompactionJobPriorityQueue {
 
     CjpqKey(CompactionJob job) {
       this.job = job;
-      this.seq = nextSeq++;
+      this.seq = nextSeq.incrementAndGet();
     }
 
     @Override
@@ -102,9 +103,9 @@ public class CompactionJobPriorityQueue {
   // jobs in the queue when new jobs are queued for a tablet.
   private final Map<KeyExtent,List<CjpqKey>> tabletJobs;
 
-  private long nextSeq;
+  private final AtomicLong nextSeq = new AtomicLong(0);
 
-  private boolean closed = false;
+  private final AtomicBoolean closed = new AtomicBoolean(false);
 
   public CompactionJobPriorityQueue(CompactorGroupId groupId, int maxSize) {
     this.jobQueue = new TreeMap<>();
@@ -115,21 +116,23 @@ public class CompactionJobPriorityQueue {
     this.dequeuedJobs = new AtomicLong(0);
   }
 
-  public synchronized boolean add(TabletMetadata tabletMetadata, 
Collection<CompactionJob> jobs) {
-    if (closed) {
-      return false;
-    }
-
+  public synchronized int add(TabletMetadata tabletMetadata, 
Collection<CompactionJob> jobs) {
     Preconditions.checkArgument(jobs.stream().allMatch(job -> 
job.getGroup().equals(groupId)));
 
     removePreviousSubmissions(tabletMetadata.getExtent());
 
     List<CjpqKey> newEntries = new ArrayList<>(jobs.size());
 
+    int jobsAdded = 0;
     for (CompactionJob job : jobs) {
       CjpqKey cjqpKey = addJobToQueue(tabletMetadata, job);
       if (cjqpKey != null) {
         newEntries.add(cjqpKey);
+        jobsAdded++;
+      } else {
+        // The priority for this job was lower than all other priorities and 
not added
+        // In this case we will return true even though a subset of the jobs, 
or none,
+        // were added
       }
     }
 
@@ -137,7 +140,7 @@ public class CompactionJobPriorityQueue {
       checkState(tabletJobs.put(tabletMetadata.getExtent(), newEntries) == 
null);
     }
 
-    return true;
+    return jobsAdded;
   }
 
   public long getMaxSize() {
@@ -178,9 +181,19 @@ public class CompactionJobPriorityQueue {
     return first == null ? null : first.getValue();
   }
 
+  // exists for tests
+  synchronized CompactionJobQueues.MetaJob peek() {
+    var firstEntry = jobQueue.firstEntry();
+    return firstEntry == null ? null : firstEntry.getValue();
+  }
+
+  public boolean isClosed() {
+    return closed.get();
+  }
+
   public synchronized boolean closeIfEmpty() {
     if (jobQueue.isEmpty()) {
-      closed = true;
+      closed.set(true);
       return true;
     }
 
@@ -205,7 +218,9 @@ public class CompactionJobPriorityQueue {
         return null;
       } else {
         // the new job has a higher priority than the lowest job in the queue, 
so remove the lowest
-        jobQueue.pollLastEntry();
+        if (jobQueue.pollLastEntry() != null) {
+          rejectedJobs.getAndIncrement();
+        }
       }
 
     }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
index 2d6f88e448..77395fd253 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
@@ -154,7 +154,7 @@ public class CompactionJobQueues {
 
     var pq = priorityQueues.computeIfAbsent(groupId,
         gid -> new CompactionJobPriorityQueue(gid, queueSize));
-    while (!pq.add(tabletMetadata, jobs)) {
+    while (pq.add(tabletMetadata, jobs) == 0 && pq.isClosed()) {
       // This loop handles race condition where poll() closes empty priority 
queues. The queue could
       // be closed after its obtained from the map and before add is called.
       pq = priorityQueues.computeIfAbsent(groupId,
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
new file mode 100644
index 0000000000..a7bbc914cf
--- /dev/null
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.compaction.queue;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.CompactableFileImpl;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.spi.compaction.CompactionJob;
+import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.compaction.CompactionJobPrioritizer;
+import org.apache.accumulo.core.util.compaction.CompactorGroupIdImpl;
+import 
org.apache.accumulo.manager.compaction.queue.CompactionJobQueues.MetaJob;
+import org.apache.hadoop.io.Text;
+import org.easymock.EasyMock;
+import org.junit.jupiter.api.Test;
+
+public class CompactionJobPriorityQueueTest {
+
+  private static final CompactorGroupId GROUP = 
CompactorGroupIdImpl.groupId("TEST");
+
+  @Test
+  public void testTabletFileReplacement() {
+
+    CompactableFile file1 = EasyMock.createMock(CompactableFileImpl.class);
+    CompactableFile file2 = EasyMock.createMock(CompactableFileImpl.class);
+    CompactableFile file3 = EasyMock.createMock(CompactableFileImpl.class);
+    CompactableFile file4 = EasyMock.createMock(CompactableFileImpl.class);
+
+    KeyExtent extent = new KeyExtent(TableId.of("1"), new Text("z"), new 
Text("a"));
+    TabletMetadata tm = EasyMock.createMock(TabletMetadata.class);
+    EasyMock.expect(tm.getExtent()).andReturn(extent).anyTimes();
+
+    CompactionJob cj1 = EasyMock.createMock(CompactionJob.class);
+    EasyMock.expect(cj1.getGroup()).andReturn(GROUP).anyTimes();
+    EasyMock.expect(cj1.getPriority()).andReturn((short) 10).anyTimes();
+    EasyMock.expect(cj1.getFiles()).andReturn(Set.of(file1)).anyTimes();
+
+    CompactionJob cj2 = EasyMock.createMock(CompactionJob.class);
+    EasyMock.expect(cj2.getGroup()).andReturn(GROUP).anyTimes();
+    EasyMock.expect(cj2.getPriority()).andReturn((short) 5).anyTimes();
+    EasyMock.expect(cj2.getFiles()).andReturn(Set.of(file2, file3, 
file4)).anyTimes();
+
+    EasyMock.replay(tm, cj1, cj2);
+
+    CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 
2);
+    assertEquals(1, queue.add(tm, List.of(cj1)));
+
+    MetaJob job = queue.peek();
+    assertEquals(cj1, job.getJob());
+    assertEquals(Set.of(file1), job.getJob().getFiles());
+
+    assertEquals(10L, queue.getLowestPriority());
+    assertEquals(2, queue.getMaxSize());
+    assertEquals(0, queue.getDequeuedJobs());
+    assertEquals(0, queue.getRejectedJobs());
+    assertEquals(1, queue.getQueuedJobs());
+
+    // replace the files for the same tablet
+    assertEquals(1, queue.add(tm, List.of(cj2)));
+
+    job = queue.peek();
+    assertEquals(cj2, job.getJob());
+    assertEquals(Set.of(file2, file3, file4), job.getJob().getFiles());
+    assertEquals(tm, job.getTabletMetadata());
+
+    assertEquals(5L, queue.getLowestPriority());
+    assertEquals(2, queue.getMaxSize());
+    assertEquals(0, queue.getDequeuedJobs());
+    assertEquals(0, queue.getRejectedJobs());
+    assertEquals(1, queue.getQueuedJobs());
+
+    EasyMock.verify(tm, cj1, cj2);
+
+  }
+
+  @Test
+  public void testAddEqualToMaxSize() {
+
+    CompactableFile file1 = EasyMock.createMock(CompactableFileImpl.class);
+    CompactableFile file2 = EasyMock.createMock(CompactableFileImpl.class);
+    CompactableFile file3 = EasyMock.createMock(CompactableFileImpl.class);
+    CompactableFile file4 = EasyMock.createMock(CompactableFileImpl.class);
+
+    KeyExtent extent = new KeyExtent(TableId.of("1"), new Text("z"), new 
Text("a"));
+    TabletMetadata tm = EasyMock.createMock(TabletMetadata.class);
+    EasyMock.expect(tm.getExtent()).andReturn(extent).anyTimes();
+
+    CompactionJob cj1 = EasyMock.createMock(CompactionJob.class);
+    EasyMock.expect(cj1.getGroup()).andReturn(GROUP).anyTimes();
+    EasyMock.expect(cj1.getPriority()).andReturn((short) 10).anyTimes();
+    EasyMock.expect(cj1.getFiles()).andReturn(Set.of(file1)).anyTimes();
+
+    CompactionJob cj2 = EasyMock.createMock(CompactionJob.class);
+    EasyMock.expect(cj2.getGroup()).andReturn(GROUP).anyTimes();
+    EasyMock.expect(cj2.getPriority()).andReturn((short) 5).anyTimes();
+    EasyMock.expect(cj2.getFiles()).andReturn(Set.of(file2, file3, 
file4)).anyTimes();
+
+    EasyMock.replay(tm, cj1, cj2);
+
+    CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 
2);
+    assertEquals(2, queue.add(tm, List.of(cj1, cj2)));
+
+    EasyMock.verify(tm, cj1, cj2);
+
+    assertEquals(5L, queue.getLowestPriority());
+    assertEquals(2, queue.getMaxSize());
+    assertEquals(0, queue.getDequeuedJobs());
+    assertEquals(0, queue.getRejectedJobs());
+    assertEquals(2, queue.getQueuedJobs());
+    MetaJob job = queue.poll();
+    assertEquals(cj1, job.getJob());
+    assertEquals(tm, job.getTabletMetadata());
+    assertEquals(1, queue.getDequeuedJobs());
+
+    job = queue.poll();
+    assertEquals(cj2, job.getJob());
+    assertEquals(tm, job.getTabletMetadata());
+    assertEquals(2, queue.getDequeuedJobs());
+
+    job = queue.poll();
+    assertNull(job);
+    assertEquals(2, queue.getDequeuedJobs());
+
+  }
+
+  @Test
+  public void testAddMoreThanMax() {
+
+    CompactableFile file1 = EasyMock.createMock(CompactableFileImpl.class);
+    CompactableFile file2 = EasyMock.createMock(CompactableFileImpl.class);
+    CompactableFile file3 = EasyMock.createMock(CompactableFileImpl.class);
+    CompactableFile file4 = EasyMock.createMock(CompactableFileImpl.class);
+    CompactableFile file5 = EasyMock.createMock(CompactableFileImpl.class);
+    CompactableFile file6 = EasyMock.createMock(CompactableFileImpl.class);
+
+    KeyExtent extent = new KeyExtent(TableId.of("1"), new Text("z"), new 
Text("a"));
+    TabletMetadata tm = EasyMock.createMock(TabletMetadata.class);
+    EasyMock.expect(tm.getExtent()).andReturn(extent).anyTimes();
+
+    CompactionJob cj1 = EasyMock.createMock(CompactionJob.class);
+    EasyMock.expect(cj1.getGroup()).andReturn(GROUP).anyTimes();
+    EasyMock.expect(cj1.getPriority()).andReturn((short) 10).anyTimes();
+    EasyMock.expect(cj1.getFiles()).andReturn(Set.of(file1)).anyTimes();
+
+    CompactionJob cj2 = EasyMock.createMock(CompactionJob.class);
+    EasyMock.expect(cj2.getGroup()).andReturn(GROUP).anyTimes();
+    EasyMock.expect(cj2.getPriority()).andReturn((short) 5).anyTimes();
+    EasyMock.expect(cj2.getFiles()).andReturn(Set.of(file2, file3, 
file4)).anyTimes();
+
+    CompactionJob cj3 = EasyMock.createMock(CompactionJob.class);
+    EasyMock.expect(cj3.getGroup()).andReturn(GROUP).anyTimes();
+    EasyMock.expect(cj3.getPriority()).andReturn((short) 1).anyTimes();
+    EasyMock.expect(cj3.getFiles()).andReturn(Set.of(file5, file6)).anyTimes();
+
+    EasyMock.replay(tm, cj1, cj2, cj3);
+
+    CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 
2);
+    assertEquals(2, queue.add(tm, List.of(cj1, cj2, cj3)));
+
+    EasyMock.verify(tm, cj1, cj2, cj3);
+
+    assertEquals(5L, queue.getLowestPriority());
+    assertEquals(2, queue.getMaxSize());
+    assertEquals(0, queue.getDequeuedJobs());
+    assertEquals(1, queue.getRejectedJobs());
+    assertEquals(2, queue.getQueuedJobs());
+
+    MetaJob job = queue.poll();
+    assertEquals(cj1, job.getJob());
+    assertEquals(tm, job.getTabletMetadata());
+    assertEquals(1, queue.getDequeuedJobs());
+
+    job = queue.poll();
+    assertEquals(cj2, job.getJob());
+    assertEquals(tm, job.getTabletMetadata());
+    assertEquals(2, queue.getDequeuedJobs());
+
+    job = queue.poll();
+    assertNull(job);
+    assertEquals(2, queue.getDequeuedJobs());
+
+  }
+
+  @Test
+  public void testAddAfterClose() {
+
+    CompactableFile file1 = EasyMock.createMock(CompactableFileImpl.class);
+    CompactableFile file2 = EasyMock.createMock(CompactableFileImpl.class);
+    CompactableFile file3 = EasyMock.createMock(CompactableFileImpl.class);
+    CompactableFile file4 = EasyMock.createMock(CompactableFileImpl.class);
+
+    KeyExtent extent = new KeyExtent(TableId.of("1"), new Text("z"), new 
Text("a"));
+    TabletMetadata tm = EasyMock.createMock(TabletMetadata.class);
+    EasyMock.expect(tm.getExtent()).andReturn(extent).anyTimes();
+
+    CompactionJob cj1 = EasyMock.createMock(CompactionJob.class);
+    EasyMock.expect(cj1.getGroup()).andReturn(GROUP).anyTimes();
+    EasyMock.expect(cj1.getPriority()).andReturn((short) 10).anyTimes();
+    EasyMock.expect(cj1.getFiles()).andReturn(Set.of(file1)).anyTimes();
+
+    CompactionJob cj2 = EasyMock.createMock(CompactionJob.class);
+    EasyMock.expect(cj2.getGroup()).andReturn(GROUP).anyTimes();
+    EasyMock.expect(cj2.getPriority()).andReturn((short) 5).anyTimes();
+    EasyMock.expect(cj2.getFiles()).andReturn(Set.of(file2, file3, 
file4)).anyTimes();
+
+    EasyMock.replay(tm, cj1, cj2);
+
+    CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 
2);
+    assertEquals(2, queue.add(tm, List.of(cj1, cj2)));
+
+    assertFalse(queue.closeIfEmpty());
+
+    EasyMock.verify(tm, cj1, cj2);
+
+    assertEquals(5L, queue.getLowestPriority());
+    assertEquals(2, queue.getMaxSize());
+    assertEquals(0, queue.getDequeuedJobs());
+    assertEquals(0, queue.getRejectedJobs());
+    assertEquals(2, queue.getQueuedJobs());
+    MetaJob job = queue.poll();
+    assertEquals(cj1, job.getJob());
+    assertEquals(tm, job.getTabletMetadata());
+    assertEquals(1, queue.getDequeuedJobs());
+
+    MetaJob job2 = queue.poll();
+    assertEquals(cj2, job2.getJob());
+    assertEquals(tm, job2.getTabletMetadata());
+    assertEquals(2, queue.getDequeuedJobs());
+
+    assertTrue(queue.closeIfEmpty());
+
+    assertEquals(2, queue.add(tm, List.of(cj1, cj2)));
+
+  }
+
+  private static int counter = 1;
+
+  private Pair<TabletMetadata,CompactionJob> createJob() {
+
+    // Use an ever increasing tableId
+    KeyExtent extent = new KeyExtent(TableId.of("" + counter++), new 
Text("z"), new Text("a"));
+
+    Set<CompactableFile> files = new HashSet<>();
+    for (int i = 0; i < counter; i++) {
+      files.add(EasyMock.createMock(CompactableFileImpl.class));
+    }
+
+    CompactionJob job = EasyMock.createMock(CompactionJob.class);
+    TabletMetadata tm = EasyMock.createMock(TabletMetadata.class);
+    EasyMock.expect(tm.getExtent()).andReturn(extent).anyTimes();
+    EasyMock.expect(job.getGroup()).andReturn(GROUP).anyTimes();
+    EasyMock.expect(job.getPriority()).andReturn((short) counter).anyTimes();
+    EasyMock.expect(job.getFiles()).andReturn(files).anyTimes();
+
+    EasyMock.replay(tm, job);
+
+    return new Pair<>(tm, job);
+  }
+
+  @Test
+  public void test() {
+
+    TreeSet<CompactionJob> expected = new 
TreeSet<>(CompactionJobPrioritizer.JOB_COMPARATOR);
+
+    CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 
100);
+
+    // create and add 1000 jobs
+    for (int x = 0; x < 1000; x++) {
+      Pair<TabletMetadata,CompactionJob> pair = createJob();
+      queue.add(pair.getFirst(), Set.of(pair.getSecond()));
+      expected.add(pair.getSecond());
+    }
+
+    assertEquals(100, queue.getMaxSize());
+    assertEquals(100, queue.getQueuedJobs());
+    assertEquals(900, queue.getRejectedJobs());
+
+    // iterate over the expected set and make sure that they next job in the 
queue
+    // matches
+    int matchesSeen = 0;
+    for (CompactionJob expectedJob : expected) {
+      MetaJob queuedJob = queue.poll();
+      if (queuedJob == null) {
+        break;
+      }
+      assertEquals(expectedJob.getPriority(), 
queuedJob.getJob().getPriority());
+      assertEquals(expectedJob.getFiles(), queuedJob.getJob().getFiles());
+      matchesSeen++;
+    }
+
+    assertEquals(100, matchesSeen);
+  }
+}

Reply via email to