Merge branch '1.6' into 1.7 Conflicts: fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/76d9ed76 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/76d9ed76 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/76d9ed76 Branch: refs/heads/1.7 Commit: 76d9ed7691fe257cd54fca565c2f3b2f79b7269f Parents: a25ec8e 2a2bfbb Author: Josh Elser <els...@apache.org> Authored: Wed Dec 30 19:10:44 2015 -0500 Committer: Josh Elser <els...@apache.org> Committed: Wed Dec 30 19:51:40 2015 -0500 ---------------------------------------------------------------------- .../accumulo/fate/zookeeper/ZooCache.java | 319 +++++++++++-------- 1 file changed, 190 insertions(+), 129 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/76d9ed76/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java ---------------------------------------------------------------------- diff --cc fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java index f043d83,b30bb15..81985d7 --- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java +++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java @@@ -28,7 -28,12 +28,11 @@@ import java.util.ConcurrentModification import java.util.HashMap; import java.util.Iterator; import java.util.List; + import java.util.concurrent.locks.Lock; + import java.util.concurrent.locks.LockSupport; + import java.util.concurrent.locks.ReadWriteLock; + import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.log4j.Logger; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.WatchedEvent; @@@ -44,16 -47,20 +48,20 @@@ import com.google.common.annotations.Vi * A cache for values stored in ZooKeeper. Values are kept up to date as they change. */ public class ZooCache { - private static final Logger log = Logger.getLogger(ZooCache.class); + private static final Logger log = LoggerFactory.getLogger(ZooCache.class); - private ZCacheWatcher watcher = new ZCacheWatcher(); - private Watcher externalWatcher = null; + private final ZCacheWatcher watcher = new ZCacheWatcher(); + private final Watcher externalWatcher; - private HashMap<String,byte[]> cache; - private HashMap<String,Stat> statCache; - private HashMap<String,List<String>> childrenCache; + private final ReadWriteLock cacheLock = new ReentrantReadWriteLock(false); + private final Lock cacheWriteLock = cacheLock.writeLock(); + private final Lock cacheReadLock = cacheLock.readLock(); - private ZooReader zReader; + private final HashMap<String,byte[]> cache; + private final HashMap<String,Stat> statCache; + private final HashMap<String,List<String>> childrenCache; + + private final ZooReader zReader; private ZooKeeper getZooKeeper() { return zReader.getZooKeeper(); @@@ -247,81 -267,109 +268,107 @@@ * status object to populate * @return path data, or null if non-existent */ - public synchronized byte[] get(final String zPath, Stat stat) { - ZooRunnable zr = new ZooRunnable() { + public byte[] get(final String zPath, final Stat status) { + ZooRunnable<byte[]> zr = new ZooRunnable<byte[]>() { @Override - public void run(ZooKeeper zooKeeper) throws KeeperException, InterruptedException { - - if (cache.containsKey(zPath)) - return; - - // The following call to exists() is important, since we are caching that a node does not exist. Once the node comes into existence, it will be added to - // the cache. But this notification of a node coming into existence will only be given if exists() was previously called. - // - // If the call to exists() is bypassed and only getData() is called with a special case that looks for Code.NONODE in the KeeperException, then - // non-existence can not be cached. - Stat stat = zooKeeper.exists(zPath, watcher); - - byte[] data = null; - - if (stat == null) { - if (log.isTraceEnabled()) - log.trace("zookeeper did not contain " + zPath); - } else { - try { - data = zooKeeper.getData(zPath, watcher, stat); - } catch (KeeperException.BadVersionException e1) { - throw new ConcurrentModificationException(); - } catch (KeeperException.NoNodeException e2) { - throw new ConcurrentModificationException(); + public byte[] run(ZooKeeper zooKeeper) throws KeeperException, InterruptedException { + Stat stat = null; + cacheReadLock.lock(); + try { + if (cache.containsKey(zPath)) { + stat = statCache.get(zPath); + copyStats(status, stat); + return cache.get(zPath); } - if (log.isTraceEnabled()) - log.trace("zookeeper contained " + zPath + " " + (data == null ? null : new String(data, UTF_8))); + } finally { + cacheReadLock.unlock(); } - if (log.isTraceEnabled()) - log.trace("putting " + zPath + " " + (data == null ? null : new String(data, UTF_8)) + " in cache"); - put(zPath, data, stat); - } + /* + * The following call to exists() is important, since we are caching that a node does not exist. Once the node comes into existence, it will be added to - * the cache. But this notification of a node coming into existence will only be given if exists() was previously called. - * - * If the call to exists() is bypassed and only getData() is called with a special case that looks for Code.NONODE in the KeeperException, then - * non-existence can not be cached. ++ * the cache. But this notification of a node coming into existence will only be given if exists() was previously called. If the call to exists() is ++ * bypassed and only getData() is called with a special case that looks for Code.NONODE in the KeeperException, then non-existence can not be cached. + */ + cacheWriteLock.lock(); + try { + stat = zooKeeper.exists(zPath, watcher); + byte[] data = null; + if (stat == null) { + if (log.isTraceEnabled()) { + log.trace("zookeeper did not contain " + zPath); + } + } else { + try { + data = zooKeeper.getData(zPath, watcher, stat); + } catch (KeeperException.BadVersionException e1) { + throw new ConcurrentModificationException(); + } catch (KeeperException.NoNodeException e2) { + throw new ConcurrentModificationException(); + } + if (log.isTraceEnabled()) { + log.trace("zookeeper contained " + zPath + " " + (data == null ? null : new String(data, UTF_8))); + } + } + put(zPath, data, stat); + copyStats(status, stat); + statCache.put(zPath, stat); + return data; + } finally { + cacheWriteLock.unlock(); + } + } }; - retry(zr); + return zr.retry(); + } - if (stat != null) { - Stat cstat = statCache.get(zPath); - if (cstat != null) { - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(baos); - cstat.write(dos); - dos.close(); - - ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); - DataInputStream dis = new DataInputStream(bais); - stat.readFields(dis); - - dis.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } + /** + * Helper method to copy stats from the cached stat into userStat + * + * @param userStat + * user Stat object + * @param cachedStat + * cached statistic, that is or will be cached + */ + protected void copyStats(Stat userStat, Stat cachedStat) { + if (userStat != null && cachedStat != null) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + cachedStat.write(dos); + dos.close(); + + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + DataInputStream dis = new DataInputStream(bais); + userStat.readFields(dis); + + dis.close(); + } catch (IOException e) { + throw new RuntimeException(e); } } - - return cache.get(zPath); } - private synchronized void put(String zPath, byte[] data, Stat stat) { - cache.put(zPath, data); - statCache.put(zPath, stat); + private void put(String zPath, byte[] data, Stat stat) { + cacheWriteLock.lock(); + try { + cache.put(zPath, data); + statCache.put(zPath, stat); + } finally { + cacheWriteLock.unlock(); + } } - private synchronized void remove(String zPath) { - if (log.isTraceEnabled()) - log.trace("removing " + zPath + " from cache"); - cache.remove(zPath); - childrenCache.remove(zPath); - statCache.remove(zPath); + private void remove(String zPath) { + cacheWriteLock.lock(); + try { + cache.remove(zPath); + childrenCache.remove(zPath); + statCache.remove(zPath); + } finally { + cacheWriteLock.unlock(); + } } /**