This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new aa50792502 Removes getAsync from compaction jobs queues (#6259)
aa50792502 is described below

commit aa507925021794de20a85c6eee16d01b23e2c88e
Author: Keith Turner <[email protected]>
AuthorDate: Wed Apr 1 19:42:16 2026 -0400

    Removes getAsync from compaction jobs queues (#6259)
    
    Compaction job queues had a getAsync method that were not used.  This
    was causing complexity w/ #6217, so removing them.  Can add them back
    later if needed.
---
 .../queue/CompactionJobPriorityQueue.java          | 55 +--------------
 .../compaction/queue/CompactionJobQueues.java      | 13 ----
 .../queue/CompactionJobPriorityQueueTest.java      | 35 ----------
 .../compaction/queue/CompactionJobQueuesTest.java  | 79 ----------------------
 4 files changed, 1 insertion(+), 181 deletions(-)

diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
index f0b0bec8c6..2171de3c3d 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
@@ -21,7 +21,6 @@ package org.apache.accumulo.manager.compaction.queue;
 import static com.google.common.base.Preconditions.checkState;
 
 import java.time.Duration;
-import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -31,7 +30,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -129,8 +127,6 @@ public class CompactionJobPriorityQueue {
   private final AtomicLong maxSize;
   private final AtomicLong rejectedJobs;
   private final AtomicLong dequeuedJobs;
-  private final ArrayDeque<CompletableFuture<CompactionJob>> futures;
-  private long futuresAdded = 0;
   private final Map<KeyExtent,Timer> jobAges;
   private final Supplier<CompactionJobPriorityQueueStats> jobQueueStats;
   private final AtomicReference<Optional<io.micrometer.core.instrument.Timer>> 
jobQueueTimer;
@@ -159,7 +155,6 @@ public class CompactionJobPriorityQueue {
     this.groupId = groupId;
     this.rejectedJobs = new AtomicLong(0);
     this.dequeuedJobs = new AtomicLong(0);
-    this.futures = new ArrayDeque<>();
     this.jobAges = new ConcurrentHashMap<>();
     this.jobQueueStats = Suppliers.memoizeWithExpiration(
         () -> new CompactionJobPriorityQueueStats(jobAges), 5, 
TimeUnit.SECONDS);
@@ -198,25 +193,7 @@ public class CompactionJobPriorityQueue {
     HashSet<CjpqKey> newEntries = new HashSet<>(jobs.size());
 
     int jobsAdded = 0;
-    outer: for (CompactionJob job : jobs) {
-      var future = futures.poll();
-      while (future != null) {
-        // its expected that if futures are present then the queue is empty, 
if this is not true
-        // then there is a bug
-        Preconditions.checkState(jobQueue.isEmpty());
-        // Queue should be empty so jobAges should be empty
-        Preconditions.checkState(jobAges.isEmpty());
-        if (future.complete(job)) {
-          // successfully completed a future with this job, so do not need to 
queue the job
-          jobsAdded++;
-          // Record a time of 0 as job as we were able to complete immediately 
and there
-          // were no jobs waiting
-          jobQueueTimer.get().ifPresent(jqt -> jqt.record(Duration.ZERO));
-          continue outer;
-        } // else the future was canceled or timed out so could not complete it
-        future = futures.poll();
-      }
-
+    for (CompactionJob job : jobs) {
       CjpqKey cjqpKey = addJobToQueue(extent, job);
       if (cjqpKey != null) {
         checkState(newEntries.add(cjqpKey));
@@ -289,36 +266,6 @@ public class CompactionJobPriorityQueue {
     return first == null ? null : first.getValue().job;
   }
 
-  public synchronized CompletableFuture<CompactionJob> getAsync() {
-    var job = poll();
-    if (job != null) {
-      return CompletableFuture.completedFuture(job);
-    }
-
-    // There is currently nothing in the queue, so create an uncompleted 
future and queue it up to
-    // be completed when something does arrive.
-    CompletableFuture<CompactionJob> future = new CompletableFuture<>();
-    futures.add(future);
-    futuresAdded++;
-    // Handle the case where nothing is ever being added to this queue and 
futures are constantly
-    // being obtained and cancelled. If nothing is done these canceled futures 
would just keep
-    // building up in memory. The following code periodically checks to see if 
there are canceled
-    // futures to remove.
-    if (futuresAdded % FUTURE_CHECK_THRESHOLD == 0
-        && futures.size() >= 2 * FUTURE_CHECK_THRESHOLD) {
-      futures.removeIf(CompletableFuture::isDone);
-      // It is not expected that the future we just created would be done, if 
it were it would have
-      // been removed.
-      Preconditions.checkState(!future.isDone());
-    }
-    return future;
-  }
-
-  @VisibleForTesting
-  synchronized int futuresSize() {
-    return futures.size();
-  }
-
   // exists for tests
   synchronized CompactionJob peek() {
     var firstEntry = jobQueue.firstEntry();
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
index e87c231bed..983befa047 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
@@ -22,7 +22,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumMap;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap.KeySetView;
 import java.util.concurrent.atomic.AtomicLong;
@@ -120,18 +119,6 @@ public class CompactionJobQueues {
     return count;
   }
 
-  /**
-   * Asynchronously get a compaction job from the queue. If the queue 
currently has jobs then a
-   * completed future will be returned containing the highest priority job in 
the queue. If the
-   * queue is currently empty, then an uncompleted future will be returned and 
later when something
-   * is added to the queue the future will be completed.
-   */
-  public CompletableFuture<CompactionJob> getAsync(ResourceGroupId groupId) {
-    var pq = priorityQueues.computeIfAbsent(groupId,
-        gid -> new CompactionJobPriorityQueue(gid, queueSize, 
ResolvedCompactionJob.WEIGHER));
-    return pq.getAsync();
-  }
-
   public CompactionJob poll(ResourceGroupId groupId) {
     var prioQ = priorityQueues.get(groupId);
     if (prioQ == null) {
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
index 7fc20955f5..5b49e81432 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
@@ -23,12 +23,10 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.concurrent.CompletableFuture;
 
 import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
 import org.apache.accumulo.core.data.ResourceGroupId;
@@ -293,39 +291,6 @@ public class CompactionJobPriorityQueueTest {
     assertEquals(0, stats.getAvgAge().toMillis());
   }
 
-  /**
-   * Test to ensure that canceled futures do not build up in memory.
-   */
-  @Test
-  public void testAsyncCancelCleanup() {
-    CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 
100, mj -> 1);
-
-    List<CompletableFuture<CompactionJob>> futures = new ArrayList<>();
-
-    int maxFuturesSize = 0;
-
-    // Add 11 below so that cadence of clearing differs from the internal 
check cadence
-    final int CANCEL_THRESHOLD = 
CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD / 10 + 11;
-    final int ITERATIONS = CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD * 
20;
-
-    for (int x = 0; x < ITERATIONS; x++) {
-      futures.add(queue.getAsync());
-
-      maxFuturesSize = Math.max(maxFuturesSize, queue.futuresSize());
-
-      if (futures.size() >= CANCEL_THRESHOLD) {
-        futures.forEach(f -> f.cancel(true));
-        futures.clear();
-      }
-    }
-
-    maxFuturesSize = Math.max(maxFuturesSize, queue.futuresSize());
-
-    assertTrue(maxFuturesSize
-        < 2 * (CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD + 
CANCEL_THRESHOLD));
-    assertTrue(maxFuturesSize > 2 * 
CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD);
-  }
-
   @Test
   public void testResetMaxSize() {
     TreeSet<CompactionJob> expected = new 
TreeSet<>(CompactionJobPrioritizer.JOB_COMPARATOR);
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
index d3c1acd469..d34cc2666d 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
@@ -19,11 +19,7 @@
 package org.apache.accumulo.manager.compaction.queue;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -31,11 +27,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Stream;
 
@@ -328,78 +321,6 @@ public class CompactionJobQueuesTest {
     assertEquals(numToAdd, totalSeen);
   }
 
-  @Test
-  public void testGetAsync() throws Exception {
-    CompactionJobQueues jobQueues = new CompactionJobQueues(1000000);
-
-    var tid = TableId.of("1");
-    var extent1 = new KeyExtent(tid, new Text("z"), new Text("q"));
-    var extent2 = new KeyExtent(tid, new Text("q"), new Text("l"));
-    var extent3 = new KeyExtent(tid, new Text("l"), new Text("c"));
-    var extent4 = new KeyExtent(tid, new Text("c"), new Text("a"));
-
-    var cg1 = ResourceGroupId.of("CG1");
-
-    var job1 = newJob((short) 1, 5, cg1);
-    var job2 = newJob((short) 2, 6, cg1);
-    var job3 = newJob((short) 3, 7, cg1);
-    var job4 = newJob((short) 4, 8, cg1);
-
-    var future1 = jobQueues.getAsync(cg1);
-    var future2 = jobQueues.getAsync(cg1);
-
-    assertFalse(future1.isDone());
-    assertFalse(future2.isDone());
-
-    jobQueues.add(extent1, List.of(job1));
-    jobQueues.add(extent2, List.of(job2));
-    // Futures were immediately completed so nothing should be queued
-    assertTrue(jobQueues.getQueue(cg1).getJobAges().isEmpty());
-
-    jobQueues.add(extent3, List.of(job3));
-    jobQueues.add(extent4, List.of(job4));
-    // No futures available, so jobAges should exist for 2 tablets
-    assertEquals(2, jobQueues.getQueue(cg1).getJobAges().size());
-
-    var future3 = jobQueues.getAsync(cg1);
-    var future4 = jobQueues.getAsync(cg1);
-
-    // Should be back to 0 size after futures complete
-    assertTrue(jobQueues.getQueue(cg1).getJobAges().isEmpty());
-
-    assertTrue(future1.isDone());
-    assertTrue(future2.isDone());
-    assertTrue(future3.isDone());
-    assertTrue(future4.isDone());
-
-    assertEquals(job1, future1.get());
-    assertEquals(job2, future2.get());
-    assertEquals(job4, future3.get());
-    assertEquals(job3, future4.get());
-
-    // test cancelling a future and having a future timeout
-    var future5 = jobQueues.getAsync(cg1);
-    assertFalse(future5.isDone());
-    future5.cancel(false);
-    var future6 = jobQueues.getAsync(cg1);
-    assertFalse(future6.isDone());
-    future6.orTimeout(10, TimeUnit.MILLISECONDS);
-    // Wait for future6 to timeout to make sure future7 will
-    // receive the job when added to the queue
-    var ex = assertThrows(ExecutionException.class, future6::get);
-    assertInstanceOf(TimeoutException.class, ex.getCause());
-    var future7 = jobQueues.getAsync(cg1);
-    assertFalse(future7.isDone());
-    // since future5 was canceled and future6 timed out, this addition should 
go to future7
-    var job5 = newJob((short) 1, 5, cg1);
-    jobQueues.add(extent1, List.of(job5));
-    assertTrue(future7.isDone());
-    assertEquals(job5, future7.get());
-    assertTrue(future5.isDone());
-    assertTrue(future6.isCompletedExceptionally());
-    assertTrue(future6.isDone());
-  }
-
   @Test
   public void testResetSize() throws Exception {
     CompactionJobQueues jobQueues = new CompactionJobQueues(1000000);

Reply via email to