Repository: accumulo
Updated Branches:
  refs/heads/master c9391894e -> 9d94d30e1


ACCUMULO-4177 TinyLFU-based block cache

Signed-off-by: Josh Elser <els...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9d94d30e
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9d94d30e
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9d94d30e

Branch: refs/heads/master
Commit: 9d94d30e16a5768fd6441ef603ea7afe2e7d37f6
Parents: c939189
Author: Ben Manes <ben.ma...@gmail.com>
Authored: Fri May 6 19:35:41 2016 -0700
Committer: Josh Elser <els...@apache.org>
Committed: Fri Sep 16 16:26:48 2016 -0400

----------------------------------------------------------------------
 assemble/pom.xml                                |   4 +
 assemble/src/main/assemblies/component.xml      |   1 +
 core/pom.xml                                    |   4 +
 .../core/client/rfile/RFileScanner.java         |  15 ++
 .../org/apache/accumulo/core/conf/Property.java |   1 +
 .../core/file/blockfile/cache/BlockCache.java   |  25 ++++
 .../file/blockfile/cache/LruBlockCache.java     |  14 +-
 .../file/blockfile/cache/TinyLfuBlockCache.java | 141 +++++++++++++++++++
 pom.xml                                         |   5 +
 .../apache/accumulo/tserver/TabletServer.java   |   8 +-
 .../tserver/TabletServerResourceManager.java    |  22 ++-
 11 files changed, 224 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d94d30e/assemble/pom.xml
----------------------------------------------------------------------
diff --git a/assemble/pom.xml b/assemble/pom.xml
index d0f14a9..1f2a899 100644
--- a/assemble/pom.xml
+++ b/assemble/pom.xml
@@ -33,6 +33,10 @@
       <optional>true</optional>
     </dependency>
     <dependency>
+      <groupId>com.github.ben-manes.caffeine</groupId>
+      <artifactId>caffeine</artifactId>
+    </dependency>
+    <dependency>
       <groupId>com.google.code.gson</groupId>
       <artifactId>gson</artifactId>
       <optional>true</optional>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d94d30e/assemble/src/main/assemblies/component.xml
----------------------------------------------------------------------
diff --git a/assemble/src/main/assemblies/component.xml 
b/assemble/src/main/assemblies/component.xml
index 6fc6656..2156e40 100644
--- a/assemble/src/main/assemblies/component.xml
+++ b/assemble/src/main/assemblies/component.xml
@@ -31,6 +31,7 @@
         version listing for packaged artifacts -->
         <include>${groupId}:${artifactId}-*</include>
         <include>com.beust:jcommander</include>
+        <include>com.github.ben-manes.caffeine:caffeine</include>
         <include>com.google.code.gson:gson</include>
         <include>com.google.guava:guava</include>
         <include>com.google.protobuf:protobuf-java</include>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d94d30e/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 44afddb..2e858b8 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -31,6 +31,10 @@
       <artifactId>jcommander</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.github.ben-manes.caffeine</groupId>
+      <artifactId>caffeine</artifactId>
+    </dependency>
+    <dependency>
       <groupId>com.google.auto.service</groupId>
       <artifactId>auto-service</artifactId>
       <optional>true</optional>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d94d30e/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java 
b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
index 4dfba68..186471d 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
@@ -112,6 +112,21 @@ class RFileScanner extends ScannerOptions implements 
Scanner {
     public long getMaxSize() {
       return Integer.MAX_VALUE;
     }
+
+    @Override
+    public Stats getStats() {
+      return new BlockCache.Stats() {
+        @Override
+        public long hitCount() {
+          return 0L;
+        }
+
+        @Override
+        public long requestCount() {
+          return 0L;
+        }
+      };
+    }
   }
 
   RFileScanner(Opts opts) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d94d30e/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index ede1c6f..860e57b 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -243,6 +243,7 @@ public enum Property {
   TSERV_PREFIX("tserver.", null, PropertyType.PREFIX, "Properties in this 
category affect the behavior of the tablet servers"),
   TSERV_CLIENT_TIMEOUT("tserver.client.timeout", "3s", 
PropertyType.TIMEDURATION, "Time to wait for clients to continue scans before 
closing a session."),
   TSERV_DEFAULT_BLOCKSIZE("tserver.default.blocksize", "1M", 
PropertyType.MEMORY, "Specifies a default blocksize for the tserver caches"),
+  TSERV_CACHE_POLICY("tserver.cache.policy", "LRU", PropertyType.STRING, 
"Specifies the eviction policy of the file data caches (LRU or TinyLFU)."),
   TSERV_DATACACHE_SIZE("tserver.cache.data.size", "128M", PropertyType.MEMORY, 
"Specifies the size of the cache for file data blocks."),
   TSERV_INDEXCACHE_SIZE("tserver.cache.index.size", "512M", 
PropertyType.MEMORY, "Specifies the size of the cache for file indices."),
   TSERV_PORTSEARCH("tserver.port.search", "false", PropertyType.BOOLEAN, "if 
the ports above are in use, search higher ports until one is available"),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d94d30e/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
index 094782d..82f8b1e 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
@@ -58,4 +58,29 @@ public interface BlockCache {
    * @return max size in bytes
    */
   long getMaxSize();
+
+  /**
+   * Get the statistics of this cache.
+   *
+   * @return statistics
+   */
+  Stats getStats();
+
+  /** Cache statistics. */
+  interface Stats {
+
+    /**
+     * Returns the number of lookups that have returned a cached value.
+     *
+     * @return the number of lookups that have returned a cached value
+     */
+    long hitCount();
+
+    /**
+     * Returns the number of times the lookup methods have returned either a 
cached or uncached value.
+     *
+     * @return the number of lookups
+     */
+    long requestCount();
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d94d30e/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
index 1beaccb..cbdaca5 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
@@ -578,7 +578,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
     float maxMB = ((float) maxSize) / ((float) (1024 * 1024));
     log.debug("Cache Stats: Sizes: Total={}MB ({}), Free={}MB ({}), Max={}MB 
({}), Counts: Blocks={}, Access={}, Hit={}, Miss={}, Evictions={}, Evicted={},"
         + "Ratios: Hit Ratio={}%, Miss Ratio={}%, Evicted/Run={}, Duplicate 
Reads={}", sizeMB, totalSize, freeMB, freeSize, maxMB, maxSize, size(),
-        stats.getRequestCount(), stats.getHitCount(), stats.getMissCount(), 
stats.getEvictionCount(), stats.getEvictedCount(), stats.getHitRatio() * 100,
+        stats.requestCount(), stats.hitCount(), stats.getMissCount(), 
stats.getEvictionCount(), stats.getEvictedCount(), stats.getHitRatio() * 100,
         stats.getMissRatio() * 100, stats.evictedPerEviction(), 
stats.getDuplicateReads());
   }
 
@@ -592,7 +592,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
     return this.stats;
   }
 
-  public static class CacheStats {
+  public static class CacheStats implements BlockCache.Stats {
     private final AtomicLong accessCount = new AtomicLong(0);
     private final AtomicLong hitCount = new AtomicLong(0);
     private final AtomicLong missCount = new AtomicLong(0);
@@ -622,7 +622,8 @@ public class LruBlockCache implements BlockCache, HeapSize {
       evictedCount.incrementAndGet();
     }
 
-    public long getRequestCount() {
+    @Override
+    public long requestCount() {
       return accessCount.get();
     }
 
@@ -630,7 +631,8 @@ public class LruBlockCache implements BlockCache, HeapSize {
       return missCount.get();
     }
 
-    public long getHitCount() {
+    @Override
+    public long hitCount() {
       return hitCount.get();
     }
 
@@ -647,11 +649,11 @@ public class LruBlockCache implements BlockCache, 
HeapSize {
     }
 
     public double getHitRatio() {
-      return ((float) getHitCount() / (float) getRequestCount());
+      return ((float) hitCount() / (float) requestCount());
     }
 
     public double getMissRatio() {
-      return ((float) getMissCount() / (float) getRequestCount());
+      return ((float) getMissCount() / (float) requestCount());
     }
 
     public double evictedPerEviction() {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d94d30e/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java
new file mode 100644
index 0000000..bab52af
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Policy;
+import com.github.benmanes.caffeine.cache.stats.CacheStats;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * A block cache that is memory bounded using the W-TinyLFU eviction 
algorithm. This implementation delegates to a Caffeine cache to provide 
concurrent O(1)
+ * read and write operations.
+ * <ul>
+ * <li>W-TinyLFU: http://arxiv.org/pdf/1512.00727.pdf</li>
+ * <li>Caffeine: https://github.com/ben-manes/caffeine</li>
+ * <li>Cache design: 
http://highscalability.com/blog/2016/1/25/design-of-a-modern-cache.html</li>
+ * </ul>
+ */
+public final class TinyLfuBlockCache implements BlockCache {
+  private static final Logger log = 
LoggerFactory.getLogger(TinyLfuBlockCache.class);
+  private static final int STATS_PERIOD_SEC = 60;
+
+  private final Cache<String,Block> cache;
+  private final Policy.Eviction<String,Block> policy;
+  private final ScheduledExecutorService statsExecutor;
+
+  public TinyLfuBlockCache(long maxSize, long blockSize) {
+    cache = Caffeine.newBuilder().initialCapacity((int) Math.ceil(1.2 * 
maxSize / blockSize)).weigher((String blockName, Block block) -> {
+      int keyWeight = ClassSize.align(blockName.length()) + ClassSize.STRING;
+      return keyWeight + block.weight();
+    }).maximumWeight(maxSize).recordStats().build();
+    policy = cache.policy().eviction().get();
+
+    statsExecutor = Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryBuilder().setNameFormat("TinyLfuBlockCacheStatsExecutor").setDaemon(true)
+        .build());
+    statsExecutor.scheduleAtFixedRate(this::logStats, STATS_PERIOD_SEC, 
STATS_PERIOD_SEC, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public long getMaxSize() {
+    return policy.getMaximum();
+  }
+
+  @Override
+  public CacheEntry getBlock(String blockName) {
+    return cache.getIfPresent(blockName);
+  }
+
+  @Override
+  public CacheEntry cacheBlock(String blockName, byte[] buffer) {
+    return cache.asMap().compute(blockName, (key, block) -> {
+      if (block == null) {
+        return new Block(buffer);
+      }
+      block.buffer = buffer;
+      return block;
+    });
+  }
+
+  @Override
+  public CacheEntry cacheBlock(String blockName, byte[] buffer, /* ignored 
*/boolean inMemory) {
+    return cacheBlock(blockName, buffer);
+  }
+
+  @Override
+  public BlockCache.Stats getStats() {
+    CacheStats stats = cache.stats();
+    return new BlockCache.Stats() {
+      @Override
+      public long hitCount() {
+        return stats.hitCount();
+      }
+
+      @Override
+      public long requestCount() {
+        return stats.requestCount();
+      }
+    };
+  }
+
+  private void logStats() {
+    double maxMB = ((double) policy.getMaximum()) / ((double) (1024 * 1024));
+    double sizeMB = ((double) policy.weightedSize().getAsLong()) / ((double) 
(1024 * 1024));
+    double freeMB = maxMB - sizeMB;
+    log.debug("Cache Size={}MB, Free={}MB, Max={}MB, Blocks={}", sizeMB, 
freeMB, maxMB, cache.estimatedSize());
+    log.debug(cache.stats().toString());
+  }
+
+  private static final class Block implements CacheEntry {
+    private volatile byte[] buffer;
+    private volatile Object index;
+
+    Block(byte[] buffer) {
+      this.buffer = requireNonNull(buffer);
+    }
+
+    @Override
+    public byte[] getBuffer() {
+      return buffer;
+    }
+
+    @Override
+    public Object getIndex() {
+      return index;
+    }
+
+    @Override
+    public void setIndex(Object index) {
+      this.index = index;
+    }
+
+    int weight() {
+      return ClassSize.align(buffer.length) + SizeConstants.SIZEOF_LONG + 
ClassSize.REFERENCE + ClassSize.OBJECT + ClassSize.ARRAY;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d94d30e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 54e4a72..fbe9070 100644
--- a/pom.xml
+++ b/pom.xml
@@ -164,6 +164,11 @@
         <version>1.48</version>
       </dependency>
       <dependency>
+        <groupId>com.github.ben-manes.caffeine</groupId>
+        <artifactId>caffeine</artifactId>
+        <version>2.3.3</version>
+      </dependency>
+      <dependency>
         <groupId>com.google.auto.service</groupId>
         <artifactId>auto-service</artifactId>
         <version>1.0-rc2</version>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d94d30e/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 7751681..450c1c6 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -2893,10 +2893,10 @@ public class TabletServer extends AccumuloServerContext 
implements Runnable {
     result.name = getClientAddressString();
     result.holdTime = resourceManager.holdTime();
     result.lookups = seekCount.get();
-    result.indexCacheHits = 
resourceManager.getIndexCache().getStats().getHitCount();
-    result.indexCacheRequest = 
resourceManager.getIndexCache().getStats().getRequestCount();
-    result.dataCacheHits = 
resourceManager.getDataCache().getStats().getHitCount();
-    result.dataCacheRequest = 
resourceManager.getDataCache().getStats().getRequestCount();
+    result.indexCacheHits = 
resourceManager.getIndexCache().getStats().hitCount();
+    result.indexCacheRequest = 
resourceManager.getIndexCache().getStats().requestCount();
+    result.dataCacheHits = 
resourceManager.getDataCache().getStats().hitCount();
+    result.dataCacheRequest = 
resourceManager.getDataCache().getStats().requestCount();
     result.logSorts = logSorter.getLogSorts();
     result.flushs = flushCounter.get();
     result.syncs = syncCounter.get();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d94d30e/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 089bd12..3cd7bfa 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -38,7 +38,9 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
 import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.TinyLfuBlockCache;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.NamingThreadFactory;
@@ -96,8 +98,8 @@ public class TabletServerResourceManager {
 
   private final MemoryManagementFramework memMgmt;
 
-  private final LruBlockCache _dCache;
-  private final LruBlockCache _iCache;
+  private final BlockCache _dCache;
+  private final BlockCache _iCache;
   private final TabletServer tserver;
   private final ServerConfigurationFactory conf;
 
@@ -163,8 +165,16 @@ public class TabletServerResourceManager {
     long iCacheSize = acuConf.getMemoryInBytes(Property.TSERV_INDEXCACHE_SIZE);
     long totalQueueSize = 
acuConf.getMemoryInBytes(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX);
 
-    _iCache = new LruBlockCache(iCacheSize, blockSize);
-    _dCache = new LruBlockCache(dCacheSize, blockSize);
+    String policy = acuConf.get(Property.TSERV_CACHE_POLICY);
+    if (policy.equalsIgnoreCase("LRU")) {
+      _iCache = new LruBlockCache(iCacheSize, blockSize);
+      _dCache = new LruBlockCache(dCacheSize, blockSize);
+    } else if (policy.equalsIgnoreCase("TinyLFU")) {
+      _iCache = new TinyLfuBlockCache(iCacheSize, blockSize);
+      _dCache = new TinyLfuBlockCache(dCacheSize, blockSize);
+    } else {
+      throw new IllegalArgumentException("Unknown Block cache policy " + 
policy);
+    }
 
     Runtime runtime = Runtime.getRuntime();
     if (usingNativeMap) {
@@ -742,11 +752,11 @@ public class TabletServerResourceManager {
     }
   }
 
-  public LruBlockCache getIndexCache() {
+  public BlockCache getIndexCache() {
     return _iCache;
   }
 
-  public LruBlockCache getDataCache() {
+  public BlockCache getDataCache() {
     return _dCache;
   }
 

Reply via email to