Repository: accumulo Updated Branches: refs/heads/master 10cafac94 -> dd420f631
ACCUMULO-3508 added read-write lock to ZooCache Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/dd420f63 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/dd420f63 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/dd420f63 Branch: refs/heads/master Commit: dd420f6312322d13b814a177fc87fabb0576dc04 Parents: 10cafac Author: Eric C. Newton <eric.new...@gmail.com> Authored: Wed Oct 14 15:12:23 2015 -0400 Committer: Eric C. Newton <eric.new...@gmail.com> Committed: Wed Oct 14 15:13:42 2015 -0400 ---------------------------------------------------------------------- .../accumulo/fate/zookeeper/ZooCache.java | 325 +++++++++++-------- 1 file changed, 195 insertions(+), 130 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/dd420f63/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java ---------------------------------------------------------------------- diff --git a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java index f043d83..8c9f80d 100644 --- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java +++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java @@ -28,6 +28,10 @@ import java.util.ConcurrentModificationException; import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.LockSupport; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; @@ -46,14 +50,18 @@ import com.google.common.annotations.VisibleForTesting; public class ZooCache { private static final Logger log = LoggerFactory.getLogger(ZooCache.class); - private ZCacheWatcher watcher = new ZCacheWatcher(); - private Watcher externalWatcher = null; + private final ZCacheWatcher watcher = new ZCacheWatcher(); + private final Watcher externalWatcher; - private HashMap<String,byte[]> cache; - private HashMap<String,Stat> statCache; - private HashMap<String,List<String>> childrenCache; + private final ReadWriteLock cacheLock = new ReentrantReadWriteLock(false); + private final Lock cacheWriteLock = cacheLock.writeLock(); + private final Lock cacheReadLock = cacheLock.readLock(); - private ZooReader zReader; + private final HashMap<String,byte[]> cache; + private final HashMap<String,Stat> statCache; + private final HashMap<String,List<String>> childrenCache; + + private final ZooReader zReader; private ZooKeeper getZooKeeper() { return zReader.getZooKeeper(); @@ -63,8 +71,9 @@ public class ZooCache { @Override public void process(WatchedEvent event) { - if (log.isTraceEnabled()) + if (log.isTraceEnabled()) { log.trace("{}", event); + } switch (event.getType()) { case NodeDataChanged: @@ -89,10 +98,12 @@ public class ZooCache { break; default: log.warn("Unhandled: " + event); + break; } break; default: log.warn("Unhandled: " + event); + break; } if (externalWatcher != null) { @@ -143,50 +154,50 @@ public class ZooCache { this.externalWatcher = watcher; } - private interface ZooRunnable { + private abstract class ZooRunnable<T> { /** * Runs an operation against ZooKeeper, automatically retrying in the face of KeeperExceptions */ - void run(ZooKeeper zooKeeper) throws KeeperException, InterruptedException; - } + abstract T run(ZooKeeper zooKeeper) throws KeeperException, InterruptedException; - private synchronized void retry(ZooRunnable op) { + public T retry() { - int sleepTime = 100; + int sleepTime = 100; - while (true) { + while (true) { - ZooKeeper zooKeeper = getZooKeeper(); + ZooKeeper zooKeeper = getZooKeeper(); - try { - op.run(zooKeeper); - return; - - } catch (KeeperException e) { - final Code code = e.code(); - if (code == Code.NONODE) { - log.error("Looked up non-existent node in cache " + e.getPath(), e); - } else if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT || code == Code.SESSIONEXPIRED) { - log.warn("Saw (possibly) transient exception communicating with ZooKeeper, will retry", e); - } else { - log.warn("Zookeeper error, will retry", e); + try { + return run(zooKeeper); + } catch (KeeperException e) { + final Code code = e.code(); + if (code == Code.NONODE) { + log.error("Looked up non-existent node in cache " + e.getPath(), e); + } else if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT || code == Code.SESSIONEXPIRED) { + log.warn("Saw (possibly) transient exception communicating with ZooKeeper, will retry", e); + } else { + log.warn("Zookeeper error, will retry", e); + } + } catch (InterruptedException e) { + log.info("Zookeeper error, will retry", e); + } catch (ConcurrentModificationException e) { + log.debug("Zookeeper was modified, will retry"); } - } catch (InterruptedException e) { - log.info("Zookeeper error, will retry", e); - } catch (ConcurrentModificationException e) { - log.debug("Zookeeper was modified, will retry"); - } - try { - // do not hold lock while sleeping - wait(sleepTime); - } catch (InterruptedException e) { - log.debug("Wait in retry() was interrupted.", e); + try { + // do not hold lock while sleeping + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + log.debug("Wait in retry() was interrupted.", e); + } + LockSupport.parkNanos(sleepTime); + if (sleepTime < 10_000) { + sleepTime = (int) (sleepTime + sleepTime * Math.random()); + } } - if (sleepTime < 10000) - sleepTime = (int) (sleepTime + sleepTime * Math.random()); - } + } /** @@ -198,29 +209,41 @@ public class ZooCache { */ public synchronized List<String> getChildren(final String zPath) { - ZooRunnable zr = new ZooRunnable() { + ZooRunnable<List<String>> zr = new ZooRunnable<List<String>>() { @Override - public void run(ZooKeeper zooKeeper) throws KeeperException, InterruptedException { - - if (childrenCache.containsKey(zPath)) - return; + public List<String> run(ZooKeeper zooKeeper) throws KeeperException, InterruptedException { + try { + cacheReadLock.lock(); + if (childrenCache.containsKey(zPath)) { + return childrenCache.get(zPath); + } + } finally { + cacheReadLock.unlock(); + } + cacheWriteLock.lock(); try { + if (childrenCache.containsKey(zPath)) { + return childrenCache.get(zPath); + } List<String> children = zooKeeper.getChildren(zPath, watcher); childrenCache.put(zPath, children); + return children; } catch (KeeperException ke) { if (ke.code() != Code.NONODE) { throw ke; } + } finally { + cacheWriteLock.unlock(); } + return null; } }; - retry(zr); + List<String> children = zr.retry(); - List<String> children = childrenCache.get(zPath); if (children == null) { return null; } @@ -234,7 +257,7 @@ public class ZooCache { * path to get * @return path data, or null if non-existent */ - public synchronized byte[] get(final String zPath) { + public byte[] get(final String zPath) { return get(zPath, null); } @@ -243,94 +266,127 @@ public class ZooCache { * * @param zPath * path to get - * @param stat + * @param status * status object to populate * @return path data, or null if non-existent */ - public synchronized byte[] get(final String zPath, Stat stat) { - ZooRunnable zr = new ZooRunnable() { + public byte[] get(final String zPath, final Stat status) { + ZooRunnable<byte[]> zr = new ZooRunnable<byte[]>() { @Override - public void run(ZooKeeper zooKeeper) throws KeeperException, InterruptedException { - - if (cache.containsKey(zPath)) - return; - - // The following call to exists() is important, since we are caching that a node does not exist. Once the node comes into existence, it will be added to - // the cache. But this notification of a node coming into existence will only be given if exists() was previously called. - // - // If the call to exists() is bypassed and only getData() is called with a special case that looks for Code.NONODE in the KeeperException, then - // non-existence can not be cached. - Stat stat = zooKeeper.exists(zPath, watcher); - - byte[] data = null; - - if (stat == null) { - if (log.isTraceEnabled()) - log.trace("zookeeper did not contain " + zPath); - } else { - try { - data = zooKeeper.getData(zPath, watcher, stat); - } catch (KeeperException.BadVersionException e1) { - throw new ConcurrentModificationException(); - } catch (KeeperException.NoNodeException e2) { - throw new ConcurrentModificationException(); + public byte[] run(ZooKeeper zooKeeper) throws KeeperException, InterruptedException { + Stat stat = null; + cacheReadLock.lock(); + try { + if (cache.containsKey(zPath)) { + stat = statCache.get(zPath); + copyStats(status, stat); + return cache.get(zPath); } - if (log.isTraceEnabled()) - log.trace("zookeeper contained " + zPath + " " + (data == null ? null : new String(data, UTF_8))); + } finally { + cacheReadLock.unlock(); } - if (log.isTraceEnabled()) - log.trace("putting " + zPath + " " + (data == null ? null : new String(data, UTF_8)) + " in cache"); - put(zPath, data, stat); - } + /* + * The following call to exists() is important, since we are caching that a node does not exist. Once the node comes into existence, it will be added to + * the cache. But this notification of a node coming into existence will only be given if exists() was previously called. + * + * If the call to exists() is bypassed and only getData() is called with a special case that looks for Code.NONODE in the KeeperException, then + * non-existence can not be cached. + */ + cacheWriteLock.lock(); + try { + stat = zooKeeper.exists(zPath, watcher); + byte[] data = null; + if (stat == null) { + if (log.isTraceEnabled()) { + log.trace("zookeeper did not contain " + zPath); + } + } else { + try { + data = zooKeeper.getData(zPath, watcher, stat); + } catch (KeeperException.BadVersionException e1) { + throw new ConcurrentModificationException(); + } catch (KeeperException.NoNodeException e2) { + throw new ConcurrentModificationException(); + } + if (log.isTraceEnabled()) { + log.trace("zookeeper contained " + zPath + " " + (data == null ? null : new String(data, UTF_8))); + } + } + put(zPath, data, stat); + copyStats(status, stat); + statCache.put(zPath, stat); + return data; + } finally { + cacheWriteLock.unlock(); + } + } }; - retry(zr); + return zr.retry(); + } - if (stat != null) { - Stat cstat = statCache.get(zPath); - if (cstat != null) { - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(baos); - cstat.write(dos); - dos.close(); - - ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); - DataInputStream dis = new DataInputStream(bais); - stat.readFields(dis); - - dis.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } + /** + * Helper method to copy stats from the cached stat into userStat + * + * @param userStat + * user Stat object + * @param cachedStat + * cached statistic, that is or will be cached + */ + protected void copyStats(Stat userStat, Stat cachedStat) { + if (userStat != null && cachedStat != null) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + cachedStat.write(dos); + dos.close(); + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + DataInputStream dis = new DataInputStream(bais); + userStat.readFields(dis); + + dis.close(); + } catch (IOException e) { + throw new RuntimeException(e); } } - - return cache.get(zPath); } - private synchronized void put(String zPath, byte[] data, Stat stat) { - cache.put(zPath, data); - statCache.put(zPath, stat); + private void put(String zPath, byte[] data, Stat stat) { + cacheWriteLock.lock(); + try { + cache.put(zPath, data); + statCache.put(zPath, stat); + } finally { + cacheWriteLock.unlock(); + } } - private synchronized void remove(String zPath) { - if (log.isTraceEnabled()) - log.trace("removing " + zPath + " from cache"); - cache.remove(zPath); - childrenCache.remove(zPath); - statCache.remove(zPath); + private void remove(String zPath) { + cacheWriteLock.lock(); + try { + cache.remove(zPath); + childrenCache.remove(zPath); + statCache.remove(zPath); + } finally { + cacheWriteLock.unlock(); + } } /** * Clears this cache. */ public synchronized void clear() { - cache.clear(); - childrenCache.clear(); - statCache.clear(); + cacheWriteLock.lock(); + try { + cache.clear(); + childrenCache.clear(); + statCache.clear(); + } finally { + cacheWriteLock.unlock(); + } } /** @@ -353,8 +409,13 @@ public class ZooCache { * @return true if children are cached */ @VisibleForTesting - synchronized boolean childrenCached(String zPath) { - return childrenCache.containsKey(zPath); + boolean childrenCached(String zPath) { + cacheReadLock.lock(); + try { + return childrenCache.containsKey(zPath); + } finally { + cacheReadLock.unlock(); + } } /** @@ -363,24 +424,28 @@ public class ZooCache { * @param zPath * path of top node */ - public synchronized void clear(String zPath) { - - for (Iterator<String> i = cache.keySet().iterator(); i.hasNext();) { - String path = i.next(); - if (path.startsWith(zPath)) - i.remove(); - } + public void clear(String zPath) { + cacheWriteLock.lock(); + try { + for (Iterator<String> i = cache.keySet().iterator(); i.hasNext();) { + String path = i.next(); + if (path.startsWith(zPath)) + i.remove(); + } - for (Iterator<String> i = childrenCache.keySet().iterator(); i.hasNext();) { - String path = i.next(); - if (path.startsWith(zPath)) - i.remove(); - } + for (Iterator<String> i = childrenCache.keySet().iterator(); i.hasNext();) { + String path = i.next(); + if (path.startsWith(zPath)) + i.remove(); + } - for (Iterator<String> i = statCache.keySet().iterator(); i.hasNext();) { - String path = i.next(); - if (path.startsWith(zPath)) - i.remove(); + for (Iterator<String> i = statCache.keySet().iterator(); i.hasNext();) { + String path = i.next(); + if (path.startsWith(zPath)) + i.remove(); + } + } finally { + cacheWriteLock.unlock(); } }