Repository: accumulo
Updated Branches:
  refs/heads/master 10cafac94 -> dd420f631


ACCUMULO-3508 added read-write lock to ZooCache


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/dd420f63
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/dd420f63
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/dd420f63

Branch: refs/heads/master
Commit: dd420f6312322d13b814a177fc87fabb0576dc04
Parents: 10cafac
Author: Eric C. Newton <eric.new...@gmail.com>
Authored: Wed Oct 14 15:12:23 2015 -0400
Committer: Eric C. Newton <eric.new...@gmail.com>
Committed: Wed Oct 14 15:13:42 2015 -0400

----------------------------------------------------------------------
 .../accumulo/fate/zookeeper/ZooCache.java       | 325 +++++++++++--------
 1 file changed, 195 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/dd420f63/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
----------------------------------------------------------------------
diff --git 
a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java 
b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
index f043d83..8c9f80d 100644
--- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
+++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
@@ -28,6 +28,10 @@ import java.util.ConcurrentModificationException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.LockSupport;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
@@ -46,14 +50,18 @@ import com.google.common.annotations.VisibleForTesting;
 public class ZooCache {
   private static final Logger log = LoggerFactory.getLogger(ZooCache.class);
 
-  private ZCacheWatcher watcher = new ZCacheWatcher();
-  private Watcher externalWatcher = null;
+  private final ZCacheWatcher watcher = new ZCacheWatcher();
+  private final Watcher externalWatcher;
 
-  private HashMap<String,byte[]> cache;
-  private HashMap<String,Stat> statCache;
-  private HashMap<String,List<String>> childrenCache;
+  private final ReadWriteLock cacheLock = new ReentrantReadWriteLock(false);
+  private final Lock cacheWriteLock = cacheLock.writeLock();
+  private final Lock cacheReadLock = cacheLock.readLock();
 
-  private ZooReader zReader;
+  private final HashMap<String,byte[]> cache;
+  private final HashMap<String,Stat> statCache;
+  private final HashMap<String,List<String>> childrenCache;
+
+  private final ZooReader zReader;
 
   private ZooKeeper getZooKeeper() {
     return zReader.getZooKeeper();
@@ -63,8 +71,9 @@ public class ZooCache {
     @Override
     public void process(WatchedEvent event) {
 
-      if (log.isTraceEnabled())
+      if (log.isTraceEnabled()) {
         log.trace("{}", event);
+      }
 
       switch (event.getType()) {
         case NodeDataChanged:
@@ -89,10 +98,12 @@ public class ZooCache {
               break;
             default:
               log.warn("Unhandled: " + event);
+              break;
           }
           break;
         default:
           log.warn("Unhandled: " + event);
+          break;
       }
 
       if (externalWatcher != null) {
@@ -143,50 +154,50 @@ public class ZooCache {
     this.externalWatcher = watcher;
   }
 
-  private interface ZooRunnable {
+  private abstract class ZooRunnable<T> {
     /**
      * Runs an operation against ZooKeeper, automatically retrying in the face 
of KeeperExceptions
      */
-    void run(ZooKeeper zooKeeper) throws KeeperException, InterruptedException;
-  }
+    abstract T run(ZooKeeper zooKeeper) throws KeeperException, 
InterruptedException;
 
-  private synchronized void retry(ZooRunnable op) {
+    public T retry() {
 
-    int sleepTime = 100;
+      int sleepTime = 100;
 
-    while (true) {
+      while (true) {
 
-      ZooKeeper zooKeeper = getZooKeeper();
+        ZooKeeper zooKeeper = getZooKeeper();
 
-      try {
-        op.run(zooKeeper);
-        return;
-
-      } catch (KeeperException e) {
-        final Code code = e.code();
-        if (code == Code.NONODE) {
-          log.error("Looked up non-existent node in cache " + e.getPath(), e);
-        } else if (code == Code.CONNECTIONLOSS || code == 
Code.OPERATIONTIMEOUT || code == Code.SESSIONEXPIRED) {
-          log.warn("Saw (possibly) transient exception communicating with 
ZooKeeper, will retry", e);
-        } else {
-          log.warn("Zookeeper error, will retry", e);
+        try {
+          return run(zooKeeper);
+        } catch (KeeperException e) {
+          final Code code = e.code();
+          if (code == Code.NONODE) {
+            log.error("Looked up non-existent node in cache " + e.getPath(), 
e);
+          } else if (code == Code.CONNECTIONLOSS || code == 
Code.OPERATIONTIMEOUT || code == Code.SESSIONEXPIRED) {
+            log.warn("Saw (possibly) transient exception communicating with 
ZooKeeper, will retry", e);
+          } else {
+            log.warn("Zookeeper error, will retry", e);
+          }
+        } catch (InterruptedException e) {
+          log.info("Zookeeper error, will retry", e);
+        } catch (ConcurrentModificationException e) {
+          log.debug("Zookeeper was modified, will retry");
         }
-      } catch (InterruptedException e) {
-        log.info("Zookeeper error, will retry", e);
-      } catch (ConcurrentModificationException e) {
-        log.debug("Zookeeper was modified, will retry");
-      }
 
-      try {
-        // do not hold lock while sleeping
-        wait(sleepTime);
-      } catch (InterruptedException e) {
-        log.debug("Wait in retry() was interrupted.", e);
+        try {
+          // do not hold lock while sleeping
+          Thread.sleep(sleepTime);
+        } catch (InterruptedException e) {
+          log.debug("Wait in retry() was interrupted.", e);
+        }
+        LockSupport.parkNanos(sleepTime);
+        if (sleepTime < 10_000) {
+          sleepTime = (int) (sleepTime + sleepTime * Math.random());
+        }
       }
-      if (sleepTime < 10000)
-        sleepTime = (int) (sleepTime + sleepTime * Math.random());
-
     }
+
   }
 
   /**
@@ -198,29 +209,41 @@ public class ZooCache {
    */
   public synchronized List<String> getChildren(final String zPath) {
 
-    ZooRunnable zr = new ZooRunnable() {
+    ZooRunnable<List<String>> zr = new ZooRunnable<List<String>>() {
 
       @Override
-      public void run(ZooKeeper zooKeeper) throws KeeperException, 
InterruptedException {
-
-        if (childrenCache.containsKey(zPath))
-          return;
+      public List<String> run(ZooKeeper zooKeeper) throws KeeperException, 
InterruptedException {
+        try {
+          cacheReadLock.lock();
+          if (childrenCache.containsKey(zPath)) {
+            return childrenCache.get(zPath);
+          }
+        } finally {
+          cacheReadLock.unlock();
+        }
 
+        cacheWriteLock.lock();
         try {
+          if (childrenCache.containsKey(zPath)) {
+            return childrenCache.get(zPath);
+          }
           List<String> children = zooKeeper.getChildren(zPath, watcher);
           childrenCache.put(zPath, children);
+          return children;
         } catch (KeeperException ke) {
           if (ke.code() != Code.NONODE) {
             throw ke;
           }
+        } finally {
+          cacheWriteLock.unlock();
         }
+        return null;
       }
 
     };
 
-    retry(zr);
+    List<String> children = zr.retry();
 
-    List<String> children = childrenCache.get(zPath);
     if (children == null) {
       return null;
     }
@@ -234,7 +257,7 @@ public class ZooCache {
    *          path to get
    * @return path data, or null if non-existent
    */
-  public synchronized byte[] get(final String zPath) {
+  public byte[] get(final String zPath) {
     return get(zPath, null);
   }
 
@@ -243,94 +266,127 @@ public class ZooCache {
    *
    * @param zPath
    *          path to get
-   * @param stat
+   * @param status
    *          status object to populate
    * @return path data, or null if non-existent
    */
-  public synchronized byte[] get(final String zPath, Stat stat) {
-    ZooRunnable zr = new ZooRunnable() {
+  public byte[] get(final String zPath, final Stat status) {
+    ZooRunnable<byte[]> zr = new ZooRunnable<byte[]>() {
 
       @Override
-      public void run(ZooKeeper zooKeeper) throws KeeperException, 
InterruptedException {
-
-        if (cache.containsKey(zPath))
-          return;
-
-        // The following call to exists() is important, since we are caching 
that a node does not exist. Once the node comes into existence, it will be 
added to
-        // the cache. But this notification of a node coming into existence 
will only be given if exists() was previously called.
-        //
-        // If the call to exists() is bypassed and only getData() is called 
with a special case that looks for Code.NONODE in the KeeperException, then
-        // non-existence can not be cached.
-        Stat stat = zooKeeper.exists(zPath, watcher);
-
-        byte[] data = null;
-
-        if (stat == null) {
-          if (log.isTraceEnabled())
-            log.trace("zookeeper did not contain " + zPath);
-        } else {
-          try {
-            data = zooKeeper.getData(zPath, watcher, stat);
-          } catch (KeeperException.BadVersionException e1) {
-            throw new ConcurrentModificationException();
-          } catch (KeeperException.NoNodeException e2) {
-            throw new ConcurrentModificationException();
+      public byte[] run(ZooKeeper zooKeeper) throws KeeperException, 
InterruptedException {
+        Stat stat = null;
+        cacheReadLock.lock();
+        try {
+          if (cache.containsKey(zPath)) {
+            stat = statCache.get(zPath);
+            copyStats(status, stat);
+            return cache.get(zPath);
           }
-          if (log.isTraceEnabled())
-            log.trace("zookeeper contained " + zPath + " " + (data == null ? 
null : new String(data, UTF_8)));
+        } finally {
+          cacheReadLock.unlock();
         }
-        if (log.isTraceEnabled())
-          log.trace("putting " + zPath + " " + (data == null ? null : new 
String(data, UTF_8)) + " in cache");
-        put(zPath, data, stat);
-      }
 
+        /*
+         * The following call to exists() is important, since we are caching 
that a node does not exist. Once the node comes into existence, it will be 
added to
+         * the cache. But this notification of a node coming into existence 
will only be given if exists() was previously called.
+         *
+         * If the call to exists() is bypassed and only getData() is called 
with a special case that looks for Code.NONODE in the KeeperException, then
+         * non-existence can not be cached.
+         */
+        cacheWriteLock.lock();
+        try {
+          stat = zooKeeper.exists(zPath, watcher);
+          byte[] data = null;
+          if (stat == null) {
+            if (log.isTraceEnabled()) {
+              log.trace("zookeeper did not contain " + zPath);
+            }
+          } else {
+            try {
+              data = zooKeeper.getData(zPath, watcher, stat);
+            } catch (KeeperException.BadVersionException e1) {
+              throw new ConcurrentModificationException();
+            } catch (KeeperException.NoNodeException e2) {
+              throw new ConcurrentModificationException();
+            }
+            if (log.isTraceEnabled()) {
+              log.trace("zookeeper contained " + zPath + " " + (data == null ? 
null : new String(data, UTF_8)));
+            }
+          }
+          put(zPath, data, stat);
+          copyStats(status, stat);
+          statCache.put(zPath, stat);
+          return data;
+        } finally {
+          cacheWriteLock.unlock();
+        }
+      }
     };
 
-    retry(zr);
+    return zr.retry();
+  }
 
-    if (stat != null) {
-      Stat cstat = statCache.get(zPath);
-      if (cstat != null) {
-        try {
-          ByteArrayOutputStream baos = new ByteArrayOutputStream();
-          DataOutputStream dos = new DataOutputStream(baos);
-          cstat.write(dos);
-          dos.close();
-
-          ByteArrayInputStream bais = new 
ByteArrayInputStream(baos.toByteArray());
-          DataInputStream dis = new DataInputStream(bais);
-          stat.readFields(dis);
-
-          dis.close();
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
+  /**
+   * Helper method to copy stats from the cached stat into userStat
+   *
+   * @param userStat
+   *          user Stat object
+   * @param cachedStat
+   *          cached statistic, that is or will be cached
+   */
+  protected void copyStats(Stat userStat, Stat cachedStat) {
+    if (userStat != null && cachedStat != null) {
+      try {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos);
+        cachedStat.write(dos);
+        dos.close();
+
+        ByteArrayInputStream bais = new 
ByteArrayInputStream(baos.toByteArray());
+        DataInputStream dis = new DataInputStream(bais);
+        userStat.readFields(dis);
+
+        dis.close();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
       }
     }
-
-    return cache.get(zPath);
   }
 
-  private synchronized void put(String zPath, byte[] data, Stat stat) {
-    cache.put(zPath, data);
-    statCache.put(zPath, stat);
+  private void put(String zPath, byte[] data, Stat stat) {
+    cacheWriteLock.lock();
+    try {
+      cache.put(zPath, data);
+      statCache.put(zPath, stat);
+    } finally {
+      cacheWriteLock.unlock();
+    }
   }
 
-  private synchronized void remove(String zPath) {
-    if (log.isTraceEnabled())
-      log.trace("removing " + zPath + " from cache");
-    cache.remove(zPath);
-    childrenCache.remove(zPath);
-    statCache.remove(zPath);
+  private void remove(String zPath) {
+    cacheWriteLock.lock();
+    try {
+      cache.remove(zPath);
+      childrenCache.remove(zPath);
+      statCache.remove(zPath);
+    } finally {
+      cacheWriteLock.unlock();
+    }
   }
 
   /**
    * Clears this cache.
    */
   public synchronized void clear() {
-    cache.clear();
-    childrenCache.clear();
-    statCache.clear();
+    cacheWriteLock.lock();
+    try {
+      cache.clear();
+      childrenCache.clear();
+      statCache.clear();
+    } finally {
+      cacheWriteLock.unlock();
+    }
   }
 
   /**
@@ -353,8 +409,13 @@ public class ZooCache {
    * @return true if children are cached
    */
   @VisibleForTesting
-  synchronized boolean childrenCached(String zPath) {
-    return childrenCache.containsKey(zPath);
+  boolean childrenCached(String zPath) {
+    cacheReadLock.lock();
+    try {
+      return childrenCache.containsKey(zPath);
+    } finally {
+      cacheReadLock.unlock();
+    }
   }
 
   /**
@@ -363,24 +424,28 @@ public class ZooCache {
    * @param zPath
    *          path of top node
    */
-  public synchronized void clear(String zPath) {
-
-    for (Iterator<String> i = cache.keySet().iterator(); i.hasNext();) {
-      String path = i.next();
-      if (path.startsWith(zPath))
-        i.remove();
-    }
+  public void clear(String zPath) {
+    cacheWriteLock.lock();
+    try {
+      for (Iterator<String> i = cache.keySet().iterator(); i.hasNext();) {
+        String path = i.next();
+        if (path.startsWith(zPath))
+          i.remove();
+      }
 
-    for (Iterator<String> i = childrenCache.keySet().iterator(); i.hasNext();) 
{
-      String path = i.next();
-      if (path.startsWith(zPath))
-        i.remove();
-    }
+      for (Iterator<String> i = childrenCache.keySet().iterator(); 
i.hasNext();) {
+        String path = i.next();
+        if (path.startsWith(zPath))
+          i.remove();
+      }
 
-    for (Iterator<String> i = statCache.keySet().iterator(); i.hasNext();) {
-      String path = i.next();
-      if (path.startsWith(zPath))
-        i.remove();
+      for (Iterator<String> i = statCache.keySet().iterator(); i.hasNext();) {
+        String path = i.next();
+        if (path.startsWith(zPath))
+          i.remove();
+      }
+    } finally {
+      cacheWriteLock.unlock();
     }
   }
 

Reply via email to