This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new e42a03198f periodically clean up canceled futures in compaction job prioq (#4727) e42a03198f is described below commit e42a03198fea366842403761701585bbfc09a430 Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri Jul 5 11:14:19 2024 -0700 periodically clean up canceled futures in compaction job prioq (#4727) For the case where nothing is ever added to a compaction job prioq and futures are continually obtained and canceled these canceled futures would keep building up in memory. This commit fixes that by periodically cleaning out canceled futures. --- .../queue/CompactionJobPriorityQueue.java | 22 +++++++++++++ .../queue/CompactionJobPriorityQueueTest.java | 36 ++++++++++++++++++++++ .../compaction/queue/CompactionJobQueuesTest.java | 16 ++++++++-- 3 files changed, 71 insertions(+), 3 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java index 9909ccb7f9..c91b8becf1 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java @@ -42,6 +42,7 @@ import org.apache.accumulo.core.util.compaction.CompactionJobPrioritizer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; /** @@ -59,6 +60,9 @@ public class CompactionJobPriorityQueue { private final CompactorGroupId groupId; + @VisibleForTesting + static final int FUTURE_CHECK_THRESHOLD = 10_000; + private class CjpqKey implements Comparable<CjpqKey> { private final CompactionJob job; @@ -107,6 +111,7 @@ public class CompactionJobPriorityQueue { private final AtomicLong rejectedJobs; private final AtomicLong dequeuedJobs; private final ArrayDeque<CompletableFuture<CompactionJobQueues.MetaJob>> futures; + private long futuresAdded = 0; private static class TabletJobs { final long generation; @@ -244,9 +249,26 @@ public class CompactionJobPriorityQueue { // be completed when something does arrive. CompletableFuture<CompactionJobQueues.MetaJob> future = new CompletableFuture<>(); futures.add(future); + futuresAdded++; + // Handle the case where nothing is ever being added to this queue and futures are constantly + // being obtained and cancelled. If nothing is done these canceled futures would just keep + // building up in memory. The following code periodically checks to see if there are canceled + // futures to remove. + if (futuresAdded % FUTURE_CHECK_THRESHOLD == 0 + && futures.size() >= 2 * FUTURE_CHECK_THRESHOLD) { + futures.removeIf(CompletableFuture::isDone); + // It is not expected that the future we just created would be done, if it were it would have + // been removed. + Preconditions.checkState(!future.isDone()); + } return future; } + @VisibleForTesting + synchronized int futuresSize() { + return futures.size(); + } + // exists for tests synchronized CompactionJobQueues.MetaJob peek() { var firstEntry = jobQueue.firstEntry(); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java index ddf9a3016e..5464b90a33 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java @@ -20,11 +20,14 @@ package org.apache.accumulo.manager.compaction.queue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.CompletableFuture; import org.apache.accumulo.core.client.admin.compaction.CompactableFile; import org.apache.accumulo.core.data.TableId; @@ -264,4 +267,37 @@ public class CompactionJobPriorityQueueTest { assertEquals(100, matchesSeen); } + + /** + * Test to ensure that canceled futures do not build up in memory. + */ + @Test + public void testAsyncCancelCleanup() { + CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 100); + + List<CompletableFuture<MetaJob>> futures = new ArrayList<>(); + + int maxFuturesSize = 0; + + // Add 11 below so that cadence of clearing differs from the internal check cadence + final int CANCEL_THRESHOLD = CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD / 10 + 11; + final int ITERATIONS = CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD * 20; + + for (int x = 0; x < ITERATIONS; x++) { + futures.add(queue.getAsync()); + + maxFuturesSize = Math.max(maxFuturesSize, queue.futuresSize()); + + if (futures.size() >= CANCEL_THRESHOLD) { + futures.forEach(f -> f.cancel(true)); + futures.clear(); + } + } + + maxFuturesSize = Math.max(maxFuturesSize, queue.futuresSize()); + + assertTrue(maxFuturesSize + < 2 * (CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD + CANCEL_THRESHOLD)); + assertTrue(maxFuturesSize > 2 * CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD); + } } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java index a9f360b4cd..09ae416091 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; @@ -45,6 +46,7 @@ import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.spi.compaction.CompactionJob; import org.apache.accumulo.core.spi.compaction.CompactionKind; import org.apache.accumulo.core.spi.compaction.CompactorGroupId; +import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.compaction.CompactionJobImpl; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.Test; @@ -376,15 +378,23 @@ public class CompactionJobQueuesTest { assertEquals(extent4, future3.get().getTabletMetadata().getExtent()); assertEquals(extent3, future4.get().getTabletMetadata().getExtent()); - // test cancelling a future + // test cancelling a future and having a future timeout var future5 = jobQueues.getAsync(cg1); assertFalse(future5.isDone()); future5.cancel(false); var future6 = jobQueues.getAsync(cg1); assertFalse(future6.isDone()); - // since future5 was canceled, this addition should go to future6 + future6.orTimeout(10, TimeUnit.MILLISECONDS); + // sleep for 20 millis, this should cause future6 to be timed out + UtilWaitThread.sleep(20); + var future7 = jobQueues.getAsync(cg1); + assertFalse(future7.isDone()); + // since future5 was canceled and future6 timed out, this addition should go to future7 jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1))); + assertTrue(future7.isDone()); + assertEquals(extent1, future7.get().getTabletMetadata().getExtent()); + assertTrue(future5.isDone()); + assertTrue(future6.isCompletedExceptionally()); assertTrue(future6.isDone()); - assertEquals(extent1, future6.get().getTabletMetadata().getExtent()); } }