This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new f320ca36d9 Adds race condtion test for CompactionJobQueues (#4310)
f320ca36d9 is described below

commit f320ca36d9b84cea97522ac4d21b327b3fe11cc6
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Tue Feb 27 14:30:40 2024 -0500

    Adds race condtion test for CompactionJobQueues (#4310)
    
    Adds a unit test that concurrently calls add() and poll() on 
CompactionJobsQueues.
    This test validates that race conditions between these methods are handled. 
Fixed
    a race condition found by running the test.
---
 .../queue/CompactionJobPriorityQueue.java          |   6 +
 .../compaction/queue/CompactionJobQueues.java      |  11 +-
 .../queue/CompactionJobPriorityQueueTest.java      |   2 +-
 .../compaction/queue/CompactionJobQueuesTest.java  | 121 +++++++++++++++++++++
 4 files changed, 135 insertions(+), 5 deletions(-)

diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
index 6c28fe5ecf..e4aa059e95 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java
@@ -116,8 +116,14 @@ public class CompactionJobPriorityQueue {
     this.dequeuedJobs = new AtomicLong(0);
   }
 
+  /**
+   * @return the number of jobs added. If the queue is closed returns -1
+   */
   public synchronized int add(TabletMetadata tabletMetadata, 
Collection<CompactionJob> jobs) {
     Preconditions.checkArgument(jobs.stream().allMatch(job -> 
job.getGroup().equals(groupId)));
+    if (closed.get()) {
+      return -1;
+    }
 
     removePreviousSubmissions(tabletMetadata.getExtent());
 
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
index 77395fd253..b47ec92675 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java
@@ -29,6 +29,8 @@ import 
org.apache.accumulo.core.spi.compaction.CompactorGroupId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 public class CompactionJobQueues {
 
   private static final Logger log = 
LoggerFactory.getLogger(CompactionJobQueues.class);
@@ -145,16 +147,17 @@ public class CompactionJobQueues {
   private void add(TabletMetadata tabletMetadata, CompactorGroupId groupId,
       Collection<CompactionJob> jobs) {
 
-    // TODO log level
-    if (log.isDebugEnabled()) {
-      log.debug("Adding jobs to queue {} {} {}", groupId, 
tabletMetadata.getExtent(),
+    if (log.isTraceEnabled()) {
+      log.trace("Adding jobs to queue {} {} {}", groupId, 
tabletMetadata.getExtent(),
           jobs.stream().map(job -> "#files:" + job.getFiles().size() + 
",prio:" + job.getPriority()
               + ",kind:" + job.getKind()).collect(Collectors.toList()));
     }
 
     var pq = priorityQueues.computeIfAbsent(groupId,
         gid -> new CompactionJobPriorityQueue(gid, queueSize));
-    while (pq.add(tabletMetadata, jobs) == 0 && pq.isClosed()) {
+    while (pq.add(tabletMetadata, jobs) < 0) {
+      // When entering this loop its expected the queue is closed
+      Preconditions.checkState(pq.isClosed());
       // This loop handles race condition where poll() closes empty priority 
queues. The queue could
       // be closed after its obtained from the map and before add is called.
       pq = priorityQueues.computeIfAbsent(groupId,
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
index a7bbc914cf..268751dc85 100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java
@@ -258,7 +258,7 @@ public class CompactionJobPriorityQueueTest {
 
     assertTrue(queue.closeIfEmpty());
 
-    assertEquals(2, queue.add(tm, List.of(cj1, cj2)));
+    assertEquals(-1, queue.add(tm, List.of(cj1, cj2)));
 
   }
 
diff --git 
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
new file mode 100644
index 0000000000..de39497640
--- /dev/null
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.compaction.queue;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+
+import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.CompactableFileImpl;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.spi.compaction.CompactionJob;
+import org.apache.accumulo.core.spi.compaction.CompactionKind;
+import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
+import org.apache.accumulo.core.util.compaction.CompactionJobImpl;
+import org.apache.accumulo.core.util.compaction.CompactorGroupIdImpl;
+import org.apache.hadoop.io.Text;
+import org.junit.jupiter.api.Test;
+
+public class CompactionJobQueuesTest {
+
+  /**
+   * When a queue goes empty it is removed. This removal should be done safely 
and should not cause
+   * any data being concurrently added to be lost. The way this test adds and 
removes data in
+   * multiple threads it should cause poll() to empty a queue and delete it 
while another thread is
+   * adding, no loss of jobs should happen when this occurs.
+   */
+  @Test
+  public void testAddPollRaceCondition() throws Exception {
+
+    final int numToAdd = 100_000;
+
+    CompactionJobQueues jobQueues = new CompactionJobQueues(numToAdd + 1);
+    CompactorGroupId[] groups = Stream.of("G1", "G2", "G3")
+        .map(s -> CompactorGroupIdImpl.groupId(s)).toArray(l -> new 
CompactorGroupId[l]);
+
+    var executor = Executors.newFixedThreadPool(groups.length);
+
+    List<Future<Integer>> futures = new ArrayList<>();
+
+    AtomicBoolean stop = new AtomicBoolean(false);
+
+    // create a background thread per a group that polls jobs for the group
+    for (var group : groups) {
+      var future = executor.submit(() -> {
+        int seen = 0;
+        while (!stop.get()) {
+          var job = jobQueues.poll(group);
+          if (job != null) {
+            seen++;
+          }
+        }
+
+        // After stop was set, nothing should be added to queues anymore. 
Drain anything that is
+        // present and then exit.
+        while (jobQueues.poll(group) != null) {
+          seen++;
+        }
+
+        return seen;
+      });
+      futures.add(future);
+    }
+
+    // Add jobs to queues spread across the groups. While these are being 
added the background
+    // threads should concurrently empty queues causing them to be deleted.
+    for (int i = 0; i < numToAdd; i++) {
+      // Create unique exents because re-adding the same extent will clobber 
any jobs already in the
+      // queue for that extent which could throw off the counts
+      KeyExtent extent = new KeyExtent(TableId.of("1"), new Text(i + "z"), new 
Text(i + "a"));
+      TabletMetadata tm = TabletMetadata.builder(extent).build();
+
+      Collection<CompactableFile> files = List
+          .of(new CompactableFileImpl(new 
URI("file://accumulo/tables//123/t-0/f" + i), 100, 100));
+      Collection<CompactionJob> jobs = List.of(new CompactionJobImpl((short) 
(i % 31),
+          groups[i % groups.length], files, CompactionKind.SYSTEM, 
Optional.empty()));
+
+      jobQueues.add(tm, jobs);
+    }
+
+    // Cause the background threads to exit after polling all data
+    stop.set(true);
+
+    // Count the total jobs seen by background threads
+    int totalSeen = 0;
+    for (var future : futures) {
+      totalSeen += future.get();
+    }
+
+    executor.shutdown();
+
+    // The background threads should have seen every job that was added
+    assertEquals(numToAdd, totalSeen);
+  }
+}

Reply via email to