ACCUMULO-4463: Make block cache implementation pluggable

Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/452732c8
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/452732c8
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/452732c8

Branch: refs/heads/master
Commit: 452732c8ad91c4a73a556cdc2030e0a5cbbd173b
Parents: 3f1b0f3
Author: Dave Marion <dlmar...@apache.org>
Authored: Mon May 22 13:08:41 2017 -0400
Committer: Dave Marion <dlmar...@apache.org>
Committed: Mon May 22 13:08:41 2017 -0400

----------------------------------------------------------------------
 core/src/main/findbugs/exclude-filter.xml       |   4 +-
 .../core/client/rfile/RFileScanner.java         |  57 +-
 .../org/apache/accumulo/core/conf/Property.java |   4 +-
 .../core/file/blockfile/cache/BlockCache.java   |   6 +
 .../cache/BlockCacheConfiguration.java          |  91 +++
 .../file/blockfile/cache/BlockCacheManager.java | 107 +++
 .../core/file/blockfile/cache/CacheType.java    |  24 +
 .../core/file/blockfile/cache/CachedBlock.java  |   2 +-
 .../file/blockfile/cache/LruBlockCache.java     | 703 -------------------
 .../file/blockfile/cache/TinyLfuBlockCache.java | 141 ----
 .../file/blockfile/cache/lru/LruBlockCache.java | 625 +++++++++++++++++
 .../cache/lru/LruBlockCacheConfiguration.java   | 212 ++++++
 .../cache/lru/LruBlockCacheManager.java         |  49 ++
 .../cache/tinylfu/TinyLfuBlockCache.java        | 150 ++++
 .../tinylfu/TinyLfuBlockCacheConfiguration.java |  32 +
 .../cache/tinylfu/TinyLfuBlockCacheManager.java |  37 +
 .../accumulo/core/summary/SummaryReader.java    |   5 +
 .../blockfile/cache/BlockCacheFactoryTest.java  |  53 ++
 .../cache/BlockConfigurationHelperTest.java     |  32 +
 .../file/blockfile/cache/TestLruBlockCache.java | 194 ++---
 .../accumulo/core/file/rfile/RFileTest.java     |  25 +-
 .../tserver/TabletServerResourceManager.java    |  42 +-
 22 files changed, 1625 insertions(+), 970 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/findbugs/exclude-filter.xml
----------------------------------------------------------------------
diff --git a/core/src/main/findbugs/exclude-filter.xml 
b/core/src/main/findbugs/exclude-filter.xml
index 95aca41..56d3659 100644
--- a/core/src/main/findbugs/exclude-filter.xml
+++ b/core/src/main/findbugs/exclude-filter.xml
@@ -67,12 +67,12 @@
   </Match>
   <Match>
     <!-- bad practice to start a thread in constructor; we should be careful 
using this class -->
-    <Class name="org.apache.accumulo.core.file.blockfile.cache.LruBlockCache" 
/>
+    <Class 
name="org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCache" />
     <Bug code="SC" pattern="SC_START_IN_CTOR" />
   </Match>
   <Match>
     <!-- locking is confusing, but probably correct -->
-    <Class 
name="org.apache.accumulo.core.file.blockfile.cache.LruBlockCache$EvictionThread"
 />
+    <Class 
name="org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCache$EvictionThread"
 />
     <Or>
       <Bug code="NN" pattern="NN_NAKED_NOTIFY" />
       <Bug code="UW" pattern="UW_UNCOND_WAIT" />

http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java 
b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
index c95531b..3e6239d 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
@@ -43,8 +43,9 @@ import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCacheManager;
 import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
-import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.CacheType;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.core.file.rfile.RFile.Reader;
@@ -67,6 +68,7 @@ class RFileScanner extends ScannerOptions implements Scanner {
   private static final Range EMPTY_RANGE = new Range();
 
   private Range range;
+  private BlockCacheManager blockCacheManager = null;
   private BlockCache dataCache = null;
   private BlockCache indexCache = null;
   private Opts opts;
@@ -109,6 +111,11 @@ class RFileScanner extends ScannerOptions implements 
Scanner {
     }
 
     @Override
+    public long getMaxHeapSize() {
+      return getMaxSize();
+    }
+
+    @Override
     public long getMaxSize() {
       return Integer.MAX_VALUE;
     }
@@ -135,15 +142,34 @@ class RFileScanner extends ScannerOptions implements 
Scanner {
     }
 
     this.opts = opts;
-    if (opts.indexCacheSize > 0) {
-      this.indexCache = new LruBlockCache(opts.indexCacheSize, 
CACHE_BLOCK_SIZE);
-    } else {
+
+    if (opts.indexCacheSize > 0 || opts.dataCacheSize > 0) {
+      ConfigurationCopy cc = new 
ConfigurationCopy(DefaultConfiguration.getInstance());
+      if (null != opts.tableConfig) {
+        opts.tableConfig.forEach(cc::set);
+      }
+
+      try {
+        blockCacheManager = BlockCacheManager.getClientInstance(cc);
+        if (opts.indexCacheSize > 0) {
+          cc.set(Property.TSERV_INDEXCACHE_SIZE, 
Long.toString(opts.indexCacheSize));
+        }
+        if (opts.dataCacheSize > 0) {
+          cc.set(Property.TSERV_DATACACHE_SIZE, 
Long.toString(opts.dataCacheSize));
+        }
+        blockCacheManager.start(cc);
+        this.indexCache = blockCacheManager.getBlockCache(CacheType.INDEX);
+        this.dataCache = blockCacheManager.getBlockCache(CacheType.DATA);
+      } catch (RuntimeException e) {
+        throw e;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+    if (null == indexCache) {
       this.indexCache = new NoopCache();
     }
-
-    if (opts.dataCacheSize > 0) {
-      this.dataCache = new LruBlockCache(opts.dataCacheSize, CACHE_BLOCK_SIZE);
-    } else {
+    if (null == this.dataCache) {
       this.dataCache = new NoopCache();
     }
   }
@@ -326,14 +352,6 @@ class RFileScanner extends ScannerOptions implements 
Scanner {
 
   @Override
   public void close() {
-    if (dataCache instanceof LruBlockCache) {
-      ((LruBlockCache) dataCache).shutdown();
-    }
-
-    if (indexCache instanceof LruBlockCache) {
-      ((LruBlockCache) indexCache).shutdown();
-    }
-
     try {
       for (RFileSource source : opts.in.getSources()) {
         source.getInputStream().close();
@@ -341,5 +359,12 @@ class RFileScanner extends ScannerOptions implements 
Scanner {
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
+    try {
+      if (null != this.blockCacheManager) {
+        this.blockCacheManager.stop();
+      }
+    } catch (Exception e1) {
+      throw new RuntimeException(e1);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 5480867..a506e07 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -245,7 +245,9 @@ public enum Property {
   TSERV_PREFIX("tserver.", null, PropertyType.PREFIX, "Properties in this 
category affect the behavior of the tablet servers"),
   TSERV_CLIENT_TIMEOUT("tserver.client.timeout", "3s", 
PropertyType.TIMEDURATION, "Time to wait for clients to continue scans before 
closing a session."),
   TSERV_DEFAULT_BLOCKSIZE("tserver.default.blocksize", "1M", 
PropertyType.BYTES, "Specifies a default blocksize for the tserver caches"),
-  TSERV_CACHE_POLICY("tserver.cache.policy", "LRU", PropertyType.STRING, 
"Specifies the eviction policy of the file data caches (LRU or TinyLFU)."),
+  TSERV_CACHE_MANAGER_IMPL("tserver.cache.manager.class", 
"org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheManager", 
PropertyType.STRING,
+      "Specifies the class name of the block cache factory implementation. 
Alternative implementation is "
+          + 
"org.apache.accumulo.core.file.blockfile.cache.tinylfu.TinyLfuBlockCacheManager"),
   TSERV_DATACACHE_SIZE("tserver.cache.data.size", "10%", PropertyType.MEMORY, 
"Specifies the size of the cache for file data blocks."),
   TSERV_INDEXCACHE_SIZE("tserver.cache.index.size", "25%", 
PropertyType.MEMORY, "Specifies the size of the cache for file indices."),
   TSERV_SUMMARYCACHE_SIZE("tserver.cache.summary.size", "10%", 
PropertyType.MEMORY, "Specifies the size of the cache for summary data on each 
tablet server."),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
index 82f8b1e..b27c918 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.core.file.blockfile.cache;
  * Block cache interface.
  */
 public interface BlockCache {
+
   /**
    * Add block to cache.
    *
@@ -53,6 +54,11 @@ public interface BlockCache {
   CacheEntry getBlock(String blockName);
 
   /**
+   * Get the maximum amount of on heap memory this cache will use.
+   */
+  long getMaxHeapSize();
+
+  /**
    * Get the maximum size of this cache.
    *
    * @return max size in bytes

http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfiguration.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfiguration.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfiguration.java
new file mode 100644
index 0000000..efab628
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfiguration.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache;
+
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+
+public class BlockCacheConfiguration {
+
+  public static final String CACHE_PROPERTY_BASE = 
Property.GENERAL_ARBITRARY_PROP_PREFIX + "cache.block.";
+
+  /** Maximum allowable size of cache (block put if size > max, evict) */
+  private final long maxSize;
+
+  /** Approximate block size */
+  private final long blockSize;
+
+  private final Map<String,String> genProps;
+
+  private final String prefix;
+
+  private final String defaultPrefix;
+
+  public BlockCacheConfiguration(AccumuloConfiguration conf, CacheType type, 
String implName) {
+    defaultPrefix = getDefaultPrefix(implName);
+    prefix = getPrefix(type, implName);
+    genProps = 
conf.getAllPropertiesWithPrefix(Property.GENERAL_ARBITRARY_PROP_PREFIX);
+
+    switch (type) {
+      case INDEX:
+        this.maxSize = conf.getAsBytes(Property.TSERV_INDEXCACHE_SIZE);
+        break;
+      case DATA:
+        this.maxSize = conf.getAsBytes(Property.TSERV_DATACACHE_SIZE);
+        break;
+      case SUMMARY:
+        this.maxSize = conf.getAsBytes(Property.TSERV_SUMMARYCACHE_SIZE);
+        break;
+      default:
+        throw new IllegalArgumentException("Unknown block cache type");
+    }
+    this.blockSize = conf.getAsBytes(Property.TSERV_DEFAULT_BLOCKSIZE);
+  }
+
+  public long getMaxSize() {
+    return this.maxSize;
+  }
+
+  public long getBlockSize() {
+    return this.blockSize;
+  }
+
+  protected Optional<String> get(String suffix) {
+    String val = genProps.get(prefix + suffix);
+    if (val == null) {
+      val = genProps.get(defaultPrefix + suffix);
+    }
+    return Optional.ofNullable(val);
+  }
+
+  public static String getDefaultPrefix(String implName) {
+    return CACHE_PROPERTY_BASE + implName + ".default.";
+  }
+
+  public static String getPrefix(CacheType type, String implName) {
+    return CACHE_PROPERTY_BASE + implName + "." + type.name().toLowerCase() + 
".";
+  }
+
+  @Override
+  public String toString() {
+    return "maxSize: " + getMaxSize() + ", blockSize: " + getBlockSize();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheManager.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheManager.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheManager.java
new file mode 100644
index 0000000..be77ee2
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheManager.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class BlockCacheManager {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BlockCacheManager.class);
+
+  private final Map<CacheType,BlockCache> caches = new HashMap<>();
+
+  /**
+   * Initialize the caches for each CacheType based on the configuration
+   *
+   * @param conf
+   *          accumulo configuration
+   */
+  public void start(AccumuloConfiguration conf) {
+    for (CacheType type : CacheType.values()) {
+      BlockCache cache = this.createCache(conf, type);
+      this.caches.put(type, cache);
+    }
+  }
+
+  /**
+   * Stop caches and release resources
+   */
+  public void stop() {
+    this.caches.clear();
+  }
+
+  /**
+   * Get the block cache of the given type
+   *
+   * @param type
+   *          block cache type
+   * @return BlockCache or null if not enabled
+   */
+  public BlockCache getBlockCache(CacheType type) {
+    return caches.get(type);
+  }
+
+  /**
+   * Create a block cache using the supplied configuration
+   *
+   * @param conf
+   *          cache configuration
+   * @return configured block cache
+   */
+  protected abstract BlockCache createCache(AccumuloConfiguration conf, 
CacheType type);
+
+  /**
+   * Get the BlockCacheFactory specified by the property 
'tserver.cache.factory.class' using the AccumuloVFSClassLoader
+   *
+   * @param conf
+   *          accumulo configuration
+   * @return block cache manager instance
+   * @throws Exception
+   *           error loading block cache manager implementation class
+   */
+  public static synchronized BlockCacheManager 
getInstance(AccumuloConfiguration conf) throws Exception {
+    String impl = conf.get(Property.TSERV_CACHE_MANAGER_IMPL);
+    Class<? extends BlockCacheManager> clazz = 
AccumuloVFSClassLoader.loadClass(impl, BlockCacheManager.class);
+    LOG.info("Created new block cache manager of type: {}", 
clazz.getSimpleName());
+    return (BlockCacheManager) clazz.newInstance();
+  }
+
+  /**
+   * Get the BlockCacheFactory specified by the property 
'tserver.cache.factory.class'
+   *
+   * @param conf
+   *          accumulo configuration
+   * @return block cache manager instance
+   * @throws Exception
+   *           error loading block cache manager implementation class
+   */
+  public static synchronized BlockCacheManager 
getClientInstance(AccumuloConfiguration conf) throws Exception {
+    String impl = conf.get(Property.TSERV_CACHE_MANAGER_IMPL);
+    Class<? extends BlockCacheManager> clazz = 
Class.forName(impl).asSubclass(BlockCacheManager.class);
+    LOG.info("Created new block cache factory of type: {}", 
clazz.getSimpleName());
+    return (BlockCacheManager) clazz.newInstance();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheType.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheType.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheType.java
new file mode 100644
index 0000000..9e26c05
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheType.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache;
+
+public enum CacheType {
+
+  DATA, INDEX, SUMMARY;
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java
index c67b4c7..44cea6b 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java
@@ -31,7 +31,7 @@ public class CachedBlock implements HeapSize, 
Comparable<CachedBlock>, CacheEntr
   public final static long PER_BLOCK_OVERHEAD = 
ClassSize.align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + (2 * 
SizeConstants.SIZEOF_LONG)
       + ClassSize.STRING + ClassSize.BYTE_BUFFER);
 
-  static enum BlockPriority {
+  public static enum BlockPriority {
     /**
      * Accessed a single time (used for scan-resistance)
      */

http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
deleted file mode 100644
index cbdaca5..0000000
--- 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
+++ /dev/null
@@ -1,703 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.file.blockfile.cache;
-
-import java.lang.ref.WeakReference;
-import java.util.Objects;
-import java.util.PriorityQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.accumulo.core.util.NamingThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A block cache implementation that is memory-aware using {@link HeapSize}, 
memory-bound using an LRU eviction algorithm, and concurrent: backed by a
- * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving 
constant-time {@link #cacheBlock} and {@link #getBlock} operations.
- *
- * <p>
- * Contains three levels of block priority to allow for scan-resistance and 
in-memory families. A block is added with an inMemory flag if necessary, 
otherwise a
- * block becomes a single access priority. Once a blocked is accessed again, 
it changes to multiple access. This is used to prevent scans from thrashing the
- * cache, adding a least-frequently-used element to the eviction algorithm.
- *
- * <p>
- * Each priority is given its own chunk of the total cache to ensure fairness 
during eviction. Each priority will retain close to its maximum size, however, 
if
- * any priority is not using its entire chunk the others are able to grow 
beyond their chunk size.
- *
- * <p>
- * Instantiated at a minimum with the total size and average block size. All 
sizes are in bytes. The block size is not especially important as this cache is
- * fully dynamic in its sizing of blocks. It is only used for pre-allocating 
data structures and in initial heap estimation of the map.
- *
- * <p>
- * The detailed constructor defines the sizes for the three priorities (they 
should total to the maximum size defined). It also sets the levels that trigger 
and
- * control the eviction thread.
- *
- * <p>
- * The acceptable size is the cache size level which triggers the eviction 
process to start. It evicts enough blocks to get the size below the minimum size
- * specified.
- *
- * <p>
- * Eviction happens in a separate thread and involves a single full-scan of 
the map. It determines how many bytes must be freed to reach the minimum size, 
and
- * then while scanning determines the fewest least-recently-used blocks 
necessary from each of the three priorities (would be 3 times bytes to free). 
It then
- * uses the priority chunk sizes to evict fairly according to the relative 
sizes and usage.
- */
-public class LruBlockCache implements BlockCache, HeapSize {
-
-  private static final Logger log = 
LoggerFactory.getLogger(LruBlockCache.class);
-
-  /** Default Configuration Parameters */
-
-  /** Backing Concurrent Map Configuration */
-  static final float DEFAULT_LOAD_FACTOR = 0.75f;
-  static final int DEFAULT_CONCURRENCY_LEVEL = 16;
-
-  /** Eviction thresholds */
-  static final float DEFAULT_MIN_FACTOR = 0.75f;
-  static final float DEFAULT_ACCEPTABLE_FACTOR = 0.85f;
-
-  /** Priority buckets */
-  static final float DEFAULT_SINGLE_FACTOR = 0.25f;
-  static final float DEFAULT_MULTI_FACTOR = 0.50f;
-  static final float DEFAULT_MEMORY_FACTOR = 0.25f;
-
-  /** Statistics thread */
-  static final int statThreadPeriod = 60;
-
-  /** Concurrent map (the cache) */
-  private final ConcurrentHashMap<String,CachedBlock> map;
-
-  /** Eviction lock (locked when eviction in process) */
-  private final ReentrantLock evictionLock = new ReentrantLock(true);
-
-  /** Volatile boolean to track if we are in an eviction process or not */
-  private volatile boolean evictionInProgress = false;
-
-  /** Eviction thread */
-  private final EvictionThread evictionThread;
-
-  /** Statistics thread schedule pool (for heavy debugging, could remove) */
-  private final ScheduledExecutorService scheduleThreadPool = 
Executors.newScheduledThreadPool(1, new 
NamingThreadFactory("LRUBlockCacheStats"));
-
-  /** Current size of cache */
-  private final AtomicLong size;
-
-  /** Current number of cached elements */
-  private final AtomicLong elements;
-
-  /** Cache access count (sequential ID) */
-  private final AtomicLong count;
-
-  /** Cache statistics */
-  private final CacheStats stats;
-
-  /** Maximum allowable size of cache (block put if size > max, evict) */
-  private long maxSize;
-
-  /** Approximate block size */
-  private long blockSize;
-
-  /** Acceptable size of cache (no evictions if size < acceptable) */
-  private float acceptableFactor;
-
-  /** Minimum threshold of cache (when evicting, evict until size < min) */
-  private float minFactor;
-
-  /** Single access bucket size */
-  private float singleFactor;
-
-  /** Multiple access bucket size */
-  private float multiFactor;
-
-  /** In-memory bucket size */
-  private float memoryFactor;
-
-  /** Overhead of the structure itself */
-  private long overhead;
-
-  /**
-   * Default constructor. Specify maximum size and expected average block size 
(approximation is fine).
-   *
-   * <p>
-   * All other factors will be calculated based on defaults specified in this 
class.
-   *
-   * @param maxSize
-   *          maximum size of cache, in bytes
-   * @param blockSize
-   *          approximate size of each block, in bytes
-   */
-  public LruBlockCache(long maxSize, long blockSize) {
-    this(maxSize, blockSize, true);
-  }
-
-  /**
-   * Constructor used for testing. Allows disabling of the eviction thread.
-   */
-  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
-    this(maxSize, blockSize, evictionThread, (int) Math.ceil(1.2 * maxSize / 
blockSize), DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL, DEFAULT_MIN_FACTOR,
-        DEFAULT_ACCEPTABLE_FACTOR, DEFAULT_SINGLE_FACTOR, 
DEFAULT_MULTI_FACTOR, DEFAULT_MEMORY_FACTOR);
-  }
-
-  /**
-   * Configurable constructor. Use this constructor if not using defaults.
-   *
-   * @param maxSize
-   *          maximum size of this cache, in bytes
-   * @param blockSize
-   *          expected average size of blocks, in bytes
-   * @param evictionThread
-   *          whether to run evictions in a bg thread or not
-   * @param mapInitialSize
-   *          initial size of backing ConcurrentHashMap
-   * @param mapLoadFactor
-   *          initial load factor of backing ConcurrentHashMap
-   * @param mapConcurrencyLevel
-   *          initial concurrency factor for backing CHM
-   * @param minFactor
-   *          percentage of total size that eviction will evict until
-   * @param acceptableFactor
-   *          percentage of total size that triggers eviction
-   * @param singleFactor
-   *          percentage of total size for single-access blocks
-   * @param multiFactor
-   *          percentage of total size for multiple-access blocks
-   * @param memoryFactor
-   *          percentage of total size for in-memory blocks
-   */
-  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, 
int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel, float 
minFactor,
-      float acceptableFactor, float singleFactor, float multiFactor, float 
memoryFactor) {
-    if (singleFactor + multiFactor + memoryFactor != 1) {
-      throw new IllegalArgumentException("Single, multi, and memory factors " 
+ " should total 1.0");
-    }
-    if (minFactor >= acceptableFactor) {
-      throw new IllegalArgumentException("minFactor must be smaller than 
acceptableFactor");
-    }
-    if (minFactor >= 1.0f || acceptableFactor >= 1.0f) {
-      throw new IllegalArgumentException("all factors must be < 1");
-    }
-    this.maxSize = maxSize;
-    this.blockSize = blockSize;
-    map = new ConcurrentHashMap<>(mapInitialSize, mapLoadFactor, 
mapConcurrencyLevel);
-    this.minFactor = minFactor;
-    this.acceptableFactor = acceptableFactor;
-    this.singleFactor = singleFactor;
-    this.multiFactor = multiFactor;
-    this.memoryFactor = memoryFactor;
-    this.stats = new CacheStats();
-    this.count = new AtomicLong(0);
-    this.elements = new AtomicLong(0);
-    this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
-    this.size = new AtomicLong(this.overhead);
-
-    if (evictionThread) {
-      this.evictionThread = new EvictionThread(this);
-      this.evictionThread.start();
-      while (!this.evictionThread.running()) {
-        try {
-          Thread.sleep(10);
-        } catch (InterruptedException ex) {
-          throw new RuntimeException(ex);
-        }
-      }
-    } else {
-      this.evictionThread = null;
-    }
-    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), 
statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
-  }
-
-  public void setMaxSize(long maxSize) {
-    this.maxSize = maxSize;
-    if (this.size.get() > acceptableSize() && !evictionInProgress) {
-      runEviction();
-    }
-  }
-
-  // BlockCache implementation
-
-  /**
-   * Cache the block with the specified name and buffer.
-   * <p>
-   * It is assumed this will NEVER be called on an already cached block. If 
that is done, it is assumed that you are reinserting the same exact block due 
to a
-   * race condition and will update the buffer but not modify the size of the 
cache.
-   *
-   * @param blockName
-   *          block name
-   * @param buf
-   *          block buffer
-   * @param inMemory
-   *          if block is in-memory
-   */
-  @Override
-  public CacheEntry cacheBlock(String blockName, byte buf[], boolean inMemory) 
{
-    CachedBlock cb = map.get(blockName);
-    if (cb != null) {
-      stats.duplicateReads();
-      cb.access(count.incrementAndGet());
-    } else {
-      cb = new CachedBlock(blockName, buf, count.incrementAndGet(), inMemory);
-      CachedBlock currCb = map.putIfAbsent(blockName, cb);
-      if (currCb != null) {
-        stats.duplicateReads();
-        cb = currCb;
-        cb.access(count.incrementAndGet());
-      } else {
-        // Actually added block to cache
-        long newSize = size.addAndGet(cb.heapSize());
-        elements.incrementAndGet();
-        if (newSize > acceptableSize() && !evictionInProgress) {
-          runEviction();
-        }
-      }
-    }
-
-    return cb;
-  }
-
-  /**
-   * Cache the block with the specified name and buffer.
-   * <p>
-   * It is assumed this will NEVER be called on an already cached block. If 
that is done, it is assumed that you are reinserting the same exact block due 
to a
-   * race condition and will update the buffer but not modify the size of the 
cache.
-   *
-   * @param blockName
-   *          block name
-   * @param buf
-   *          block buffer
-   */
-  @Override
-  public CacheEntry cacheBlock(String blockName, byte buf[]) {
-    return cacheBlock(blockName, buf, false);
-  }
-
-  /**
-   * Get the buffer of the block with the specified name.
-   *
-   * @param blockName
-   *          block name
-   * @return buffer of specified block name, or null if not in cache
-   */
-  @Override
-  public CachedBlock getBlock(String blockName) {
-    CachedBlock cb = map.get(blockName);
-    if (cb == null) {
-      stats.miss();
-      return null;
-    }
-    stats.hit();
-    cb.access(count.incrementAndGet());
-    return cb;
-  }
-
-  protected long evictBlock(CachedBlock block) {
-    map.remove(block.getName());
-    size.addAndGet(-1 * block.heapSize());
-    elements.decrementAndGet();
-    stats.evicted();
-    return block.heapSize();
-  }
-
-  /**
-   * Multi-threaded call to run the eviction process.
-   */
-  private void runEviction() {
-    if (evictionThread == null) {
-      evict();
-    } else {
-      evictionThread.evict();
-    }
-  }
-
-  /**
-   * Eviction method.
-   */
-  void evict() {
-
-    // Ensure only one eviction at a time
-    if (!evictionLock.tryLock())
-      return;
-
-    try {
-      evictionInProgress = true;
-
-      long bytesToFree = size.get() - minSize();
-
-      log.trace("Block cache LRU eviction started.  Attempting to free {} 
bytes", bytesToFree);
-
-      if (bytesToFree <= 0)
-        return;
-
-      // Instantiate priority buckets
-      BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize, 
singleSize());
-      BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize, 
multiSize());
-      BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize, 
memorySize());
-
-      // Scan entire map putting into appropriate buckets
-      for (CachedBlock cachedBlock : map.values()) {
-        switch (cachedBlock.getPriority()) {
-          case SINGLE: {
-            bucketSingle.add(cachedBlock);
-            break;
-          }
-          case MULTI: {
-            bucketMulti.add(cachedBlock);
-            break;
-          }
-          case MEMORY: {
-            bucketMemory.add(cachedBlock);
-            break;
-          }
-        }
-      }
-
-      PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<>(3);
-
-      bucketQueue.add(bucketSingle);
-      bucketQueue.add(bucketMulti);
-      bucketQueue.add(bucketMemory);
-
-      int remainingBuckets = 3;
-      long bytesFreed = 0;
-
-      BlockBucket bucket;
-      while ((bucket = bucketQueue.poll()) != null) {
-        long overflow = bucket.overflow();
-        if (overflow > 0) {
-          long bucketBytesToFree = Math.min(overflow, (long) 
Math.ceil((bytesToFree - bytesFreed) / (double) remainingBuckets));
-          bytesFreed += bucket.free(bucketBytesToFree);
-        }
-        remainingBuckets--;
-      }
-
-      float singleMB = ((float) bucketSingle.totalSize()) / ((float) (1024 * 
1024));
-      float multiMB = ((float) bucketMulti.totalSize()) / ((float) (1024 * 
1024));
-      float memoryMB = ((float) bucketMemory.totalSize()) / ((float) (1024 * 
1024));
-
-      log.trace("Block cache LRU eviction completed. Freed {} bytes. Priority 
Sizes: Single={}MB ({}), Multi={}MB ({}), Memory={}MB ({})", bytesFreed,
-          singleMB, bucketSingle.totalSize(), multiMB, 
bucketMulti.totalSize(), memoryMB, bucketMemory.totalSize());
-
-    } finally {
-      stats.evict();
-      evictionInProgress = false;
-      evictionLock.unlock();
-    }
-  }
-
-  /**
-   * Used to group blocks into priority buckets. There will be a BlockBucket 
for each priority (single, multi, memory). Once bucketed, the eviction algorithm
-   * takes the appropriate number of elements out of each according to 
configuration parameters and their relatives sizes.
-   */
-  private class BlockBucket implements Comparable<BlockBucket> {
-
-    private CachedBlockQueue queue;
-    private long totalSize = 0;
-    private long bucketSize;
-
-    public BlockBucket(long bytesToFree, long blockSize, long bucketSize) {
-      this.bucketSize = bucketSize;
-      queue = new CachedBlockQueue(bytesToFree, blockSize);
-      totalSize = 0;
-    }
-
-    public void add(CachedBlock block) {
-      totalSize += block.heapSize();
-      queue.add(block);
-    }
-
-    public long free(long toFree) {
-      CachedBlock[] blocks = queue.get();
-      long freedBytes = 0;
-      for (int i = 0; i < blocks.length; i++) {
-        freedBytes += evictBlock(blocks[i]);
-        if (freedBytes >= toFree) {
-          return freedBytes;
-        }
-      }
-      return freedBytes;
-    }
-
-    public long overflow() {
-      return totalSize - bucketSize;
-    }
-
-    public long totalSize() {
-      return totalSize;
-    }
-
-    @Override
-    public int compareTo(BlockBucket that) {
-      if (this.overflow() == that.overflow())
-        return 0;
-      return this.overflow() > that.overflow() ? 1 : -1;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(overflow());
-    }
-
-    @Override
-    public boolean equals(Object that) {
-      if (that instanceof BlockBucket)
-        return compareTo((BlockBucket) that) == 0;
-      return false;
-    }
-  }
-
-  @Override
-  public long getMaxSize() {
-    return this.maxSize;
-  }
-
-  /**
-   * Get the current size of this cache.
-   *
-   * @return current size in bytes
-   */
-  public long getCurrentSize() {
-    return this.size.get();
-  }
-
-  /**
-   * Get the current size of this cache.
-   *
-   * @return current size in bytes
-   */
-  public long getFreeSize() {
-    return getMaxSize() - getCurrentSize();
-  }
-
-  /**
-   * Get the size of this cache (number of cached blocks)
-   *
-   * @return number of cached blocks
-   */
-  public long size() {
-    return this.elements.get();
-  }
-
-  /**
-   * Get the number of eviction runs that have occurred
-   */
-  public long getEvictionCount() {
-    return this.stats.getEvictionCount();
-  }
-
-  /**
-   * Get the number of blocks that have been evicted during the lifetime of 
this cache.
-   */
-  public long getEvictedCount() {
-    return this.stats.getEvictedCount();
-  }
-
-  /**
-   * Eviction thread. Sits in waiting state until an eviction is triggered 
when the cache size grows above the acceptable level.
-   *
-   * <p>
-   * Thread is triggered into action by {@link LruBlockCache#runEviction()}
-   */
-  private static class EvictionThread extends Thread {
-    private WeakReference<LruBlockCache> cache;
-    private boolean running = false;
-
-    public EvictionThread(LruBlockCache cache) {
-      super("LruBlockCache.EvictionThread");
-      setDaemon(true);
-      this.cache = new WeakReference<>(cache);
-    }
-
-    public synchronized boolean running() {
-      return running;
-    }
-
-    @Override
-    public void run() {
-      while (true) {
-        synchronized (this) {
-          running = true;
-          try {
-            this.wait();
-          } catch (InterruptedException e) {}
-        }
-        LruBlockCache cache = this.cache.get();
-        if (cache == null)
-          break;
-        cache.evict();
-      }
-    }
-
-    public void evict() {
-      synchronized (this) {
-        this.notify();
-      }
-    }
-  }
-
-  /*
-   * Statistics thread. Periodically prints the cache statistics to the log.
-   */
-  private static class StatisticsThread extends Thread {
-    LruBlockCache lru;
-
-    public StatisticsThread(LruBlockCache lru) {
-      super("LruBlockCache.StatisticsThread");
-      setDaemon(true);
-      this.lru = lru;
-    }
-
-    @Override
-    public void run() {
-      lru.logStats();
-    }
-  }
-
-  public void logStats() {
-    // Log size
-    long totalSize = heapSize();
-    long freeSize = maxSize - totalSize;
-    float sizeMB = ((float) totalSize) / ((float) (1024 * 1024));
-    float freeMB = ((float) freeSize) / ((float) (1024 * 1024));
-    float maxMB = ((float) maxSize) / ((float) (1024 * 1024));
-    log.debug("Cache Stats: Sizes: Total={}MB ({}), Free={}MB ({}), Max={}MB 
({}), Counts: Blocks={}, Access={}, Hit={}, Miss={}, Evictions={}, Evicted={},"
-        + "Ratios: Hit Ratio={}%, Miss Ratio={}%, Evicted/Run={}, Duplicate 
Reads={}", sizeMB, totalSize, freeMB, freeSize, maxMB, maxSize, size(),
-        stats.requestCount(), stats.hitCount(), stats.getMissCount(), 
stats.getEvictionCount(), stats.getEvictedCount(), stats.getHitRatio() * 100,
-        stats.getMissRatio() * 100, stats.evictedPerEviction(), 
stats.getDuplicateReads());
-  }
-
-  /**
-   * Get counter statistics for this cache.
-   *
-   * <p>
-   * Includes: total accesses, hits, misses, evicted blocks, and runs of the 
eviction processes.
-   */
-  public CacheStats getStats() {
-    return this.stats;
-  }
-
-  public static class CacheStats implements BlockCache.Stats {
-    private final AtomicLong accessCount = new AtomicLong(0);
-    private final AtomicLong hitCount = new AtomicLong(0);
-    private final AtomicLong missCount = new AtomicLong(0);
-    private final AtomicLong evictionCount = new AtomicLong(0);
-    private final AtomicLong evictedCount = new AtomicLong(0);
-    private final AtomicLong duplicateReads = new AtomicLong(0);
-
-    public void miss() {
-      missCount.incrementAndGet();
-      accessCount.incrementAndGet();
-    }
-
-    public void hit() {
-      hitCount.incrementAndGet();
-      accessCount.incrementAndGet();
-    }
-
-    public void evict() {
-      evictionCount.incrementAndGet();
-    }
-
-    public void duplicateReads() {
-      duplicateReads.incrementAndGet();
-    }
-
-    public void evicted() {
-      evictedCount.incrementAndGet();
-    }
-
-    @Override
-    public long requestCount() {
-      return accessCount.get();
-    }
-
-    public long getMissCount() {
-      return missCount.get();
-    }
-
-    @Override
-    public long hitCount() {
-      return hitCount.get();
-    }
-
-    public long getEvictionCount() {
-      return evictionCount.get();
-    }
-
-    public long getDuplicateReads() {
-      return duplicateReads.get();
-    }
-
-    public long getEvictedCount() {
-      return evictedCount.get();
-    }
-
-    public double getHitRatio() {
-      return ((float) hitCount() / (float) requestCount());
-    }
-
-    public double getMissRatio() {
-      return ((float) getMissCount() / (float) requestCount());
-    }
-
-    public double evictedPerEviction() {
-      return (float) getEvictedCount() / (float) getEvictionCount();
-    }
-  }
-
-  public final static long CACHE_FIXED_OVERHEAD = ClassSize.align((3 * 
SizeConstants.SIZEOF_LONG) + (8 * ClassSize.REFERENCE)
-      + (5 * SizeConstants.SIZEOF_FLOAT) + SizeConstants.SIZEOF_BOOLEAN + 
ClassSize.OBJECT);
-
-  // HeapSize implementation
-  @Override
-  public long heapSize() {
-    return getCurrentSize();
-  }
-
-  public static long calculateOverhead(long maxSize, long blockSize, int 
concurrency) {
-    return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP + ((int) 
Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY)
-        + (concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
-  }
-
-  // Simple calculators of sizes given factors and maxSize
-
-  private long acceptableSize() {
-    return (long) Math.floor(this.maxSize * this.acceptableFactor);
-  }
-
-  private long minSize() {
-    return (long) Math.floor(this.maxSize * this.minFactor);
-  }
-
-  private long singleSize() {
-    return (long) Math.floor(this.maxSize * this.singleFactor * 
this.minFactor);
-  }
-
-  private long multiSize() {
-    return (long) Math.floor(this.maxSize * this.multiFactor * this.minFactor);
-  }
-
-  private long memorySize() {
-    return (long) Math.floor(this.maxSize * this.memoryFactor * 
this.minFactor);
-  }
-
-  public void shutdown() {
-    this.scheduleThreadPool.shutdown();
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java
deleted file mode 100644
index bab52af..0000000
--- 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.file.blockfile.cache;
-
-import static java.util.Objects.requireNonNull;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import com.github.benmanes.caffeine.cache.Policy;
-import com.github.benmanes.caffeine.cache.stats.CacheStats;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-/**
- * A block cache that is memory bounded using the W-TinyLFU eviction 
algorithm. This implementation delegates to a Caffeine cache to provide 
concurrent O(1)
- * read and write operations.
- * <ul>
- * <li>W-TinyLFU: http://arxiv.org/pdf/1512.00727.pdf</li>
- * <li>Caffeine: https://github.com/ben-manes/caffeine</li>
- * <li>Cache design: 
http://highscalability.com/blog/2016/1/25/design-of-a-modern-cache.html</li>
- * </ul>
- */
-public final class TinyLfuBlockCache implements BlockCache {
-  private static final Logger log = 
LoggerFactory.getLogger(TinyLfuBlockCache.class);
-  private static final int STATS_PERIOD_SEC = 60;
-
-  private final Cache<String,Block> cache;
-  private final Policy.Eviction<String,Block> policy;
-  private final ScheduledExecutorService statsExecutor;
-
-  public TinyLfuBlockCache(long maxSize, long blockSize) {
-    cache = Caffeine.newBuilder().initialCapacity((int) Math.ceil(1.2 * 
maxSize / blockSize)).weigher((String blockName, Block block) -> {
-      int keyWeight = ClassSize.align(blockName.length()) + ClassSize.STRING;
-      return keyWeight + block.weight();
-    }).maximumWeight(maxSize).recordStats().build();
-    policy = cache.policy().eviction().get();
-
-    statsExecutor = Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryBuilder().setNameFormat("TinyLfuBlockCacheStatsExecutor").setDaemon(true)
-        .build());
-    statsExecutor.scheduleAtFixedRate(this::logStats, STATS_PERIOD_SEC, 
STATS_PERIOD_SEC, TimeUnit.SECONDS);
-  }
-
-  @Override
-  public long getMaxSize() {
-    return policy.getMaximum();
-  }
-
-  @Override
-  public CacheEntry getBlock(String blockName) {
-    return cache.getIfPresent(blockName);
-  }
-
-  @Override
-  public CacheEntry cacheBlock(String blockName, byte[] buffer) {
-    return cache.asMap().compute(blockName, (key, block) -> {
-      if (block == null) {
-        return new Block(buffer);
-      }
-      block.buffer = buffer;
-      return block;
-    });
-  }
-
-  @Override
-  public CacheEntry cacheBlock(String blockName, byte[] buffer, /* ignored 
*/boolean inMemory) {
-    return cacheBlock(blockName, buffer);
-  }
-
-  @Override
-  public BlockCache.Stats getStats() {
-    CacheStats stats = cache.stats();
-    return new BlockCache.Stats() {
-      @Override
-      public long hitCount() {
-        return stats.hitCount();
-      }
-
-      @Override
-      public long requestCount() {
-        return stats.requestCount();
-      }
-    };
-  }
-
-  private void logStats() {
-    double maxMB = ((double) policy.getMaximum()) / ((double) (1024 * 1024));
-    double sizeMB = ((double) policy.weightedSize().getAsLong()) / ((double) 
(1024 * 1024));
-    double freeMB = maxMB - sizeMB;
-    log.debug("Cache Size={}MB, Free={}MB, Max={}MB, Blocks={}", sizeMB, 
freeMB, maxMB, cache.estimatedSize());
-    log.debug(cache.stats().toString());
-  }
-
-  private static final class Block implements CacheEntry {
-    private volatile byte[] buffer;
-    private volatile Object index;
-
-    Block(byte[] buffer) {
-      this.buffer = requireNonNull(buffer);
-    }
-
-    @Override
-    public byte[] getBuffer() {
-      return buffer;
-    }
-
-    @Override
-    public Object getIndex() {
-      return index;
-    }
-
-    @Override
-    public void setIndex(Object index) {
-      this.index = index;
-    }
-
-    int weight() {
-      return ClassSize.align(buffer.length) + SizeConstants.SIZEOF_LONG + 
ClassSize.REFERENCE + ClassSize.OBJECT + ClassSize.ARRAY;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java
new file mode 100644
index 0000000..fa8d824
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java
@@ -0,0 +1,625 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache.lru;
+
+import java.lang.ref.WeakReference;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
+import org.apache.accumulo.core.file.blockfile.cache.CachedBlock;
+import org.apache.accumulo.core.file.blockfile.cache.CachedBlockQueue;
+import org.apache.accumulo.core.file.blockfile.cache.ClassSize;
+import org.apache.accumulo.core.file.blockfile.cache.HeapSize;
+import org.apache.accumulo.core.file.blockfile.cache.SizeConstants;
+import org.apache.accumulo.core.util.NamingThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A block cache implementation that is memory-aware using {@link HeapSize}, 
memory-bound using an LRU eviction algorithm, and concurrent: backed by a
+ * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving 
constant-time {@link #cacheBlock} and {@link #getBlock} operations.
+ *
+ * <p>
+ * Contains three levels of block priority to allow for scan-resistance and 
in-memory families. A block is added with an inMemory flag if necessary, 
otherwise a
+ * block becomes a single access priority. Once a blocked is accessed again, 
it changes to multiple access. This is used to prevent scans from thrashing the
+ * cache, adding a least-frequently-used element to the eviction algorithm.
+ *
+ * <p>
+ * Each priority is given its own chunk of the total cache to ensure fairness 
during eviction. Each priority will retain close to its maximum size, however, 
if
+ * any priority is not using its entire chunk the others are able to grow 
beyond their chunk size.
+ *
+ * <p>
+ * Instantiated at a minimum with the total size and average block size. All 
sizes are in bytes. The block size is not especially important as this cache is
+ * fully dynamic in its sizing of blocks. It is only used for pre-allocating 
data structures and in initial heap estimation of the map.
+ *
+ * <p>
+ * The detailed constructor defines the sizes for the three priorities (they 
should total to the maximum size defined). It also sets the levels that trigger 
and
+ * control the eviction thread.
+ *
+ * <p>
+ * The acceptable size is the cache size level which triggers the eviction 
process to start. It evicts enough blocks to get the size below the minimum size
+ * specified.
+ *
+ * <p>
+ * Eviction happens in a separate thread and involves a single full-scan of 
the map. It determines how many bytes must be freed to reach the minimum size, 
and
+ * then while scanning determines the fewest least-recently-used blocks 
necessary from each of the three priorities (would be 3 times bytes to free). 
It then
+ * uses the priority chunk sizes to evict fairly according to the relative 
sizes and usage.
+ */
+public class LruBlockCache implements BlockCache, HeapSize {
+
+  private static final Logger log = 
LoggerFactory.getLogger(LruBlockCache.class);
+
+  /** Statistics thread */
+  static final int statThreadPeriod = 60;
+
+  /** Concurrent map (the cache) */
+  private final ConcurrentHashMap<String,CachedBlock> map;
+
+  /** Eviction lock (locked when eviction in process) */
+  private final ReentrantLock evictionLock = new ReentrantLock(true);
+
+  /** Volatile boolean to track if we are in an eviction process or not */
+  private volatile boolean evictionInProgress = false;
+
+  /** Eviction thread */
+  private final EvictionThread evictionThread;
+
+  /** Statistics thread schedule pool (for heavy debugging, could remove) */
+  private final ScheduledExecutorService scheduleThreadPool = 
Executors.newScheduledThreadPool(1, new 
NamingThreadFactory("LRUBlockCacheStats"));
+
+  /** Current size of cache */
+  private final AtomicLong size;
+
+  /** Current number of cached elements */
+  private final AtomicLong elements;
+
+  /** Cache access count (sequential ID) */
+  private final AtomicLong count;
+
+  /** Cache statistics */
+  private final CacheStats stats;
+
+  /** Overhead of the structure itself */
+  private final long overhead;
+
+  private final LruBlockCacheConfiguration conf;
+
+  /**
+   * Default constructor. Specify maximum size and expected average block size 
(approximation is fine).
+   *
+   * <p>
+   * All other factors will be calculated based on defaults specified in this 
class.
+   *
+   * @param conf
+   *          block cache configuration
+   */
+  public LruBlockCache(final LruBlockCacheConfiguration conf) {
+    this.conf = conf;
+
+    int mapInitialSize = (int) Math.ceil(1.2 * conf.getMaxSize() / 
conf.getBlockSize());
+
+    map = new ConcurrentHashMap<>(mapInitialSize, conf.getMapLoadFactor(), 
conf.getMapConcurrencyLevel());
+    this.stats = new CacheStats();
+    this.count = new AtomicLong(0);
+    this.elements = new AtomicLong(0);
+    this.overhead = calculateOverhead(conf.getMaxSize(), conf.getBlockSize(), 
conf.getMapConcurrencyLevel());
+    this.size = new AtomicLong(this.overhead);
+
+    if (conf.isUseEvictionThread()) {
+      this.evictionThread = new EvictionThread(this);
+      this.evictionThread.start();
+      while (!this.evictionThread.running()) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException ex) {
+          throw new RuntimeException(ex);
+        }
+      }
+    } else {
+      this.evictionThread = null;
+    }
+    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), 
statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
+  }
+
+  public long getOverhead() {
+    return overhead;
+  }
+
+  // BlockCache implementation
+
+  /**
+   * Cache the block with the specified name and buffer.
+   * <p>
+   * It is assumed this will NEVER be called on an already cached block. If 
that is done, it is assumed that you are reinserting the same exact block due 
to a
+   * race condition and will update the buffer but not modify the size of the 
cache.
+   *
+   * @param blockName
+   *          block name
+   * @param buf
+   *          block buffer
+   * @param inMemory
+   *          if block is in-memory
+   */
+  @Override
+  public CacheEntry cacheBlock(String blockName, byte buf[], boolean inMemory) 
{
+    CachedBlock cb = map.get(blockName);
+    if (cb != null) {
+      stats.duplicateReads();
+      cb.access(count.incrementAndGet());
+    } else {
+      cb = new CachedBlock(blockName, buf, count.incrementAndGet(), inMemory);
+      CachedBlock currCb = map.putIfAbsent(blockName, cb);
+      if (currCb != null) {
+        stats.duplicateReads();
+        cb = currCb;
+        cb.access(count.incrementAndGet());
+      } else {
+        // Actually added block to cache
+        long newSize = size.addAndGet(cb.heapSize());
+        elements.incrementAndGet();
+        if (newSize > acceptableSize() && !evictionInProgress) {
+          runEviction();
+        }
+      }
+    }
+
+    return cb;
+  }
+
+  /**
+   * Cache the block with the specified name and buffer.
+   * <p>
+   * It is assumed this will NEVER be called on an already cached block. If 
that is done, it is assumed that you are reinserting the same exact block due 
to a
+   * race condition and will update the buffer but not modify the size of the 
cache.
+   *
+   * @param blockName
+   *          block name
+   * @param buf
+   *          block buffer
+   */
+  @Override
+  public CacheEntry cacheBlock(String blockName, byte buf[]) {
+    return cacheBlock(blockName, buf, false);
+  }
+
+  /**
+   * Get the buffer of the block with the specified name.
+   *
+   * @param blockName
+   *          block name
+   * @return buffer of specified block name, or null if not in cache
+   */
+  @Override
+  public CachedBlock getBlock(String blockName) {
+    CachedBlock cb = map.get(blockName);
+    if (cb == null) {
+      stats.miss();
+      return null;
+    }
+    stats.hit();
+    cb.access(count.incrementAndGet());
+    return cb;
+  }
+
+  protected long evictBlock(CachedBlock block) {
+    map.remove(block.getName());
+    size.addAndGet(-1 * block.heapSize());
+    elements.decrementAndGet();
+    stats.evicted();
+    return block.heapSize();
+  }
+
+  /**
+   * Multi-threaded call to run the eviction process.
+   */
+  private void runEviction() {
+    if (evictionThread == null) {
+      evict();
+    } else {
+      evictionThread.evict();
+    }
+  }
+
+  /**
+   * Eviction method.
+   */
+  void evict() {
+
+    // Ensure only one eviction at a time
+    if (!evictionLock.tryLock())
+      return;
+
+    try {
+      evictionInProgress = true;
+
+      long bytesToFree = size.get() - minSize();
+
+      log.trace("Block cache LRU eviction started.  Attempting to free {} 
bytes", bytesToFree);
+
+      if (bytesToFree <= 0)
+        return;
+
+      // Instantiate priority buckets
+      BlockBucket bucketSingle = new BlockBucket(bytesToFree, 
conf.getBlockSize(), singleSize());
+      BlockBucket bucketMulti = new BlockBucket(bytesToFree, 
conf.getBlockSize(), multiSize());
+      BlockBucket bucketMemory = new BlockBucket(bytesToFree, 
conf.getBlockSize(), memorySize());
+
+      // Scan entire map putting into appropriate buckets
+      for (CachedBlock cachedBlock : map.values()) {
+        switch (cachedBlock.getPriority()) {
+          case SINGLE: {
+            bucketSingle.add(cachedBlock);
+            break;
+          }
+          case MULTI: {
+            bucketMulti.add(cachedBlock);
+            break;
+          }
+          case MEMORY: {
+            bucketMemory.add(cachedBlock);
+            break;
+          }
+        }
+      }
+
+      PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<>(3);
+
+      bucketQueue.add(bucketSingle);
+      bucketQueue.add(bucketMulti);
+      bucketQueue.add(bucketMemory);
+
+      int remainingBuckets = 3;
+      long bytesFreed = 0;
+
+      BlockBucket bucket;
+      while ((bucket = bucketQueue.poll()) != null) {
+        long overflow = bucket.overflow();
+        if (overflow > 0) {
+          long bucketBytesToFree = Math.min(overflow, (long) 
Math.ceil((bytesToFree - bytesFreed) / (double) remainingBuckets));
+          bytesFreed += bucket.free(bucketBytesToFree);
+        }
+        remainingBuckets--;
+      }
+
+      float singleMB = ((float) bucketSingle.totalSize()) / ((float) (1024 * 
1024));
+      float multiMB = ((float) bucketMulti.totalSize()) / ((float) (1024 * 
1024));
+      float memoryMB = ((float) bucketMemory.totalSize()) / ((float) (1024 * 
1024));
+
+      log.trace("Block cache LRU eviction completed. Freed {} bytes. Priority 
Sizes: Single={}MB ({}), Multi={}MB ({}), Memory={}MB ({})", bytesFreed,
+          singleMB, bucketSingle.totalSize(), multiMB, 
bucketMulti.totalSize(), memoryMB, bucketMemory.totalSize());
+
+    } finally {
+      stats.evict();
+      evictionInProgress = false;
+      evictionLock.unlock();
+    }
+  }
+
+  /**
+   * Used to group blocks into priority buckets. There will be a BlockBucket 
for each priority (single, multi, memory). Once bucketed, the eviction algorithm
+   * takes the appropriate number of elements out of each according to 
configuration parameters and their relatives sizes.
+   */
+  private class BlockBucket implements Comparable<BlockBucket> {
+
+    private CachedBlockQueue queue;
+    private long totalSize = 0;
+    private long bucketSize;
+
+    public BlockBucket(long bytesToFree, long blockSize, long bucketSize) {
+      this.bucketSize = bucketSize;
+      queue = new CachedBlockQueue(bytesToFree, blockSize);
+      totalSize = 0;
+    }
+
+    public void add(CachedBlock block) {
+      totalSize += block.heapSize();
+      queue.add(block);
+    }
+
+    public long free(long toFree) {
+      CachedBlock[] blocks = queue.get();
+      long freedBytes = 0;
+      for (int i = 0; i < blocks.length; i++) {
+        freedBytes += evictBlock(blocks[i]);
+        if (freedBytes >= toFree) {
+          return freedBytes;
+        }
+      }
+      return freedBytes;
+    }
+
+    public long overflow() {
+      return totalSize - bucketSize;
+    }
+
+    public long totalSize() {
+      return totalSize;
+    }
+
+    @Override
+    public int compareTo(BlockBucket that) {
+      if (this.overflow() == that.overflow())
+        return 0;
+      return this.overflow() > that.overflow() ? 1 : -1;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(overflow());
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that instanceof BlockBucket)
+        return compareTo((BlockBucket) that) == 0;
+      return false;
+    }
+  }
+
+  @Override
+  public long getMaxHeapSize() {
+    return getMaxSize();
+  }
+
+  @Override
+  public long getMaxSize() {
+    return this.conf.getMaxSize();
+  }
+
+  /**
+   * Get the current size of this cache.
+   *
+   * @return current size in bytes
+   */
+  public long getCurrentSize() {
+    return this.size.get();
+  }
+
+  /**
+   * Get the current size of this cache.
+   *
+   * @return current size in bytes
+   */
+  public long getFreeSize() {
+    return getMaxSize() - getCurrentSize();
+  }
+
+  /**
+   * Get the size of this cache (number of cached blocks)
+   *
+   * @return number of cached blocks
+   */
+  public long size() {
+    return this.elements.get();
+  }
+
+  /**
+   * Get the number of eviction runs that have occurred
+   */
+  public long getEvictionCount() {
+    return this.stats.getEvictionCount();
+  }
+
+  /**
+   * Get the number of blocks that have been evicted during the lifetime of 
this cache.
+   */
+  public long getEvictedCount() {
+    return this.stats.getEvictedCount();
+  }
+
+  /**
+   * Eviction thread. Sits in waiting state until an eviction is triggered 
when the cache size grows above the acceptable level.
+   *
+   * <p>
+   * Thread is triggered into action by {@link LruBlockCache#runEviction()}
+   */
+  private static class EvictionThread extends Thread {
+    private WeakReference<LruBlockCache> cache;
+    private boolean running = false;
+
+    public EvictionThread(LruBlockCache cache) {
+      super("LruBlockCache.EvictionThread");
+      setDaemon(true);
+      this.cache = new WeakReference<>(cache);
+    }
+
+    public synchronized boolean running() {
+      return running;
+    }
+
+    @Override
+    public void run() {
+      while (true) {
+        synchronized (this) {
+          running = true;
+          try {
+            this.wait();
+          } catch (InterruptedException e) {}
+        }
+        LruBlockCache cache = this.cache.get();
+        if (cache == null)
+          break;
+        cache.evict();
+      }
+    }
+
+    public void evict() {
+      synchronized (this) {
+        this.notify();
+      }
+    }
+  }
+
+  /*
+   * Statistics thread. Periodically prints the cache statistics to the log.
+   */
+  private static class StatisticsThread extends Thread {
+    LruBlockCache lru;
+
+    public StatisticsThread(LruBlockCache lru) {
+      super("LruBlockCache.StatisticsThread");
+      setDaemon(true);
+      this.lru = lru;
+    }
+
+    @Override
+    public void run() {
+      lru.logStats();
+    }
+  }
+
+  public void logStats() {
+    // Log size
+    long totalSize = heapSize();
+    long freeSize = this.conf.getMaxSize() - totalSize;
+    float sizeMB = ((float) totalSize) / ((float) (1024 * 1024));
+    float freeMB = ((float) freeSize) / ((float) (1024 * 1024));
+    float maxMB = ((float) this.conf.getMaxSize()) / ((float) (1024 * 1024));
+    log.debug("Cache Stats: Sizes: Total={}MB ({}), Free={}MB ({}), Max={}MB 
({}), Counts: Blocks={}, Access={}, Hit={}, Miss={}, Evictions={}, Evicted={},"
+        + "Ratios: Hit Ratio={}%, Miss Ratio={}%, Evicted/Run={}, Duplicate 
Reads={}", sizeMB, totalSize, freeMB, freeSize, maxMB, this.conf.getMaxSize(),
+        size(), stats.requestCount(), stats.hitCount(), stats.getMissCount(), 
stats.getEvictionCount(), stats.getEvictedCount(), stats.getHitRatio() * 100,
+        stats.getMissRatio() * 100, stats.evictedPerEviction(), 
stats.getDuplicateReads());
+  }
+
+  /**
+   * Get counter statistics for this cache.
+   *
+   * <p>
+   * Includes: total accesses, hits, misses, evicted blocks, and runs of the 
eviction processes.
+   */
+  public CacheStats getStats() {
+    return this.stats;
+  }
+
+  public static class CacheStats implements BlockCache.Stats {
+    private final AtomicLong accessCount = new AtomicLong(0);
+    private final AtomicLong hitCount = new AtomicLong(0);
+    private final AtomicLong missCount = new AtomicLong(0);
+    private final AtomicLong evictionCount = new AtomicLong(0);
+    private final AtomicLong evictedCount = new AtomicLong(0);
+    private final AtomicLong duplicateReads = new AtomicLong(0);
+
+    public void miss() {
+      missCount.incrementAndGet();
+      accessCount.incrementAndGet();
+    }
+
+    public void hit() {
+      hitCount.incrementAndGet();
+      accessCount.incrementAndGet();
+    }
+
+    public void evict() {
+      evictionCount.incrementAndGet();
+    }
+
+    public void duplicateReads() {
+      duplicateReads.incrementAndGet();
+    }
+
+    public void evicted() {
+      evictedCount.incrementAndGet();
+    }
+
+    @Override
+    public long requestCount() {
+      return accessCount.get();
+    }
+
+    public long getMissCount() {
+      return missCount.get();
+    }
+
+    @Override
+    public long hitCount() {
+      return hitCount.get();
+    }
+
+    public long getEvictionCount() {
+      return evictionCount.get();
+    }
+
+    public long getDuplicateReads() {
+      return duplicateReads.get();
+    }
+
+    public long getEvictedCount() {
+      return evictedCount.get();
+    }
+
+    public double getHitRatio() {
+      return ((float) hitCount() / (float) requestCount());
+    }
+
+    public double getMissRatio() {
+      return ((float) getMissCount() / (float) requestCount());
+    }
+
+    public double evictedPerEviction() {
+      return (float) getEvictedCount() / (float) getEvictionCount();
+    }
+  }
+
+  public final static long CACHE_FIXED_OVERHEAD = ClassSize.align((3 * 
SizeConstants.SIZEOF_LONG) + (8 * ClassSize.REFERENCE)
+      + (5 * SizeConstants.SIZEOF_FLOAT) + SizeConstants.SIZEOF_BOOLEAN + 
ClassSize.OBJECT);
+
+  // HeapSize implementation
+  @Override
+  public long heapSize() {
+    return getCurrentSize();
+  }
+
+  public static long calculateOverhead(long maxSize, long blockSize, int 
concurrency) {
+    return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP + ((int) 
Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY)
+        + (concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT);
+  }
+
+  // Simple calculators of sizes given factors and maxSize
+
+  private long acceptableSize() {
+    return (long) Math.floor(this.conf.getMaxSize() * 
this.conf.getAcceptableFactor());
+  }
+
+  private long minSize() {
+    return (long) Math.floor(this.conf.getMaxSize() * 
this.conf.getMinFactor());
+  }
+
+  private long singleSize() {
+    return (long) Math.floor(this.conf.getMaxSize() * 
this.conf.getSingleFactor() * this.conf.getMinFactor());
+  }
+
+  private long multiSize() {
+    return (long) Math.floor(this.conf.getMaxSize() * 
this.conf.getMultiFactor() * this.conf.getMinFactor());
+  }
+
+  private long memorySize() {
+    return (long) Math.floor(this.conf.getMaxSize() * 
this.conf.getMemoryFactor() * this.conf.getMinFactor());
+  }
+
+  public void shutdown() {
+    this.scheduleThreadPool.shutdown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheConfiguration.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheConfiguration.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheConfiguration.java
new file mode 100644
index 0000000..49790cb
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheConfiguration.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache.lru;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCacheConfiguration;
+import org.apache.accumulo.core.file.blockfile.cache.CacheType;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+
+public final class LruBlockCacheConfiguration extends BlockCacheConfiguration {
+
+  public static final String PROPERTY_PREFIX = "lru";
+
+  /** Default Configuration Parameters */
+
+  /** Backing Concurrent Map Configuration */
+  public static final Float DEFAULT_LOAD_FACTOR = 0.75f;
+  public static final Integer DEFAULT_CONCURRENCY_LEVEL = 16;
+
+  /** Eviction thresholds */
+  public static final Float DEFAULT_MIN_FACTOR = 0.75f;
+  public static final Float DEFAULT_ACCEPTABLE_FACTOR = 0.85f;
+
+  /** Priority buckets */
+  public static final Float DEFAULT_SINGLE_FACTOR = 0.25f;
+  public static final Float DEFAULT_MULTI_FACTOR = 0.50f;
+  public static final Float DEFAULT_MEMORY_FACTOR = 0.25f;
+
+  // property names
+  public static final String ACCEPTABLE_FACTOR_PROPERTY = "acceptable.factor";
+  public static final String MIN_FACTOR_PROPERTY = "min.factor";
+  public static final String SINGLE_FACTOR_PROPERTY = "single.factor";
+  public static final String MULTI_FACTOR_PROPERTY = "multi.factor";
+  public static final String MEMORY_FACTOR_PROPERTY = "memory.factor";
+  public static final String MAP_LOAD_PROPERTY = "map.load";
+  public static final String MAP_CONCURRENCY_PROPERTY = "map.concurrency";
+  public static final String EVICTION_THREAD_PROPERTY = "eviction.thread";
+
+  /** Acceptable size of cache (no evictions if size < acceptable) */
+  private final float acceptableFactor;
+
+  /** Minimum threshold of cache (when evicting, evict until size < min) */
+  private final float minFactor;
+
+  /** Single access bucket size */
+  private final float singleFactor;
+
+  /** Multiple access bucket size */
+  private final float multiFactor;
+
+  /** In-memory bucket size */
+  private final float memoryFactor;
+
+  /** LruBlockCache cache = new LruBlockCache **/
+  private final float mapLoadFactor;
+
+  /** LruBlockCache cache = new LruBlockCache **/
+  private final int mapConcurrencyLevel;
+
+  private final boolean useEvictionThread;
+
+  public LruBlockCacheConfiguration(AccumuloConfiguration conf, CacheType 
type) {
+    super(conf, type, PROPERTY_PREFIX);
+
+    this.acceptableFactor = 
get(ACCEPTABLE_FACTOR_PROPERTY).map(Float::valueOf).filter(f -> f > 
0).orElse(DEFAULT_ACCEPTABLE_FACTOR);
+    this.minFactor = get(MIN_FACTOR_PROPERTY).map(Float::valueOf).filter(f -> 
f > 0).orElse(DEFAULT_MIN_FACTOR);
+    this.singleFactor = 
get(SINGLE_FACTOR_PROPERTY).map(Float::valueOf).filter(f -> f > 
0).orElse(DEFAULT_SINGLE_FACTOR);
+    this.multiFactor = get(MULTI_FACTOR_PROPERTY).map(Float::valueOf).filter(f 
-> f > 0).orElse(DEFAULT_MULTI_FACTOR);
+    this.memoryFactor = 
get(MEMORY_FACTOR_PROPERTY).map(Float::valueOf).filter(f -> f > 
0).orElse(DEFAULT_MEMORY_FACTOR);
+    this.mapLoadFactor = get(MAP_LOAD_PROPERTY).map(Float::valueOf).filter(f 
-> f > 0).orElse(DEFAULT_LOAD_FACTOR);
+    this.mapConcurrencyLevel = 
get(MAP_CONCURRENCY_PROPERTY).map(Integer::valueOf).filter(i -> i > 
0).orElse(DEFAULT_CONCURRENCY_LEVEL);
+    this.useEvictionThread = 
get(EVICTION_THREAD_PROPERTY).map(Boolean::valueOf).orElse(true);
+
+    if (this.getSingleFactor() + this.getMultiFactor() + 
this.getMemoryFactor() != 1) {
+      throw new IllegalArgumentException("Single, multi, and memory factors " 
+ " should total 1.0");
+    }
+    if (this.getMinFactor() >= this.getAcceptableFactor()) {
+      throw new IllegalArgumentException("minFactor must be smaller than 
acceptableFactor");
+    }
+    if (this.getMinFactor() >= 1.0f || this.getAcceptableFactor() >= 1.0f) {
+      throw new IllegalArgumentException("all factors must be < 1");
+    }
+  }
+
+  public float getAcceptableFactor() {
+    return acceptableFactor;
+  }
+
+  public float getMinFactor() {
+    return minFactor;
+  }
+
+  public float getSingleFactor() {
+    return singleFactor;
+  }
+
+  public float getMultiFactor() {
+    return multiFactor;
+  }
+
+  public float getMemoryFactor() {
+    return memoryFactor;
+  }
+
+  public float getMapLoadFactor() {
+    return mapLoadFactor;
+  }
+
+  public int getMapConcurrencyLevel() {
+    return mapConcurrencyLevel;
+  }
+
+  public boolean isUseEvictionThread() {
+    return useEvictionThread;
+  }
+
+  public static class Builder {
+    private Map<String,String> props = new HashMap<>();
+    private String prefix;
+
+    private Builder(String prefix) {
+      this.prefix = prefix;
+    }
+
+    private void set(String prop, float val) {
+      props.put(prefix + prop, Float.toString(val));
+    }
+
+    public Builder acceptableFactor(float af) {
+      Preconditions.checkArgument(af > 0);
+      set(ACCEPTABLE_FACTOR_PROPERTY, af);
+      return this;
+    }
+
+    public Builder minFactor(float mf) {
+      Preconditions.checkArgument(mf > 0);
+      set(MIN_FACTOR_PROPERTY, mf);
+      return this;
+    }
+
+    public Builder singleFactor(float sf) {
+      Preconditions.checkArgument(sf > 0);
+      set(SINGLE_FACTOR_PROPERTY, sf);
+      return this;
+    }
+
+    public Builder multiFactor(float mf) {
+      Preconditions.checkArgument(mf > 0);
+      set(MULTI_FACTOR_PROPERTY, mf);
+      return this;
+    }
+
+    public Builder memoryFactor(float mf) {
+      Preconditions.checkArgument(mf > 0);
+      set(MEMORY_FACTOR_PROPERTY, mf);
+      return this;
+    }
+
+    public Builder mapLoadFactor(float mlf) {
+      Preconditions.checkArgument(mlf > 0);
+      set(MAP_LOAD_PROPERTY, mlf);
+      return this;
+    }
+
+    public Builder mapConcurrencyLevel(int mcl) {
+      Preconditions.checkArgument(mcl > 0);
+      props.put(prefix + MAP_CONCURRENCY_PROPERTY, mcl + "");
+      return this;
+    }
+
+    public Builder useEvictionThread(boolean uet) {
+      props.put(prefix + EVICTION_THREAD_PROPERTY, uet + "");
+      return this;
+    }
+
+    public Map<String,String> buildMap() {
+      return ImmutableMap.copyOf(props);
+    }
+  }
+
+  public static Builder builder(CacheType ct) {
+    return new Builder(getPrefix(ct, PROPERTY_PREFIX));
+  }
+
+  @Override
+  public String toString() {
+    return super.toString() + ", acceptableFactor: " + 
this.getAcceptableFactor() + ", minFactor: " + this.getMinFactor() + ", 
singleFactor: "
+        + this.getSingleFactor() + ", multiFactor: " + this.getMultiFactor() + 
", memoryFactor: " + this.getMemoryFactor() + ", mapLoadFactor: "
+        + this.getMapLoadFactor() + ", mapConcurrencyLevel: " + 
this.getMapConcurrencyLevel() + ", useEvictionThread: " + 
this.isUseEvictionThread();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheManager.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheManager.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheManager.java
new file mode 100644
index 0000000..8a1e430
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheManager.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache.lru;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCacheManager;
+import org.apache.accumulo.core.file.blockfile.cache.CacheType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LruBlockCacheManager extends BlockCacheManager {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(LruBlockCacheManager.class);
+
+  @Override
+  protected BlockCache createCache(AccumuloConfiguration conf, CacheType type) 
{
+    LruBlockCacheConfiguration cc = new LruBlockCacheConfiguration(conf, type);
+    LOG.info("Creating {} cache with configuration {}", type, cc);
+    return new LruBlockCache(cc);
+  }
+
+  @Override
+  public void stop() {
+    for (CacheType type : CacheType.values()) {
+      LruBlockCache cache = ((LruBlockCache) this.getBlockCache(type));
+      if (null != cache) {
+        cache.shutdown();
+      }
+    }
+    super.stop();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java
new file mode 100644
index 0000000..db4e789
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache.tinylfu;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
+import org.apache.accumulo.core.file.blockfile.cache.ClassSize;
+import org.apache.accumulo.core.file.blockfile.cache.SizeConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Policy;
+import com.github.benmanes.caffeine.cache.stats.CacheStats;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * A block cache that is memory bounded using the W-TinyLFU eviction 
algorithm. This implementation delegates to a Caffeine cache to provide 
concurrent O(1)
+ * read and write operations.
+ * <ul>
+ * <li>W-TinyLFU: http://arxiv.org/pdf/1512.00727.pdf</li>
+ * <li>Caffeine: https://github.com/ben-manes/caffeine</li>
+ * <li>Cache design: 
http://highscalability.com/blog/2016/1/25/design-of-a-modern-cache.html</li>
+ * </ul>
+ */
+public final class TinyLfuBlockCache implements BlockCache {
+  private static final Logger log = 
LoggerFactory.getLogger(TinyLfuBlockCache.class);
+  private static final int STATS_PERIOD_SEC = 60;
+
+  private Cache<String,Block> cache;
+  private Policy.Eviction<String,Block> policy;
+  private ScheduledExecutorService statsExecutor;
+
+  public TinyLfuBlockCache(TinyLfuBlockCacheConfiguration conf) {
+    cache = Caffeine.newBuilder().initialCapacity((int) Math.ceil(1.2 * 
conf.getMaxSize() / conf.getBlockSize())).weigher((String blockName, Block 
block) -> {
+      int keyWeight = ClassSize.align(blockName.length()) + ClassSize.STRING;
+      return keyWeight + block.weight();
+    }).maximumWeight(conf.getMaxSize()).recordStats().build();
+    policy = cache.policy().eviction().get();
+    statsExecutor = Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryBuilder().setNameFormat("TinyLfuBlockCacheStatsExecutor").setDaemon(true)
+        .build());
+    statsExecutor.scheduleAtFixedRate(this::logStats, STATS_PERIOD_SEC, 
STATS_PERIOD_SEC, TimeUnit.SECONDS);
+
+  }
+
+  @Override
+  public long getMaxHeapSize() {
+    return getMaxSize();
+  }
+
+  @Override
+  public long getMaxSize() {
+    return policy.getMaximum();
+  }
+
+  @Override
+  public CacheEntry getBlock(String blockName) {
+    return cache.getIfPresent(blockName);
+  }
+
+  @Override
+  public CacheEntry cacheBlock(String blockName, byte[] buffer) {
+    return cache.asMap().compute(blockName, (key, block) -> {
+      if (block == null) {
+        return new Block(buffer);
+      }
+      block.buffer = buffer;
+      return block;
+    });
+  }
+
+  @Override
+  public CacheEntry cacheBlock(String blockName, byte[] buffer, /* ignored 
*/boolean inMemory) {
+    return cacheBlock(blockName, buffer);
+  }
+
+  @Override
+  public BlockCache.Stats getStats() {
+    CacheStats stats = cache.stats();
+    return new BlockCache.Stats() {
+      @Override
+      public long hitCount() {
+        return stats.hitCount();
+      }
+
+      @Override
+      public long requestCount() {
+        return stats.requestCount();
+      }
+    };
+  }
+
+  private void logStats() {
+    double maxMB = ((double) policy.getMaximum()) / ((double) (1024 * 1024));
+    double sizeMB = ((double) policy.weightedSize().getAsLong()) / ((double) 
(1024 * 1024));
+    double freeMB = maxMB - sizeMB;
+    log.debug("Cache Size={}MB, Free={}MB, Max={}MB, Blocks={}", sizeMB, 
freeMB, maxMB, cache.estimatedSize());
+    log.debug(cache.stats().toString());
+  }
+
+  private static final class Block implements CacheEntry {
+    private volatile byte[] buffer;
+    private volatile Object index;
+
+    Block(byte[] buffer) {
+      this.buffer = requireNonNull(buffer);
+    }
+
+    @Override
+    public byte[] getBuffer() {
+      return buffer;
+    }
+
+    @Override
+    public Object getIndex() {
+      return index;
+    }
+
+    @Override
+    public void setIndex(Object index) {
+      this.index = index;
+    }
+
+    int weight() {
+      return ClassSize.align(buffer.length) + SizeConstants.SIZEOF_LONG + 
ClassSize.REFERENCE + ClassSize.OBJECT + ClassSize.ARRAY;
+    }
+  }
+}

Reply via email to