This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new f13ad00e06 Adds completable futures to compaction queue (#4726)
f13ad00e06 is described below

commit f13ad00e06c88eb76bfb1d966f837025d8ddfbca
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Fri Jul 5 07:37:00 2024 -0700

    Adds completable futures to compaction queue (#4726)
    
    Adds completeable futures to the queue of compaction jobs.  This allows for
    async notification when something is added to the queue.
    
    The compaction queues code would drop queues that became empty.  The 
concept of
    queues being empty became more complex with this change. A queue would be
    considered empty when there were no futures and the queue was empty.  This
    increased complexity of empty would have made the code for dropping empty
    queues more complex.  Instead of increasing the complexity of this code 
chose
    to drop removing empty queues.  This means that if a compaction group is 
used
    and then no longer used that it will have a small empty datastructure 
sitting
    around in map for the process lifetime.  That is unlikely to cause memory
    issues.  Therefore decided the increased complexity was not worthwhile given
    it was unlikely to cause memory problems.
---
 .../queue/CompactionJobPriorityQueue.java          | 55 ++++++++++++----------
 .../compaction/queue/CompactionJobQueues.java      | 40 +++++++---------
 .../queue/CompactionJobPriorityQueueTest.java      | 54 ---------------------
 .../compaction/queue/CompactionJobQueuesTest.java  | 55 ++++++++++++++++++++++
 4 files changed, 103 insertions(+), 101 deletions(-)

diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
index 4dfd6868ad..9909ccb7f9 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.manager.compaction.queue;
 
 import static com.google.common.base.Preconditions.checkState;
 
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -29,7 +30,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.core.dataImpl.KeyExtent;
@@ -105,6 +106,7 @@ public class CompactionJobPriorityQueue {
   private final int maxSize;
   private final AtomicLong rejectedJobs;
   private final AtomicLong dequeuedJobs;
+  private final ArrayDeque<CompletableFuture<CompactionJobQueues.MetaJob>> 
futures;
 
   private static class TabletJobs {
     final long generation;
@@ -122,8 +124,6 @@ public class CompactionJobPriorityQueue {
 
   private final AtomicLong nextSeq = new AtomicLong(0);
 
-  private final AtomicBoolean closed = new AtomicBoolean(false);
-
   public CompactionJobPriorityQueue(CompactorGroupId groupId, int maxSize) {
     this.jobQueue = new TreeMap<>();
     this.maxSize = maxSize;
@@ -131,13 +131,10 @@ public class CompactionJobPriorityQueue {
     this.groupId = groupId;
     this.rejectedJobs = new AtomicLong(0);
     this.dequeuedJobs = new AtomicLong(0);
+    this.futures = new ArrayDeque<>();
   }
 
   public synchronized void removeOlderGenerations(Ample.DataLevel level, long 
currGeneration) {
-    if (closed.get()) {
-      return;
-    }
-
     List<KeyExtent> removals = new ArrayList<>();
 
     tabletJobs.forEach((extent, jobs) -> {
@@ -160,16 +157,26 @@ public class CompactionJobPriorityQueue {
   public synchronized int add(TabletMetadata tabletMetadata, 
Collection<CompactionJob> jobs,
       long generation) {
     Preconditions.checkArgument(jobs.stream().allMatch(job -> 
job.getGroup().equals(groupId)));
-    if (closed.get()) {
-      return -1;
-    }
 
     removePreviousSubmissions(tabletMetadata.getExtent());
 
     HashSet<CjpqKey> newEntries = new HashSet<>(jobs.size());
 
     int jobsAdded = 0;
-    for (CompactionJob job : jobs) {
+    outer: for (CompactionJob job : jobs) {
+      var future = futures.poll();
+      while (future != null) {
+        // its expected that if futures are present then the queue is empty, 
if this is not true
+        // then there is a bug
+        Preconditions.checkState(jobQueue.isEmpty());
+        if (future.complete(new CompactionJobQueues.MetaJob(job, 
tabletMetadata))) {
+          // successfully completed a future with this job, so do not need to 
queue the job
+          jobsAdded++;
+          continue outer;
+        } // else the future was canceled or timed out so could not complete it
+        future = futures.poll();
+      }
+
       CjpqKey cjqpKey = addJobToQueue(tabletMetadata, job);
       if (cjqpKey != null) {
         checkState(newEntries.add(cjqpKey));
@@ -227,25 +234,25 @@ public class CompactionJobPriorityQueue {
     return first == null ? null : first.getValue();
   }
 
+  public synchronized CompletableFuture<CompactionJobQueues.MetaJob> 
getAsync() {
+    var job = jobQueue.pollFirstEntry();
+    if (job != null) {
+      return CompletableFuture.completedFuture(job.getValue());
+    }
+
+    // There is currently nothing in the queue, so create an uncompleted 
future and queue it up to
+    // be completed when something does arrive.
+    CompletableFuture<CompactionJobQueues.MetaJob> future = new 
CompletableFuture<>();
+    futures.add(future);
+    return future;
+  }
+
   // exists for tests
   synchronized CompactionJobQueues.MetaJob peek() {
     var firstEntry = jobQueue.firstEntry();
     return firstEntry == null ? null : firstEntry.getValue();
   }
 
-  public boolean isClosed() {
-    return closed.get();
-  }
-
-  public synchronized boolean closeIfEmpty() {
-    if (jobQueue.isEmpty()) {
-      closed.set(true);
-      return true;
-    }
-
-    return false;
-  }
-
   private void removePreviousSubmissions(KeyExtent extent) {
     TabletJobs prevJobs = tabletJobs.get(extent);
     if (prevJobs != null) {
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
index 8c46227357..b9fe1ed424 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap.KeySetView;
 import java.util.concurrent.atomic.AtomicLong;
@@ -34,8 +35,6 @@ import 
org.apache.accumulo.core.spi.compaction.CompactorGroupId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-
 public class CompactionJobQueues {
 
   private static final Logger log = 
LoggerFactory.getLogger(CompactionJobQueues.class);
@@ -157,23 +156,25 @@ public class CompactionJobQueues {
     }
   }
 
+  /**
+   * Asynchronously get a compaction job from the queue. If the queue 
currently has jobs then a
+   * completed future will be returned containing the highest priority job in 
the queue. If the
+   * queue is currently empty, then an uncompleted future will be returned and 
later when something
+   * is added to the queue the future will be completed.
+   */
+  public CompletableFuture<MetaJob> getAsync(CompactorGroupId groupId) {
+    var pq = priorityQueues.computeIfAbsent(groupId,
+        gid -> new CompactionJobPriorityQueue(gid, queueSize));
+    return pq.getAsync();
+  }
+
   public MetaJob poll(CompactorGroupId groupId) {
     var prioQ = priorityQueues.get(groupId);
     if (prioQ == null) {
       return null;
     }
-    MetaJob mj = prioQ.poll();
-
-    if (mj == null) {
-      priorityQueues.computeIfPresent(groupId, (eid, pq) -> {
-        if (pq.closeIfEmpty()) {
-          return null;
-        } else {
-          return pq;
-        }
-      });
-    }
-    return mj;
+
+    return prioQ.poll();
   }
 
   private void add(TabletMetadata tabletMetadata, CompactorGroupId groupId,
@@ -187,14 +188,7 @@ public class CompactionJobQueues {
 
     var pq = priorityQueues.computeIfAbsent(groupId,
         gid -> new CompactionJobPriorityQueue(gid, queueSize));
-    while (pq.add(tabletMetadata, jobs,
-        
currentGenerations.get(DataLevel.of(tabletMetadata.getTableId())).get()) < 0) {
-      // When entering this loop its expected the queue is closed
-      Preconditions.checkState(pq.isClosed());
-      // This loop handles race condition where poll() closes empty priority 
queues. The queue could
-      // be closed after its obtained from the map and before add is called.
-      pq = priorityQueues.computeIfAbsent(groupId,
-          gid -> new CompactionJobPriorityQueue(gid, queueSize));
-    }
+    pq.add(tabletMetadata, jobs,
+        
currentGenerations.get(DataLevel.of(tabletMetadata.getTableId())).get());
   }
 }
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
index 2e090c32a2..ddf9a3016e 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
@@ -19,9 +19,7 @@
 package org.apache.accumulo.manager.compaction.queue;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.HashSet;
 import java.util.List;
@@ -209,58 +207,6 @@ public class CompactionJobPriorityQueueTest {
 
   }
 
-  @Test
-  public void testAddAfterClose() {
-
-    CompactableFile file1 = EasyMock.createMock(CompactableFileImpl.class);
-    CompactableFile file2 = EasyMock.createMock(CompactableFileImpl.class);
-    CompactableFile file3 = EasyMock.createMock(CompactableFileImpl.class);
-    CompactableFile file4 = EasyMock.createMock(CompactableFileImpl.class);
-
-    KeyExtent extent = new KeyExtent(TableId.of("1"), new Text("z"), new 
Text("a"));
-    TabletMetadata tm = EasyMock.createMock(TabletMetadata.class);
-    EasyMock.expect(tm.getExtent()).andReturn(extent).anyTimes();
-
-    CompactionJob cj1 = EasyMock.createMock(CompactionJob.class);
-    EasyMock.expect(cj1.getGroup()).andReturn(GROUP).anyTimes();
-    EasyMock.expect(cj1.getPriority()).andReturn((short) 10).anyTimes();
-    EasyMock.expect(cj1.getFiles()).andReturn(Set.of(file1)).anyTimes();
-
-    CompactionJob cj2 = EasyMock.createMock(CompactionJob.class);
-    EasyMock.expect(cj2.getGroup()).andReturn(GROUP).anyTimes();
-    EasyMock.expect(cj2.getPriority()).andReturn((short) 5).anyTimes();
-    EasyMock.expect(cj2.getFiles()).andReturn(Set.of(file2, file3, 
file4)).anyTimes();
-
-    EasyMock.replay(tm, cj1, cj2);
-
-    CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 
2);
-    assertEquals(2, queue.add(tm, List.of(cj1, cj2), 1L));
-
-    assertFalse(queue.closeIfEmpty());
-
-    EasyMock.verify(tm, cj1, cj2);
-
-    assertEquals(5L, queue.getLowestPriority());
-    assertEquals(2, queue.getMaxSize());
-    assertEquals(0, queue.getDequeuedJobs());
-    assertEquals(0, queue.getRejectedJobs());
-    assertEquals(2, queue.getQueuedJobs());
-    MetaJob job = queue.poll();
-    assertEquals(cj1, job.getJob());
-    assertEquals(tm, job.getTabletMetadata());
-    assertEquals(1, queue.getDequeuedJobs());
-
-    MetaJob job2 = queue.poll();
-    assertEquals(cj2, job2.getJob());
-    assertEquals(tm, job2.getTabletMetadata());
-    assertEquals(2, queue.getDequeuedJobs());
-
-    assertTrue(queue.closeIfEmpty());
-
-    assertEquals(-1, queue.add(tm, List.of(cj1, cj2), 1L));
-
-  }
-
   private static int counter = 1;
 
   private Pair<TabletMetadata,CompactionJob> createJob() {
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
index 73aa404295..a9f360b4cd 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
@@ -19,7 +19,9 @@
 package org.apache.accumulo.manager.compaction.queue;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -332,4 +334,57 @@ public class CompactionJobQueuesTest {
     // The background threads should have seen every job that was added
     assertEquals(numToAdd, totalSeen);
   }
+
+  @Test
+  public void testGetAsync() throws Exception {
+    CompactionJobQueues jobQueues = new CompactionJobQueues(100);
+
+    var tid = TableId.of("1");
+    var extent1 = new KeyExtent(tid, new Text("z"), new Text("q"));
+    var extent2 = new KeyExtent(tid, new Text("q"), new Text("l"));
+    var extent3 = new KeyExtent(tid, new Text("l"), new Text("c"));
+    var extent4 = new KeyExtent(tid, new Text("c"), new Text("a"));
+
+    var tm1 = TabletMetadata.builder(extent1).build();
+    var tm2 = TabletMetadata.builder(extent2).build();
+    var tm3 = TabletMetadata.builder(extent3).build();
+    var tm4 = TabletMetadata.builder(extent4).build();
+
+    var cg1 = CompactorGroupId.of("CG1");
+
+    var future1 = jobQueues.getAsync(cg1);
+    var future2 = jobQueues.getAsync(cg1);
+
+    assertFalse(future1.isDone());
+    assertFalse(future2.isDone());
+
+    jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1)));
+    jobQueues.add(tm2, List.of(newJob((short) 2, 6, cg1)));
+    jobQueues.add(tm3, List.of(newJob((short) 3, 7, cg1)));
+    jobQueues.add(tm4, List.of(newJob((short) 4, 8, cg1)));
+
+    var future3 = jobQueues.getAsync(cg1);
+    var future4 = jobQueues.getAsync(cg1);
+
+    assertTrue(future1.isDone());
+    assertTrue(future2.isDone());
+    assertTrue(future3.isDone());
+    assertTrue(future4.isDone());
+
+    assertEquals(extent1, future1.get().getTabletMetadata().getExtent());
+    assertEquals(extent2, future2.get().getTabletMetadata().getExtent());
+    assertEquals(extent4, future3.get().getTabletMetadata().getExtent());
+    assertEquals(extent3, future4.get().getTabletMetadata().getExtent());
+
+    // test cancelling a future
+    var future5 = jobQueues.getAsync(cg1);
+    assertFalse(future5.isDone());
+    future5.cancel(false);
+    var future6 = jobQueues.getAsync(cg1);
+    assertFalse(future6.isDone());
+    // since future5 was canceled, this addition should go to future6
+    jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1)));
+    assertTrue(future6.isDone());
+    assertEquals(extent1, future6.get().getTabletMetadata().getExtent());
+  }
 }

Reply via email to