This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new ff93f66712 Validate recovered lock type (#4125)
ff93f66712 is described below

commit ff93f66712ff3500cb114684e4f3cd9823c079a8
Author: rsingh433 <74160026+rsingh...@users.noreply.github.com>
AuthorDate: Tue Jan 9 16:28:42 2024 -0500

    Validate recovered lock type (#4125)
    
    * Reveal lock type for DistributedReadWriteLock
    * Make lock type visible to callers
    * Update Util to make sure it checks the correct type
    * Update exception to include table/namespace id and use transaction id
      as a long
    * implemented format fate ids to easily find all messages
    
    ---------
    
    Co-authored-by: Christopher Tubbs <ctubb...@apache.org>
---
 .../fate/zookeeper/DistributedReadWriteLock.java   | 33 +++++++++++++---------
 .../apache/accumulo/manager/tableOps/Utils.java    | 15 ++++++++--
 2 files changed, 32 insertions(+), 16 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java
 
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java
index 852a40b317..84b3c1148c 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
  */
 public class DistributedReadWriteLock implements 
java.util.concurrent.locks.ReadWriteLock {
 
-  static enum LockType {
+  public static enum LockType {
     READ, WRITE,
   }
 
@@ -107,7 +107,11 @@ public class DistributedReadWriteLock implements 
java.util.concurrent.locks.Read
 
   private static final Logger log = 
LoggerFactory.getLogger(DistributedReadWriteLock.class);
 
-  static class ReadLock implements Lock {
+  public static interface DistributedLock extends Lock {
+    LockType getType();
+  }
+
+  static class ReadLock implements DistributedLock {
 
     QueueLock qlock;
     byte[] userData;
@@ -125,7 +129,8 @@ public class DistributedReadWriteLock implements 
java.util.concurrent.locks.Read
       this.entry = entry;
     }
 
-    protected LockType lockType() {
+    @Override
+    public LockType getType() {
       return LockType.READ;
     }
 
@@ -154,9 +159,9 @@ public class DistributedReadWriteLock implements 
java.util.concurrent.locks.Read
     @Override
     public boolean tryLock() {
       if (entry == -1) {
-        entry = qlock.addEntry(new ParsedLock(this.lockType(), 
this.userData).getLockData());
+        entry = qlock.addEntry(new ParsedLock(this.getType(), 
this.userData).getLockData());
         log.info("Added lock entry {} userData {} lockType {}", entry,
-            new String(this.userData, UTF_8), lockType());
+            new String(this.userData, UTF_8), getType());
       }
       SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
       for (Entry<Long,byte[]> entry : entries.entrySet()) {
@@ -169,7 +174,7 @@ public class DistributedReadWriteLock implements 
java.util.concurrent.locks.Read
         }
       }
       throw new IllegalStateException("Did not find our own lock in the queue: 
" + this.entry
-          + " userData " + new String(this.userData, UTF_8) + " lockType " + 
lockType());
+          + " userData " + new String(this.userData, UTF_8) + " lockType " + 
getType());
     }
 
     @Override
@@ -193,7 +198,7 @@ public class DistributedReadWriteLock implements 
java.util.concurrent.locks.Read
         return;
       }
       log.debug("Removing lock entry {} userData {} lockType {}", entry,
-          new String(this.userData, UTF_8), lockType());
+          new String(this.userData, UTF_8), getType());
       qlock.removeEntry(entry);
       entry = -1;
     }
@@ -215,22 +220,22 @@ public class DistributedReadWriteLock implements 
java.util.concurrent.locks.Read
     }
 
     @Override
-    protected LockType lockType() {
+    public LockType getType() {
       return LockType.WRITE;
     }
 
     @Override
     public boolean tryLock() {
       if (entry == -1) {
-        entry = qlock.addEntry(new ParsedLock(this.lockType(), 
this.userData).getLockData());
+        entry = qlock.addEntry(new ParsedLock(this.getType(), 
this.userData).getLockData());
         log.info("Added lock entry {} userData {} lockType {}", entry,
-            new String(this.userData, UTF_8), lockType());
+            new String(this.userData, UTF_8), getType());
       }
       SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
       Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
       if (!iterator.hasNext()) {
         throw new IllegalStateException("Did not find our own lock in the 
queue: " + this.entry
-            + " userData " + new String(this.userData, UTF_8) + " lockType " + 
lockType());
+            + " userData " + new String(this.userData, UTF_8) + " lockType " + 
getType());
       }
       return iterator.next().getKey().equals(entry);
     }
@@ -244,7 +249,7 @@ public class DistributedReadWriteLock implements 
java.util.concurrent.locks.Read
     this.data = Arrays.copyOf(data, data.length);
   }
 
-  public static Lock recoverLock(QueueLock qlock, byte[] data) {
+  public static DistributedLock recoverLock(QueueLock qlock, byte[] data) {
     SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(Long.MAX_VALUE);
     for (Entry<Long,byte[]> entry : entries.entrySet()) {
       ParsedLock parsed = new ParsedLock(entry.getValue());
@@ -261,12 +266,12 @@ public class DistributedReadWriteLock implements 
java.util.concurrent.locks.Read
   }
 
   @Override
-  public Lock readLock() {
+  public DistributedLock readLock() {
     return new ReadLock(qlock, data);
   }
 
   @Override
-  public Lock writeLock() {
+  public DistributedLock writeLock() {
     return new WriteLock(qlock, data);
   }
 }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java
index def74371c2..b4d92b3a9d 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java
@@ -39,6 +39,8 @@ import org.apache.accumulo.core.data.NamespaceId;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.fate.FateTxId;
 import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock;
+import 
org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.DistributedLock;
+import 
org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.LockType;
 import org.apache.accumulo.core.fate.zookeeper.FateLock;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.core.fate.zookeeper.ZooReservation;
@@ -166,8 +168,17 @@ public class Utils {
     var fLockPath =
         FateLock.path(context.getZooKeeperRoot() + Constants.ZTABLE_LOCKS + 
"/" + id.canonical());
     FateLock qlock = new FateLock(context.getZooReaderWriter(), fLockPath);
-    Lock lock = DistributedReadWriteLock.recoverLock(qlock, lockData);
-    if (lock == null) {
+    DistributedLock lock = DistributedReadWriteLock.recoverLock(qlock, 
lockData);
+    if (lock != null) {
+
+      // Validate the recovered lock type
+      boolean isWriteLock = lock.getType() == LockType.WRITE;
+      if (writeLock != isWriteLock) {
+        throw new IllegalStateException("Unexpected lock type " + 
lock.getType()
+            + " recovered for transaction " + FateTxId.formatTid(tid) + " on 
object " + id
+            + ". Expected " + (writeLock ? LockType.WRITE : LockType.READ) + " 
lock instead.");
+      }
+    } else {
       DistributedReadWriteLock locker = new DistributedReadWriteLock(qlock, 
lockData);
       if (writeLock) {
         lock = locker.writeLock();

Reply via email to