This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 940b12a265 Use per-table merge lock in Manager instead of single lock
(#6378)
940b12a265 is described below
commit 940b12a265b9343f4b132b8177b1f632985e502f
Author: Dave Marion <[email protected]>
AuthorDate: Fri May 29 09:16:57 2026 -0400
Use per-table merge lock in Manager instead of single lock (#6378)
This change introduces a per-table merge lock instead of
a single lock to protect reading/writing to ZooKeeper.
Closes #6374
---
server/manager/pom.xml | 4 +++
.../java/org/apache/accumulo/manager/Manager.java | 33 ++++++++++++++++------
2 files changed, 29 insertions(+), 8 deletions(-)
diff --git a/server/manager/pom.xml b/server/manager/pom.xml
index b2280f88f4..0da212e024 100644
--- a/server/manager/pom.xml
+++ b/server/manager/pom.xml
@@ -31,6 +31,10 @@
<name>Apache Accumulo Manager Server</name>
<description>The manager server for Apache Accumulo for load balancing and
other system-wide operations.</description>
<dependencies>
+ <dependency>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ </dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 0b3ea6ab40..01399b8d1f 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -52,6 +52,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.accumulo.core.Constants;
@@ -157,6 +158,9 @@ import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Scheduler;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.util.concurrent.RateLimiter;
@@ -201,8 +205,11 @@ public class Manager extends AbstractServer implements
LiveTServerSet.Listener,
Collections.synchronizedMap(new HashMap<>());
final Set<TServerInstance> serversToShutdown =
Collections.synchronizedSet(new HashSet<>());
final Migrations migrations = new Migrations();
+
+ private final LoadingCache<String,ReentrantLock> mergeLocks =
Caffeine.newBuilder().weakValues()
+ .scheduler(Scheduler.systemScheduler()).build(k -> new ReentrantLock());
+
final EventCoordinator nextEvent = new EventCoordinator();
- private final Object mergeLock = new Object();
private Thread replicationWorkThread;
private Thread replicationAssignerThread;
RecoveryManager recoveryManager = null;
@@ -477,7 +484,9 @@ public class Manager extends AbstractServer implements
LiveTServerSet.Listener,
public MergeInfo getMergeInfo(TableId tableId) {
ServerContext context = getContext();
- synchronized (mergeLock) {
+ final ReentrantLock l = mergeLocks.get(tableId.canonical());
+ l.lock();
+ try {
try {
String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId +
"/merge";
if (!context.getZooReaderWriter().exists(path)) {
@@ -496,15 +505,19 @@ public class Manager extends AbstractServer implements
LiveTServerSet.Listener,
log.warn("Unexpected error reading merge state", ex);
return new MergeInfo();
}
+ } finally {
+ l.unlock();
}
}
public void setMergeState(MergeInfo info, MergeState state)
throws KeeperException, InterruptedException {
ServerContext context = getContext();
- synchronized (mergeLock) {
- String path =
- getZooKeeperRoot() + Constants.ZTABLES + "/" +
info.getExtent().tableId() + "/merge";
+ final TableId tid = info.getExtent().tableId();
+ final ReentrantLock l = mergeLocks.get(tid.canonical());
+ l.lock();
+ try {
+ String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tid +
"/merge";
info.setState(state);
if (state.equals(MergeState.NONE)) {
context.getZooReaderWriter().recursiveDelete(path,
NodeMissingPolicy.SKIP);
@@ -519,16 +532,20 @@ public class Manager extends AbstractServer implements
LiveTServerSet.Listener,
state.equals(MergeState.STARTED) ? ZooUtil.NodeExistsPolicy.FAIL
: ZooUtil.NodeExistsPolicy.OVERWRITE);
}
- mergeLock.notifyAll();
+ } finally {
+ l.unlock();
}
nextEvent.event("Merge state of %s set to %s", info.getExtent(), state);
}
public void clearMergeState(TableId tableId) throws KeeperException,
InterruptedException {
- synchronized (mergeLock) {
+ final ReentrantLock l = mergeLocks.get(tableId.canonical());
+ l.lock();
+ try {
String path = getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId +
"/merge";
getContext().getZooReaderWriter().recursiveDelete(path,
NodeMissingPolicy.SKIP);
- mergeLock.notifyAll();
+ } finally {
+ l.unlock();
}
nextEvent.event("Merge state of %s cleared", tableId);
}