This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new ff93f66712 Validate recovered lock type (#4125) ff93f66712 is described below commit ff93f66712ff3500cb114684e4f3cd9823c079a8 Author: rsingh433 <74160026+rsingh...@users.noreply.github.com> AuthorDate: Tue Jan 9 16:28:42 2024 -0500 Validate recovered lock type (#4125) * Reveal lock type for DistributedReadWriteLock * Make lock type visible to callers * Update Util to make sure it checks the correct type * Update exception to include table/namespace id and use transaction id as a long * implemented format fate ids to easily find all messages --------- Co-authored-by: Christopher Tubbs <ctubb...@apache.org> --- .../fate/zookeeper/DistributedReadWriteLock.java | 33 +++++++++++++--------- .../apache/accumulo/manager/tableOps/Utils.java | 15 ++++++++-- 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java index 852a40b317..84b3c1148c 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java @@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory; */ public class DistributedReadWriteLock implements java.util.concurrent.locks.ReadWriteLock { - static enum LockType { + public static enum LockType { READ, WRITE, } @@ -107,7 +107,11 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read private static final Logger log = LoggerFactory.getLogger(DistributedReadWriteLock.class); - static class ReadLock implements Lock { + public static interface DistributedLock extends Lock { + LockType getType(); + } + + static class ReadLock implements DistributedLock { QueueLock qlock; byte[] userData; @@ -125,7 +129,8 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read this.entry = entry; } - protected LockType lockType() { + @Override + public LockType getType() { return LockType.READ; } @@ -154,9 +159,9 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read @Override public boolean tryLock() { if (entry == -1) { - entry = qlock.addEntry(new ParsedLock(this.lockType(), this.userData).getLockData()); + entry = qlock.addEntry(new ParsedLock(this.getType(), this.userData).getLockData()); log.info("Added lock entry {} userData {} lockType {}", entry, - new String(this.userData, UTF_8), lockType()); + new String(this.userData, UTF_8), getType()); } SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry); for (Entry<Long,byte[]> entry : entries.entrySet()) { @@ -169,7 +174,7 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read } } throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry - + " userData " + new String(this.userData, UTF_8) + " lockType " + lockType()); + + " userData " + new String(this.userData, UTF_8) + " lockType " + getType()); } @Override @@ -193,7 +198,7 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read return; } log.debug("Removing lock entry {} userData {} lockType {}", entry, - new String(this.userData, UTF_8), lockType()); + new String(this.userData, UTF_8), getType()); qlock.removeEntry(entry); entry = -1; } @@ -215,22 +220,22 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read } @Override - protected LockType lockType() { + public LockType getType() { return LockType.WRITE; } @Override public boolean tryLock() { if (entry == -1) { - entry = qlock.addEntry(new ParsedLock(this.lockType(), this.userData).getLockData()); + entry = qlock.addEntry(new ParsedLock(this.getType(), this.userData).getLockData()); log.info("Added lock entry {} userData {} lockType {}", entry, - new String(this.userData, UTF_8), lockType()); + new String(this.userData, UTF_8), getType()); } SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry); Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator(); if (!iterator.hasNext()) { throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry - + " userData " + new String(this.userData, UTF_8) + " lockType " + lockType()); + + " userData " + new String(this.userData, UTF_8) + " lockType " + getType()); } return iterator.next().getKey().equals(entry); } @@ -244,7 +249,7 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read this.data = Arrays.copyOf(data, data.length); } - public static Lock recoverLock(QueueLock qlock, byte[] data) { + public static DistributedLock recoverLock(QueueLock qlock, byte[] data) { SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(Long.MAX_VALUE); for (Entry<Long,byte[]> entry : entries.entrySet()) { ParsedLock parsed = new ParsedLock(entry.getValue()); @@ -261,12 +266,12 @@ public class DistributedReadWriteLock implements java.util.concurrent.locks.Read } @Override - public Lock readLock() { + public DistributedLock readLock() { return new ReadLock(qlock, data); } @Override - public Lock writeLock() { + public DistributedLock writeLock() { return new WriteLock(qlock, data); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java index def74371c2..b4d92b3a9d 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java @@ -39,6 +39,8 @@ import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock; +import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.DistributedLock; +import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.LockType; import org.apache.accumulo.core.fate.zookeeper.FateLock; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooReservation; @@ -166,8 +168,17 @@ public class Utils { var fLockPath = FateLock.path(context.getZooKeeperRoot() + Constants.ZTABLE_LOCKS + "/" + id.canonical()); FateLock qlock = new FateLock(context.getZooReaderWriter(), fLockPath); - Lock lock = DistributedReadWriteLock.recoverLock(qlock, lockData); - if (lock == null) { + DistributedLock lock = DistributedReadWriteLock.recoverLock(qlock, lockData); + if (lock != null) { + + // Validate the recovered lock type + boolean isWriteLock = lock.getType() == LockType.WRITE; + if (writeLock != isWriteLock) { + throw new IllegalStateException("Unexpected lock type " + lock.getType() + + " recovered for transaction " + FateTxId.formatTid(tid) + " on object " + id + + ". Expected " + (writeLock ? LockType.WRITE : LockType.READ) + " lock instead."); + } + } else { DistributedReadWriteLock locker = new DistributedReadWriteLock(qlock, lockData); if (writeLock) { lock = locker.writeLock();