This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new f13ad00e06 Adds completable futures to compaction queue (#4726) f13ad00e06 is described below commit f13ad00e06c88eb76bfb1d966f837025d8ddfbca Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri Jul 5 07:37:00 2024 -0700 Adds completable futures to compaction queue (#4726) Adds completeable futures to the queue of compaction jobs. This allows for async notification when something is added to the queue. The compaction queues code would drop queues that became empty. The concept of queues being empty became more complex with this change. A queue would be considered empty when there were no futures and the queue was empty. This increased complexity of empty would have made the code for dropping empty queues more complex. Instead of increasing the complexity of this code chose to drop removing empty queues. This means that if a compaction group is used and then no longer used that it will have a small empty datastructure sitting around in map for the process lifetime. That is unlikely to cause memory issues. Therefore decided the increased complexity was not worthwhile given it was unlikely to cause memory problems. --- .../queue/CompactionJobPriorityQueue.java | 55 ++++++++++++---------- .../compaction/queue/CompactionJobQueues.java | 40 +++++++--------- .../queue/CompactionJobPriorityQueueTest.java | 54 --------------------- .../compaction/queue/CompactionJobQueuesTest.java | 55 ++++++++++++++++++++++ 4 files changed, 103 insertions(+), 101 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java index 4dfd6868ad..9909ccb7f9 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java @@ -20,6 +20,7 @@ package org.apache.accumulo.manager.compaction.queue; import static com.google.common.base.Preconditions.checkState; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -29,7 +30,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -105,6 +106,7 @@ public class CompactionJobPriorityQueue { private final int maxSize; private final AtomicLong rejectedJobs; private final AtomicLong dequeuedJobs; + private final ArrayDeque<CompletableFuture<CompactionJobQueues.MetaJob>> futures; private static class TabletJobs { final long generation; @@ -122,8 +124,6 @@ public class CompactionJobPriorityQueue { private final AtomicLong nextSeq = new AtomicLong(0); - private final AtomicBoolean closed = new AtomicBoolean(false); - public CompactionJobPriorityQueue(CompactorGroupId groupId, int maxSize) { this.jobQueue = new TreeMap<>(); this.maxSize = maxSize; @@ -131,13 +131,10 @@ public class CompactionJobPriorityQueue { this.groupId = groupId; this.rejectedJobs = new AtomicLong(0); this.dequeuedJobs = new AtomicLong(0); + this.futures = new ArrayDeque<>(); } public synchronized void removeOlderGenerations(Ample.DataLevel level, long currGeneration) { - if (closed.get()) { - return; - } - List<KeyExtent> removals = new ArrayList<>(); tabletJobs.forEach((extent, jobs) -> { @@ -160,16 +157,26 @@ public class CompactionJobPriorityQueue { public synchronized int add(TabletMetadata tabletMetadata, Collection<CompactionJob> jobs, long generation) { Preconditions.checkArgument(jobs.stream().allMatch(job -> job.getGroup().equals(groupId))); - if (closed.get()) { - return -1; - } removePreviousSubmissions(tabletMetadata.getExtent()); HashSet<CjpqKey> newEntries = new HashSet<>(jobs.size()); int jobsAdded = 0; - for (CompactionJob job : jobs) { + outer: for (CompactionJob job : jobs) { + var future = futures.poll(); + while (future != null) { + // its expected that if futures are present then the queue is empty, if this is not true + // then there is a bug + Preconditions.checkState(jobQueue.isEmpty()); + if (future.complete(new CompactionJobQueues.MetaJob(job, tabletMetadata))) { + // successfully completed a future with this job, so do not need to queue the job + jobsAdded++; + continue outer; + } // else the future was canceled or timed out so could not complete it + future = futures.poll(); + } + CjpqKey cjqpKey = addJobToQueue(tabletMetadata, job); if (cjqpKey != null) { checkState(newEntries.add(cjqpKey)); @@ -227,25 +234,25 @@ public class CompactionJobPriorityQueue { return first == null ? null : first.getValue(); } + public synchronized CompletableFuture<CompactionJobQueues.MetaJob> getAsync() { + var job = jobQueue.pollFirstEntry(); + if (job != null) { + return CompletableFuture.completedFuture(job.getValue()); + } + + // There is currently nothing in the queue, so create an uncompleted future and queue it up to + // be completed when something does arrive. + CompletableFuture<CompactionJobQueues.MetaJob> future = new CompletableFuture<>(); + futures.add(future); + return future; + } + // exists for tests synchronized CompactionJobQueues.MetaJob peek() { var firstEntry = jobQueue.firstEntry(); return firstEntry == null ? null : firstEntry.getValue(); } - public boolean isClosed() { - return closed.get(); - } - - public synchronized boolean closeIfEmpty() { - if (jobQueue.isEmpty()) { - closed.set(true); - return true; - } - - return false; - } - private void removePreviousSubmissions(KeyExtent extent) { TabletJobs prevJobs = tabletJobs.get(extent); if (prevJobs != null) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java index 8c46227357..b9fe1ed424 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.Collections; import java.util.EnumMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap.KeySetView; import java.util.concurrent.atomic.AtomicLong; @@ -34,8 +35,6 @@ import org.apache.accumulo.core.spi.compaction.CompactorGroupId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; - public class CompactionJobQueues { private static final Logger log = LoggerFactory.getLogger(CompactionJobQueues.class); @@ -157,23 +156,25 @@ public class CompactionJobQueues { } } + /** + * Asynchronously get a compaction job from the queue. If the queue currently has jobs then a + * completed future will be returned containing the highest priority job in the queue. If the + * queue is currently empty, then an uncompleted future will be returned and later when something + * is added to the queue the future will be completed. + */ + public CompletableFuture<MetaJob> getAsync(CompactorGroupId groupId) { + var pq = priorityQueues.computeIfAbsent(groupId, + gid -> new CompactionJobPriorityQueue(gid, queueSize)); + return pq.getAsync(); + } + public MetaJob poll(CompactorGroupId groupId) { var prioQ = priorityQueues.get(groupId); if (prioQ == null) { return null; } - MetaJob mj = prioQ.poll(); - - if (mj == null) { - priorityQueues.computeIfPresent(groupId, (eid, pq) -> { - if (pq.closeIfEmpty()) { - return null; - } else { - return pq; - } - }); - } - return mj; + + return prioQ.poll(); } private void add(TabletMetadata tabletMetadata, CompactorGroupId groupId, @@ -187,14 +188,7 @@ public class CompactionJobQueues { var pq = priorityQueues.computeIfAbsent(groupId, gid -> new CompactionJobPriorityQueue(gid, queueSize)); - while (pq.add(tabletMetadata, jobs, - currentGenerations.get(DataLevel.of(tabletMetadata.getTableId())).get()) < 0) { - // When entering this loop its expected the queue is closed - Preconditions.checkState(pq.isClosed()); - // This loop handles race condition where poll() closes empty priority queues. The queue could - // be closed after its obtained from the map and before add is called. - pq = priorityQueues.computeIfAbsent(groupId, - gid -> new CompactionJobPriorityQueue(gid, queueSize)); - } + pq.add(tabletMetadata, jobs, + currentGenerations.get(DataLevel.of(tabletMetadata.getTableId())).get()); } } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java index 2e090c32a2..ddf9a3016e 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java @@ -19,9 +19,7 @@ package org.apache.accumulo.manager.compaction.queue; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.HashSet; import java.util.List; @@ -209,58 +207,6 @@ public class CompactionJobPriorityQueueTest { } - @Test - public void testAddAfterClose() { - - CompactableFile file1 = EasyMock.createMock(CompactableFileImpl.class); - CompactableFile file2 = EasyMock.createMock(CompactableFileImpl.class); - CompactableFile file3 = EasyMock.createMock(CompactableFileImpl.class); - CompactableFile file4 = EasyMock.createMock(CompactableFileImpl.class); - - KeyExtent extent = new KeyExtent(TableId.of("1"), new Text("z"), new Text("a")); - TabletMetadata tm = EasyMock.createMock(TabletMetadata.class); - EasyMock.expect(tm.getExtent()).andReturn(extent).anyTimes(); - - CompactionJob cj1 = EasyMock.createMock(CompactionJob.class); - EasyMock.expect(cj1.getGroup()).andReturn(GROUP).anyTimes(); - EasyMock.expect(cj1.getPriority()).andReturn((short) 10).anyTimes(); - EasyMock.expect(cj1.getFiles()).andReturn(Set.of(file1)).anyTimes(); - - CompactionJob cj2 = EasyMock.createMock(CompactionJob.class); - EasyMock.expect(cj2.getGroup()).andReturn(GROUP).anyTimes(); - EasyMock.expect(cj2.getPriority()).andReturn((short) 5).anyTimes(); - EasyMock.expect(cj2.getFiles()).andReturn(Set.of(file2, file3, file4)).anyTimes(); - - EasyMock.replay(tm, cj1, cj2); - - CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2); - assertEquals(2, queue.add(tm, List.of(cj1, cj2), 1L)); - - assertFalse(queue.closeIfEmpty()); - - EasyMock.verify(tm, cj1, cj2); - - assertEquals(5L, queue.getLowestPriority()); - assertEquals(2, queue.getMaxSize()); - assertEquals(0, queue.getDequeuedJobs()); - assertEquals(0, queue.getRejectedJobs()); - assertEquals(2, queue.getQueuedJobs()); - MetaJob job = queue.poll(); - assertEquals(cj1, job.getJob()); - assertEquals(tm, job.getTabletMetadata()); - assertEquals(1, queue.getDequeuedJobs()); - - MetaJob job2 = queue.poll(); - assertEquals(cj2, job2.getJob()); - assertEquals(tm, job2.getTabletMetadata()); - assertEquals(2, queue.getDequeuedJobs()); - - assertTrue(queue.closeIfEmpty()); - - assertEquals(-1, queue.add(tm, List.of(cj1, cj2), 1L)); - - } - private static int counter = 1; private Pair<TabletMetadata,CompactionJob> createJob() { diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java index 73aa404295..a9f360b4cd 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java @@ -19,7 +19,9 @@ package org.apache.accumulo.manager.compaction.queue; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.net.URI; import java.net.URISyntaxException; @@ -332,4 +334,57 @@ public class CompactionJobQueuesTest { // The background threads should have seen every job that was added assertEquals(numToAdd, totalSeen); } + + @Test + public void testGetAsync() throws Exception { + CompactionJobQueues jobQueues = new CompactionJobQueues(100); + + var tid = TableId.of("1"); + var extent1 = new KeyExtent(tid, new Text("z"), new Text("q")); + var extent2 = new KeyExtent(tid, new Text("q"), new Text("l")); + var extent3 = new KeyExtent(tid, new Text("l"), new Text("c")); + var extent4 = new KeyExtent(tid, new Text("c"), new Text("a")); + + var tm1 = TabletMetadata.builder(extent1).build(); + var tm2 = TabletMetadata.builder(extent2).build(); + var tm3 = TabletMetadata.builder(extent3).build(); + var tm4 = TabletMetadata.builder(extent4).build(); + + var cg1 = CompactorGroupId.of("CG1"); + + var future1 = jobQueues.getAsync(cg1); + var future2 = jobQueues.getAsync(cg1); + + assertFalse(future1.isDone()); + assertFalse(future2.isDone()); + + jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1))); + jobQueues.add(tm2, List.of(newJob((short) 2, 6, cg1))); + jobQueues.add(tm3, List.of(newJob((short) 3, 7, cg1))); + jobQueues.add(tm4, List.of(newJob((short) 4, 8, cg1))); + + var future3 = jobQueues.getAsync(cg1); + var future4 = jobQueues.getAsync(cg1); + + assertTrue(future1.isDone()); + assertTrue(future2.isDone()); + assertTrue(future3.isDone()); + assertTrue(future4.isDone()); + + assertEquals(extent1, future1.get().getTabletMetadata().getExtent()); + assertEquals(extent2, future2.get().getTabletMetadata().getExtent()); + assertEquals(extent4, future3.get().getTabletMetadata().getExtent()); + assertEquals(extent3, future4.get().getTabletMetadata().getExtent()); + + // test cancelling a future + var future5 = jobQueues.getAsync(cg1); + assertFalse(future5.isDone()); + future5.cancel(false); + var future6 = jobQueues.getAsync(cg1); + assertFalse(future6.isDone()); + // since future5 was canceled, this addition should go to future6 + jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1))); + assertTrue(future6.isDone()); + assertEquals(extent1, future6.get().getTabletMetadata().getExtent()); + } }