This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 940b12a265 Use per-table merge lock in Manager instead of single lock 
(#6378)
940b12a265 is described below

commit 940b12a265b9343f4b132b8177b1f632985e502f
Author: Dave Marion <[email protected]>
AuthorDate: Fri May 29 09:16:57 2026 -0400

    Use per-table merge lock in Manager instead of single lock (#6378)
    
    This change introduces a per-table merge lock instead of
    a single lock to protect reading/writing to ZooKeeper.
    
    Closes #6374
---
 server/manager/pom.xml                             |  4 +++
 .../java/org/apache/accumulo/manager/Manager.java  | 33 ++++++++++++++++------
 2 files changed, 29 insertions(+), 8 deletions(-)

diff --git a/server/manager/pom.xml b/server/manager/pom.xml
index b2280f88f4..0da212e024 100644
--- a/server/manager/pom.xml
+++ b/server/manager/pom.xml
@@ -31,6 +31,10 @@
   <name>Apache Accumulo Manager Server</name>
   <description>The manager server for Apache Accumulo for load balancing and 
other system-wide operations.</description>
   <dependencies>
+    <dependency>
+      <groupId>com.github.ben-manes.caffeine</groupId>
+      <artifactId>caffeine</artifactId>
+    </dependency>
     <dependency>
       <groupId>com.google.code.gson</groupId>
       <artifactId>gson</artifactId>
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 0b3ea6ab40..01399b8d1f 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -52,6 +52,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.Constants;
@@ -157,6 +158,9 @@ import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Scheduler;
 import com.google.common.collect.ImmutableSortedMap;
 import com.google.common.util.concurrent.RateLimiter;
 
@@ -201,8 +205,11 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener,
       Collections.synchronizedMap(new HashMap<>());
   final Set<TServerInstance> serversToShutdown = 
Collections.synchronizedSet(new HashSet<>());
   final Migrations migrations = new Migrations();
+
+  private final LoadingCache<String,ReentrantLock> mergeLocks = 
Caffeine.newBuilder().weakValues()
+      .scheduler(Scheduler.systemScheduler()).build(k -> new ReentrantLock());
+
   final EventCoordinator nextEvent = new EventCoordinator();
-  private final Object mergeLock = new Object();
   private Thread replicationWorkThread;
   private Thread replicationAssignerThread;
   RecoveryManager recoveryManager = null;
@@ -477,7 +484,9 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener,
 
   public MergeInfo getMergeInfo(TableId tableId) {
     ServerContext context = getContext();
-    synchronized (mergeLock) {
+    final ReentrantLock l = mergeLocks.get(tableId.canonical());
+    l.lock();
+    try {
       try {
         String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId + 
"/merge";
         if (!context.getZooReaderWriter().exists(path)) {
@@ -496,15 +505,19 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener,
         log.warn("Unexpected error reading merge state", ex);
         return new MergeInfo();
       }
+    } finally {
+      l.unlock();
     }
   }
 
   public void setMergeState(MergeInfo info, MergeState state)
       throws KeeperException, InterruptedException {
     ServerContext context = getContext();
-    synchronized (mergeLock) {
-      String path =
-          getZooKeeperRoot() + Constants.ZTABLES + "/" + 
info.getExtent().tableId() + "/merge";
+    final TableId tid = info.getExtent().tableId();
+    final ReentrantLock l = mergeLocks.get(tid.canonical());
+    l.lock();
+    try {
+      String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tid + 
"/merge";
       info.setState(state);
       if (state.equals(MergeState.NONE)) {
         context.getZooReaderWriter().recursiveDelete(path, 
NodeMissingPolicy.SKIP);
@@ -519,16 +532,20 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener,
             state.equals(MergeState.STARTED) ? ZooUtil.NodeExistsPolicy.FAIL
                 : ZooUtil.NodeExistsPolicy.OVERWRITE);
       }
-      mergeLock.notifyAll();
+    } finally {
+      l.unlock();
     }
     nextEvent.event("Merge state of %s set to %s", info.getExtent(), state);
   }
 
   public void clearMergeState(TableId tableId) throws KeeperException, 
InterruptedException {
-    synchronized (mergeLock) {
+    final ReentrantLock l = mergeLocks.get(tableId.canonical());
+    l.lock();
+    try {
       String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId + 
"/merge";
       getContext().getZooReaderWriter().recursiveDelete(path, 
NodeMissingPolicy.SKIP);
-      mergeLock.notifyAll();
+    } finally {
+      l.unlock();
     }
     nextEvent.event("Merge state of %s cleared", tableId);
   }

Reply via email to