This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 30083c3ca7 simplifies concurrency in ZooSession and ZooCache (#5303) 30083c3ca7 is described below commit 30083c3ca761596ba028aabcc7eeb97c475bde7e Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri Feb 7 11:18:01 2025 -0500 simplifies concurrency in ZooSession and ZooCache (#5303) Made a few changes to how ZooCache and ZooSession handle concurrency in order to make it easier to reason about the code. ZooCache and ZooSession had code that used Atomics to do computations in addition to synchronized blocks. This adds complexity w/o an apparent benefit. Stopped doing the computations in both places and moved all computations in to the synchronized block. ZooSession has a counter that was used by ZooCache to know when zookeeper changed. ZooSession updated this counter independently from updating the zookeeper reference and ZooCache read the refrence and counter at different times. Reorganized the code so that read and write of the counter and zookeeper are done at the same times by ZooSession and ZooCache. ZooSession.close() could run conucurrently with the code that created new ZooKeeper objects in ZooSession. Made the close method synchronized and tweaked how the closed variable is checked. --- .../apache/accumulo/core/zookeeper/ZooCache.java | 27 +++++-- .../apache/accumulo/core/zookeeper/ZooSession.java | 89 +++++++++++++++------- .../accumulo/core/zookeeper/ZooCacheTest.java | 8 +- 3 files changed, 85 insertions(+), 39 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java b/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java index e5f52a32d2..ff30277797 100644 --- a/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java +++ b/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java @@ -178,7 +178,8 @@ public class ZooCache { // visible for tests that use a Ticker public ZooCache(ZooSession zk, Set<String> pathsToWatch, Ticker ticker) { this.zk = requireNonNull(zk); - this.zkClientTracker.set(this.getZKClientObjectVersion()); + // this initial value is meant to indicate watchers were never setup + this.zkClientTracker.set(-1); this.cache = Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE, false) .ticker(requireNonNull(ticker)).expireAfterAccess(CACHE_DURATION).build(); // The concurrent map returned by Caffeine will only allow one thread to run at a time for a @@ -196,7 +197,11 @@ public class ZooCache { // visible for tests long getZKClientObjectVersion() { - return zk.getConnectionCounter(); + long counter = zk.getConnectionCounter(); + // -1 is used to signify ZK has not been setup in this code and this code assume ZooSession will + // always return something >= 0. + Preconditions.checkState(counter >= 0); + return counter; } /** @@ -206,16 +211,20 @@ public class ZooCache { final long currentCount = getZKClientObjectVersion(); final long oldCount = zkClientTracker.get(); if (oldCount != currentCount) { - if (zkClientTracker.compareAndSet(oldCount, currentCount)) { - setupWatchers(); - } - return true; + return setupWatchers(); } return false; } // Called on construction and when ZooKeeper connection changes - synchronized void setupWatchers() { + synchronized boolean setupWatchers() { + + final long currentCount = getZKClientObjectVersion(); + final long oldCount = zkClientTracker.get(); + + if (currentCount == oldCount) { + return false; + } for (String left : watchedPaths) { for (String right : watchedPaths) { @@ -227,10 +236,12 @@ public class ZooCache { } try { - zk.addPersistentRecursiveWatchers(watchedPaths, watcher); + long zkId = zk.addPersistentRecursiveWatchers(watchedPaths, watcher); + zkClientTracker.set(zkId); clear(); log.trace("{} Reinitialized persistent watchers and cleared cache {}", cacheId, zkClientTracker.get()); + return true; } catch (KeeperException | InterruptedException e) { throw new RuntimeException("Error setting up persistent recursive watcher", e); } diff --git a/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java b/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java index 850d6481d2..41b501a9ae 100644 --- a/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java +++ b/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java @@ -28,10 +28,9 @@ import java.net.UnknownHostException; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -55,6 +54,8 @@ import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + /** * A ZooKeeper client facade that maintains a ZooKeeper delegate instance. If the delegate instance * loses its session, it is replaced with a new instance to establish a new session. Any Watchers @@ -115,10 +116,20 @@ public class ZooSession implements AutoCloseable { zoo.addAuthInfo("digest", ("accumulo:" + requireNonNull(secret)).getBytes(UTF_8)); } - private final AtomicBoolean closed = new AtomicBoolean(); - private final AtomicLong connectCounter; + private static class ZookeeperAndCounter { + final ZooKeeper zookeeper; + final long connectionCount; + + private ZookeeperAndCounter(ZooKeeper zookeeper, long connectionCount) { + Preconditions.checkArgument(connectionCount >= 0); + this.zookeeper = Objects.requireNonNull(zookeeper); + this.connectionCount = connectionCount; + } + } + + private boolean closed = false; private final String connectString; - private final AtomicReference<ZooKeeper> delegate = new AtomicReference<>(); + private final AtomicReference<ZookeeperAndCounter> delegate = new AtomicReference<>(); private final String instanceSecret; private final String sessionName; private final int timeout; @@ -155,24 +166,32 @@ public class ZooSession implements AutoCloseable { // information for logging which instance of ZooSession this is this.sessionName = String.format("%s[%s_%s]", getClass().getSimpleName(), clientName, UUID.randomUUID()); - this.connectCounter = new AtomicLong(); // incremented when we need to create a new delegate this.zrw = new ZooReaderWriter(this); } private ZooKeeper verifyConnected() { - if (closed.get()) { - throw new IllegalStateException(sessionName + " was closed"); + var zkac = delegate.get(); + if (zkac != null && zkac.zookeeper.getState().isAlive()) { + return zkac.zookeeper; + } else { + return reconnect().zookeeper; } - return delegate.updateAndGet(zk -> (zk != null && zk.getState().isAlive()) ? zk : reconnect()); } - private synchronized ZooKeeper reconnect() { - ZooKeeper zk; - if ((zk = delegate.get()) != null && zk.getState().isAlive()) { - return zk; + private synchronized ZookeeperAndCounter reconnect() { + if (closed) { + throw new IllegalStateException(sessionName + " was closed"); + } + + ZookeeperAndCounter zkac; + if ((zkac = delegate.get()) != null && zkac.zookeeper.getState().isAlive()) { + return zkac; } - zk = null; - var reconnectName = String.format("%s#%s", sessionName, connectCounter.getAndIncrement()); + + final long nextCounter = (zkac == null ? 0 : zkac.connectionCount) + 1; + zkac = null; + + var reconnectName = String.format("%s#%s", sessionName, nextCounter); log.debug("{} (re-)connecting to {} with timeout {}{}", reconnectName, connectString, timeout, instanceSecret == null ? "" : " with auth"); final int TIME_BETWEEN_CONNECT_CHECKS_MS = 100; @@ -182,6 +201,8 @@ public class ZooSession implements AutoCloseable { long startTime = System.nanoTime(); + ZooKeeper zk = null; + while (tryAgain) { try { zk = new ZooKeeper(connectString, timeout, new ZooSessionWatcher(reconnectName)); @@ -236,7 +257,10 @@ public class ZooSession implements AutoCloseable { } } } - return zk; + + zkac = new ZookeeperAndCounter(zk, nextCounter); + delegate.set(zkac); + return zkac; } public void addAuthInfo(String scheme, byte[] auth) { @@ -294,27 +318,28 @@ public class ZooSession implements AutoCloseable { verifyConnected().sync(path, cb, ctx); } - public void addPersistentRecursiveWatchers(Set<String> paths, Watcher watcher) + public long addPersistentRecursiveWatchers(Set<String> paths, Watcher watcher) throws KeeperException, InterruptedException { + ZookeeperAndCounter localZkac = reconnect(); - ZooKeeper localZK = verifyConnected(); Set<String> remainingPaths = new HashSet<>(paths); while (true) { try { Iterator<String> remainingPathsIter = remainingPaths.iterator(); while (remainingPathsIter.hasNext()) { String path = remainingPathsIter.next(); - localZK.addWatch(path, watcher, AddWatchMode.PERSISTENT_RECURSIVE); + localZkac.zookeeper.addWatch(path, watcher, AddWatchMode.PERSISTENT_RECURSIVE); remainingPathsIter.remove(); } - break; + + return localZkac.connectionCount; } catch (KeeperException e) { log.error("Error setting persistent watcher in ZooKeeper, retrying...", e); - ZooKeeper currentZK = verifyConnected(); + ZookeeperAndCounter currentZkac = reconnect(); // If ZooKeeper object is different, then reset the localZK variable // and start over. - if (localZK != currentZK) { - localZK = currentZK; + if (localZkac != currentZkac) { + localZkac = currentZkac; remainingPaths = new HashSet<>(paths); } } @@ -322,9 +347,13 @@ public class ZooSession implements AutoCloseable { } @Override - public void close() { - if (closed.compareAndSet(false, true)) { - closeZk(delegate.getAndSet(null)); + public synchronized void close() { + if (!closed) { + var zkac = delegate.getAndSet(null); + if (zkac != null) { + closeZk(zkac.zookeeper); + } + closed = true; } } @@ -348,7 +377,13 @@ public class ZooSession implements AutoCloseable { * @return connection counter */ public long getConnectionCounter() { - return connectCounter.get(); + var zkac = delegate.get(); + if (delegate.get() == null) { + // If null then this is closed or in the process of opening. If closed reconnect will throw an + // exception. If in the process of opening, then reconnect will wait for that to finish. + return reconnect().connectionCount; + } + return zkac.connectionCount; } } diff --git a/core/src/test/java/org/apache/accumulo/core/zookeeper/ZooCacheTest.java b/core/src/test/java/org/apache/accumulo/core/zookeeper/ZooCacheTest.java index e4e9426c64..9f7cbc31a8 100644 --- a/core/src/test/java/org/apache/accumulo/core/zookeeper/ZooCacheTest.java +++ b/core/src/test/java/org/apache/accumulo/core/zookeeper/ZooCacheTest.java @@ -60,8 +60,8 @@ public class ZooCacheTest { } @Override - protected void setupWatchers() { - clear(); + protected boolean setupWatchers() { + return false; } public void executeWatcher(WatchedEvent event) { @@ -95,7 +95,7 @@ public class ZooCacheTest { @SuppressWarnings("unchecked") public void testOverlappingPaths() throws Exception { expect(zk.getConnectionCounter()).andReturn(2L).times(2); - zk.addPersistentRecursiveWatchers(isA(Set.class), isA(Watcher.class)); + expect(zk.addPersistentRecursiveWatchers(isA(Set.class), isA(Watcher.class))).andReturn(3L); replay(zk); assertThrows(IllegalArgumentException.class, () -> new ZooCache(zk, Set.of(root, root + "/localhost:9995"))); @@ -121,7 +121,7 @@ public class ZooCacheTest { @SuppressWarnings("unchecked") public void testUnwatchedPaths() throws Exception { expect(zk.getConnectionCounter()).andReturn(2L).anyTimes(); - zk.addPersistentRecursiveWatchers(isA(Set.class), isA(Watcher.class)); + expect(zk.addPersistentRecursiveWatchers(isA(Set.class), isA(Watcher.class))).andReturn(3L); replay(zk); ZooCache cache = new ZooCache(zk, Set.of(root)); assertThrows(IllegalStateException.class, () -> cache.get("/some/unknown/path"));