This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 5c39aac379 Refactor Fate lock code to use concrete types (#5298)
5c39aac379 is described below

commit 5c39aac379036dcacead35c8f85ac8f67e3ecbc2
Author: Christopher L. Shannon <cshan...@apache.org>
AuthorDate: Sat Feb 1 11:17:19 2025 -0500

    Refactor Fate lock code to use concrete types (#5298)
    
    Instead of passing around byte arrays for DistributedReadWriteLock and
    related classes we can use LockType and FateId to make the code easier
    to work with and to understand as those values are serialized as part of
    the lock. A new FateLockEntry type has been created which is passed
    around instead and that is serialized/deserialized into the correct byte
    array format when reading/writing to zookeeeper.
    
    This closes #5264
---
 .../fate/zookeeper/DistributedReadWriteLock.java   | 131 +++++++--------------
 .../accumulo/core/fate/zookeeper/FateLock.java     |  95 ++++++++++++++-
 .../zookeeper/DistributedReadWriteLockTest.java    |  37 ++++--
 .../apache/accumulo/manager/tableOps/Utils.java    |   5 +-
 4 files changed, 159 insertions(+), 109 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java
 
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java
index 221494c911..1f36ee7a92 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLock.java
@@ -18,18 +18,19 @@
  */
 package org.apache.accumulo.core.fate.zookeeper;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.concurrent.TimeUnit.DAYS;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
-import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.SortedMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
+import java.util.function.Supplier;
 
+import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.zookeeper.FateLock.FateLockEntry;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,65 +45,16 @@ public class DistributedReadWriteLock implements 
java.util.concurrent.locks.Read
     READ, WRITE,
   }
 
-  // serializer for lock type and user data
-  static class ParsedLock {
-    public ParsedLock(LockType type, byte[] userData) {
-      this.type = type;
-      this.userData = Arrays.copyOf(userData, userData.length);
-    }
-
-    public ParsedLock(byte[] lockData) {
-      if (lockData == null || lockData.length < 1) {
-        throw new IllegalArgumentException();
-      }
-
-      int split = -1;
-      for (int i = 0; i < lockData.length; i++) {
-        if (lockData[i] == ':') {
-          split = i;
-          break;
-        }
-      }
-
-      if (split == -1) {
-        throw new IllegalArgumentException();
-      }
-
-      this.type = LockType.valueOf(new String(lockData, 0, split, UTF_8));
-      this.userData = Arrays.copyOfRange(lockData, split + 1, lockData.length);
-    }
-
-    public LockType getType() {
-      return type;
-    }
-
-    public byte[] getUserData() {
-      return userData;
-    }
-
-    public byte[] getLockData() {
-      byte[] typeBytes = type.name().getBytes(UTF_8);
-      byte[] result = new byte[userData.length + 1 + typeBytes.length];
-      System.arraycopy(typeBytes, 0, result, 0, typeBytes.length);
-      result[typeBytes.length] = ':';
-      System.arraycopy(userData, 0, result, typeBytes.length + 1, 
userData.length);
-      return result;
-    }
-
-    private LockType type;
-    private byte[] userData;
-  }
-
   // This kind of lock can be easily implemented by ZooKeeper
   // You make an entry at the bottom of the queue, readers run when there are 
no writers ahead of
   // them,
   // a writer only runs when they are at the top of the queue.
   public interface QueueLock {
-    SortedMap<Long,byte[]> getEarlierEntries(long entry);
+    SortedMap<Long,Supplier<FateLockEntry>> getEarlierEntries(long entry);
 
     void removeEntry(long entry);
 
-    long addEntry(byte[] data);
+    long addEntry(FateLockEntry entry);
   }
 
   private static final Logger log = 
LoggerFactory.getLogger(DistributedReadWriteLock.class);
@@ -114,18 +66,18 @@ public class DistributedReadWriteLock implements 
java.util.concurrent.locks.Read
   static class ReadLock implements DistributedLock {
 
     final QueueLock qlock;
-    final byte[] userData;
+    final FateId fateId;
     long entry = -1;
 
-    ReadLock(QueueLock qlock, byte[] userData) {
+    ReadLock(QueueLock qlock, FateId fateId) {
       this.qlock = qlock;
-      this.userData = userData;
+      this.fateId = fateId;
     }
 
     // for recovery
-    ReadLock(QueueLock qlock, byte[] userData, long entry) {
+    ReadLock(QueueLock qlock, FateId fateId, long entry) {
       this.qlock = qlock;
-      this.userData = userData;
+      this.fateId = fateId;
       this.entry = entry;
     }
 
@@ -160,22 +112,21 @@ public class DistributedReadWriteLock implements 
java.util.concurrent.locks.Read
     @Override
     public boolean tryLock() {
       if (entry == -1) {
-        entry = qlock.addEntry(new ParsedLock(this.getType(), 
this.userData).getLockData());
-        log.info("Added lock entry {} userData {} lockType {}", entry,
-            new String(this.userData, UTF_8), getType());
+        entry = qlock.addEntry(FateLockEntry.from(this.getType(), 
this.fateId));
+        log.info("Added lock entry {} fateId {} lockType {}", entry, fateId, 
getType());
       }
-      SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
-      for (Entry<Long,byte[]> entry : entries.entrySet()) {
-        ParsedLock parsed = new ParsedLock(entry.getValue());
+      SortedMap<Long,Supplier<FateLockEntry>> entries = 
qlock.getEarlierEntries(entry);
+      for (Entry<Long,Supplier<FateLockEntry>> entry : entries.entrySet()) {
         if (entry.getKey().equals(this.entry)) {
           return true;
         }
-        if (parsed.type == LockType.WRITE) {
+        FateLockEntry lockEntry = entry.getValue().get();
+        if (lockEntry.getLockType() == LockType.WRITE) {
           return false;
         }
       }
       throw new IllegalStateException("Did not find our own lock in the queue: 
" + this.entry
-          + " userData " + new String(this.userData, UTF_8) + " lockType " + 
getType());
+          + " fateId " + this.fateId + " lockType " + getType());
     }
 
     @Override
@@ -198,8 +149,7 @@ public class DistributedReadWriteLock implements 
java.util.concurrent.locks.Read
       if (entry == -1) {
         return;
       }
-      log.debug("Removing lock entry {} userData {} lockType {}", entry,
-          new String(this.userData, UTF_8), getType());
+      log.debug("Removing lock entry {} fateId {} lockType {}", entry, 
this.fateId, getType());
       qlock.removeEntry(entry);
       entry = -1;
     }
@@ -212,12 +162,12 @@ public class DistributedReadWriteLock implements 
java.util.concurrent.locks.Read
 
   static class WriteLock extends ReadLock {
 
-    WriteLock(QueueLock qlock, byte[] userData) {
-      super(qlock, userData);
+    WriteLock(QueueLock qlock, FateId fateId) {
+      super(qlock, fateId);
     }
 
-    WriteLock(QueueLock qlock, byte[] userData, long entry) {
-      super(qlock, userData, entry);
+    WriteLock(QueueLock qlock, FateId fateId, long entry) {
+      super(qlock, fateId, entry);
     }
 
     @Override
@@ -228,38 +178,37 @@ public class DistributedReadWriteLock implements 
java.util.concurrent.locks.Read
     @Override
     public boolean tryLock() {
       if (entry == -1) {
-        entry = qlock.addEntry(new ParsedLock(this.getType(), 
this.userData).getLockData());
-        log.info("Added lock entry {} userData {} lockType {}", entry,
-            new String(this.userData, UTF_8), getType());
+        entry = qlock.addEntry(FateLockEntry.from(this.getType(), 
this.fateId));
+        log.info("Added lock entry {} fateId {} lockType {}", entry, 
this.fateId, getType());
       }
-      SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
-      Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
+      SortedMap<Long,Supplier<FateLockEntry>> entries = 
qlock.getEarlierEntries(entry);
+      Iterator<Entry<Long,Supplier<FateLockEntry>>> iterator = 
entries.entrySet().iterator();
       if (!iterator.hasNext()) {
         throw new IllegalStateException("Did not find our own lock in the 
queue: " + this.entry
-            + " userData " + new String(this.userData, UTF_8) + " lockType " + 
getType());
+            + " fateId " + this.fateId + " lockType " + getType());
       }
       return iterator.next().getKey().equals(entry);
     }
   }
 
   private final QueueLock qlock;
-  private final byte[] data;
+  private final FateId fateId;
 
-  public DistributedReadWriteLock(QueueLock qlock, byte[] data) {
+  public DistributedReadWriteLock(QueueLock qlock, FateId fateId) {
     this.qlock = qlock;
-    this.data = Arrays.copyOf(data, data.length);
+    this.fateId = fateId;
   }
 
-  public static DistributedLock recoverLock(QueueLock qlock, byte[] data) {
-    SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(Long.MAX_VALUE);
-    for (Entry<Long,byte[]> entry : entries.entrySet()) {
-      ParsedLock parsed = new ParsedLock(entry.getValue());
-      if (Arrays.equals(data, parsed.getUserData())) {
-        switch (parsed.getType()) {
+  public static DistributedLock recoverLock(QueueLock qlock, FateId fateId) {
+    SortedMap<Long,Supplier<FateLockEntry>> entries = 
qlock.getEarlierEntries(Long.MAX_VALUE);
+    for (Entry<Long,Supplier<FateLockEntry>> entry : entries.entrySet()) {
+      FateLockEntry lockEntry = entry.getValue().get();
+      if (fateId.equals(lockEntry.getFateId())) {
+        switch (lockEntry.getLockType()) {
           case READ:
-            return new ReadLock(qlock, parsed.getUserData(), entry.getKey());
+            return new ReadLock(qlock, lockEntry.getFateId(), entry.getKey());
           case WRITE:
-            return new WriteLock(qlock, parsed.getUserData(), entry.getKey());
+            return new WriteLock(qlock, lockEntry.getFateId(), entry.getKey());
         }
       }
     }
@@ -268,11 +217,11 @@ public class DistributedReadWriteLock implements 
java.util.concurrent.locks.Read
 
   @Override
   public DistributedLock readLock() {
-    return new ReadLock(qlock, data);
+    return new ReadLock(qlock, fateId);
   }
 
   @Override
   public DistributedLock writeLock() {
-    return new WriteLock(qlock, data);
+    return new WriteLock(qlock, fateId);
   }
 }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java 
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java
index 6b42014c31..74c3065c70 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/FateLock.java
@@ -18,14 +18,20 @@
  */
 package org.apache.accumulo.core.fate.zookeeper;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.Objects.requireNonNull;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.function.Supplier;
 
+import org.apache.accumulo.core.fate.FateId;
+import 
org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.LockType;
 import 
org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.QueueLock;
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
@@ -35,6 +41,8 @@ import org.apache.zookeeper.KeeperException.NotEmptyException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Suppliers;
+
 /**
  * A persistent lock mechanism in ZooKeeper used for locking tables during 
FaTE operations.
  */
@@ -59,6 +67,81 @@ public class FateLock implements QueueLock {
     }
   }
 
+  public static class FateLockEntry {
+    final LockType lockType;
+    final FateId fateId;
+
+    private FateLockEntry(LockType lockType, FateId fateId) {
+      this.lockType = Objects.requireNonNull(lockType);
+      this.fateId = Objects.requireNonNull(fateId);
+    }
+
+    private FateLockEntry(byte[] entry) {
+      if (entry == null || entry.length < 1) {
+        throw new IllegalArgumentException();
+      }
+
+      int split = -1;
+      for (int i = 0; i < entry.length; i++) {
+        if (entry[i] == ':') {
+          split = i;
+          break;
+        }
+      }
+
+      if (split == -1) {
+        throw new IllegalArgumentException();
+      }
+
+      this.lockType = LockType.valueOf(new String(entry, 0, split, UTF_8));
+      this.fateId =
+          FateId.from(new String(Arrays.copyOfRange(entry, split + 1, 
entry.length), UTF_8));
+    }
+
+    public LockType getLockType() {
+      return lockType;
+    }
+
+    public FateId getFateId() {
+      return fateId;
+    }
+
+    public byte[] serialize() {
+      byte[] typeBytes = lockType.name().getBytes(UTF_8);
+      byte[] fateIdBytes = fateId.canonical().getBytes(UTF_8);
+      byte[] result = new byte[fateIdBytes.length + 1 + typeBytes.length];
+      System.arraycopy(typeBytes, 0, result, 0, typeBytes.length);
+      result[typeBytes.length] = ':';
+      System.arraycopy(fateIdBytes, 0, result, typeBytes.length + 1, 
fateIdBytes.length);
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      FateLockEntry lockEntry = (FateLockEntry) o;
+      return lockType == lockEntry.lockType && fateId.equals(lockEntry.fateId);
+    }
+
+    @Override
+    public int hashCode() {
+      int result = lockType.hashCode();
+      result = 31 * result + fateId.hashCode();
+      return result;
+    }
+
+    public static FateLockEntry from(LockType lockType, FateId fateId) {
+      return new FateLockEntry(lockType, fateId);
+    }
+
+    public static FateLockEntry deserialize(byte[] serialized) {
+      return new FateLockEntry(serialized);
+    }
+  }
+
   public static FateLockPath path(String path) {
     return new FateLockPath(path);
   }
@@ -69,12 +152,12 @@ public class FateLock implements QueueLock {
   }
 
   @Override
-  public long addEntry(byte[] data) {
+  public long addEntry(FateLockEntry entry) {
     String newPath;
     try {
       while (true) {
         try {
-          newPath = zoo.putPersistentSequential(path + "/" + PREFIX, data);
+          newPath = zoo.putPersistentSequential(path + "/" + PREFIX, 
entry.serialize());
           String[] parts = newPath.split("/");
           String last = parts[parts.length - 1];
           return Long.parseLong(last.substring(PREFIX.length()));
@@ -89,8 +172,8 @@ public class FateLock implements QueueLock {
   }
 
   @Override
-  public SortedMap<Long,byte[]> getEarlierEntries(long entry) {
-    SortedMap<Long,byte[]> result = new TreeMap<>();
+  public SortedMap<Long,Supplier<FateLockEntry>> getEarlierEntries(long entry) 
{
+    SortedMap<Long,Supplier<FateLockEntry>> result = new TreeMap<>();
     try {
       List<String> children = Collections.emptyList();
       try {
@@ -106,7 +189,9 @@ public class FateLock implements QueueLock {
           long order = Long.parseLong(name.substring(PREFIX.length()));
           if (order <= entry) {
             byte[] data = zoo.getData(path + "/" + name);
-            result.put(order, data);
+            // Use a supplier so we don't need to deserialize unless the 
calling code cares about
+            // the value for that entry.
+            result.put(order, Suppliers.memoize(() -> 
FateLockEntry.deserialize(data)));
           }
         } catch (KeeperException.NoNodeException ex) {
           // ignored
diff --git 
a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLockTest.java
 
b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLockTest.java
index da30442d93..bf55b79d4a 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLockTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/DistributedReadWriteLockTest.java
@@ -18,17 +18,22 @@
  */
 package org.apache.accumulo.core.fate.zookeeper;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerArray;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
+import java.util.function.Supplier;
 
+import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.FateInstanceType;
+import 
org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.LockType;
 import 
org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.QueueLock;
+import org.apache.accumulo.core.fate.zookeeper.FateLock.FateLockEntry;
 import org.junit.jupiter.api.Test;
 
 public class DistributedReadWriteLockTest {
@@ -37,12 +42,12 @@ public class DistributedReadWriteLockTest {
   public static class MockQueueLock implements QueueLock {
 
     long next = 0L;
-    final SortedMap<Long,byte[]> locks = new TreeMap<>();
+    final SortedMap<Long,FateLockEntry> locks = new TreeMap<>();
 
     @Override
-    public synchronized SortedMap<Long,byte[]> getEarlierEntries(long entry) {
-      SortedMap<Long,byte[]> result = new TreeMap<>();
-      result.putAll(locks.headMap(entry + 1));
+    public synchronized SortedMap<Long,Supplier<FateLockEntry>> 
getEarlierEntries(long entry) {
+      SortedMap<Long,Supplier<FateLockEntry>> result = new TreeMap<>();
+      locks.headMap(entry + 1).forEach((k, v) -> result.put(k, () -> v));
       return result;
     }
 
@@ -55,10 +60,10 @@ public class DistributedReadWriteLockTest {
     }
 
     @Override
-    public synchronized long addEntry(byte[] data) {
+    public synchronized long addEntry(FateLockEntry entry) {
       long result;
       synchronized (locks) {
-        locks.put(result = next++, data);
+        locks.put(result = next++, entry);
         locks.notifyAll();
       }
       return result;
@@ -67,8 +72,8 @@ public class DistributedReadWriteLockTest {
 
   // some data that is probably not going to update atomically
   static class SomeData {
-    private AtomicIntegerArray data = new AtomicIntegerArray(100);
-    private AtomicInteger counter = new AtomicInteger();
+    private final AtomicIntegerArray data = new AtomicIntegerArray(100);
+    private final AtomicInteger counter = new AtomicInteger();
 
     void read() {
       for (int i = 0; i < data.length(); i++) {
@@ -91,7 +96,8 @@ public class DistributedReadWriteLockTest {
     data.read();
     QueueLock qlock = new MockQueueLock();
 
-    final ReadWriteLock locker = new DistributedReadWriteLock(qlock, 
"locker1".getBytes(UTF_8));
+    final ReadWriteLock locker =
+        new DistributedReadWriteLock(qlock, FateId.from(FateInstanceType.USER, 
UUID.randomUUID()));
     final Lock readLock = locker.readLock();
     final Lock writeLock = locker.writeLock();
     readLock.lock();
@@ -134,4 +140,15 @@ public class DistributedReadWriteLockTest {
     }
   }
 
+  @Test
+  public void testFateLockEntrySerDes() {
+    var uuid = UUID.randomUUID();
+    var entry = FateLockEntry.from(LockType.READ, 
FateId.from(FateInstanceType.USER, uuid));
+    assertEquals(LockType.READ, entry.getLockType());
+    assertEquals(FateId.from(FateInstanceType.USER, uuid), entry.getFateId());
+
+    byte[] serialized = entry.serialize();
+    var deserialized = FateLockEntry.deserialize(serialized);
+    assertEquals(entry, deserialized);
+  }
 }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java
index 8517503639..a8089dbb13 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/Utils.java
@@ -221,11 +221,10 @@ public class Utils {
 
   private static Lock getLock(ServerContext context, AbstractId<?> id, FateId 
fateId,
       LockType lockType) {
-    byte[] lockData = fateId.canonical().getBytes(UTF_8);
     var fLockPath =
         FateLock.path(context.getZooKeeperRoot() + Constants.ZTABLE_LOCKS + 
"/" + id.canonical());
     FateLock qlock = new FateLock(context.getZooSession().asReaderWriter(), 
fLockPath);
-    DistributedLock lock = DistributedReadWriteLock.recoverLock(qlock, 
lockData);
+    DistributedLock lock = DistributedReadWriteLock.recoverLock(qlock, fateId);
     if (lock != null) {
 
       // Validate the recovered lock type
@@ -235,7 +234,7 @@ public class Utils {
                 + " on object " + id + ". Expected " + lockType + " lock 
instead.");
       }
     } else {
-      DistributedReadWriteLock locker = new DistributedReadWriteLock(qlock, 
lockData);
+      DistributedReadWriteLock locker = new DistributedReadWriteLock(qlock, 
fateId);
       switch (lockType) {
         case WRITE:
           lock = locker.writeLock();

Reply via email to