This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 5c39aac379 Refactor Fate lock code to use concrete types (#5298) 5c39aac379 is described below commit 5c39aac379036dcacead35c8f85ac8f67e3ecbc2 Author: Christopher L. Shannon <cshan...@apache.org> AuthorDate: Sat Feb 1 11:17:19 2025 -0500 Refactor Fate lock code to use concrete types (#5298) Instead of passing around byte arrays for DistributedReadWriteLock and related classes we can use LockType and FateId to make the code easier to work with and to understand as those values are serialized as part of the lock. A new FateLockEntry type has been created which is passed around instead and that is serialized/deserialized into the correct byte array format when reading/writing to zookeeeper. This closes #5264 --- .../fate/zookeeper/DistributedReadWriteLock.java | 131 +++++++-------------- .../accumulo/core/fate/zookeeper/FateLock.java | 95 ++++++++++++++- .../zookeeper/DistributedReadWriteLockTest.java | 37 ++++-- .../apache/accumulo/manager/tableOps/Utils.java | 5 +- 4 files changed, 159 insertions(+), 109 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java index 221494c911..1f36ee7a92 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java @@ -18,18 +18,19 @@ */ package org.apache.accumulo.core.fate.zookeeper; -import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.concurrent.TimeUnit.DAYS; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import java.util.Arrays; import java.util.Iterator; import java.util.Map.Entry; import java.util.SortedMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; +import java.util.function.Supplier; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.zookeeper.FateLock.FateLockEntry; import org.apache.accumulo.core.util.UtilWaitThread; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,65 +45,16 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read READ, WRITE, } - // serializer for lock type and user data - static class ParsedLock { - public ParsedLock(LockType type, byte[] userData) { - this.type = type; - this.userData = Arrays.copyOf(userData, userData.length); - } - - public ParsedLock(byte[] lockData) { - if (lockData == null || lockData.length < 1) { - throw new IllegalArgumentException(); - } - - int split = -1; - for (int i = 0; i < lockData.length; i++) { - if (lockData[i] == ':') { - split = i; - break; - } - } - - if (split == -1) { - throw new IllegalArgumentException(); - } - - this.type = LockType.valueOf(new String(lockData, 0, split, UTF_8)); - this.userData = Arrays.copyOfRange(lockData, split + 1, lockData.length); - } - - public LockType getType() { - return type; - } - - public byte[] getUserData() { - return userData; - } - - public byte[] getLockData() { - byte[] typeBytes = type.name().getBytes(UTF_8); - byte[] result = new byte[userData.length + 1 + typeBytes.length]; - System.arraycopy(typeBytes, 0, result, 0, typeBytes.length); - result[typeBytes.length] = ':'; - System.arraycopy(userData, 0, result, typeBytes.length + 1, userData.length); - return result; - } - - private LockType type; - private byte[] userData; - } - // This kind of lock can be easily implemented by ZooKeeper // You make an entry at the bottom of the queue, readers run when there are no writers ahead of // them, // a writer only runs when they are at the top of the queue. public interface QueueLock { - SortedMap<Long,byte[]> getEarlierEntries(long entry); + SortedMap<Long,Supplier<FateLockEntry>> getEarlierEntries(long entry); void removeEntry(long entry); - long addEntry(byte[] data); + long addEntry(FateLockEntry entry); } private static final Logger log = LoggerFactory.getLogger(DistributedReadWriteLock.class); @@ -114,18 +66,18 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read static class ReadLock implements DistributedLock { final QueueLock qlock; - final byte[] userData; + final FateId fateId; long entry = -1; - ReadLock(QueueLock qlock, byte[] userData) { + ReadLock(QueueLock qlock, FateId fateId) { this.qlock = qlock; - this.userData = userData; + this.fateId = fateId; } // for recovery - ReadLock(QueueLock qlock, byte[] userData, long entry) { + ReadLock(QueueLock qlock, FateId fateId, long entry) { this.qlock = qlock; - this.userData = userData; + this.fateId = fateId; this.entry = entry; } @@ -160,22 +112,21 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read @Override public boolean tryLock() { if (entry == -1) { - entry = qlock.addEntry(new ParsedLock(this.getType(), this.userData).getLockData()); - log.info("Added lock entry {} userData {} lockType {}", entry, - new String(this.userData, UTF_8), getType()); + entry = qlock.addEntry(FateLockEntry.from(this.getType(), this.fateId)); + log.info("Added lock entry {} fateId {} lockType {}", entry, fateId, getType()); } - SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry); - for (Entry<Long,byte[]> entry : entries.entrySet()) { - ParsedLock parsed = new ParsedLock(entry.getValue()); + SortedMap<Long,Supplier<FateLockEntry>> entries = qlock.getEarlierEntries(entry); + for (Entry<Long,Supplier<FateLockEntry>> entry : entries.entrySet()) { if (entry.getKey().equals(this.entry)) { return true; } - if (parsed.type == LockType.WRITE) { + FateLockEntry lockEntry = entry.getValue().get(); + if (lockEntry.getLockType() == LockType.WRITE) { return false; } } throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry - + " userData " + new String(this.userData, UTF_8) + " lockType " + getType()); + + " fateId " + this.fateId + " lockType " + getType()); } @Override @@ -198,8 +149,7 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read if (entry == -1) { return; } - log.debug("Removing lock entry {} userData {} lockType {}", entry, - new String(this.userData, UTF_8), getType()); + log.debug("Removing lock entry {} fateId {} lockType {}", entry, this.fateId, getType()); qlock.removeEntry(entry); entry = -1; } @@ -212,12 +162,12 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read static class WriteLock extends ReadLock { - WriteLock(QueueLock qlock, byte[] userData) { - super(qlock, userData); + WriteLock(QueueLock qlock, FateId fateId) { + super(qlock, fateId); } - WriteLock(QueueLock qlock, byte[] userData, long entry) { - super(qlock, userData, entry); + WriteLock(QueueLock qlock, FateId fateId, long entry) { + super(qlock, fateId, entry); } @Override @@ -228,38 +178,37 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read @Override public boolean tryLock() { if (entry == -1) { - entry = qlock.addEntry(new ParsedLock(this.getType(), this.userData).getLockData()); - log.info("Added lock entry {} userData {} lockType {}", entry, - new String(this.userData, UTF_8), getType()); + entry = qlock.addEntry(FateLockEntry.from(this.getType(), this.fateId)); + log.info("Added lock entry {} fateId {} lockType {}", entry, this.fateId, getType()); } - SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry); - Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator(); + SortedMap<Long,Supplier<FateLockEntry>> entries = qlock.getEarlierEntries(entry); + Iterator<Entry<Long,Supplier<FateLockEntry>>> iterator = entries.entrySet().iterator(); if (!iterator.hasNext()) { throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry - + " userData " + new String(this.userData, UTF_8) + " lockType " + getType()); + + " fateId " + this.fateId + " lockType " + getType()); } return iterator.next().getKey().equals(entry); } } private final QueueLock qlock; - private final byte[] data; + private final FateId fateId; - public DistributedReadWriteLock(QueueLock qlock, byte[] data) { + public DistributedReadWriteLock(QueueLock qlock, FateId fateId) { this.qlock = qlock; - this.data = Arrays.copyOf(data, data.length); + this.fateId = fateId; } - public static DistributedLock recoverLock(QueueLock qlock, byte[] data) { - SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(Long.MAX_VALUE); - for (Entry<Long,byte[]> entry : entries.entrySet()) { - ParsedLock parsed = new ParsedLock(entry.getValue()); - if (Arrays.equals(data, parsed.getUserData())) { - switch (parsed.getType()) { + public static DistributedLock recoverLock(QueueLock qlock, FateId fateId) { + SortedMap<Long,Supplier<FateLockEntry>> entries = qlock.getEarlierEntries(Long.MAX_VALUE); + for (Entry<Long,Supplier<FateLockEntry>> entry : entries.entrySet()) { + FateLockEntry lockEntry = entry.getValue().get(); + if (fateId.equals(lockEntry.getFateId())) { + switch (lockEntry.getLockType()) { case READ: - return new ReadLock(qlock, parsed.getUserData(), entry.getKey()); + return new ReadLock(qlock, lockEntry.getFateId(), entry.getKey()); case WRITE: - return new WriteLock(qlock, parsed.getUserData(), entry.getKey()); + return new WriteLock(qlock, lockEntry.getFateId(), entry.getKey()); } } } @@ -268,11 +217,11 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read @Override public DistributedLock readLock() { - return new ReadLock(qlock, data); + return new ReadLock(qlock, fateId); } @Override public DistributedLock writeLock() { - return new WriteLock(qlock, data); + return new WriteLock(qlock, fateId); } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java index 6b42014c31..74c3065c70 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java @@ -18,14 +18,20 @@ */ package org.apache.accumulo.core.fate.zookeeper; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.SortedMap; import java.util.TreeMap; +import java.util.function.Supplier; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.LockType; import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.QueueLock; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; @@ -35,6 +41,8 @@ import org.apache.zookeeper.KeeperException.NotEmptyException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Suppliers; + /** * A persistent lock mechanism in ZooKeeper used for locking tables during FaTE operations. */ @@ -59,6 +67,81 @@ public class FateLock implements QueueLock { } } + public static class FateLockEntry { + final LockType lockType; + final FateId fateId; + + private FateLockEntry(LockType lockType, FateId fateId) { + this.lockType = Objects.requireNonNull(lockType); + this.fateId = Objects.requireNonNull(fateId); + } + + private FateLockEntry(byte[] entry) { + if (entry == null || entry.length < 1) { + throw new IllegalArgumentException(); + } + + int split = -1; + for (int i = 0; i < entry.length; i++) { + if (entry[i] == ':') { + split = i; + break; + } + } + + if (split == -1) { + throw new IllegalArgumentException(); + } + + this.lockType = LockType.valueOf(new String(entry, 0, split, UTF_8)); + this.fateId = + FateId.from(new String(Arrays.copyOfRange(entry, split + 1, entry.length), UTF_8)); + } + + public LockType getLockType() { + return lockType; + } + + public FateId getFateId() { + return fateId; + } + + public byte[] serialize() { + byte[] typeBytes = lockType.name().getBytes(UTF_8); + byte[] fateIdBytes = fateId.canonical().getBytes(UTF_8); + byte[] result = new byte[fateIdBytes.length + 1 + typeBytes.length]; + System.arraycopy(typeBytes, 0, result, 0, typeBytes.length); + result[typeBytes.length] = ':'; + System.arraycopy(fateIdBytes, 0, result, typeBytes.length + 1, fateIdBytes.length); + return result; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + + FateLockEntry lockEntry = (FateLockEntry) o; + return lockType == lockEntry.lockType && fateId.equals(lockEntry.fateId); + } + + @Override + public int hashCode() { + int result = lockType.hashCode(); + result = 31 * result + fateId.hashCode(); + return result; + } + + public static FateLockEntry from(LockType lockType, FateId fateId) { + return new FateLockEntry(lockType, fateId); + } + + public static FateLockEntry deserialize(byte[] serialized) { + return new FateLockEntry(serialized); + } + } + public static FateLockPath path(String path) { return new FateLockPath(path); } @@ -69,12 +152,12 @@ public class FateLock implements QueueLock { } @Override - public long addEntry(byte[] data) { + public long addEntry(FateLockEntry entry) { String newPath; try { while (true) { try { - newPath = zoo.putPersistentSequential(path + "/" + PREFIX, data); + newPath = zoo.putPersistentSequential(path + "/" + PREFIX, entry.serialize()); String[] parts = newPath.split("/"); String last = parts[parts.length - 1]; return Long.parseLong(last.substring(PREFIX.length())); @@ -89,8 +172,8 @@ public class FateLock implements QueueLock { } @Override - public SortedMap<Long,byte[]> getEarlierEntries(long entry) { - SortedMap<Long,byte[]> result = new TreeMap<>(); + public SortedMap<Long,Supplier<FateLockEntry>> getEarlierEntries(long entry) { + SortedMap<Long,Supplier<FateLockEntry>> result = new TreeMap<>(); try { List<String> children = Collections.emptyList(); try { @@ -106,7 +189,9 @@ public class FateLock implements QueueLock { long order = Long.parseLong(name.substring(PREFIX.length())); if (order <= entry) { byte[] data = zoo.getData(path + "/" + name); - result.put(order, data); + // Use a supplier so we don't need to deserialize unless the calling code cares about + // the value for that entry. + result.put(order, Suppliers.memoize(() -> FateLockEntry.deserialize(data))); } } catch (KeeperException.NoNodeException ex) { // ignored diff --git a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLockTest.java b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLockTest.java index da30442d93..bf55b79d4a 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLockTest.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLockTest.java @@ -18,17 +18,22 @@ */ package org.apache.accumulo.core.fate.zookeeper; -import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; import java.util.SortedMap; import java.util.TreeMap; +import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerArray; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; +import java.util.function.Supplier; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.LockType; import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.QueueLock; +import org.apache.accumulo.core.fate.zookeeper.FateLock.FateLockEntry; import org.junit.jupiter.api.Test; public class DistributedReadWriteLockTest { @@ -37,12 +42,12 @@ public class DistributedReadWriteLockTest { public static class MockQueueLock implements QueueLock { long next = 0L; - final SortedMap<Long,byte[]> locks = new TreeMap<>(); + final SortedMap<Long,FateLockEntry> locks = new TreeMap<>(); @Override - public synchronized SortedMap<Long,byte[]> getEarlierEntries(long entry) { - SortedMap<Long,byte[]> result = new TreeMap<>(); - result.putAll(locks.headMap(entry + 1)); + public synchronized SortedMap<Long,Supplier<FateLockEntry>> getEarlierEntries(long entry) { + SortedMap<Long,Supplier<FateLockEntry>> result = new TreeMap<>(); + locks.headMap(entry + 1).forEach((k, v) -> result.put(k, () -> v)); return result; } @@ -55,10 +60,10 @@ public class DistributedReadWriteLockTest { } @Override - public synchronized long addEntry(byte[] data) { + public synchronized long addEntry(FateLockEntry entry) { long result; synchronized (locks) { - locks.put(result = next++, data); + locks.put(result = next++, entry); locks.notifyAll(); } return result; @@ -67,8 +72,8 @@ public class DistributedReadWriteLockTest { // some data that is probably not going to update atomically static class SomeData { - private AtomicIntegerArray data = new AtomicIntegerArray(100); - private AtomicInteger counter = new AtomicInteger(); + private final AtomicIntegerArray data = new AtomicIntegerArray(100); + private final AtomicInteger counter = new AtomicInteger(); void read() { for (int i = 0; i < data.length(); i++) { @@ -91,7 +96,8 @@ public class DistributedReadWriteLockTest { data.read(); QueueLock qlock = new MockQueueLock(); - final ReadWriteLock locker = new DistributedReadWriteLock(qlock, "locker1".getBytes(UTF_8)); + final ReadWriteLock locker = + new DistributedReadWriteLock(qlock, FateId.from(FateInstanceType.USER, UUID.randomUUID())); final Lock readLock = locker.readLock(); final Lock writeLock = locker.writeLock(); readLock.lock(); @@ -134,4 +140,15 @@ public class DistributedReadWriteLockTest { } } + @Test + public void testFateLockEntrySerDes() { + var uuid = UUID.randomUUID(); + var entry = FateLockEntry.from(LockType.READ, FateId.from(FateInstanceType.USER, uuid)); + assertEquals(LockType.READ, entry.getLockType()); + assertEquals(FateId.from(FateInstanceType.USER, uuid), entry.getFateId()); + + byte[] serialized = entry.serialize(); + var deserialized = FateLockEntry.deserialize(serialized); + assertEquals(entry, deserialized); + } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java index 8517503639..a8089dbb13 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java @@ -221,11 +221,10 @@ public class Utils { private static Lock getLock(ServerContext context, AbstractId<?> id, FateId fateId, LockType lockType) { - byte[] lockData = fateId.canonical().getBytes(UTF_8); var fLockPath = FateLock.path(context.getZooKeeperRoot() + Constants.ZTABLE_LOCKS + "/" + id.canonical()); FateLock qlock = new FateLock(context.getZooSession().asReaderWriter(), fLockPath); - DistributedLock lock = DistributedReadWriteLock.recoverLock(qlock, lockData); + DistributedLock lock = DistributedReadWriteLock.recoverLock(qlock, fateId); if (lock != null) { // Validate the recovered lock type @@ -235,7 +234,7 @@ public class Utils { + " on object " + id + ". Expected " + lockType + " lock instead."); } } else { - DistributedReadWriteLock locker = new DistributedReadWriteLock(qlock, lockData); + DistributedReadWriteLock locker = new DistributedReadWriteLock(qlock, fateId); switch (lockType) { case WRITE: lock = locker.writeLock();