This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new e42a03198f periodically clean up canceled futures in compaction job 
prioq (#4727)
e42a03198f is described below

commit e42a03198fea366842403761701585bbfc09a430
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Fri Jul 5 11:14:19 2024 -0700

    periodically clean up canceled futures in compaction job prioq (#4727)
    
    For the case where nothing is ever added to a compaction job prioq
    and futures are continually obtained and canceled these canceled
    futures would keep building up in memory.  This commit fixes that
    by periodically cleaning out canceled futures.
---
 .../queue/CompactionJobPriorityQueue.java          | 22 +++++++++++++
 .../queue/CompactionJobPriorityQueueTest.java      | 36 ++++++++++++++++++++++
 .../compaction/queue/CompactionJobQueuesTest.java  | 16 ++++++++--
 3 files changed, 71 insertions(+), 3 deletions(-)

diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
index 9909ccb7f9..c91b8becf1 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
@@ -42,6 +42,7 @@ import 
org.apache.accumulo.core.util.compaction.CompactionJobPrioritizer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 /**
@@ -59,6 +60,9 @@ public class CompactionJobPriorityQueue {
 
   private final CompactorGroupId groupId;
 
+  @VisibleForTesting
+  static final int FUTURE_CHECK_THRESHOLD = 10_000;
+
   private class CjpqKey implements Comparable<CjpqKey> {
     private final CompactionJob job;
 
@@ -107,6 +111,7 @@ public class CompactionJobPriorityQueue {
   private final AtomicLong rejectedJobs;
   private final AtomicLong dequeuedJobs;
   private final ArrayDeque<CompletableFuture<CompactionJobQueues.MetaJob>> 
futures;
+  private long futuresAdded = 0;
 
   private static class TabletJobs {
     final long generation;
@@ -244,9 +249,26 @@ public class CompactionJobPriorityQueue {
     // be completed when something does arrive.
     CompletableFuture<CompactionJobQueues.MetaJob> future = new 
CompletableFuture<>();
     futures.add(future);
+    futuresAdded++;
+    // Handle the case where nothing is ever being added to this queue and 
futures are constantly
+    // being obtained and cancelled. If nothing is done these canceled futures 
would just keep
+    // building up in memory. The following code periodically checks to see if 
there are canceled
+    // futures to remove.
+    if (futuresAdded % FUTURE_CHECK_THRESHOLD == 0
+        && futures.size() >= 2 * FUTURE_CHECK_THRESHOLD) {
+      futures.removeIf(CompletableFuture::isDone);
+      // It is not expected that the future we just created would be done, if 
it were it would have
+      // been removed.
+      Preconditions.checkState(!future.isDone());
+    }
     return future;
   }
 
+  @VisibleForTesting
+  synchronized int futuresSize() {
+    return futures.size();
+  }
+
   // exists for tests
   synchronized CompactionJobQueues.MetaJob peek() {
     var firstEntry = jobQueue.firstEntry();
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
index ddf9a3016e..5464b90a33 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
@@ -20,11 +20,14 @@ package org.apache.accumulo.manager.compaction.queue;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
 
 import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
 import org.apache.accumulo.core.data.TableId;
@@ -264,4 +267,37 @@ public class CompactionJobPriorityQueueTest {
 
     assertEquals(100, matchesSeen);
   }
+
+  /**
+   * Test to ensure that canceled futures do not build up in memory.
+   */
+  @Test
+  public void testAsyncCancelCleanup() {
+    CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 
100);
+
+    List<CompletableFuture<MetaJob>> futures = new ArrayList<>();
+
+    int maxFuturesSize = 0;
+
+    // Add 11 below so that cadence of clearing differs from the internal 
check cadence
+    final int CANCEL_THRESHOLD = 
CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD / 10 + 11;
+    final int ITERATIONS = CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD * 
20;
+
+    for (int x = 0; x < ITERATIONS; x++) {
+      futures.add(queue.getAsync());
+
+      maxFuturesSize = Math.max(maxFuturesSize, queue.futuresSize());
+
+      if (futures.size() >= CANCEL_THRESHOLD) {
+        futures.forEach(f -> f.cancel(true));
+        futures.clear();
+      }
+    }
+
+    maxFuturesSize = Math.max(maxFuturesSize, queue.futuresSize());
+
+    assertTrue(maxFuturesSize
+        < 2 * (CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD + 
CANCEL_THRESHOLD));
+    assertTrue(maxFuturesSize > 2 * 
CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD);
+  }
 }
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
index a9f360b4cd..09ae416091 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Stream;
 
@@ -45,6 +46,7 @@ import 
org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.spi.compaction.CompactionJob;
 import org.apache.accumulo.core.spi.compaction.CompactionKind;
 import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
+import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.util.compaction.CompactionJobImpl;
 import org.apache.hadoop.io.Text;
 import org.junit.jupiter.api.Test;
@@ -376,15 +378,23 @@ public class CompactionJobQueuesTest {
     assertEquals(extent4, future3.get().getTabletMetadata().getExtent());
     assertEquals(extent3, future4.get().getTabletMetadata().getExtent());
 
-    // test cancelling a future
+    // test cancelling a future and having a future timeout
     var future5 = jobQueues.getAsync(cg1);
     assertFalse(future5.isDone());
     future5.cancel(false);
     var future6 = jobQueues.getAsync(cg1);
     assertFalse(future6.isDone());
-    // since future5 was canceled, this addition should go to future6
+    future6.orTimeout(10, TimeUnit.MILLISECONDS);
+    // sleep for 20 millis, this should cause future6 to be timed out
+    UtilWaitThread.sleep(20);
+    var future7 = jobQueues.getAsync(cg1);
+    assertFalse(future7.isDone());
+    // since future5 was canceled and future6 timed out, this addition should 
go to future7
     jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1)));
+    assertTrue(future7.isDone());
+    assertEquals(extent1, future7.get().getTabletMetadata().getExtent());
+    assertTrue(future5.isDone());
+    assertTrue(future6.isCompletedExceptionally());
     assertTrue(future6.isDone());
-    assertEquals(extent1, future6.get().getTabletMetadata().getExtent());
   }
 }

Reply via email to