This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 30083c3ca7 simplifies concurrency in ZooSession and ZooCache (#5303)
30083c3ca7 is described below

commit 30083c3ca761596ba028aabcc7eeb97c475bde7e
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Fri Feb 7 11:18:01 2025 -0500

    simplifies concurrency in ZooSession and ZooCache (#5303)
    
    Made a few changes to how ZooCache and ZooSession handle concurrency in
    order to make it easier to reason about the code.
    
    ZooCache and ZooSession had code that used Atomics to do computations in
    addition to synchronized blocks.  This adds complexity w/o an apparent
    benefit.  Stopped doing the computations in both places and moved all
    computations in to the synchronized block.
    
    ZooSession has a counter that was used by ZooCache to know when
    zookeeper changed.  ZooSession updated this counter independently from
    updating the zookeeper reference and ZooCache read the refrence and
    counter at different times.  Reorganized the code so that read and write
    of the counter and zookeeper are done at the same times by ZooSession
    and ZooCache.
    
    ZooSession.close() could run conucurrently with the code that created
    new ZooKeeper objects in ZooSession.  Made the close method synchronized
    and tweaked how the closed variable is checked.
---
 .../apache/accumulo/core/zookeeper/ZooCache.java   | 27 +++++--
 .../apache/accumulo/core/zookeeper/ZooSession.java | 89 +++++++++++++++-------
 .../accumulo/core/zookeeper/ZooCacheTest.java      |  8 +-
 3 files changed, 85 insertions(+), 39 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java 
b/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java
index e5f52a32d2..ff30277797 100644
--- a/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java
+++ b/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java
@@ -178,7 +178,8 @@ public class ZooCache {
   // visible for tests that use a Ticker
   public ZooCache(ZooSession zk, Set<String> pathsToWatch, Ticker ticker) {
     this.zk = requireNonNull(zk);
-    this.zkClientTracker.set(this.getZKClientObjectVersion());
+    // this initial value is meant to indicate watchers were never setup
+    this.zkClientTracker.set(-1);
     this.cache = 
Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE, false)
         
.ticker(requireNonNull(ticker)).expireAfterAccess(CACHE_DURATION).build();
     // The concurrent map returned by Caffeine will only allow one thread to 
run at a time for a
@@ -196,7 +197,11 @@ public class ZooCache {
 
   // visible for tests
   long getZKClientObjectVersion() {
-    return zk.getConnectionCounter();
+    long counter = zk.getConnectionCounter();
+    // -1 is used to signify ZK has not been setup in this code and this code 
assume ZooSession will
+    // always return something >= 0.
+    Preconditions.checkState(counter >= 0);
+    return counter;
   }
 
   /**
@@ -206,16 +211,20 @@ public class ZooCache {
     final long currentCount = getZKClientObjectVersion();
     final long oldCount = zkClientTracker.get();
     if (oldCount != currentCount) {
-      if (zkClientTracker.compareAndSet(oldCount, currentCount)) {
-        setupWatchers();
-      }
-      return true;
+      return setupWatchers();
     }
     return false;
   }
 
   // Called on construction and when ZooKeeper connection changes
-  synchronized void setupWatchers() {
+  synchronized boolean setupWatchers() {
+
+    final long currentCount = getZKClientObjectVersion();
+    final long oldCount = zkClientTracker.get();
+
+    if (currentCount == oldCount) {
+      return false;
+    }
 
     for (String left : watchedPaths) {
       for (String right : watchedPaths) {
@@ -227,10 +236,12 @@ public class ZooCache {
     }
 
     try {
-      zk.addPersistentRecursiveWatchers(watchedPaths, watcher);
+      long zkId = zk.addPersistentRecursiveWatchers(watchedPaths, watcher);
+      zkClientTracker.set(zkId);
       clear();
       log.trace("{} Reinitialized persistent watchers and cleared cache {}", 
cacheId,
           zkClientTracker.get());
+      return true;
     } catch (KeeperException | InterruptedException e) {
       throw new RuntimeException("Error setting up persistent recursive 
watcher", e);
     }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java 
b/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
index 850d6481d2..41b501a9ae 100644
--- a/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
+++ b/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java
@@ -28,10 +28,9 @@ import java.net.UnknownHostException;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -55,6 +54,8 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 /**
  * A ZooKeeper client facade that maintains a ZooKeeper delegate instance. If 
the delegate instance
  * loses its session, it is replaced with a new instance to establish a new 
session. Any Watchers
@@ -115,10 +116,20 @@ public class ZooSession implements AutoCloseable {
     zoo.addAuthInfo("digest", ("accumulo:" + 
requireNonNull(secret)).getBytes(UTF_8));
   }
 
-  private final AtomicBoolean closed = new AtomicBoolean();
-  private final AtomicLong connectCounter;
+  private static class ZookeeperAndCounter {
+    final ZooKeeper zookeeper;
+    final long connectionCount;
+
+    private ZookeeperAndCounter(ZooKeeper zookeeper, long connectionCount) {
+      Preconditions.checkArgument(connectionCount >= 0);
+      this.zookeeper = Objects.requireNonNull(zookeeper);
+      this.connectionCount = connectionCount;
+    }
+  }
+
+  private boolean closed = false;
   private final String connectString;
-  private final AtomicReference<ZooKeeper> delegate = new AtomicReference<>();
+  private final AtomicReference<ZookeeperAndCounter> delegate = new 
AtomicReference<>();
   private final String instanceSecret;
   private final String sessionName;
   private final int timeout;
@@ -155,24 +166,32 @@ public class ZooSession implements AutoCloseable {
     // information for logging which instance of ZooSession this is
     this.sessionName =
         String.format("%s[%s_%s]", getClass().getSimpleName(), clientName, 
UUID.randomUUID());
-    this.connectCounter = new AtomicLong(); // incremented when we need to 
create a new delegate
     this.zrw = new ZooReaderWriter(this);
   }
 
   private ZooKeeper verifyConnected() {
-    if (closed.get()) {
-      throw new IllegalStateException(sessionName + " was closed");
+    var zkac = delegate.get();
+    if (zkac != null && zkac.zookeeper.getState().isAlive()) {
+      return zkac.zookeeper;
+    } else {
+      return reconnect().zookeeper;
     }
-    return delegate.updateAndGet(zk -> (zk != null && zk.getState().isAlive()) 
? zk : reconnect());
   }
 
-  private synchronized ZooKeeper reconnect() {
-    ZooKeeper zk;
-    if ((zk = delegate.get()) != null && zk.getState().isAlive()) {
-      return zk;
+  private synchronized ZookeeperAndCounter reconnect() {
+    if (closed) {
+      throw new IllegalStateException(sessionName + " was closed");
+    }
+
+    ZookeeperAndCounter zkac;
+    if ((zkac = delegate.get()) != null && 
zkac.zookeeper.getState().isAlive()) {
+      return zkac;
     }
-    zk = null;
-    var reconnectName = String.format("%s#%s", sessionName, 
connectCounter.getAndIncrement());
+
+    final long nextCounter = (zkac == null ? 0 : zkac.connectionCount) + 1;
+    zkac = null;
+
+    var reconnectName = String.format("%s#%s", sessionName, nextCounter);
     log.debug("{} (re-)connecting to {} with timeout {}{}", reconnectName, 
connectString, timeout,
         instanceSecret == null ? "" : " with auth");
     final int TIME_BETWEEN_CONNECT_CHECKS_MS = 100;
@@ -182,6 +201,8 @@ public class ZooSession implements AutoCloseable {
 
     long startTime = System.nanoTime();
 
+    ZooKeeper zk = null;
+
     while (tryAgain) {
       try {
         zk = new ZooKeeper(connectString, timeout, new 
ZooSessionWatcher(reconnectName));
@@ -236,7 +257,10 @@ public class ZooSession implements AutoCloseable {
         }
       }
     }
-    return zk;
+
+    zkac = new ZookeeperAndCounter(zk, nextCounter);
+    delegate.set(zkac);
+    return zkac;
   }
 
   public void addAuthInfo(String scheme, byte[] auth) {
@@ -294,27 +318,28 @@ public class ZooSession implements AutoCloseable {
     verifyConnected().sync(path, cb, ctx);
   }
 
-  public void addPersistentRecursiveWatchers(Set<String> paths, Watcher 
watcher)
+  public long addPersistentRecursiveWatchers(Set<String> paths, Watcher 
watcher)
       throws KeeperException, InterruptedException {
+    ZookeeperAndCounter localZkac = reconnect();
 
-    ZooKeeper localZK = verifyConnected();
     Set<String> remainingPaths = new HashSet<>(paths);
     while (true) {
       try {
         Iterator<String> remainingPathsIter = remainingPaths.iterator();
         while (remainingPathsIter.hasNext()) {
           String path = remainingPathsIter.next();
-          localZK.addWatch(path, watcher, AddWatchMode.PERSISTENT_RECURSIVE);
+          localZkac.zookeeper.addWatch(path, watcher, 
AddWatchMode.PERSISTENT_RECURSIVE);
           remainingPathsIter.remove();
         }
-        break;
+
+        return localZkac.connectionCount;
       } catch (KeeperException e) {
         log.error("Error setting persistent watcher in ZooKeeper, 
retrying...", e);
-        ZooKeeper currentZK = verifyConnected();
+        ZookeeperAndCounter currentZkac = reconnect();
         // If ZooKeeper object is different, then reset the localZK variable
         // and start over.
-        if (localZK != currentZK) {
-          localZK = currentZK;
+        if (localZkac != currentZkac) {
+          localZkac = currentZkac;
           remainingPaths = new HashSet<>(paths);
         }
       }
@@ -322,9 +347,13 @@ public class ZooSession implements AutoCloseable {
   }
 
   @Override
-  public void close() {
-    if (closed.compareAndSet(false, true)) {
-      closeZk(delegate.getAndSet(null));
+  public synchronized void close() {
+    if (!closed) {
+      var zkac = delegate.getAndSet(null);
+      if (zkac != null) {
+        closeZk(zkac.zookeeper);
+      }
+      closed = true;
     }
   }
 
@@ -348,7 +377,13 @@ public class ZooSession implements AutoCloseable {
    * @return connection counter
    */
   public long getConnectionCounter() {
-    return connectCounter.get();
+    var zkac = delegate.get();
+    if (delegate.get() == null) {
+      // If null then this is closed or in the process of opening. If closed 
reconnect will throw an
+      // exception. If in the process of opening, then reconnect will wait for 
that to finish.
+      return reconnect().connectionCount;
+    }
+    return zkac.connectionCount;
   }
 
 }
diff --git 
a/core/src/test/java/org/apache/accumulo/core/zookeeper/ZooCacheTest.java 
b/core/src/test/java/org/apache/accumulo/core/zookeeper/ZooCacheTest.java
index e4e9426c64..9f7cbc31a8 100644
--- a/core/src/test/java/org/apache/accumulo/core/zookeeper/ZooCacheTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/zookeeper/ZooCacheTest.java
@@ -60,8 +60,8 @@ public class ZooCacheTest {
     }
 
     @Override
-    protected void setupWatchers() {
-      clear();
+    protected boolean setupWatchers() {
+      return false;
     }
 
     public void executeWatcher(WatchedEvent event) {
@@ -95,7 +95,7 @@ public class ZooCacheTest {
   @SuppressWarnings("unchecked")
   public void testOverlappingPaths() throws Exception {
     expect(zk.getConnectionCounter()).andReturn(2L).times(2);
-    zk.addPersistentRecursiveWatchers(isA(Set.class), isA(Watcher.class));
+    expect(zk.addPersistentRecursiveWatchers(isA(Set.class), 
isA(Watcher.class))).andReturn(3L);
     replay(zk);
     assertThrows(IllegalArgumentException.class,
         () -> new ZooCache(zk, Set.of(root, root + "/localhost:9995")));
@@ -121,7 +121,7 @@ public class ZooCacheTest {
   @SuppressWarnings("unchecked")
   public void testUnwatchedPaths() throws Exception {
     expect(zk.getConnectionCounter()).andReturn(2L).anyTimes();
-    zk.addPersistentRecursiveWatchers(isA(Set.class), isA(Watcher.class));
+    expect(zk.addPersistentRecursiveWatchers(isA(Set.class), 
isA(Watcher.class))).andReturn(3L);
     replay(zk);
     ZooCache cache = new ZooCache(zk, Set.of(root));
     assertThrows(IllegalStateException.class, () -> 
cache.get("/some/unknown/path"));

Reply via email to