Repository: accumulo Updated Branches: refs/heads/master 9abe28d6a -> 6d2cd6596
ACCUMULO-3349 fixing concurrency of LRU block cache Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/6d2cd659 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/6d2cd659 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/6d2cd659 Branch: refs/heads/master Commit: 6d2cd6596fc148e7eb951ea42a45fd720445aad5 Parents: 9abe28d Author: John Vines <vi...@apache.org> Authored: Mon Apr 13 10:39:52 2015 -0400 Committer: John Vines <vi...@apache.org> Committed: Mon Apr 13 10:39:52 2015 -0400 ---------------------------------------------------------------------- .../file/blockfile/cache/LruBlockCache.java | 24 ++++++++++++-------- 1 file changed, 15 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/6d2cd659/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java index d4b875c..2bd1a38 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java @@ -198,7 +198,7 @@ public class LruBlockCache implements BlockCache, HeapSize { } this.maxSize = maxSize; this.blockSize = blockSize; - map = new ConcurrentHashMap<String,CachedBlock>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel); + map = new ConcurrentHashMap<>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel); this.minFactor = minFactor; this.acceptableFactor = acceptableFactor; this.singleFactor = singleFactor; @@ -254,14 +254,20 @@ public class LruBlockCache implements BlockCache, HeapSize { if (cb != null) { stats.duplicateReads(); cb.access(count.incrementAndGet()); - } else { cb = new CachedBlock(blockName, buf, count.incrementAndGet(), inMemory); - long newSize = size.addAndGet(cb.heapSize()); - map.put(blockName, cb); - elements.incrementAndGet(); - if (newSize > acceptableSize() && !evictionInProgress) { - runEviction(); + CachedBlock currCb = map.putIfAbsent(blockName, cb); + if (currCb != null) { + stats.duplicateReads(); + cb = currCb; + cb.access(count.incrementAndGet()); + } else { + // Actually added block to cache + long newSize = size.addAndGet(cb.heapSize()); + elements.incrementAndGet(); + if (newSize > acceptableSize() && !evictionInProgress) { + runEviction(); + } } } @@ -364,7 +370,7 @@ public class LruBlockCache implements BlockCache, HeapSize { } } - PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<BlockBucket>(3); + PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<>(3); bucketQueue.add(bucketSingle); bucketQueue.add(bucketMulti); @@ -517,7 +523,7 @@ public class LruBlockCache implements BlockCache, HeapSize { public EvictionThread(LruBlockCache cache) { super("LruBlockCache.EvictionThread"); setDaemon(true); - this.cache = new WeakReference<LruBlockCache>(cache); + this.cache = new WeakReference<>(cache); } public synchronized boolean running() {