This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 11f8a5095f Added unit test for CompactionJobPriorityQueue (#4296) 11f8a5095f is described below commit 11f8a5095fc5e96c0ad016f40a98b1ce389382a7 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Tue Feb 27 09:01:46 2024 -0500 Added unit test for CompactionJobPriorityQueue (#4296) Modified add method to return into instead of boolean and added unit test for CompactionJobPriorityQueue Fixes #3466 --- .../queue/CompactionJobPriorityQueue.java | 39 ++- .../compaction/queue/CompactionJobQueues.java | 2 +- .../queue/CompactionJobPriorityQueueTest.java | 322 +++++++++++++++++++++ 3 files changed, 350 insertions(+), 13 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java index 12d82ff1f9..6c28fe5ecf 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -47,7 +48,7 @@ import com.google.common.base.Preconditions; * </p> */ public class CompactionJobPriorityQueue { - // ELASTICITY_TODO unit test this class + private final CompactorGroupId groupId; private class CjpqKey implements Comparable<CjpqKey> { @@ -59,7 +60,7 @@ public class CompactionJobPriorityQueue { CjpqKey(CompactionJob job) { this.job = job; - this.seq = nextSeq++; + this.seq = nextSeq.incrementAndGet(); } @Override @@ -102,9 +103,9 @@ public class CompactionJobPriorityQueue { // jobs in the queue when new jobs are queued for a tablet. private final Map<KeyExtent,List<CjpqKey>> tabletJobs; - private long nextSeq; + private final AtomicLong nextSeq = new AtomicLong(0); - private boolean closed = false; + private final AtomicBoolean closed = new AtomicBoolean(false); public CompactionJobPriorityQueue(CompactorGroupId groupId, int maxSize) { this.jobQueue = new TreeMap<>(); @@ -115,21 +116,23 @@ public class CompactionJobPriorityQueue { this.dequeuedJobs = new AtomicLong(0); } - public synchronized boolean add(TabletMetadata tabletMetadata, Collection<CompactionJob> jobs) { - if (closed) { - return false; - } - + public synchronized int add(TabletMetadata tabletMetadata, Collection<CompactionJob> jobs) { Preconditions.checkArgument(jobs.stream().allMatch(job -> job.getGroup().equals(groupId))); removePreviousSubmissions(tabletMetadata.getExtent()); List<CjpqKey> newEntries = new ArrayList<>(jobs.size()); + int jobsAdded = 0; for (CompactionJob job : jobs) { CjpqKey cjqpKey = addJobToQueue(tabletMetadata, job); if (cjqpKey != null) { newEntries.add(cjqpKey); + jobsAdded++; + } else { + // The priority for this job was lower than all other priorities and not added + // In this case we will return true even though a subset of the jobs, or none, + // were added } } @@ -137,7 +140,7 @@ public class CompactionJobPriorityQueue { checkState(tabletJobs.put(tabletMetadata.getExtent(), newEntries) == null); } - return true; + return jobsAdded; } public long getMaxSize() { @@ -178,9 +181,19 @@ public class CompactionJobPriorityQueue { return first == null ? null : first.getValue(); } + // exists for tests + synchronized CompactionJobQueues.MetaJob peek() { + var firstEntry = jobQueue.firstEntry(); + return firstEntry == null ? null : firstEntry.getValue(); + } + + public boolean isClosed() { + return closed.get(); + } + public synchronized boolean closeIfEmpty() { if (jobQueue.isEmpty()) { - closed = true; + closed.set(true); return true; } @@ -205,7 +218,9 @@ public class CompactionJobPriorityQueue { return null; } else { // the new job has a higher priority than the lowest job in the queue, so remove the lowest - jobQueue.pollLastEntry(); + if (jobQueue.pollLastEntry() != null) { + rejectedJobs.getAndIncrement(); + } } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java index 2d6f88e448..77395fd253 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java @@ -154,7 +154,7 @@ public class CompactionJobQueues { var pq = priorityQueues.computeIfAbsent(groupId, gid -> new CompactionJobPriorityQueue(gid, queueSize)); - while (!pq.add(tabletMetadata, jobs)) { + while (pq.add(tabletMetadata, jobs) == 0 && pq.isClosed()) { // This loop handles race condition where poll() closes empty priority queues. The queue could // be closed after its obtained from the map and before add is called. pq = priorityQueues.computeIfAbsent(groupId, diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java new file mode 100644 index 0000000000..a7bbc914cf --- /dev/null +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.compaction.queue; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.admin.compaction.CompactableFile; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.CompactableFileImpl; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.spi.compaction.CompactionJob; +import org.apache.accumulo.core.spi.compaction.CompactorGroupId; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.compaction.CompactionJobPrioritizer; +import org.apache.accumulo.core.util.compaction.CompactorGroupIdImpl; +import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues.MetaJob; +import org.apache.hadoop.io.Text; +import org.easymock.EasyMock; +import org.junit.jupiter.api.Test; + +public class CompactionJobPriorityQueueTest { + + private static final CompactorGroupId GROUP = CompactorGroupIdImpl.groupId("TEST"); + + @Test + public void testTabletFileReplacement() { + + CompactableFile file1 = EasyMock.createMock(CompactableFileImpl.class); + CompactableFile file2 = EasyMock.createMock(CompactableFileImpl.class); + CompactableFile file3 = EasyMock.createMock(CompactableFileImpl.class); + CompactableFile file4 = EasyMock.createMock(CompactableFileImpl.class); + + KeyExtent extent = new KeyExtent(TableId.of("1"), new Text("z"), new Text("a")); + TabletMetadata tm = EasyMock.createMock(TabletMetadata.class); + EasyMock.expect(tm.getExtent()).andReturn(extent).anyTimes(); + + CompactionJob cj1 = EasyMock.createMock(CompactionJob.class); + EasyMock.expect(cj1.getGroup()).andReturn(GROUP).anyTimes(); + EasyMock.expect(cj1.getPriority()).andReturn((short) 10).anyTimes(); + EasyMock.expect(cj1.getFiles()).andReturn(Set.of(file1)).anyTimes(); + + CompactionJob cj2 = EasyMock.createMock(CompactionJob.class); + EasyMock.expect(cj2.getGroup()).andReturn(GROUP).anyTimes(); + EasyMock.expect(cj2.getPriority()).andReturn((short) 5).anyTimes(); + EasyMock.expect(cj2.getFiles()).andReturn(Set.of(file2, file3, file4)).anyTimes(); + + EasyMock.replay(tm, cj1, cj2); + + CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2); + assertEquals(1, queue.add(tm, List.of(cj1))); + + MetaJob job = queue.peek(); + assertEquals(cj1, job.getJob()); + assertEquals(Set.of(file1), job.getJob().getFiles()); + + assertEquals(10L, queue.getLowestPriority()); + assertEquals(2, queue.getMaxSize()); + assertEquals(0, queue.getDequeuedJobs()); + assertEquals(0, queue.getRejectedJobs()); + assertEquals(1, queue.getQueuedJobs()); + + // replace the files for the same tablet + assertEquals(1, queue.add(tm, List.of(cj2))); + + job = queue.peek(); + assertEquals(cj2, job.getJob()); + assertEquals(Set.of(file2, file3, file4), job.getJob().getFiles()); + assertEquals(tm, job.getTabletMetadata()); + + assertEquals(5L, queue.getLowestPriority()); + assertEquals(2, queue.getMaxSize()); + assertEquals(0, queue.getDequeuedJobs()); + assertEquals(0, queue.getRejectedJobs()); + assertEquals(1, queue.getQueuedJobs()); + + EasyMock.verify(tm, cj1, cj2); + + } + + @Test + public void testAddEqualToMaxSize() { + + CompactableFile file1 = EasyMock.createMock(CompactableFileImpl.class); + CompactableFile file2 = EasyMock.createMock(CompactableFileImpl.class); + CompactableFile file3 = EasyMock.createMock(CompactableFileImpl.class); + CompactableFile file4 = EasyMock.createMock(CompactableFileImpl.class); + + KeyExtent extent = new KeyExtent(TableId.of("1"), new Text("z"), new Text("a")); + TabletMetadata tm = EasyMock.createMock(TabletMetadata.class); + EasyMock.expect(tm.getExtent()).andReturn(extent).anyTimes(); + + CompactionJob cj1 = EasyMock.createMock(CompactionJob.class); + EasyMock.expect(cj1.getGroup()).andReturn(GROUP).anyTimes(); + EasyMock.expect(cj1.getPriority()).andReturn((short) 10).anyTimes(); + EasyMock.expect(cj1.getFiles()).andReturn(Set.of(file1)).anyTimes(); + + CompactionJob cj2 = EasyMock.createMock(CompactionJob.class); + EasyMock.expect(cj2.getGroup()).andReturn(GROUP).anyTimes(); + EasyMock.expect(cj2.getPriority()).andReturn((short) 5).anyTimes(); + EasyMock.expect(cj2.getFiles()).andReturn(Set.of(file2, file3, file4)).anyTimes(); + + EasyMock.replay(tm, cj1, cj2); + + CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2); + assertEquals(2, queue.add(tm, List.of(cj1, cj2))); + + EasyMock.verify(tm, cj1, cj2); + + assertEquals(5L, queue.getLowestPriority()); + assertEquals(2, queue.getMaxSize()); + assertEquals(0, queue.getDequeuedJobs()); + assertEquals(0, queue.getRejectedJobs()); + assertEquals(2, queue.getQueuedJobs()); + MetaJob job = queue.poll(); + assertEquals(cj1, job.getJob()); + assertEquals(tm, job.getTabletMetadata()); + assertEquals(1, queue.getDequeuedJobs()); + + job = queue.poll(); + assertEquals(cj2, job.getJob()); + assertEquals(tm, job.getTabletMetadata()); + assertEquals(2, queue.getDequeuedJobs()); + + job = queue.poll(); + assertNull(job); + assertEquals(2, queue.getDequeuedJobs()); + + } + + @Test + public void testAddMoreThanMax() { + + CompactableFile file1 = EasyMock.createMock(CompactableFileImpl.class); + CompactableFile file2 = EasyMock.createMock(CompactableFileImpl.class); + CompactableFile file3 = EasyMock.createMock(CompactableFileImpl.class); + CompactableFile file4 = EasyMock.createMock(CompactableFileImpl.class); + CompactableFile file5 = EasyMock.createMock(CompactableFileImpl.class); + CompactableFile file6 = EasyMock.createMock(CompactableFileImpl.class); + + KeyExtent extent = new KeyExtent(TableId.of("1"), new Text("z"), new Text("a")); + TabletMetadata tm = EasyMock.createMock(TabletMetadata.class); + EasyMock.expect(tm.getExtent()).andReturn(extent).anyTimes(); + + CompactionJob cj1 = EasyMock.createMock(CompactionJob.class); + EasyMock.expect(cj1.getGroup()).andReturn(GROUP).anyTimes(); + EasyMock.expect(cj1.getPriority()).andReturn((short) 10).anyTimes(); + EasyMock.expect(cj1.getFiles()).andReturn(Set.of(file1)).anyTimes(); + + CompactionJob cj2 = EasyMock.createMock(CompactionJob.class); + EasyMock.expect(cj2.getGroup()).andReturn(GROUP).anyTimes(); + EasyMock.expect(cj2.getPriority()).andReturn((short) 5).anyTimes(); + EasyMock.expect(cj2.getFiles()).andReturn(Set.of(file2, file3, file4)).anyTimes(); + + CompactionJob cj3 = EasyMock.createMock(CompactionJob.class); + EasyMock.expect(cj3.getGroup()).andReturn(GROUP).anyTimes(); + EasyMock.expect(cj3.getPriority()).andReturn((short) 1).anyTimes(); + EasyMock.expect(cj3.getFiles()).andReturn(Set.of(file5, file6)).anyTimes(); + + EasyMock.replay(tm, cj1, cj2, cj3); + + CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2); + assertEquals(2, queue.add(tm, List.of(cj1, cj2, cj3))); + + EasyMock.verify(tm, cj1, cj2, cj3); + + assertEquals(5L, queue.getLowestPriority()); + assertEquals(2, queue.getMaxSize()); + assertEquals(0, queue.getDequeuedJobs()); + assertEquals(1, queue.getRejectedJobs()); + assertEquals(2, queue.getQueuedJobs()); + + MetaJob job = queue.poll(); + assertEquals(cj1, job.getJob()); + assertEquals(tm, job.getTabletMetadata()); + assertEquals(1, queue.getDequeuedJobs()); + + job = queue.poll(); + assertEquals(cj2, job.getJob()); + assertEquals(tm, job.getTabletMetadata()); + assertEquals(2, queue.getDequeuedJobs()); + + job = queue.poll(); + assertNull(job); + assertEquals(2, queue.getDequeuedJobs()); + + } + + @Test + public void testAddAfterClose() { + + CompactableFile file1 = EasyMock.createMock(CompactableFileImpl.class); + CompactableFile file2 = EasyMock.createMock(CompactableFileImpl.class); + CompactableFile file3 = EasyMock.createMock(CompactableFileImpl.class); + CompactableFile file4 = EasyMock.createMock(CompactableFileImpl.class); + + KeyExtent extent = new KeyExtent(TableId.of("1"), new Text("z"), new Text("a")); + TabletMetadata tm = EasyMock.createMock(TabletMetadata.class); + EasyMock.expect(tm.getExtent()).andReturn(extent).anyTimes(); + + CompactionJob cj1 = EasyMock.createMock(CompactionJob.class); + EasyMock.expect(cj1.getGroup()).andReturn(GROUP).anyTimes(); + EasyMock.expect(cj1.getPriority()).andReturn((short) 10).anyTimes(); + EasyMock.expect(cj1.getFiles()).andReturn(Set.of(file1)).anyTimes(); + + CompactionJob cj2 = EasyMock.createMock(CompactionJob.class); + EasyMock.expect(cj2.getGroup()).andReturn(GROUP).anyTimes(); + EasyMock.expect(cj2.getPriority()).andReturn((short) 5).anyTimes(); + EasyMock.expect(cj2.getFiles()).andReturn(Set.of(file2, file3, file4)).anyTimes(); + + EasyMock.replay(tm, cj1, cj2); + + CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2); + assertEquals(2, queue.add(tm, List.of(cj1, cj2))); + + assertFalse(queue.closeIfEmpty()); + + EasyMock.verify(tm, cj1, cj2); + + assertEquals(5L, queue.getLowestPriority()); + assertEquals(2, queue.getMaxSize()); + assertEquals(0, queue.getDequeuedJobs()); + assertEquals(0, queue.getRejectedJobs()); + assertEquals(2, queue.getQueuedJobs()); + MetaJob job = queue.poll(); + assertEquals(cj1, job.getJob()); + assertEquals(tm, job.getTabletMetadata()); + assertEquals(1, queue.getDequeuedJobs()); + + MetaJob job2 = queue.poll(); + assertEquals(cj2, job2.getJob()); + assertEquals(tm, job2.getTabletMetadata()); + assertEquals(2, queue.getDequeuedJobs()); + + assertTrue(queue.closeIfEmpty()); + + assertEquals(2, queue.add(tm, List.of(cj1, cj2))); + + } + + private static int counter = 1; + + private Pair<TabletMetadata,CompactionJob> createJob() { + + // Use an ever increasing tableId + KeyExtent extent = new KeyExtent(TableId.of("" + counter++), new Text("z"), new Text("a")); + + Set<CompactableFile> files = new HashSet<>(); + for (int i = 0; i < counter; i++) { + files.add(EasyMock.createMock(CompactableFileImpl.class)); + } + + CompactionJob job = EasyMock.createMock(CompactionJob.class); + TabletMetadata tm = EasyMock.createMock(TabletMetadata.class); + EasyMock.expect(tm.getExtent()).andReturn(extent).anyTimes(); + EasyMock.expect(job.getGroup()).andReturn(GROUP).anyTimes(); + EasyMock.expect(job.getPriority()).andReturn((short) counter).anyTimes(); + EasyMock.expect(job.getFiles()).andReturn(files).anyTimes(); + + EasyMock.replay(tm, job); + + return new Pair<>(tm, job); + } + + @Test + public void test() { + + TreeSet<CompactionJob> expected = new TreeSet<>(CompactionJobPrioritizer.JOB_COMPARATOR); + + CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 100); + + // create and add 1000 jobs + for (int x = 0; x < 1000; x++) { + Pair<TabletMetadata,CompactionJob> pair = createJob(); + queue.add(pair.getFirst(), Set.of(pair.getSecond())); + expected.add(pair.getSecond()); + } + + assertEquals(100, queue.getMaxSize()); + assertEquals(100, queue.getQueuedJobs()); + assertEquals(900, queue.getRejectedJobs()); + + // iterate over the expected set and make sure that they next job in the queue + // matches + int matchesSeen = 0; + for (CompactionJob expectedJob : expected) { + MetaJob queuedJob = queue.poll(); + if (queuedJob == null) { + break; + } + assertEquals(expectedJob.getPriority(), queuedJob.getJob().getPriority()); + assertEquals(expectedJob.getFiles(), queuedJob.getJob().getFiles()); + matchesSeen++; + } + + assertEquals(100, matchesSeen); + } +}