This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new aa50792502 Removes getAsync from compaction jobs queues (#6259)
aa50792502 is described below
commit aa507925021794de20a85c6eee16d01b23e2c88e
Author: Keith Turner <[email protected]>
AuthorDate: Wed Apr 1 19:42:16 2026 -0400
Removes getAsync from compaction jobs queues (#6259)
Compaction job queues had a getAsync method that were not used. This
was causing complexity w/ #6217, so removing them. Can add them back
later if needed.
---
.../queue/CompactionJobPriorityQueue.java | 55 +--------------
.../compaction/queue/CompactionJobQueues.java | 13 ----
.../queue/CompactionJobPriorityQueueTest.java | 35 ----------
.../compaction/queue/CompactionJobQueuesTest.java | 79 ----------------------
4 files changed, 1 insertion(+), 181 deletions(-)
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
index f0b0bec8c6..2171de3c3d 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
@@ -21,7 +21,6 @@ package org.apache.accumulo.manager.compaction.queue;
import static com.google.common.base.Preconditions.checkState;
import java.time.Duration;
-import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -31,7 +30,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -129,8 +127,6 @@ public class CompactionJobPriorityQueue {
private final AtomicLong maxSize;
private final AtomicLong rejectedJobs;
private final AtomicLong dequeuedJobs;
- private final ArrayDeque<CompletableFuture<CompactionJob>> futures;
- private long futuresAdded = 0;
private final Map<KeyExtent,Timer> jobAges;
private final Supplier<CompactionJobPriorityQueueStats> jobQueueStats;
private final AtomicReference<Optional<io.micrometer.core.instrument.Timer>>
jobQueueTimer;
@@ -159,7 +155,6 @@ public class CompactionJobPriorityQueue {
this.groupId = groupId;
this.rejectedJobs = new AtomicLong(0);
this.dequeuedJobs = new AtomicLong(0);
- this.futures = new ArrayDeque<>();
this.jobAges = new ConcurrentHashMap<>();
this.jobQueueStats = Suppliers.memoizeWithExpiration(
() -> new CompactionJobPriorityQueueStats(jobAges), 5,
TimeUnit.SECONDS);
@@ -198,25 +193,7 @@ public class CompactionJobPriorityQueue {
HashSet<CjpqKey> newEntries = new HashSet<>(jobs.size());
int jobsAdded = 0;
- outer: for (CompactionJob job : jobs) {
- var future = futures.poll();
- while (future != null) {
- // its expected that if futures are present then the queue is empty,
if this is not true
- // then there is a bug
- Preconditions.checkState(jobQueue.isEmpty());
- // Queue should be empty so jobAges should be empty
- Preconditions.checkState(jobAges.isEmpty());
- if (future.complete(job)) {
- // successfully completed a future with this job, so do not need to
queue the job
- jobsAdded++;
- // Record a time of 0 as job as we were able to complete immediately
and there
- // were no jobs waiting
- jobQueueTimer.get().ifPresent(jqt -> jqt.record(Duration.ZERO));
- continue outer;
- } // else the future was canceled or timed out so could not complete it
- future = futures.poll();
- }
-
+ for (CompactionJob job : jobs) {
CjpqKey cjqpKey = addJobToQueue(extent, job);
if (cjqpKey != null) {
checkState(newEntries.add(cjqpKey));
@@ -289,36 +266,6 @@ public class CompactionJobPriorityQueue {
return first == null ? null : first.getValue().job;
}
- public synchronized CompletableFuture<CompactionJob> getAsync() {
- var job = poll();
- if (job != null) {
- return CompletableFuture.completedFuture(job);
- }
-
- // There is currently nothing in the queue, so create an uncompleted
future and queue it up to
- // be completed when something does arrive.
- CompletableFuture<CompactionJob> future = new CompletableFuture<>();
- futures.add(future);
- futuresAdded++;
- // Handle the case where nothing is ever being added to this queue and
futures are constantly
- // being obtained and cancelled. If nothing is done these canceled futures
would just keep
- // building up in memory. The following code periodically checks to see if
there are canceled
- // futures to remove.
- if (futuresAdded % FUTURE_CHECK_THRESHOLD == 0
- && futures.size() >= 2 * FUTURE_CHECK_THRESHOLD) {
- futures.removeIf(CompletableFuture::isDone);
- // It is not expected that the future we just created would be done, if
it were it would have
- // been removed.
- Preconditions.checkState(!future.isDone());
- }
- return future;
- }
-
- @VisibleForTesting
- synchronized int futuresSize() {
- return futures.size();
- }
-
// exists for tests
synchronized CompactionJob peek() {
var firstEntry = jobQueue.firstEntry();
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
index e87c231bed..983befa047 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
@@ -22,7 +22,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentHashMap.KeySetView;
import java.util.concurrent.atomic.AtomicLong;
@@ -120,18 +119,6 @@ public class CompactionJobQueues {
return count;
}
- /**
- * Asynchronously get a compaction job from the queue. If the queue
currently has jobs then a
- * completed future will be returned containing the highest priority job in
the queue. If the
- * queue is currently empty, then an uncompleted future will be returned and
later when something
- * is added to the queue the future will be completed.
- */
- public CompletableFuture<CompactionJob> getAsync(ResourceGroupId groupId) {
- var pq = priorityQueues.computeIfAbsent(groupId,
- gid -> new CompactionJobPriorityQueue(gid, queueSize,
ResolvedCompactionJob.WEIGHER));
- return pq.getAsync();
- }
-
public CompactionJob poll(ResourceGroupId groupId) {
var prioQ = priorityQueues.get(groupId);
if (prioQ == null) {
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
index 7fc20955f5..5b49e81432 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
@@ -23,12 +23,10 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
-import java.util.concurrent.CompletableFuture;
import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
import org.apache.accumulo.core.data.ResourceGroupId;
@@ -293,39 +291,6 @@ public class CompactionJobPriorityQueueTest {
assertEquals(0, stats.getAvgAge().toMillis());
}
- /**
- * Test to ensure that canceled futures do not build up in memory.
- */
- @Test
- public void testAsyncCancelCleanup() {
- CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP,
100, mj -> 1);
-
- List<CompletableFuture<CompactionJob>> futures = new ArrayList<>();
-
- int maxFuturesSize = 0;
-
- // Add 11 below so that cadence of clearing differs from the internal
check cadence
- final int CANCEL_THRESHOLD =
CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD / 10 + 11;
- final int ITERATIONS = CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD *
20;
-
- for (int x = 0; x < ITERATIONS; x++) {
- futures.add(queue.getAsync());
-
- maxFuturesSize = Math.max(maxFuturesSize, queue.futuresSize());
-
- if (futures.size() >= CANCEL_THRESHOLD) {
- futures.forEach(f -> f.cancel(true));
- futures.clear();
- }
- }
-
- maxFuturesSize = Math.max(maxFuturesSize, queue.futuresSize());
-
- assertTrue(maxFuturesSize
- < 2 * (CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD +
CANCEL_THRESHOLD));
- assertTrue(maxFuturesSize > 2 *
CompactionJobPriorityQueue.FUTURE_CHECK_THRESHOLD);
- }
-
@Test
public void testResetMaxSize() {
TreeSet<CompactionJob> expected = new
TreeSet<>(CompactionJobPrioritizer.JOB_COMPARATOR);
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
index d3c1acd469..d34cc2666d 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
@@ -19,11 +19,7 @@
package org.apache.accumulo.manager.compaction.queue;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import java.net.URI;
import java.net.URISyntaxException;
@@ -31,11 +27,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
@@ -328,78 +321,6 @@ public class CompactionJobQueuesTest {
assertEquals(numToAdd, totalSeen);
}
- @Test
- public void testGetAsync() throws Exception {
- CompactionJobQueues jobQueues = new CompactionJobQueues(1000000);
-
- var tid = TableId.of("1");
- var extent1 = new KeyExtent(tid, new Text("z"), new Text("q"));
- var extent2 = new KeyExtent(tid, new Text("q"), new Text("l"));
- var extent3 = new KeyExtent(tid, new Text("l"), new Text("c"));
- var extent4 = new KeyExtent(tid, new Text("c"), new Text("a"));
-
- var cg1 = ResourceGroupId.of("CG1");
-
- var job1 = newJob((short) 1, 5, cg1);
- var job2 = newJob((short) 2, 6, cg1);
- var job3 = newJob((short) 3, 7, cg1);
- var job4 = newJob((short) 4, 8, cg1);
-
- var future1 = jobQueues.getAsync(cg1);
- var future2 = jobQueues.getAsync(cg1);
-
- assertFalse(future1.isDone());
- assertFalse(future2.isDone());
-
- jobQueues.add(extent1, List.of(job1));
- jobQueues.add(extent2, List.of(job2));
- // Futures were immediately completed so nothing should be queued
- assertTrue(jobQueues.getQueue(cg1).getJobAges().isEmpty());
-
- jobQueues.add(extent3, List.of(job3));
- jobQueues.add(extent4, List.of(job4));
- // No futures available, so jobAges should exist for 2 tablets
- assertEquals(2, jobQueues.getQueue(cg1).getJobAges().size());
-
- var future3 = jobQueues.getAsync(cg1);
- var future4 = jobQueues.getAsync(cg1);
-
- // Should be back to 0 size after futures complete
- assertTrue(jobQueues.getQueue(cg1).getJobAges().isEmpty());
-
- assertTrue(future1.isDone());
- assertTrue(future2.isDone());
- assertTrue(future3.isDone());
- assertTrue(future4.isDone());
-
- assertEquals(job1, future1.get());
- assertEquals(job2, future2.get());
- assertEquals(job4, future3.get());
- assertEquals(job3, future4.get());
-
- // test cancelling a future and having a future timeout
- var future5 = jobQueues.getAsync(cg1);
- assertFalse(future5.isDone());
- future5.cancel(false);
- var future6 = jobQueues.getAsync(cg1);
- assertFalse(future6.isDone());
- future6.orTimeout(10, TimeUnit.MILLISECONDS);
- // Wait for future6 to timeout to make sure future7 will
- // receive the job when added to the queue
- var ex = assertThrows(ExecutionException.class, future6::get);
- assertInstanceOf(TimeoutException.class, ex.getCause());
- var future7 = jobQueues.getAsync(cg1);
- assertFalse(future7.isDone());
- // since future5 was canceled and future6 timed out, this addition should
go to future7
- var job5 = newJob((short) 1, 5, cg1);
- jobQueues.add(extent1, List.of(job5));
- assertTrue(future7.isDone());
- assertEquals(job5, future7.get());
- assertTrue(future5.isDone());
- assertTrue(future6.isCompletedExceptionally());
- assertTrue(future6.isDone());
- }
-
@Test
public void testResetSize() throws Exception {
CompactionJobQueues jobQueues = new CompactionJobQueues(1000000);