This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new f320ca36d9 Adds race condtion test for CompactionJobQueues (#4310) f320ca36d9 is described below commit f320ca36d9b84cea97522ac4d21b327b3fe11cc6 Author: Keith Turner <ktur...@apache.org> AuthorDate: Tue Feb 27 14:30:40 2024 -0500 Adds race condtion test for CompactionJobQueues (#4310) Adds a unit test that concurrently calls add() and poll() on CompactionJobsQueues. This test validates that race conditions between these methods are handled. Fixed a race condition found by running the test. --- .../queue/CompactionJobPriorityQueue.java | 6 + .../compaction/queue/CompactionJobQueues.java | 11 +- .../queue/CompactionJobPriorityQueueTest.java | 2 +- .../compaction/queue/CompactionJobQueuesTest.java | 121 +++++++++++++++++++++ 4 files changed, 135 insertions(+), 5 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java index 6c28fe5ecf..e4aa059e95 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java @@ -116,8 +116,14 @@ public class CompactionJobPriorityQueue { this.dequeuedJobs = new AtomicLong(0); } + /** + * @return the number of jobs added. If the queue is closed returns -1 + */ public synchronized int add(TabletMetadata tabletMetadata, Collection<CompactionJob> jobs) { Preconditions.checkArgument(jobs.stream().allMatch(job -> job.getGroup().equals(groupId))); + if (closed.get()) { + return -1; + } removePreviousSubmissions(tabletMetadata.getExtent()); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java index 77395fd253..b47ec92675 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java @@ -29,6 +29,8 @@ import org.apache.accumulo.core.spi.compaction.CompactorGroupId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + public class CompactionJobQueues { private static final Logger log = LoggerFactory.getLogger(CompactionJobQueues.class); @@ -145,16 +147,17 @@ public class CompactionJobQueues { private void add(TabletMetadata tabletMetadata, CompactorGroupId groupId, Collection<CompactionJob> jobs) { - // TODO log level - if (log.isDebugEnabled()) { - log.debug("Adding jobs to queue {} {} {}", groupId, tabletMetadata.getExtent(), + if (log.isTraceEnabled()) { + log.trace("Adding jobs to queue {} {} {}", groupId, tabletMetadata.getExtent(), jobs.stream().map(job -> "#files:" + job.getFiles().size() + ",prio:" + job.getPriority() + ",kind:" + job.getKind()).collect(Collectors.toList())); } var pq = priorityQueues.computeIfAbsent(groupId, gid -> new CompactionJobPriorityQueue(gid, queueSize)); - while (pq.add(tabletMetadata, jobs) == 0 && pq.isClosed()) { + while (pq.add(tabletMetadata, jobs) < 0) { + // When entering this loop its expected the queue is closed + Preconditions.checkState(pq.isClosed()); // This loop handles race condition where poll() closes empty priority queues. The queue could // be closed after its obtained from the map and before add is called. pq = priorityQueues.computeIfAbsent(groupId, diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java index a7bbc914cf..268751dc85 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java @@ -258,7 +258,7 @@ public class CompactionJobPriorityQueueTest { assertTrue(queue.closeIfEmpty()); - assertEquals(2, queue.add(tm, List.of(cj1, cj2))); + assertEquals(-1, queue.add(tm, List.of(cj1, cj2))); } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java new file mode 100644 index 0000000000..de39497640 --- /dev/null +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.compaction.queue; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; + +import org.apache.accumulo.core.client.admin.compaction.CompactableFile; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.CompactableFileImpl; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.spi.compaction.CompactionJob; +import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.core.spi.compaction.CompactorGroupId; +import org.apache.accumulo.core.util.compaction.CompactionJobImpl; +import org.apache.accumulo.core.util.compaction.CompactorGroupIdImpl; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.Test; + +public class CompactionJobQueuesTest { + + /** + * When a queue goes empty it is removed. This removal should be done safely and should not cause + * any data being concurrently added to be lost. The way this test adds and removes data in + * multiple threads it should cause poll() to empty a queue and delete it while another thread is + * adding, no loss of jobs should happen when this occurs. + */ + @Test + public void testAddPollRaceCondition() throws Exception { + + final int numToAdd = 100_000; + + CompactionJobQueues jobQueues = new CompactionJobQueues(numToAdd + 1); + CompactorGroupId[] groups = Stream.of("G1", "G2", "G3") + .map(s -> CompactorGroupIdImpl.groupId(s)).toArray(l -> new CompactorGroupId[l]); + + var executor = Executors.newFixedThreadPool(groups.length); + + List<Future<Integer>> futures = new ArrayList<>(); + + AtomicBoolean stop = new AtomicBoolean(false); + + // create a background thread per a group that polls jobs for the group + for (var group : groups) { + var future = executor.submit(() -> { + int seen = 0; + while (!stop.get()) { + var job = jobQueues.poll(group); + if (job != null) { + seen++; + } + } + + // After stop was set, nothing should be added to queues anymore. Drain anything that is + // present and then exit. + while (jobQueues.poll(group) != null) { + seen++; + } + + return seen; + }); + futures.add(future); + } + + // Add jobs to queues spread across the groups. While these are being added the background + // threads should concurrently empty queues causing them to be deleted. + for (int i = 0; i < numToAdd; i++) { + // Create unique exents because re-adding the same extent will clobber any jobs already in the + // queue for that extent which could throw off the counts + KeyExtent extent = new KeyExtent(TableId.of("1"), new Text(i + "z"), new Text(i + "a")); + TabletMetadata tm = TabletMetadata.builder(extent).build(); + + Collection<CompactableFile> files = List + .of(new CompactableFileImpl(new URI("file://accumulo/tables//123/t-0/f" + i), 100, 100)); + Collection<CompactionJob> jobs = List.of(new CompactionJobImpl((short) (i % 31), + groups[i % groups.length], files, CompactionKind.SYSTEM, Optional.empty())); + + jobQueues.add(tm, jobs); + } + + // Cause the background threads to exit after polling all data + stop.set(true); + + // Count the total jobs seen by background threads + int totalSeen = 0; + for (var future : futures) { + totalSeen += future.get(); + } + + executor.shutdown(); + + // The background threads should have seen every job that was added + assertEquals(numToAdd, totalSeen); + } +}