This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 3213646e1c speeds up fate lock acquisition (#5262) 3213646e1c is described below commit 3213646e1ca7086914d354a4b65c9856d6c99623 Author: Keith Turner <ktur...@apache.org> AuthorDate: Sat Feb 1 16:36:45 2025 -0500 speeds up fate lock acquisition (#5262) Stores the lock data for fate locks in the zookeeper node name instead of the zookeeper data for the node. Ran some local performance test with hundreds of fate operations and saw lock times go from 750ms to 15ms. fixes #5181 Co-authored-by: Christopher Tubbs <ctubb...@apache.org> --- .../org/apache/accumulo/core/fate/AdminUtil.java | 24 +-- .../fate/zookeeper/DistributedReadWriteLock.java | 34 ++-- .../accumulo/core/fate/zookeeper/FateLock.java | 172 +++++++++++---------- .../zookeeper/DistributedReadWriteLockTest.java | 14 +- .../accumulo/core/fate/zookeeper/FateLockTest.java | 76 +++++++++ .../compaction/ExternalCompactionTestUtils.java | 15 +- .../test/compaction/ExternalCompaction_1_IT.java | 66 ++++++++ 7 files changed, 284 insertions(+), 117 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java index a5b1c5645a..9577ab7ff2 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java @@ -33,12 +33,14 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.SortedSet; import java.util.stream.Stream; import org.apache.accumulo.core.fate.FateStore.FateTxStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.FateIdStatus; import org.apache.accumulo.core.fate.ReadOnlyFateStore.ReadOnlyFateTxStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.LockType; import org.apache.accumulo.core.fate.zookeeper.FateLock; import org.apache.accumulo.core.fate.zookeeper.FateLock.FateLockPath; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; @@ -275,24 +277,21 @@ public class AdminUtil<T> { List<String> lockedIds = zr.getChildren(lockPath.toString()); for (String id : lockedIds) { - try { - FateLockPath fLockPath = FateLock.path(lockPath + "/" + id); - List<String> lockNodes = - FateLock.validateAndSort(fLockPath, zr.getChildren(fLockPath.toString())); + SortedSet<FateLock.NodeName> lockNodes = + FateLock.validateAndWarn(fLockPath, zr.getChildren(fLockPath.toString())); int pos = 0; boolean sawWriteLock = false; - for (String node : lockNodes) { + for (FateLock.NodeName node : lockNodes) { try { - byte[] data = zr.getData(lockPath + "/" + id + "/" + node); - // Example data: "READ:<FateId>". FateId contains ':' hence the limit of 2 - String[] lda = new String(data, UTF_8).split(":", 2); - FateId fateId = FateId.from(lda[1]); + FateLock.FateLockEntry fateLockEntry = node.fateLockEntry.get(); + var fateId = fateLockEntry.getFateId(); + var lockType = fateLockEntry.getLockType(); - if (lda[0].charAt(0) == 'W') { + if (lockType == LockType.WRITE) { sawWriteLock = true; } @@ -300,13 +299,14 @@ public class AdminUtil<T> { if (pos == 0) { locks = heldLocks; - } else if (lda[0].charAt(0) == 'R' && !sawWriteLock) { + } else if (lockType == LockType.READ && !sawWriteLock) { locks = heldLocks; } else { locks = waitingLocks; } - locks.computeIfAbsent(fateId, k -> new ArrayList<>()).add(lda[0].charAt(0) + ":" + id); + locks.computeIfAbsent(fateId, k -> new ArrayList<>()) + .add(lockType.name().charAt(0) + ":" + id); } catch (Exception e) { log.error("{}", e.getMessage(), e); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java index 1f36ee7a92..1b52a477d7 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java @@ -27,6 +27,7 @@ import java.util.SortedMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; +import java.util.function.BiPredicate; import java.util.function.Supplier; import org.apache.accumulo.core.fate.FateId; @@ -50,9 +51,10 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read // them, // a writer only runs when they are at the top of the queue. public interface QueueLock { - SortedMap<Long,Supplier<FateLockEntry>> getEarlierEntries(long entry); + SortedMap<Long,Supplier<FateLockEntry>> + getEntries(BiPredicate<Long,Supplier<FateLockEntry>> predicate); - void removeEntry(long entry); + void removeEntry(FateLockEntry data, long seq); long addEntry(FateLockEntry entry); } @@ -115,7 +117,9 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read entry = qlock.addEntry(FateLockEntry.from(this.getType(), this.fateId)); log.info("Added lock entry {} fateId {} lockType {}", entry, fateId, getType()); } - SortedMap<Long,Supplier<FateLockEntry>> entries = qlock.getEarlierEntries(entry); + + SortedMap<Long,Supplier<FateLockEntry>> entries = + qlock.getEntries((seq, lockData) -> seq <= entry); for (Entry<Long,Supplier<FateLockEntry>> entry : entries.entrySet()) { if (entry.getKey().equals(this.entry)) { return true; @@ -150,7 +154,7 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read return; } log.debug("Removing lock entry {} fateId {} lockType {}", entry, this.fateId, getType()); - qlock.removeEntry(entry); + qlock.removeEntry(FateLockEntry.from(this.getType(), this.fateId), entry); entry = -1; } @@ -181,7 +185,8 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read entry = qlock.addEntry(FateLockEntry.from(this.getType(), this.fateId)); log.info("Added lock entry {} fateId {} lockType {}", entry, this.fateId, getType()); } - SortedMap<Long,Supplier<FateLockEntry>> entries = qlock.getEarlierEntries(entry); + SortedMap<Long,Supplier<FateLockEntry>> entries = + qlock.getEntries((seq, locData) -> seq <= entry); Iterator<Entry<Long,Supplier<FateLockEntry>>> iterator = entries.entrySet().iterator(); if (!iterator.hasNext()) { throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry @@ -200,19 +205,26 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read } public static DistributedLock recoverLock(QueueLock qlock, FateId fateId) { - SortedMap<Long,Supplier<FateLockEntry>> entries = qlock.getEarlierEntries(Long.MAX_VALUE); - for (Entry<Long,Supplier<FateLockEntry>> entry : entries.entrySet()) { - FateLockEntry lockEntry = entry.getValue().get(); - if (fateId.equals(lockEntry.getFateId())) { + SortedMap<Long,Supplier<FateLockEntry>> entries = + qlock.getEntries((seq, lockData) -> lockData.get().fateId.equals(fateId)); + + switch (entries.size()) { + case 0: + return null; + case 1: + var entry = entries.entrySet().iterator().next(); + FateLockEntry lockEntry = entry.getValue().get(); switch (lockEntry.getLockType()) { case READ: return new ReadLock(qlock, lockEntry.getFateId(), entry.getKey()); case WRITE: return new WriteLock(qlock, lockEntry.getFateId(), entry.getKey()); + default: + throw new IllegalStateException("Unknown lock type " + lockEntry.getLockType()); } - } + default: + throw new IllegalStateException("Found more than one lock node " + entries); } - return null; } @Override diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java index 74c3065c70..66b8191f2e 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java @@ -18,16 +18,16 @@ */ package org.apache.accumulo.core.fate.zookeeper; -import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.SortedMap; +import java.util.SortedSet; import java.util.TreeMap; +import java.util.TreeSet; +import java.util.function.BiPredicate; import java.util.function.Supplier; import org.apache.accumulo.core.fate.FateId; @@ -41,6 +41,7 @@ import org.apache.zookeeper.KeeperException.NotEmptyException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; import com.google.common.base.Suppliers; /** @@ -49,7 +50,7 @@ import com.google.common.base.Suppliers; public class FateLock implements QueueLock { private static final Logger log = LoggerFactory.getLogger(FateLock.class); - private static final String PREFIX = "flock#"; + static final String PREFIX = "flock#"; private final ZooReaderWriter zoo; private final FateLockPath path; @@ -67,7 +68,7 @@ public class FateLock implements QueueLock { } } - public static class FateLockEntry { + public static class FateLockEntry implements Comparable<FateLockEntry> { final LockType lockType; final FateId fateId; @@ -76,26 +77,10 @@ public class FateLock implements QueueLock { this.fateId = Objects.requireNonNull(fateId); } - private FateLockEntry(byte[] entry) { - if (entry == null || entry.length < 1) { - throw new IllegalArgumentException(); - } - - int split = -1; - for (int i = 0; i < entry.length; i++) { - if (entry[i] == ':') { - split = i; - break; - } - } - - if (split == -1) { - throw new IllegalArgumentException(); - } - - this.lockType = LockType.valueOf(new String(entry, 0, split, UTF_8)); - this.fateId = - FateId.from(new String(Arrays.copyOfRange(entry, split + 1, entry.length), UTF_8)); + private FateLockEntry(String entry) { + var fields = entry.split(":", 2); + this.lockType = LockType.valueOf(fields[0]); + this.fateId = FateId.from(fields[1]); } public LockType getLockType() { @@ -106,14 +91,8 @@ public class FateLock implements QueueLock { return fateId; } - public byte[] serialize() { - byte[] typeBytes = lockType.name().getBytes(UTF_8); - byte[] fateIdBytes = fateId.canonical().getBytes(UTF_8); - byte[] result = new byte[fateIdBytes.length + 1 + typeBytes.length]; - System.arraycopy(typeBytes, 0, result, 0, typeBytes.length); - result[typeBytes.length] = ':'; - System.arraycopy(fateIdBytes, 0, result, typeBytes.length + 1, fateIdBytes.length); - return result; + public String serialize() { + return lockType.name() + ":" + fateId.canonical(); } @Override @@ -137,9 +116,18 @@ public class FateLock implements QueueLock { return new FateLockEntry(lockType, fateId); } - public static FateLockEntry deserialize(byte[] serialized) { + public static FateLockEntry deserialize(String serialized) { return new FateLockEntry(serialized); } + + @Override + public int compareTo(FateLockEntry o) { + int cmp = lockType.compareTo(o.lockType); + if (cmp == 0) { + cmp = fateId.compareTo(o.fateId); + } + return cmp; + } } public static FateLockPath path(String path) { @@ -151,16 +139,59 @@ public class FateLock implements QueueLock { this.path = requireNonNull(path); } + public static class NodeName implements Comparable<NodeName> { + public final long sequence; + public final Supplier<FateLockEntry> fateLockEntry; + + NodeName(String nodeName) { + int len = nodeName.length(); + Preconditions.checkArgument(nodeName.startsWith(PREFIX) && nodeName.charAt(len - 11) == '#', + "Illegal node name %s", nodeName); + sequence = Long.parseUnsignedLong(nodeName.substring(len - 10), 10); + // Use a supplier so we don't need to deserialize unless the calling code cares about + // the value for that entry. + fateLockEntry = Suppliers + .memoize(() -> FateLockEntry.deserialize(nodeName.substring(PREFIX.length(), len - 11))); + } + + @Override + public int compareTo(NodeName o) { + int cmp = Long.compare(sequence, o.sequence); + if (cmp == 0) { + cmp = fateLockEntry.get().compareTo(o.fateLockEntry.get()); + } + return cmp; + } + + @Override + public boolean equals(Object o) { + if (o instanceof NodeName) { + return this.compareTo((NodeName) o) == 0; + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(sequence, fateLockEntry.get()); + } + } + @Override public long addEntry(FateLockEntry entry) { + + String dataString = entry.serialize(); + Preconditions.checkState(!dataString.contains("#")); + String newPath; try { while (true) { try { - newPath = zoo.putPersistentSequential(path + "/" + PREFIX, entry.serialize()); + newPath = + zoo.putPersistentSequential(path + "/" + PREFIX + dataString + "#", new byte[0]); String[] parts = newPath.split("/"); String last = parts[parts.length - 1]; - return Long.parseLong(last.substring(PREFIX.length())); + return new NodeName(last).sequence; } catch (NoNodeException nne) { // the parent does not exist so try to create it zoo.putPersistentData(path.toString(), new byte[] {}, NodeExistsPolicy.SKIP); @@ -172,7 +203,8 @@ public class FateLock implements QueueLock { } @Override - public SortedMap<Long,Supplier<FateLockEntry>> getEarlierEntries(long entry) { + public SortedMap<Long,Supplier<FateLockEntry>> + getEntries(BiPredicate<Long,Supplier<FateLockEntry>> predicate) { SortedMap<Long,Supplier<FateLockEntry>> result = new TreeMap<>(); try { List<String> children = Collections.emptyList(); @@ -184,17 +216,9 @@ public class FateLock implements QueueLock { } for (String name : children) { - // this try catch must be done inside the loop because some subset of the children may exist - try { - long order = Long.parseLong(name.substring(PREFIX.length())); - if (order <= entry) { - byte[] data = zoo.getData(path + "/" + name); - // Use a supplier so we don't need to deserialize unless the calling code cares about - // the value for that entry. - result.put(order, Suppliers.memoize(() -> FateLockEntry.deserialize(data))); - } - } catch (KeeperException.NoNodeException ex) { - // ignored + var parsed = new NodeName(name); + if (predicate.test(parsed.sequence, parsed.fateLockEntry)) { + Preconditions.checkState(result.put(parsed.sequence, parsed.fateLockEntry) == null); } } } catch (KeeperException | InterruptedException ex) { @@ -204,9 +228,12 @@ public class FateLock implements QueueLock { } @Override - public void removeEntry(long entry) { + public void removeEntry(FateLockEntry data, long entry) { + String dataString = data.serialize(); + Preconditions.checkState(!dataString.contains("#")); try { - zoo.recursiveDelete(path + String.format("/%s%010d", PREFIX, entry), NodeMissingPolicy.SKIP); + zoo.recursiveDelete(path + String.format("/%s%s#%010d", PREFIX, dataString, entry), + NodeMissingPolicy.SKIP); try { // try to delete the parent if it has no children zoo.delete(path.toString()); @@ -221,50 +248,25 @@ public class FateLock implements QueueLock { /** * Validate and sort child nodes at this lock path by the lock prefix */ - public static List<String> validateAndSort(FateLockPath path, List<String> children) { + public static SortedSet<NodeName> validateAndWarn(FateLockPath path, List<String> children) { log.trace("validating and sorting children at path {}", path); - List<String> validChildren = new ArrayList<>(); + + SortedSet<NodeName> validChildren = new TreeSet<>(); + if (children == null || children.isEmpty()) { return validChildren; } + children.forEach(c -> { log.trace("Validating {}", c); - if (c.startsWith(PREFIX)) { - int idx = c.indexOf('#'); - String sequenceNum = c.substring(idx + 1); - if (sequenceNum.length() == 10) { - try { - log.trace("Testing number format of {}", sequenceNum); - Integer.parseInt(sequenceNum); - validChildren.add(c); - } catch (NumberFormatException e) { - log.warn("Fate lock found with invalid sequence number format: {} (not a number)", c); - } - } else { - log.warn("Fate lock found with invalid sequence number format: {} (not 10 characters)", - c); - } - } else { - log.warn("Fate lock found with invalid lock format: {} (does not start with {})", c, - PREFIX); + try { + var fateLockNode = new NodeName(c); + validChildren.add(fateLockNode); + } catch (RuntimeException e) { + log.warn("Illegal fate lock node {}", c, e); } }); - if (validChildren.size() > 1) { - validChildren.sort((o1, o2) -> { - // Lock should be of the form: - // lock-sequenceNumber - // Example: - // flock#0000000000 - - // Lock length - sequenceNumber length - // 16 - 10 - int secondHashIdx = 6; - return Integer.valueOf(o1.substring(secondHashIdx)) - .compareTo(Integer.valueOf(o2.substring(secondHashIdx))); - }); - } - log.trace("Children nodes (size: {}): {}", validChildren.size(), validChildren); return validChildren; } } diff --git a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLockTest.java b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLockTest.java index bf55b79d4a..229e8ec07d 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLockTest.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLockTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerArray; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; +import java.util.function.BiPredicate; import java.util.function.Supplier; import org.apache.accumulo.core.fate.FateId; @@ -45,14 +46,19 @@ public class DistributedReadWriteLockTest { final SortedMap<Long,FateLockEntry> locks = new TreeMap<>(); @Override - public synchronized SortedMap<Long,Supplier<FateLockEntry>> getEarlierEntries(long entry) { + public synchronized SortedMap<Long,Supplier<FateLockEntry>> + getEntries(BiPredicate<Long,Supplier<FateLockEntry>> predicate) { SortedMap<Long,Supplier<FateLockEntry>> result = new TreeMap<>(); - locks.headMap(entry + 1).forEach((k, v) -> result.put(k, () -> v)); + locks.forEach((seq, lockData) -> { + if (predicate.test(seq, () -> lockData)) { + result.put(seq, () -> lockData); + } + }); return result; } @Override - public synchronized void removeEntry(long entry) { + public synchronized void removeEntry(FateLockEntry data, long entry) { synchronized (locks) { locks.remove(entry); locks.notifyAll(); @@ -147,7 +153,7 @@ public class DistributedReadWriteLockTest { assertEquals(LockType.READ, entry.getLockType()); assertEquals(FateId.from(FateInstanceType.USER, uuid), entry.getFateId()); - byte[] serialized = entry.serialize(); + String serialized = entry.serialize(); var deserialized = FateLockEntry.deserialize(serialized); assertEquals(entry, deserialized); } diff --git a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/FateLockTest.java b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/FateLockTest.java new file mode 100644 index 0000000000..b4648c3f90 --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/FateLockTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate.zookeeper; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; + +import java.util.UUID; + +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.junit.jupiter.api.Test; + +public class FateLockTest { + + @Test + public void testParsing() { + var fateId = FateId.from(FateInstanceType.USER, UUID.randomUUID()); + // ZooKeeper docs state that sequence numbers are formatted using %010d + String lockData = "WRITE:" + fateId.canonical(); + var lockNode = + new FateLock.NodeName(FateLock.PREFIX + lockData + "#" + String.format("%010d", 40)); + assertEquals(40, lockNode.sequence); + assertEquals(lockData, lockNode.fateLockEntry.get().serialize()); + + assertThrows(IllegalArgumentException.class, + () -> new FateLock.NodeName(lockData + "#" + String.format("%010d", 40))); + assertThrows(IllegalArgumentException.class, + () -> new FateLock.NodeName(FateLock.PREFIX + lockData + "#" + String.format("%d", 40))); + assertThrows(IllegalArgumentException.class, + () -> new FateLock.NodeName(FateLock.PREFIX + lockData + "#" + String.format("%09d", 40))); + assertThrows(IllegalArgumentException.class, + () -> new FateLock.NodeName(FateLock.PREFIX + lockData + "#" + String.format("%011d", 40))); + assertThrows(IllegalArgumentException.class, + () -> new FateLock.NodeName(FateLock.PREFIX + lockData + "#abc")); + assertThrows(IllegalArgumentException.class, + () -> new FateLock.NodeName(FateLock.PREFIX + lockData + String.format("%010d", 40))); + + // ZooKeeper docs state that sequence numbers can roll and become negative. The FateLock code + // does not support this, so make sure it fails if this happens. + for (int i : new int[] {Integer.MIN_VALUE, Integer.MIN_VALUE / 2, Integer.MIN_VALUE / 10, + Integer.MIN_VALUE / 1000, -40}) { + var seq = String.format("%010d", i); + if (seq.length() == 10) { + assertThrows(NumberFormatException.class, + () -> new FateLock.NodeName(FateLock.PREFIX + lockData + "#" + seq)); + } else if (seq.length() == 11) { + assertThrows(IllegalArgumentException.class, + () -> new FateLock.NodeName(FateLock.PREFIX + lockData + "#" + seq)); + } else { + fail("Unexpected length " + seq.length()); + } + } + + // Test a negative number that is not formatted w/ %010d + assertThrows(IllegalArgumentException.class, + () -> new FateLock.NodeName(FateLock.PREFIX + lockData + "#" + String.format("%d", -40))); + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java index b3ed3c26fa..46271c2caf 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionTestUtils.java @@ -99,15 +99,20 @@ public class ExternalCompactionTestUtils { return String.format("r:%04d", r); } - public static void compact(final AccumuloClient client, String table1, int modulus, - String expectedQueue, boolean wait) - throws AccumuloSecurityException, TableNotFoundException, AccumuloException { + public static void addCompactionIterators(CompactionConfig config, int modulus, + String expectedQueue) { IteratorSetting iterSetting = new IteratorSetting(100, TestFilter.class); // make sure iterator options make it to compactor process iterSetting.addOption("expectedQ", expectedQueue); iterSetting.addOption("modulus", modulus + ""); - CompactionConfig config = - new CompactionConfig().setIterators(List.of(iterSetting)).setWait(wait); + config.setIterators(List.of(iterSetting)); + } + + public static void compact(final AccumuloClient client, String table1, int modulus, + String expectedQueue, boolean wait) + throws AccumuloSecurityException, TableNotFoundException, AccumuloException { + CompactionConfig config = new CompactionConfig().setWait(wait); + addCompactionIterators(config, modulus, expectedQueue); client.tableOperations().compact(table1, config); } diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java index 99002079d4..fc1020bcfa 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java @@ -25,6 +25,7 @@ import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GR import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP6; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP8; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.MAX_DATA; +import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.addCompactionIterators; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.assertNoCompactionMetadata; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.compact; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable; @@ -58,12 +59,14 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.PluginConfig; +import org.apache.accumulo.core.client.admin.TabletInformation; import org.apache.accumulo.core.client.admin.compaction.CompactableFile; import org.apache.accumulo.core.client.admin.compaction.CompactionSelector; import org.apache.accumulo.core.client.admin.compaction.CompressionConfigurer; @@ -72,6 +75,7 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.FateId; @@ -514,6 +518,68 @@ public class ExternalCompaction_1_IT extends SharedMiniClusterBase { compact(client, table1, 3, GROUP4, true); verify(client, table1, 3); + + List<TabletId> tabletIds; + // start a compaction on each tablet + try (var tablets = client.tableOperations().getTabletInformation(table1, new Range())) { + tabletIds = tablets.map(TabletInformation::getTabletId).collect(Collectors.toList()); + } + // compact the even tablet with a modulus filter of 2 + List<Range> evenRanges = new ArrayList<>(); + for (int i = 0; i < tabletIds.size(); i += 2) { + var tabletId = tabletIds.get(i); + CompactionConfig compactionConfig = new CompactionConfig() + .setStartRow(tabletId.getPrevEndRow()).setEndRow(tabletId.getEndRow()).setWait(false); + addCompactionIterators(compactionConfig, 2, GROUP4); + client.tableOperations().compact(table1, compactionConfig); + evenRanges.add(tabletId.toRange()); + } + + // compact the odd tablets with a modulus filter of 5 + List<Range> oddRanges = new ArrayList<>(); + for (int i = 1; i < tabletIds.size(); i += 2) { + var tabletId = tabletIds.get(i); + CompactionConfig compactionConfig = new CompactionConfig() + .setStartRow(tabletId.getPrevEndRow()).setEndRow(tabletId.getEndRow()).setWait(false); + addCompactionIterators(compactionConfig, 5, GROUP4); + client.tableOperations().compact(table1, compactionConfig); + oddRanges.add(tabletId.toRange()); + } + + Wait.waitFor(() -> { + try (BatchScanner scanner = client.createBatchScanner(table1)) { + scanner.setRanges(evenRanges); + // filtered out data that was divisible by 3 and then 2 by compactions, so should end up + // w/ only data divisible by 6 + int matching = 0; + int nonMatching = 0; + for (var entry : scanner) { + int val = Integer.parseInt(entry.getValue().toString()); + if (val % 6 == 0) { + matching++; + } else { + nonMatching++; + } + } + boolean evenDone = matching > 0 && nonMatching == 0; + // filtered out data that was divisible by 3 and then 5 by compactions, so should end up + // w/ only data divisible by 15 + scanner.setRanges(oddRanges); + matching = 0; + nonMatching = 0; + for (var entry : scanner) { + int val = Integer.parseInt(entry.getValue().toString()); + if (val % 15 == 0) { + matching++; + } else { + nonMatching++; + } + } + boolean oddDone = matching > 0 && nonMatching == 0; + return evenDone && oddDone; + } + }); + } }