This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new d26a913ddd improves zoocache performance (#5040)
d26a913ddd is described below

commit d26a913ddda447da4c50ea28499ec152fe3c7df5
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Fri Nov 8 12:16:31 2024 -0500

    improves zoocache performance (#5040)
    
    ZooCache currently has zero contention among threads for reads of cached
    data.  It did this w/ snapshots of data.  However it has two problems
    for updates. First there is global lock for writes.  Second writes are
    expensive because the snapshots for reads must be recomputed, which can
    lead to O(N^2) behavior for a large series of rapid small updates.
    
    This change removes the global update lock and the snapshots and
    replaces them with a ConcurrentHashMap.  It also replaces the three maps
    that existed with a single ConcurrentHashMap that has a complex value
    that represented the data in the three maps.  This single complex values
    allows removal of the global lock wich existed to keep the three maps in
    sync for a given path.  Now for any path all of its data is stored in a
    single value in the map and can be safely updated with the compute
    function on ConcurrentHashMap which only allows one thread to update a
    path at a time.
    
    This change should maintain similar read behavior performance as the
    snapshots because ConcurrentHashMap does not block reads for writes, if
    there is a concurrenty compute operation happening when a read happens
    then it will return the previously computed value.  This is the same
    behavior that ZooCache used to have.  So hopefully this change has
    similar read performance as before and much better update performance.
---
 .../accumulo/core/fate/zookeeper/ZooCache.java     | 352 ++++++++++-----------
 1 file changed, 170 insertions(+), 182 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java 
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java
index 676c21e948..e0f00994d5 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java
@@ -21,16 +21,12 @@ package org.apache.accumulo.core.fate.zookeeper;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
 
-import java.util.Collections;
 import java.util.ConcurrentModificationException;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.locks.Lock;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.LockSupport;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Predicate;
 
 import org.apache.accumulo.core.lock.ServiceLock;
@@ -57,13 +53,49 @@ public class ZooCache {
   private final ZCacheWatcher watcher = new ZCacheWatcher();
   private final Watcher externalWatcher;
 
-  private final ReadWriteLock cacheLock = new ReentrantReadWriteLock(false);
-  private final Lock cacheWriteLock = cacheLock.writeLock();
-  private final Lock cacheReadLock = cacheLock.readLock();
+  private static class ZcNode {
+    final byte[] data;
+    final ZcStat stat;
+    final boolean dataSet;
+    final List<String> children;
+    final boolean childrenSet;
+
+    private ZcNode(ZcNode other, List<String> children) {
+      this.data = other != null ? other.data : null;
+      this.stat = other != null ? other.stat : null;
+      this.dataSet = other != null ? other.dataSet : false;
+      this.children = children;
+      this.childrenSet = true;
+    }
+
+    public ZcNode(byte[] data, ZcStat zstat, ZcNode zcn) {
+      this.data = data;
+      this.stat = zstat;
+      this.dataSet = true;
+      this.children = zcn != null ? zcn.children : null;
+      this.childrenSet = zcn != null ? zcn.childrenSet : false;
+    }
+
+    byte[] getData() {
+      Preconditions.checkState(dataSet);
+      return data;
+    }
 
-  private final HashMap<String,byte[]> cache;
-  private final HashMap<String,ZcStat> statCache;
-  private final HashMap<String,List<String>> childrenCache;
+    ZcStat getStat() {
+      Preconditions.checkState(dataSet);
+      return stat;
+    }
+
+    List<String> getChildren() {
+      Preconditions.checkState(childrenSet);
+      return children;
+    }
+  }
+
+  // ConcurrentHashMap will only allow one thread to run at a time for a given 
key and this
+  // implementation relies on that. Not all concurrent map implementations 
have this behavior for
+  // their compute functions.
+  private final ConcurrentHashMap<String,ZcNode> nodeCache;
 
   private final ZooReader zReader;
 
@@ -99,46 +131,7 @@ public class ZooCache {
     }
   }
 
-  private static class ImmutableCacheCopies {
-    final Map<String,byte[]> cache;
-    final Map<String,ZcStat> statCache;
-    final Map<String,List<String>> childrenCache;
-    final long updateCount;
-
-    ImmutableCacheCopies(long updateCount) {
-      this.updateCount = updateCount;
-      cache = Collections.emptyMap();
-      statCache = Collections.emptyMap();
-      childrenCache = Collections.emptyMap();
-    }
-
-    ImmutableCacheCopies(long updateCount, Map<String,byte[]> cache, 
Map<String,ZcStat> statCache,
-        Map<String,List<String>> childrenCache) {
-      this.updateCount = updateCount;
-      this.cache = Collections.unmodifiableMap(new HashMap<>(cache));
-      this.statCache = Collections.unmodifiableMap(new HashMap<>(statCache));
-      this.childrenCache = Collections.unmodifiableMap(new 
HashMap<>(childrenCache));
-    }
-
-    ImmutableCacheCopies(long updateCount, ImmutableCacheCopies prev,
-        Map<String,List<String>> childrenCache) {
-      this.updateCount = updateCount;
-      this.cache = prev.cache;
-      this.statCache = prev.statCache;
-      this.childrenCache = Collections.unmodifiableMap(new 
HashMap<>(childrenCache));
-    }
-
-    ImmutableCacheCopies(long updateCount, Map<String,byte[]> cache, 
Map<String,ZcStat> statCache,
-        ImmutableCacheCopies prev) {
-      this.updateCount = updateCount;
-      this.cache = Collections.unmodifiableMap(new HashMap<>(cache));
-      this.statCache = Collections.unmodifiableMap(new HashMap<>(statCache));
-      this.childrenCache = prev.childrenCache;
-    }
-  }
-
-  private volatile ImmutableCacheCopies immutableCache = new 
ImmutableCacheCopies(0);
-  private long updateCount = 0;
+  private final AtomicLong updateCount = new AtomicLong(0);
 
   /**
    * Returns a ZooKeeper session. Calls should be made within run of 
ZooRunnable after caches are
@@ -211,9 +204,7 @@ public class ZooCache {
    */
   public ZooCache(ZooReader reader, Watcher watcher) {
     this.zReader = reader;
-    this.cache = new HashMap<>();
-    this.statCache = new HashMap<>();
-    this.childrenCache = new HashMap<>();
+    nodeCache = new ConcurrentHashMap<>();
     this.externalWatcher = watcher;
   }
 
@@ -248,10 +239,16 @@ public class ZooCache {
 
         try {
           return run();
-        } catch (KeeperException e) {
-          final Code code = e.code();
+        } catch (KeeperException | ZcException e) {
+          KeeperException ke;
+          if (e instanceof ZcException) {
+            ke = ((ZcException) e).getZKException();
+          } else {
+            ke = ((KeeperException) e);
+          }
+          final Code code = ke.code();
           if (code == Code.NONODE) {
-            log.error("Looked up non-existent node in cache " + e.getPath(), 
e);
+            log.error("Looked up non-existent node in cache " + ke.getPath(), 
e);
           } else if (code == Code.CONNECTIONLOSS || code == 
Code.OPERATIONTIMEOUT
               || code == Code.SESSIONEXPIRED) {
             log.warn("Saw (possibly) transient exception communicating with 
ZooKeeper, will retry",
@@ -259,7 +256,7 @@ public class ZooCache {
           } else {
             log.warn("Zookeeper error, will retry", e);
           }
-        } catch (InterruptedException e) {
+        } catch (InterruptedException | ZcInterruptedException e) {
           log.info("Zookeeper error, will retry", e);
         } catch (ConcurrentModificationException e) {
           log.debug("Zookeeper was modified, will retry");
@@ -280,6 +277,26 @@ public class ZooCache {
 
   }
 
+  private static class ZcException extends RuntimeException {
+    private static final long serialVersionUID = 1;
+
+    private ZcException(KeeperException e) {
+      super(e);
+    }
+
+    public KeeperException getZKException() {
+      return (KeeperException) getCause();
+    }
+  }
+
+  private static class ZcInterruptedException extends RuntimeException {
+    private static final long serialVersionUID = 1;
+
+    private ZcInterruptedException(InterruptedException e) {
+      super(e);
+    }
+  }
+
   /**
    * Gets the children of the given node. A watch is established by this call.
    *
@@ -294,41 +311,43 @@ public class ZooCache {
       @Override
       public List<String> run() throws KeeperException, InterruptedException {
 
-        // only read volatile once for consistency
-        ImmutableCacheCopies lic = immutableCache;
-        var cachedChildren = lic.childrenCache.get(zPath);
-        // null is cached as a value, that is the reason for the secondary 
containsKey check
-        if (cachedChildren != null || lic.childrenCache.containsKey(zPath)) {
-          return cachedChildren;
+        var zcNode = nodeCache.get(zPath);
+        if (zcNode != null && zcNode.childrenSet) {
+          return zcNode.getChildren();
         }
 
-        cacheWriteLock.lock();
         try {
-          List<String> children = childrenCache.get(zPath);
-          // null is cached as a value, that is the reason for the secondary 
containsKey check
-          if (children != null || lic.childrenCache.containsKey(zPath)) {
-            return children;
-          }
-
-          final ZooKeeper zooKeeper = getZooKeeper();
+          zcNode = nodeCache.compute(zPath, (zp, zcn) -> {
+            // recheck the children now that lock is held on key
+            if (zcn != null && zcn.childrenSet) {
+              return zcn;
+            }
 
-          children = zooKeeper.getChildren(zPath, watcher);
-          if (children != null) {
-            children = List.copyOf(children);
-          }
-          childrenCache.put(zPath, children);
-          immutableCache = new ImmutableCacheCopies(++updateCount, 
immutableCache, childrenCache);
-          return children;
-        } catch (KeeperException ke) {
-          if (ke.code() != Code.NONODE) {
-            throw ke;
+            try {
+              final ZooKeeper zooKeeper = getZooKeeper();
+              List<String> children;
+              children = zooKeeper.getChildren(zPath, watcher);
+              if (children != null) {
+                children = List.copyOf(children);
+              }
+              return new ZcNode(zcn, children);
+            } catch (KeeperException e) {
+              throw new ZcException(e);
+            } catch (InterruptedException e) {
+              throw new ZcInterruptedException(e);
+            }
+          });
+          // increment this after compute call completes when the change is 
visible
+          updateCount.incrementAndGet();
+          return zcNode.getChildren();
+        } catch (ZcException zce) {
+          if (zce.getZKException().code() == Code.NONODE) {
+            return null;
+          } else {
+            throw zce;
           }
-        } finally {
-          cacheWriteLock.unlock();
         }
-        return null;
       }
-
     };
 
     return zr.retry();
@@ -359,54 +378,67 @@ public class ZooCache {
 
       @Override
       public byte[] run() throws KeeperException, InterruptedException {
-        ZcStat zstat = null;
 
-        // only read volatile once so following code works with a consistent 
snapshot
-        ImmutableCacheCopies lic = immutableCache;
-        byte[] val = lic.cache.get(zPath);
-        if (val != null || lic.cache.containsKey(zPath)) {
+        var zcNode = nodeCache.get(zPath);
+        if (zcNode != null && zcNode.dataSet) {
           if (status != null) {
-            zstat = lic.statCache.get(zPath);
-            copyStats(status, zstat);
+            copyStats(status, zcNode.getStat());
           }
-          return val;
+          return zcNode.getData();
         }
 
-        /*
-         * The following call to exists() is important, since we are caching 
that a node does not
-         * exist. Once the node comes into existence, it will be added to the 
cache. But this
-         * notification of a node coming into existence will only be given if 
exists() was
-         * previously called. If the call to exists() is bypassed and only 
getData() is called with
-         * a special case that looks for Code.NONODE in the KeeperException, 
then non-existence can
-         * not be cached.
-         */
-        cacheWriteLock.lock();
-        try {
-          final ZooKeeper zooKeeper = getZooKeeper();
-          Stat stat = zooKeeper.exists(zPath, watcher);
-          byte[] data = null;
-          if (stat == null) {
-            if (log.isTraceEnabled()) {
-              log.trace("zookeeper did not contain {}", zPath);
-            }
-          } else {
-            try {
-              data = zooKeeper.getData(zPath, watcher, stat);
-              zstat = new ZcStat(stat);
-            } catch (KeeperException.BadVersionException | 
KeeperException.NoNodeException e1) {
-              throw new ConcurrentModificationException();
-            }
-            if (log.isTraceEnabled()) {
-              log.trace("zookeeper contained {} {}", zPath,
-                  (data == null ? null : new String(data, UTF_8)));
+        zcNode = nodeCache.compute(zPath, (zp, zcn) -> {
+          // recheck the now that lock is held on key, it may be present now. 
Could have been
+          // computed while waiting for lock.
+          if (zcn != null && zcn.dataSet) {
+            return zcn;
+          }
+          /*
+           * The following call to exists() is important, since we are caching 
that a node does not
+           * exist. Once the node comes into existence, it will be added to 
the cache. But this
+           * notification of a node coming into existence will only be given 
if exists() was
+           * previously called. If the call to exists() is bypassed and only 
getData() is called
+           * with a special case that looks for Code.NONODE in the 
KeeperException, then
+           * non-existence can not be cached.
+           */
+          try {
+            final ZooKeeper zooKeeper = getZooKeeper();
+            Stat stat = zooKeeper.exists(zPath, watcher);
+            byte[] data = null;
+            ZcStat zstat = null;
+            if (stat == null) {
+              if (log.isTraceEnabled()) {
+                log.trace("zookeeper did not contain {}", zPath);
+              }
+            } else {
+              try {
+                data = zooKeeper.getData(zPath, watcher, stat);
+                zstat = new ZcStat(stat);
+              } catch (KeeperException.BadVersionException | 
KeeperException.NoNodeException e1) {
+                throw new ConcurrentModificationException();
+              } catch (InterruptedException e) {
+                throw new ZcInterruptedException(e);
+              }
+              if (log.isTraceEnabled()) {
+                log.trace("zookeeper contained {} {}", zPath,
+                    (data == null ? null : new String(data, UTF_8)));
+              }
             }
+            return new ZcNode(data, zstat, zcn);
+          } catch (KeeperException ke) {
+            throw new ZcException(ke);
+          } catch (InterruptedException e) {
+            throw new ZcInterruptedException(e);
           }
-          put(zPath, data, zstat);
-          copyStats(status, zstat);
-          return data;
-        } finally {
-          cacheWriteLock.unlock();
+
+        });
+
+        // update this after the compute call completes when the change is 
visible
+        updateCount.incrementAndGet();
+        if (status != null) {
+          copyStats(status, zcNode.getStat());
         }
+        return zcNode.getData();
       }
     };
 
@@ -426,29 +458,9 @@ public class ZooCache {
     }
   }
 
-  private void put(String zPath, byte[] data, ZcStat stat) {
-    cacheWriteLock.lock();
-    try {
-      cache.put(zPath, data);
-      statCache.put(zPath, stat);
-
-      immutableCache = new ImmutableCacheCopies(++updateCount, cache, 
statCache, immutableCache);
-    } finally {
-      cacheWriteLock.unlock();
-    }
-  }
-
   private void remove(String zPath) {
-    cacheWriteLock.lock();
-    try {
-      cache.remove(zPath);
-      childrenCache.remove(zPath);
-      statCache.remove(zPath);
-
-      immutableCache = new ImmutableCacheCopies(++updateCount, cache, 
statCache, childrenCache);
-    } finally {
-      cacheWriteLock.unlock();
-    }
+    nodeCache.remove(zPath);
+    updateCount.incrementAndGet();
   }
 
   /**
@@ -456,16 +468,8 @@ public class ZooCache {
    */
   public void clear() {
     Preconditions.checkState(!closed);
-    cacheWriteLock.lock();
-    try {
-      cache.clear();
-      childrenCache.clear();
-      statCache.clear();
-
-      immutableCache = new ImmutableCacheCopies(++updateCount);
-    } finally {
-      cacheWriteLock.unlock();
-    }
+    nodeCache.clear();
+    updateCount.incrementAndGet();
   }
 
   public void close() {
@@ -478,7 +482,7 @@ public class ZooCache {
    */
   public long getUpdateCount() {
     Preconditions.checkState(!closed);
-    return immutableCache.updateCount;
+    return updateCount.get();
   }
 
   /**
@@ -489,12 +493,8 @@ public class ZooCache {
    */
   @VisibleForTesting
   boolean dataCached(String zPath) {
-    cacheReadLock.lock();
-    try {
-      return immutableCache.cache.containsKey(zPath) && 
cache.containsKey(zPath);
-    } finally {
-      cacheReadLock.unlock();
-    }
+    var zcn = nodeCache.get(zPath);
+    return zcn != null && zcn.dataSet;
   }
 
   /**
@@ -505,12 +505,8 @@ public class ZooCache {
    */
   @VisibleForTesting
   boolean childrenCached(String zPath) {
-    cacheReadLock.lock();
-    try {
-      return immutableCache.childrenCache.containsKey(zPath) && 
childrenCache.containsKey(zPath);
-    } finally {
-      cacheReadLock.unlock();
-    }
+    var zcn = nodeCache.get(zPath);
+    return zcn != null && zcn.childrenSet;
   }
 
   /**
@@ -518,16 +514,8 @@ public class ZooCache {
    */
   public void clear(Predicate<String> pathPredicate) {
     Preconditions.checkState(!closed);
-    cacheWriteLock.lock();
-    try {
-      cache.keySet().removeIf(pathPredicate);
-      childrenCache.keySet().removeIf(pathPredicate);
-      statCache.keySet().removeIf(pathPredicate);
-
-      immutableCache = new ImmutableCacheCopies(++updateCount, cache, 
statCache, childrenCache);
-    } finally {
-      cacheWriteLock.unlock();
-    }
+    nodeCache.keySet().removeIf(pathPredicate);
+    updateCount.incrementAndGet();
   }
 
   /**

Reply via email to