ACCUMULO-3508 added read-write lock to ZooCache

Conflicts:
        fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/2a2bfbb2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/2a2bfbb2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/2a2bfbb2

Branch: refs/heads/1.7
Commit: 2a2bfbb2a34f1f544723c139f9e153a2b65c5e5e
Parents: abc6966
Author: Eric C. Newton <eric.new...@gmail.com>
Authored: Wed Oct 14 15:12:23 2015 -0400
Committer: Josh Elser <els...@apache.org>
Committed: Wed Dec 30 19:07:24 2015 -0500

----------------------------------------------------------------------
 .../accumulo/fate/zookeeper/ZooCache.java       | 314 +++++++++++--------
 1 file changed, 188 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/2a2bfbb2/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
----------------------------------------------------------------------
diff --git 
a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java 
b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
index e13b2cc..b30bb15 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
@@ -28,6 +28,10 @@ import java.util.ConcurrentModificationException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.LockSupport;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
@@ -45,14 +49,18 @@ import com.google.common.annotations.VisibleForTesting;
 public class ZooCache {
   private static final Logger log = Logger.getLogger(ZooCache.class);
 
-  private ZCacheWatcher watcher = new ZCacheWatcher();
-  private Watcher externalWatcher = null;
+  private final ZCacheWatcher watcher = new ZCacheWatcher();
+  private final Watcher externalWatcher;
 
-  private HashMap<String,byte[]> cache;
-  private HashMap<String,Stat> statCache;
-  private HashMap<String,List<String>> childrenCache;
+  private final ReadWriteLock cacheLock = new ReentrantReadWriteLock(false);
+  private final Lock cacheWriteLock = cacheLock.writeLock();
+  private final Lock cacheReadLock = cacheLock.readLock();
 
-  private ZooReader zReader;
+  private final HashMap<String,byte[]> cache;
+  private final HashMap<String,Stat> statCache;
+  private final HashMap<String,List<String>> childrenCache;
+
+  private final ZooReader zReader;
 
   private ZooKeeper getZooKeeper() {
     return zReader.getZooKeeper();
@@ -88,10 +96,12 @@ public class ZooCache {
               break;
             default:
               log.warn("Unhandled: " + event);
+              break;
           }
           break;
         default:
           log.warn("Unhandled: " + event);
+          break;
       }
 
       if (externalWatcher != null) {
@@ -142,49 +152,48 @@ public class ZooCache {
     this.externalWatcher = watcher;
   }
 
-  private interface ZooRunnable {
+  private abstract class ZooRunnable<T> {
     /**
      * Runs an operation against ZooKeeper, automatically retrying in the face 
of KeeperExceptions
      */
-    void run(ZooKeeper zooKeeper) throws KeeperException, InterruptedException;
-  }
+    abstract T run(ZooKeeper zooKeeper) throws KeeperException, 
InterruptedException;
 
-  private synchronized void retry(ZooRunnable op) {
+    public T retry() {
 
-    int sleepTime = 100;
+      int sleepTime = 100;
 
-    while (true) {
+      while (true) {
 
-      ZooKeeper zooKeeper = getZooKeeper();
+        ZooKeeper zooKeeper = getZooKeeper();
 
-      try {
-        op.run(zooKeeper);
-        return;
-
-      } catch (KeeperException e) {
-        final Code code = e.code();
-        if (code == Code.NONODE) {
-          log.error("Looked up non-existent node in cache " + e.getPath(), e);
-        } else if (code == Code.CONNECTIONLOSS || code == 
Code.OPERATIONTIMEOUT || code == Code.SESSIONEXPIRED) {
-          log.warn("Saw (possibly) transient exception communicating with 
ZooKeeper, will retry", e);
-        } else {
-          log.warn("Zookeeper error, will retry", e);
+        try {
+          return run(zooKeeper);
+        } catch (KeeperException e) {
+          final Code code = e.code();
+          if (code == Code.NONODE) {
+            log.error("Looked up non-existent node in cache " + e.getPath(), 
e);
+          } else if (code == Code.CONNECTIONLOSS || code == 
Code.OPERATIONTIMEOUT || code == Code.SESSIONEXPIRED) {
+            log.warn("Saw (possibly) transient exception communicating with 
ZooKeeper, will retry", e);
+          } else {
+            log.warn("Zookeeper error, will retry", e);
+          }
+        } catch (InterruptedException e) {
+          log.info("Zookeeper error, will retry", e);
+        } catch (ConcurrentModificationException e) {
+          log.debug("Zookeeper was modified, will retry");
         }
-      } catch (InterruptedException e) {
-        log.info("Zookeeper error, will retry", e);
-      } catch (ConcurrentModificationException e) {
-        log.debug("Zookeeper was modified, will retry");
-      }
 
-      try {
-        // do not hold lock while sleeping
-        wait(sleepTime);
-      } catch (InterruptedException e) {
-        log.info("Interrupted waiting before retrying ZooKeeper operation", e);
+        try {
+          // do not hold lock while sleeping
+          Thread.sleep(sleepTime);
+        } catch (InterruptedException e) {
+          log.debug("Wait in retry() was interrupted.", e);
+        }
+        LockSupport.parkNanos(sleepTime);
+        if (sleepTime < 10000) {
+          sleepTime = (int) (sleepTime + sleepTime * Math.random());
+        }
       }
-      if (sleepTime < 10000)
-        sleepTime = (int) (sleepTime + sleepTime * Math.random());
-
     }
   }
 
@@ -197,29 +206,41 @@ public class ZooCache {
    */
   public synchronized List<String> getChildren(final String zPath) {
 
-    ZooRunnable zr = new ZooRunnable() {
+    ZooRunnable<List<String>> zr = new ZooRunnable<List<String>>() {
 
       @Override
-      public void run(ZooKeeper zooKeeper) throws KeeperException, 
InterruptedException {
-
-        if (childrenCache.containsKey(zPath))
-          return;
+      public List<String> run(ZooKeeper zooKeeper) throws KeeperException, 
InterruptedException {
+        try {
+          cacheReadLock.lock();
+          if (childrenCache.containsKey(zPath)) {
+            return childrenCache.get(zPath);
+          }
+        } finally {
+          cacheReadLock.unlock();
+        }
 
+        cacheWriteLock.lock();
         try {
+          if (childrenCache.containsKey(zPath)) {
+            return childrenCache.get(zPath);
+          }
           List<String> children = zooKeeper.getChildren(zPath, watcher);
           childrenCache.put(zPath, children);
+          return children;
         } catch (KeeperException ke) {
           if (ke.code() != Code.NONODE) {
             throw ke;
           }
+        } finally {
+          cacheWriteLock.unlock();
         }
+        return null;
       }
 
     };
 
-    retry(zr);
+    List<String> children = zr.retry();
 
-    List<String> children = childrenCache.get(zPath);
     if (children == null) {
       return null;
     }
@@ -233,7 +254,7 @@ public class ZooCache {
    *          path to get
    * @return path data, or null if non-existent
    */
-  public synchronized byte[] get(final String zPath) {
+  public byte[] get(final String zPath) {
     return get(zPath, null);
   }
 
@@ -242,95 +263,127 @@ public class ZooCache {
    *
    * @param zPath
    *          path to get
-   * @param stat
+   * @param status
    *          status object to populate
    * @return path data, or null if non-existent
    */
-  public synchronized byte[] get(final String zPath, Stat stat) {
-    ZooRunnable zr = new ZooRunnable() {
+  public byte[] get(final String zPath, final Stat status) {
+    ZooRunnable<byte[]> zr = new ZooRunnable<byte[]>() {
 
       @Override
-      public void run(ZooKeeper zooKeeper) throws KeeperException, 
InterruptedException {
-
-        if (cache.containsKey(zPath))
-          return;
+      public byte[] run(ZooKeeper zooKeeper) throws KeeperException, 
InterruptedException {
+        Stat stat = null;
+        cacheReadLock.lock();
+        try {
+          if (cache.containsKey(zPath)) {
+            stat = statCache.get(zPath);
+            copyStats(status, stat);
+            return cache.get(zPath);
+          }
+        } finally {
+          cacheReadLock.unlock();
+        }
 
         /*
          * The following call to exists() is important, since we are caching 
that a node does not exist. Once the node comes into existence, it will be 
added to
-         * the cache. But this notification of a node coming into existence 
will only be given if exists() was previously called. If the call to exists() is
-         * bypassed and only getData() is called with a special case that 
looks for Code.NONODE in the KeeperException, then non-existence can not be 
cached.
+         * the cache. But this notification of a node coming into existence 
will only be given if exists() was previously called.
+         * 
+         * If the call to exists() is bypassed and only getData() is called 
with a special case that looks for Code.NONODE in the KeeperException, then
+         * non-existence can not be cached.
          */
-
-        Stat stat = zooKeeper.exists(zPath, watcher);
-
-        byte[] data = null;
-
-        if (stat == null) {
-          if (log.isTraceEnabled())
-            log.trace("zookeeper did not contain " + zPath);
-        } else {
-          try {
-            data = zooKeeper.getData(zPath, watcher, stat);
-          } catch (KeeperException.BadVersionException e1) {
-            throw new ConcurrentModificationException();
-          } catch (KeeperException.NoNodeException e2) {
-            throw new ConcurrentModificationException();
+        cacheWriteLock.lock();
+        try {
+          stat = zooKeeper.exists(zPath, watcher);
+          byte[] data = null;
+          if (stat == null) {
+            if (log.isTraceEnabled()) {
+              log.trace("zookeeper did not contain " + zPath);
+            }
+          } else {
+            try {
+              data = zooKeeper.getData(zPath, watcher, stat);
+            } catch (KeeperException.BadVersionException e1) {
+              throw new ConcurrentModificationException();
+            } catch (KeeperException.NoNodeException e2) {
+              throw new ConcurrentModificationException();
+            }
+            if (log.isTraceEnabled()) {
+              log.trace("zookeeper contained " + zPath + " " + (data == null ? 
null : new String(data, UTF_8)));
+            }
           }
-          if (log.isTraceEnabled())
-            log.trace("zookeeper contained " + zPath + " " + (data == null ? 
null : new String(data, UTF_8)));
+          put(zPath, data, stat);
+          copyStats(status, stat);
+          statCache.put(zPath, stat);
+          return data;
+        } finally {
+          cacheWriteLock.unlock();
         }
-        if (log.isTraceEnabled())
-          log.trace("putting " + zPath + " " + (data == null ? null : new 
String(data, UTF_8)) + " in cache");
-        put(zPath, data, stat);
       }
-
     };
 
-    retry(zr);
+    return zr.retry();
+  }
 
-    if (stat != null) {
-      Stat cstat = statCache.get(zPath);
-      if (cstat != null) {
-        try {
-          ByteArrayOutputStream baos = new ByteArrayOutputStream();
-          DataOutputStream dos = new DataOutputStream(baos);
-          cstat.write(dos);
-          dos.close();
-
-          ByteArrayInputStream bais = new 
ByteArrayInputStream(baos.toByteArray());
-          DataInputStream dis = new DataInputStream(bais);
-          stat.readFields(dis);
-
-          dis.close();
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
+  /**
+   * Helper method to copy stats from the cached stat into userStat
+   *
+   * @param userStat
+   *          user Stat object
+   * @param cachedStat
+   *          cached statistic, that is or will be cached
+   */
+  protected void copyStats(Stat userStat, Stat cachedStat) {
+    if (userStat != null && cachedStat != null) {
+      try {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos);
+        cachedStat.write(dos);
+        dos.close();
+
+        ByteArrayInputStream bais = new 
ByteArrayInputStream(baos.toByteArray());
+        DataInputStream dis = new DataInputStream(bais);
+        userStat.readFields(dis);
+
+        dis.close();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
       }
     }
-
-    return cache.get(zPath);
   }
 
-  private synchronized void put(String zPath, byte[] data, Stat stat) {
-    cache.put(zPath, data);
-    statCache.put(zPath, stat);
+  private void put(String zPath, byte[] data, Stat stat) {
+    cacheWriteLock.lock();
+    try {
+      cache.put(zPath, data);
+      statCache.put(zPath, stat);
+    } finally {
+      cacheWriteLock.unlock();
+    }
   }
 
-  private synchronized void remove(String zPath) {
-    if (log.isTraceEnabled())
-      log.trace("removing " + zPath + " from cache");
-    cache.remove(zPath);
-    childrenCache.remove(zPath);
-    statCache.remove(zPath);
+  private void remove(String zPath) {
+    cacheWriteLock.lock();
+    try {
+      cache.remove(zPath);
+      childrenCache.remove(zPath);
+      statCache.remove(zPath);
+    } finally {
+      cacheWriteLock.unlock();
+    }
   }
 
   /**
    * Clears this cache.
    */
   public synchronized void clear() {
-    cache.clear();
-    childrenCache.clear();
-    statCache.clear();
+    cacheWriteLock.lock();
+    try {
+      cache.clear();
+      childrenCache.clear();
+      statCache.clear();
+    } finally {
+      cacheWriteLock.unlock();
+    }
   }
 
   /**
@@ -353,8 +406,13 @@ public class ZooCache {
    * @return true if children are cached
    */
   @VisibleForTesting
-  synchronized boolean childrenCached(String zPath) {
-    return childrenCache.containsKey(zPath);
+  boolean childrenCached(String zPath) {
+    cacheReadLock.lock();
+    try {
+      return childrenCache.containsKey(zPath);
+    } finally {
+      cacheReadLock.unlock();
+    }
   }
 
   /**
@@ -363,24 +421,28 @@ public class ZooCache {
    * @param zPath
    *          path of top node
    */
-  public synchronized void clear(String zPath) {
-
-    for (Iterator<String> i = cache.keySet().iterator(); i.hasNext();) {
-      String path = i.next();
-      if (path.startsWith(zPath))
-        i.remove();
-    }
+  public void clear(String zPath) {
+    cacheWriteLock.lock();
+    try {
+      for (Iterator<String> i = cache.keySet().iterator(); i.hasNext();) {
+        String path = i.next();
+        if (path.startsWith(zPath))
+          i.remove();
+      }
 
-    for (Iterator<String> i = childrenCache.keySet().iterator(); i.hasNext();) 
{
-      String path = i.next();
-      if (path.startsWith(zPath))
-        i.remove();
-    }
+      for (Iterator<String> i = childrenCache.keySet().iterator(); 
i.hasNext();) {
+        String path = i.next();
+        if (path.startsWith(zPath))
+          i.remove();
+      }
 
-    for (Iterator<String> i = statCache.keySet().iterator(); i.hasNext();) {
-      String path = i.next();
-      if (path.startsWith(zPath))
-        i.remove();
+      for (Iterator<String> i = statCache.keySet().iterator(); i.hasNext();) {
+        String path = i.next();
+        if (path.startsWith(zPath))
+          i.remove();
+      }
+    } finally {
+      cacheWriteLock.unlock();
     }
   }
 

Reply via email to