Repository: accumulo Updated Branches: refs/heads/master c9391894e -> 9d94d30e1
ACCUMULO-4177 TinyLFU-based block cache Signed-off-by: Josh Elser <els...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9d94d30e Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9d94d30e Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9d94d30e Branch: refs/heads/master Commit: 9d94d30e16a5768fd6441ef603ea7afe2e7d37f6 Parents: c939189 Author: Ben Manes <ben.ma...@gmail.com> Authored: Fri May 6 19:35:41 2016 -0700 Committer: Josh Elser <els...@apache.org> Committed: Fri Sep 16 16:26:48 2016 -0400 ---------------------------------------------------------------------- assemble/pom.xml | 4 + assemble/src/main/assemblies/component.xml | 1 + core/pom.xml | 4 + .../core/client/rfile/RFileScanner.java | 15 ++ .../org/apache/accumulo/core/conf/Property.java | 1 + .../core/file/blockfile/cache/BlockCache.java | 25 ++++ .../file/blockfile/cache/LruBlockCache.java | 14 +- .../file/blockfile/cache/TinyLfuBlockCache.java | 141 +++++++++++++++++++ pom.xml | 5 + .../apache/accumulo/tserver/TabletServer.java | 8 +- .../tserver/TabletServerResourceManager.java | 22 ++- 11 files changed, 224 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d94d30e/assemble/pom.xml ---------------------------------------------------------------------- diff --git a/assemble/pom.xml b/assemble/pom.xml index d0f14a9..1f2a899 100644 --- a/assemble/pom.xml +++ b/assemble/pom.xml @@ -33,6 +33,10 @@ <optional>true</optional> </dependency> <dependency> + <groupId>com.github.ben-manes.caffeine</groupId> + <artifactId>caffeine</artifactId> + </dependency> + <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <optional>true</optional> http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d94d30e/assemble/src/main/assemblies/component.xml ---------------------------------------------------------------------- diff --git a/assemble/src/main/assemblies/component.xml b/assemble/src/main/assemblies/component.xml index 6fc6656..2156e40 100644 --- a/assemble/src/main/assemblies/component.xml +++ b/assemble/src/main/assemblies/component.xml @@ -31,6 +31,7 @@ version listing for packaged artifacts --> <include>${groupId}:${artifactId}-*</include> <include>com.beust:jcommander</include> + <include>com.github.ben-manes.caffeine:caffeine</include> <include>com.google.code.gson:gson</include> <include>com.google.guava:guava</include> <include>com.google.protobuf:protobuf-java</include> http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d94d30e/core/pom.xml ---------------------------------------------------------------------- diff --git a/core/pom.xml b/core/pom.xml index 44afddb..2e858b8 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -31,6 +31,10 @@ <artifactId>jcommander</artifactId> </dependency> <dependency> + <groupId>com.github.ben-manes.caffeine</groupId> + <artifactId>caffeine</artifactId> + </dependency> + <dependency> <groupId>com.google.auto.service</groupId> <artifactId>auto-service</artifactId> <optional>true</optional> http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d94d30e/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java index 4dfba68..186471d 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java @@ -112,6 +112,21 @@ class RFileScanner extends ScannerOptions implements Scanner { public long getMaxSize() { return Integer.MAX_VALUE; } + + @Override + public Stats getStats() { + return new BlockCache.Stats() { + @Override + public long hitCount() { + return 0L; + } + + @Override + public long requestCount() { + return 0L; + } + }; + } } RFileScanner(Opts opts) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d94d30e/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index ede1c6f..860e57b 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -243,6 +243,7 @@ public enum Property { TSERV_PREFIX("tserver.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the tablet servers"), TSERV_CLIENT_TIMEOUT("tserver.client.timeout", "3s", PropertyType.TIMEDURATION, "Time to wait for clients to continue scans before closing a session."), TSERV_DEFAULT_BLOCKSIZE("tserver.default.blocksize", "1M", PropertyType.MEMORY, "Specifies a default blocksize for the tserver caches"), + TSERV_CACHE_POLICY("tserver.cache.policy", "LRU", PropertyType.STRING, "Specifies the eviction policy of the file data caches (LRU or TinyLFU)."), TSERV_DATACACHE_SIZE("tserver.cache.data.size", "128M", PropertyType.MEMORY, "Specifies the size of the cache for file data blocks."), TSERV_INDEXCACHE_SIZE("tserver.cache.index.size", "512M", PropertyType.MEMORY, "Specifies the size of the cache for file indices."), TSERV_PORTSEARCH("tserver.port.search", "false", PropertyType.BOOLEAN, "if the ports above are in use, search higher ports until one is available"), http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d94d30e/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java index 094782d..82f8b1e 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java @@ -58,4 +58,29 @@ public interface BlockCache { * @return max size in bytes */ long getMaxSize(); + + /** + * Get the statistics of this cache. + * + * @return statistics + */ + Stats getStats(); + + /** Cache statistics. */ + interface Stats { + + /** + * Returns the number of lookups that have returned a cached value. + * + * @return the number of lookups that have returned a cached value + */ + long hitCount(); + + /** + * Returns the number of times the lookup methods have returned either a cached or uncached value. + * + * @return the number of lookups + */ + long requestCount(); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d94d30e/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java index 1beaccb..cbdaca5 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java @@ -578,7 +578,7 @@ public class LruBlockCache implements BlockCache, HeapSize { float maxMB = ((float) maxSize) / ((float) (1024 * 1024)); log.debug("Cache Stats: Sizes: Total={}MB ({}), Free={}MB ({}), Max={}MB ({}), Counts: Blocks={}, Access={}, Hit={}, Miss={}, Evictions={}, Evicted={}," + "Ratios: Hit Ratio={}%, Miss Ratio={}%, Evicted/Run={}, Duplicate Reads={}", sizeMB, totalSize, freeMB, freeSize, maxMB, maxSize, size(), - stats.getRequestCount(), stats.getHitCount(), stats.getMissCount(), stats.getEvictionCount(), stats.getEvictedCount(), stats.getHitRatio() * 100, + stats.requestCount(), stats.hitCount(), stats.getMissCount(), stats.getEvictionCount(), stats.getEvictedCount(), stats.getHitRatio() * 100, stats.getMissRatio() * 100, stats.evictedPerEviction(), stats.getDuplicateReads()); } @@ -592,7 +592,7 @@ public class LruBlockCache implements BlockCache, HeapSize { return this.stats; } - public static class CacheStats { + public static class CacheStats implements BlockCache.Stats { private final AtomicLong accessCount = new AtomicLong(0); private final AtomicLong hitCount = new AtomicLong(0); private final AtomicLong missCount = new AtomicLong(0); @@ -622,7 +622,8 @@ public class LruBlockCache implements BlockCache, HeapSize { evictedCount.incrementAndGet(); } - public long getRequestCount() { + @Override + public long requestCount() { return accessCount.get(); } @@ -630,7 +631,8 @@ public class LruBlockCache implements BlockCache, HeapSize { return missCount.get(); } - public long getHitCount() { + @Override + public long hitCount() { return hitCount.get(); } @@ -647,11 +649,11 @@ public class LruBlockCache implements BlockCache, HeapSize { } public double getHitRatio() { - return ((float) getHitCount() / (float) getRequestCount()); + return ((float) hitCount() / (float) requestCount()); } public double getMissRatio() { - return ((float) getMissCount() / (float) getRequestCount()); + return ((float) getMissCount() / (float) requestCount()); } public double evictedPerEviction() { http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d94d30e/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java new file mode 100644 index 0000000..bab52af --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.blockfile.cache; + +import static java.util.Objects.requireNonNull; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Policy; +import com.github.benmanes.caffeine.cache.stats.CacheStats; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * A block cache that is memory bounded using the W-TinyLFU eviction algorithm. This implementation delegates to a Caffeine cache to provide concurrent O(1) + * read and write operations. + * <ul> + * <li>W-TinyLFU: http://arxiv.org/pdf/1512.00727.pdf</li> + * <li>Caffeine: https://github.com/ben-manes/caffeine</li> + * <li>Cache design: http://highscalability.com/blog/2016/1/25/design-of-a-modern-cache.html</li> + * </ul> + */ +public final class TinyLfuBlockCache implements BlockCache { + private static final Logger log = LoggerFactory.getLogger(TinyLfuBlockCache.class); + private static final int STATS_PERIOD_SEC = 60; + + private final Cache<String,Block> cache; + private final Policy.Eviction<String,Block> policy; + private final ScheduledExecutorService statsExecutor; + + public TinyLfuBlockCache(long maxSize, long blockSize) { + cache = Caffeine.newBuilder().initialCapacity((int) Math.ceil(1.2 * maxSize / blockSize)).weigher((String blockName, Block block) -> { + int keyWeight = ClassSize.align(blockName.length()) + ClassSize.STRING; + return keyWeight + block.weight(); + }).maximumWeight(maxSize).recordStats().build(); + policy = cache.policy().eviction().get(); + + statsExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("TinyLfuBlockCacheStatsExecutor").setDaemon(true) + .build()); + statsExecutor.scheduleAtFixedRate(this::logStats, STATS_PERIOD_SEC, STATS_PERIOD_SEC, TimeUnit.SECONDS); + } + + @Override + public long getMaxSize() { + return policy.getMaximum(); + } + + @Override + public CacheEntry getBlock(String blockName) { + return cache.getIfPresent(blockName); + } + + @Override + public CacheEntry cacheBlock(String blockName, byte[] buffer) { + return cache.asMap().compute(blockName, (key, block) -> { + if (block == null) { + return new Block(buffer); + } + block.buffer = buffer; + return block; + }); + } + + @Override + public CacheEntry cacheBlock(String blockName, byte[] buffer, /* ignored */boolean inMemory) { + return cacheBlock(blockName, buffer); + } + + @Override + public BlockCache.Stats getStats() { + CacheStats stats = cache.stats(); + return new BlockCache.Stats() { + @Override + public long hitCount() { + return stats.hitCount(); + } + + @Override + public long requestCount() { + return stats.requestCount(); + } + }; + } + + private void logStats() { + double maxMB = ((double) policy.getMaximum()) / ((double) (1024 * 1024)); + double sizeMB = ((double) policy.weightedSize().getAsLong()) / ((double) (1024 * 1024)); + double freeMB = maxMB - sizeMB; + log.debug("Cache Size={}MB, Free={}MB, Max={}MB, Blocks={}", sizeMB, freeMB, maxMB, cache.estimatedSize()); + log.debug(cache.stats().toString()); + } + + private static final class Block implements CacheEntry { + private volatile byte[] buffer; + private volatile Object index; + + Block(byte[] buffer) { + this.buffer = requireNonNull(buffer); + } + + @Override + public byte[] getBuffer() { + return buffer; + } + + @Override + public Object getIndex() { + return index; + } + + @Override + public void setIndex(Object index) { + this.index = index; + } + + int weight() { + return ClassSize.align(buffer.length) + SizeConstants.SIZEOF_LONG + ClassSize.REFERENCE + ClassSize.OBJECT + ClassSize.ARRAY; + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d94d30e/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 54e4a72..fbe9070 100644 --- a/pom.xml +++ b/pom.xml @@ -164,6 +164,11 @@ <version>1.48</version> </dependency> <dependency> + <groupId>com.github.ben-manes.caffeine</groupId> + <artifactId>caffeine</artifactId> + <version>2.3.3</version> + </dependency> + <dependency> <groupId>com.google.auto.service</groupId> <artifactId>auto-service</artifactId> <version>1.0-rc2</version> http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d94d30e/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 7751681..450c1c6 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -2893,10 +2893,10 @@ public class TabletServer extends AccumuloServerContext implements Runnable { result.name = getClientAddressString(); result.holdTime = resourceManager.holdTime(); result.lookups = seekCount.get(); - result.indexCacheHits = resourceManager.getIndexCache().getStats().getHitCount(); - result.indexCacheRequest = resourceManager.getIndexCache().getStats().getRequestCount(); - result.dataCacheHits = resourceManager.getDataCache().getStats().getHitCount(); - result.dataCacheRequest = resourceManager.getDataCache().getStats().getRequestCount(); + result.indexCacheHits = resourceManager.getIndexCache().getStats().hitCount(); + result.indexCacheRequest = resourceManager.getIndexCache().getStats().requestCount(); + result.dataCacheHits = resourceManager.getDataCache().getStats().hitCount(); + result.dataCacheRequest = resourceManager.getDataCache().getStats().requestCount(); result.logSorts = logSorter.getLogSorts(); result.flushs = flushCounter.get(); result.syncs = syncCounter.get(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d94d30e/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index 089bd12..3cd7bfa 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@ -38,7 +38,9 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.impl.KeyExtent; +import org.apache.accumulo.core.file.blockfile.cache.BlockCache; import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache; +import org.apache.accumulo.core.file.blockfile.cache.TinyLfuBlockCache; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.util.Daemon; import org.apache.accumulo.core.util.NamingThreadFactory; @@ -96,8 +98,8 @@ public class TabletServerResourceManager { private final MemoryManagementFramework memMgmt; - private final LruBlockCache _dCache; - private final LruBlockCache _iCache; + private final BlockCache _dCache; + private final BlockCache _iCache; private final TabletServer tserver; private final ServerConfigurationFactory conf; @@ -163,8 +165,16 @@ public class TabletServerResourceManager { long iCacheSize = acuConf.getMemoryInBytes(Property.TSERV_INDEXCACHE_SIZE); long totalQueueSize = acuConf.getMemoryInBytes(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX); - _iCache = new LruBlockCache(iCacheSize, blockSize); - _dCache = new LruBlockCache(dCacheSize, blockSize); + String policy = acuConf.get(Property.TSERV_CACHE_POLICY); + if (policy.equalsIgnoreCase("LRU")) { + _iCache = new LruBlockCache(iCacheSize, blockSize); + _dCache = new LruBlockCache(dCacheSize, blockSize); + } else if (policy.equalsIgnoreCase("TinyLFU")) { + _iCache = new TinyLfuBlockCache(iCacheSize, blockSize); + _dCache = new TinyLfuBlockCache(dCacheSize, blockSize); + } else { + throw new IllegalArgumentException("Unknown Block cache policy " + policy); + } Runtime runtime = Runtime.getRuntime(); if (usingNativeMap) { @@ -742,11 +752,11 @@ public class TabletServerResourceManager { } } - public LruBlockCache getIndexCache() { + public BlockCache getIndexCache() { return _iCache; } - public LruBlockCache getDataCache() { + public BlockCache getDataCache() { return _dCache; }