ACCUMULO-4463: Make block cache implementation pluggable
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/452732c8 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/452732c8 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/452732c8 Branch: refs/heads/master Commit: 452732c8ad91c4a73a556cdc2030e0a5cbbd173b Parents: 3f1b0f3 Author: Dave Marion <dlmar...@apache.org> Authored: Mon May 22 13:08:41 2017 -0400 Committer: Dave Marion <dlmar...@apache.org> Committed: Mon May 22 13:08:41 2017 -0400 ---------------------------------------------------------------------- core/src/main/findbugs/exclude-filter.xml | 4 +- .../core/client/rfile/RFileScanner.java | 57 +- .../org/apache/accumulo/core/conf/Property.java | 4 +- .../core/file/blockfile/cache/BlockCache.java | 6 + .../cache/BlockCacheConfiguration.java | 91 +++ .../file/blockfile/cache/BlockCacheManager.java | 107 +++ .../core/file/blockfile/cache/CacheType.java | 24 + .../core/file/blockfile/cache/CachedBlock.java | 2 +- .../file/blockfile/cache/LruBlockCache.java | 703 ------------------- .../file/blockfile/cache/TinyLfuBlockCache.java | 141 ---- .../file/blockfile/cache/lru/LruBlockCache.java | 625 +++++++++++++++++ .../cache/lru/LruBlockCacheConfiguration.java | 212 ++++++ .../cache/lru/LruBlockCacheManager.java | 49 ++ .../cache/tinylfu/TinyLfuBlockCache.java | 150 ++++ .../tinylfu/TinyLfuBlockCacheConfiguration.java | 32 + .../cache/tinylfu/TinyLfuBlockCacheManager.java | 37 + .../accumulo/core/summary/SummaryReader.java | 5 + .../blockfile/cache/BlockCacheFactoryTest.java | 53 ++ .../cache/BlockConfigurationHelperTest.java | 32 + .../file/blockfile/cache/TestLruBlockCache.java | 194 ++--- .../accumulo/core/file/rfile/RFileTest.java | 25 +- .../tserver/TabletServerResourceManager.java | 42 +- 22 files changed, 1625 insertions(+), 970 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/findbugs/exclude-filter.xml ---------------------------------------------------------------------- diff --git a/core/src/main/findbugs/exclude-filter.xml b/core/src/main/findbugs/exclude-filter.xml index 95aca41..56d3659 100644 --- a/core/src/main/findbugs/exclude-filter.xml +++ b/core/src/main/findbugs/exclude-filter.xml @@ -67,12 +67,12 @@ </Match> <Match> <!-- bad practice to start a thread in constructor; we should be careful using this class --> - <Class name="org.apache.accumulo.core.file.blockfile.cache.LruBlockCache" /> + <Class name="org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCache" /> <Bug code="SC" pattern="SC_START_IN_CTOR" /> </Match> <Match> <!-- locking is confusing, but probably correct --> - <Class name="org.apache.accumulo.core.file.blockfile.cache.LruBlockCache$EvictionThread" /> + <Class name="org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCache$EvictionThread" /> <Or> <Bug code="NN" pattern="NN_NAKED_NOTIFY" /> <Bug code="UW" pattern="UW_UNCOND_WAIT" /> http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java index c95531b..3e6239d 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java @@ -43,8 +43,9 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.blockfile.cache.BlockCache; +import org.apache.accumulo.core.file.blockfile.cache.BlockCacheManager; import org.apache.accumulo.core.file.blockfile.cache.CacheEntry; -import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache; +import org.apache.accumulo.core.file.blockfile.cache.CacheType; import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; import org.apache.accumulo.core.file.rfile.RFile; import org.apache.accumulo.core.file.rfile.RFile.Reader; @@ -67,6 +68,7 @@ class RFileScanner extends ScannerOptions implements Scanner { private static final Range EMPTY_RANGE = new Range(); private Range range; + private BlockCacheManager blockCacheManager = null; private BlockCache dataCache = null; private BlockCache indexCache = null; private Opts opts; @@ -109,6 +111,11 @@ class RFileScanner extends ScannerOptions implements Scanner { } @Override + public long getMaxHeapSize() { + return getMaxSize(); + } + + @Override public long getMaxSize() { return Integer.MAX_VALUE; } @@ -135,15 +142,34 @@ class RFileScanner extends ScannerOptions implements Scanner { } this.opts = opts; - if (opts.indexCacheSize > 0) { - this.indexCache = new LruBlockCache(opts.indexCacheSize, CACHE_BLOCK_SIZE); - } else { + + if (opts.indexCacheSize > 0 || opts.dataCacheSize > 0) { + ConfigurationCopy cc = new ConfigurationCopy(DefaultConfiguration.getInstance()); + if (null != opts.tableConfig) { + opts.tableConfig.forEach(cc::set); + } + + try { + blockCacheManager = BlockCacheManager.getClientInstance(cc); + if (opts.indexCacheSize > 0) { + cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(opts.indexCacheSize)); + } + if (opts.dataCacheSize > 0) { + cc.set(Property.TSERV_DATACACHE_SIZE, Long.toString(opts.dataCacheSize)); + } + blockCacheManager.start(cc); + this.indexCache = blockCacheManager.getBlockCache(CacheType.INDEX); + this.dataCache = blockCacheManager.getBlockCache(CacheType.DATA); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + if (null == indexCache) { this.indexCache = new NoopCache(); } - - if (opts.dataCacheSize > 0) { - this.dataCache = new LruBlockCache(opts.dataCacheSize, CACHE_BLOCK_SIZE); - } else { + if (null == this.dataCache) { this.dataCache = new NoopCache(); } } @@ -326,14 +352,6 @@ class RFileScanner extends ScannerOptions implements Scanner { @Override public void close() { - if (dataCache instanceof LruBlockCache) { - ((LruBlockCache) dataCache).shutdown(); - } - - if (indexCache instanceof LruBlockCache) { - ((LruBlockCache) indexCache).shutdown(); - } - try { for (RFileSource source : opts.in.getSources()) { source.getInputStream().close(); @@ -341,5 +359,12 @@ class RFileScanner extends ScannerOptions implements Scanner { } catch (IOException e) { throw new RuntimeException(e); } + try { + if (null != this.blockCacheManager) { + this.blockCacheManager.stop(); + } + } catch (Exception e1) { + throw new RuntimeException(e1); + } } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 5480867..a506e07 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -245,7 +245,9 @@ public enum Property { TSERV_PREFIX("tserver.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the tablet servers"), TSERV_CLIENT_TIMEOUT("tserver.client.timeout", "3s", PropertyType.TIMEDURATION, "Time to wait for clients to continue scans before closing a session."), TSERV_DEFAULT_BLOCKSIZE("tserver.default.blocksize", "1M", PropertyType.BYTES, "Specifies a default blocksize for the tserver caches"), - TSERV_CACHE_POLICY("tserver.cache.policy", "LRU", PropertyType.STRING, "Specifies the eviction policy of the file data caches (LRU or TinyLFU)."), + TSERV_CACHE_MANAGER_IMPL("tserver.cache.manager.class", "org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheManager", PropertyType.STRING, + "Specifies the class name of the block cache factory implementation. Alternative implementation is " + + "org.apache.accumulo.core.file.blockfile.cache.tinylfu.TinyLfuBlockCacheManager"), TSERV_DATACACHE_SIZE("tserver.cache.data.size", "10%", PropertyType.MEMORY, "Specifies the size of the cache for file data blocks."), TSERV_INDEXCACHE_SIZE("tserver.cache.index.size", "25%", PropertyType.MEMORY, "Specifies the size of the cache for file indices."), TSERV_SUMMARYCACHE_SIZE("tserver.cache.summary.size", "10%", PropertyType.MEMORY, "Specifies the size of the cache for summary data on each tablet server."), http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java index 82f8b1e..b27c918 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java @@ -21,6 +21,7 @@ package org.apache.accumulo.core.file.blockfile.cache; * Block cache interface. */ public interface BlockCache { + /** * Add block to cache. * @@ -53,6 +54,11 @@ public interface BlockCache { CacheEntry getBlock(String blockName); /** + * Get the maximum amount of on heap memory this cache will use. + */ + long getMaxHeapSize(); + + /** * Get the maximum size of this cache. * * @return max size in bytes http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfiguration.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfiguration.java new file mode 100644 index 0000000..efab628 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfiguration.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.blockfile.cache; + +import java.util.Map; +import java.util.Optional; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; + +public class BlockCacheConfiguration { + + public static final String CACHE_PROPERTY_BASE = Property.GENERAL_ARBITRARY_PROP_PREFIX + "cache.block."; + + /** Maximum allowable size of cache (block put if size > max, evict) */ + private final long maxSize; + + /** Approximate block size */ + private final long blockSize; + + private final Map<String,String> genProps; + + private final String prefix; + + private final String defaultPrefix; + + public BlockCacheConfiguration(AccumuloConfiguration conf, CacheType type, String implName) { + defaultPrefix = getDefaultPrefix(implName); + prefix = getPrefix(type, implName); + genProps = conf.getAllPropertiesWithPrefix(Property.GENERAL_ARBITRARY_PROP_PREFIX); + + switch (type) { + case INDEX: + this.maxSize = conf.getAsBytes(Property.TSERV_INDEXCACHE_SIZE); + break; + case DATA: + this.maxSize = conf.getAsBytes(Property.TSERV_DATACACHE_SIZE); + break; + case SUMMARY: + this.maxSize = conf.getAsBytes(Property.TSERV_SUMMARYCACHE_SIZE); + break; + default: + throw new IllegalArgumentException("Unknown block cache type"); + } + this.blockSize = conf.getAsBytes(Property.TSERV_DEFAULT_BLOCKSIZE); + } + + public long getMaxSize() { + return this.maxSize; + } + + public long getBlockSize() { + return this.blockSize; + } + + protected Optional<String> get(String suffix) { + String val = genProps.get(prefix + suffix); + if (val == null) { + val = genProps.get(defaultPrefix + suffix); + } + return Optional.ofNullable(val); + } + + public static String getDefaultPrefix(String implName) { + return CACHE_PROPERTY_BASE + implName + ".default."; + } + + public static String getPrefix(CacheType type, String implName) { + return CACHE_PROPERTY_BASE + implName + "." + type.name().toLowerCase() + "."; + } + + @Override + public String toString() { + return "maxSize: " + getMaxSize() + ", blockSize: " + getBlockSize(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheManager.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheManager.java new file mode 100644 index 0000000..be77ee2 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheManager.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.blockfile.cache; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class BlockCacheManager { + + private static final Logger LOG = LoggerFactory.getLogger(BlockCacheManager.class); + + private final Map<CacheType,BlockCache> caches = new HashMap<>(); + + /** + * Initialize the caches for each CacheType based on the configuration + * + * @param conf + * accumulo configuration + */ + public void start(AccumuloConfiguration conf) { + for (CacheType type : CacheType.values()) { + BlockCache cache = this.createCache(conf, type); + this.caches.put(type, cache); + } + } + + /** + * Stop caches and release resources + */ + public void stop() { + this.caches.clear(); + } + + /** + * Get the block cache of the given type + * + * @param type + * block cache type + * @return BlockCache or null if not enabled + */ + public BlockCache getBlockCache(CacheType type) { + return caches.get(type); + } + + /** + * Create a block cache using the supplied configuration + * + * @param conf + * cache configuration + * @return configured block cache + */ + protected abstract BlockCache createCache(AccumuloConfiguration conf, CacheType type); + + /** + * Get the BlockCacheFactory specified by the property 'tserver.cache.factory.class' using the AccumuloVFSClassLoader + * + * @param conf + * accumulo configuration + * @return block cache manager instance + * @throws Exception + * error loading block cache manager implementation class + */ + public static synchronized BlockCacheManager getInstance(AccumuloConfiguration conf) throws Exception { + String impl = conf.get(Property.TSERV_CACHE_MANAGER_IMPL); + Class<? extends BlockCacheManager> clazz = AccumuloVFSClassLoader.loadClass(impl, BlockCacheManager.class); + LOG.info("Created new block cache manager of type: {}", clazz.getSimpleName()); + return (BlockCacheManager) clazz.newInstance(); + } + + /** + * Get the BlockCacheFactory specified by the property 'tserver.cache.factory.class' + * + * @param conf + * accumulo configuration + * @return block cache manager instance + * @throws Exception + * error loading block cache manager implementation class + */ + public static synchronized BlockCacheManager getClientInstance(AccumuloConfiguration conf) throws Exception { + String impl = conf.get(Property.TSERV_CACHE_MANAGER_IMPL); + Class<? extends BlockCacheManager> clazz = Class.forName(impl).asSubclass(BlockCacheManager.class); + LOG.info("Created new block cache factory of type: {}", clazz.getSimpleName()); + return (BlockCacheManager) clazz.newInstance(); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheType.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheType.java new file mode 100644 index 0000000..9e26c05 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheType.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.blockfile.cache; + +public enum CacheType { + + DATA, INDEX, SUMMARY; + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java index c67b4c7..44cea6b 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java @@ -31,7 +31,7 @@ public class CachedBlock implements HeapSize, Comparable<CachedBlock>, CacheEntr public final static long PER_BLOCK_OVERHEAD = ClassSize.align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + (2 * SizeConstants.SIZEOF_LONG) + ClassSize.STRING + ClassSize.BYTE_BUFFER); - static enum BlockPriority { + public static enum BlockPriority { /** * Accessed a single time (used for scan-resistance) */ http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java deleted file mode 100644 index cbdaca5..0000000 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java +++ /dev/null @@ -1,703 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.file.blockfile.cache; - -import java.lang.ref.WeakReference; -import java.util.Objects; -import java.util.PriorityQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.accumulo.core.util.NamingThreadFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A block cache implementation that is memory-aware using {@link HeapSize}, memory-bound using an LRU eviction algorithm, and concurrent: backed by a - * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving constant-time {@link #cacheBlock} and {@link #getBlock} operations. - * - * <p> - * Contains three levels of block priority to allow for scan-resistance and in-memory families. A block is added with an inMemory flag if necessary, otherwise a - * block becomes a single access priority. Once a blocked is accessed again, it changes to multiple access. This is used to prevent scans from thrashing the - * cache, adding a least-frequently-used element to the eviction algorithm. - * - * <p> - * Each priority is given its own chunk of the total cache to ensure fairness during eviction. Each priority will retain close to its maximum size, however, if - * any priority is not using its entire chunk the others are able to grow beyond their chunk size. - * - * <p> - * Instantiated at a minimum with the total size and average block size. All sizes are in bytes. The block size is not especially important as this cache is - * fully dynamic in its sizing of blocks. It is only used for pre-allocating data structures and in initial heap estimation of the map. - * - * <p> - * The detailed constructor defines the sizes for the three priorities (they should total to the maximum size defined). It also sets the levels that trigger and - * control the eviction thread. - * - * <p> - * The acceptable size is the cache size level which triggers the eviction process to start. It evicts enough blocks to get the size below the minimum size - * specified. - * - * <p> - * Eviction happens in a separate thread and involves a single full-scan of the map. It determines how many bytes must be freed to reach the minimum size, and - * then while scanning determines the fewest least-recently-used blocks necessary from each of the three priorities (would be 3 times bytes to free). It then - * uses the priority chunk sizes to evict fairly according to the relative sizes and usage. - */ -public class LruBlockCache implements BlockCache, HeapSize { - - private static final Logger log = LoggerFactory.getLogger(LruBlockCache.class); - - /** Default Configuration Parameters */ - - /** Backing Concurrent Map Configuration */ - static final float DEFAULT_LOAD_FACTOR = 0.75f; - static final int DEFAULT_CONCURRENCY_LEVEL = 16; - - /** Eviction thresholds */ - static final float DEFAULT_MIN_FACTOR = 0.75f; - static final float DEFAULT_ACCEPTABLE_FACTOR = 0.85f; - - /** Priority buckets */ - static final float DEFAULT_SINGLE_FACTOR = 0.25f; - static final float DEFAULT_MULTI_FACTOR = 0.50f; - static final float DEFAULT_MEMORY_FACTOR = 0.25f; - - /** Statistics thread */ - static final int statThreadPeriod = 60; - - /** Concurrent map (the cache) */ - private final ConcurrentHashMap<String,CachedBlock> map; - - /** Eviction lock (locked when eviction in process) */ - private final ReentrantLock evictionLock = new ReentrantLock(true); - - /** Volatile boolean to track if we are in an eviction process or not */ - private volatile boolean evictionInProgress = false; - - /** Eviction thread */ - private final EvictionThread evictionThread; - - /** Statistics thread schedule pool (for heavy debugging, could remove) */ - private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1, new NamingThreadFactory("LRUBlockCacheStats")); - - /** Current size of cache */ - private final AtomicLong size; - - /** Current number of cached elements */ - private final AtomicLong elements; - - /** Cache access count (sequential ID) */ - private final AtomicLong count; - - /** Cache statistics */ - private final CacheStats stats; - - /** Maximum allowable size of cache (block put if size > max, evict) */ - private long maxSize; - - /** Approximate block size */ - private long blockSize; - - /** Acceptable size of cache (no evictions if size < acceptable) */ - private float acceptableFactor; - - /** Minimum threshold of cache (when evicting, evict until size < min) */ - private float minFactor; - - /** Single access bucket size */ - private float singleFactor; - - /** Multiple access bucket size */ - private float multiFactor; - - /** In-memory bucket size */ - private float memoryFactor; - - /** Overhead of the structure itself */ - private long overhead; - - /** - * Default constructor. Specify maximum size and expected average block size (approximation is fine). - * - * <p> - * All other factors will be calculated based on defaults specified in this class. - * - * @param maxSize - * maximum size of cache, in bytes - * @param blockSize - * approximate size of each block, in bytes - */ - public LruBlockCache(long maxSize, long blockSize) { - this(maxSize, blockSize, true); - } - - /** - * Constructor used for testing. Allows disabling of the eviction thread. - */ - public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) { - this(maxSize, blockSize, evictionThread, (int) Math.ceil(1.2 * maxSize / blockSize), DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL, DEFAULT_MIN_FACTOR, - DEFAULT_ACCEPTABLE_FACTOR, DEFAULT_SINGLE_FACTOR, DEFAULT_MULTI_FACTOR, DEFAULT_MEMORY_FACTOR); - } - - /** - * Configurable constructor. Use this constructor if not using defaults. - * - * @param maxSize - * maximum size of this cache, in bytes - * @param blockSize - * expected average size of blocks, in bytes - * @param evictionThread - * whether to run evictions in a bg thread or not - * @param mapInitialSize - * initial size of backing ConcurrentHashMap - * @param mapLoadFactor - * initial load factor of backing ConcurrentHashMap - * @param mapConcurrencyLevel - * initial concurrency factor for backing CHM - * @param minFactor - * percentage of total size that eviction will evict until - * @param acceptableFactor - * percentage of total size that triggers eviction - * @param singleFactor - * percentage of total size for single-access blocks - * @param multiFactor - * percentage of total size for multiple-access blocks - * @param memoryFactor - * percentage of total size for in-memory blocks - */ - public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel, float minFactor, - float acceptableFactor, float singleFactor, float multiFactor, float memoryFactor) { - if (singleFactor + multiFactor + memoryFactor != 1) { - throw new IllegalArgumentException("Single, multi, and memory factors " + " should total 1.0"); - } - if (minFactor >= acceptableFactor) { - throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor"); - } - if (minFactor >= 1.0f || acceptableFactor >= 1.0f) { - throw new IllegalArgumentException("all factors must be < 1"); - } - this.maxSize = maxSize; - this.blockSize = blockSize; - map = new ConcurrentHashMap<>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel); - this.minFactor = minFactor; - this.acceptableFactor = acceptableFactor; - this.singleFactor = singleFactor; - this.multiFactor = multiFactor; - this.memoryFactor = memoryFactor; - this.stats = new CacheStats(); - this.count = new AtomicLong(0); - this.elements = new AtomicLong(0); - this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel); - this.size = new AtomicLong(this.overhead); - - if (evictionThread) { - this.evictionThread = new EvictionThread(this); - this.evictionThread.start(); - while (!this.evictionThread.running()) { - try { - Thread.sleep(10); - } catch (InterruptedException ex) { - throw new RuntimeException(ex); - } - } - } else { - this.evictionThread = null; - } - this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS); - } - - public void setMaxSize(long maxSize) { - this.maxSize = maxSize; - if (this.size.get() > acceptableSize() && !evictionInProgress) { - runEviction(); - } - } - - // BlockCache implementation - - /** - * Cache the block with the specified name and buffer. - * <p> - * It is assumed this will NEVER be called on an already cached block. If that is done, it is assumed that you are reinserting the same exact block due to a - * race condition and will update the buffer but not modify the size of the cache. - * - * @param blockName - * block name - * @param buf - * block buffer - * @param inMemory - * if block is in-memory - */ - @Override - public CacheEntry cacheBlock(String blockName, byte buf[], boolean inMemory) { - CachedBlock cb = map.get(blockName); - if (cb != null) { - stats.duplicateReads(); - cb.access(count.incrementAndGet()); - } else { - cb = new CachedBlock(blockName, buf, count.incrementAndGet(), inMemory); - CachedBlock currCb = map.putIfAbsent(blockName, cb); - if (currCb != null) { - stats.duplicateReads(); - cb = currCb; - cb.access(count.incrementAndGet()); - } else { - // Actually added block to cache - long newSize = size.addAndGet(cb.heapSize()); - elements.incrementAndGet(); - if (newSize > acceptableSize() && !evictionInProgress) { - runEviction(); - } - } - } - - return cb; - } - - /** - * Cache the block with the specified name and buffer. - * <p> - * It is assumed this will NEVER be called on an already cached block. If that is done, it is assumed that you are reinserting the same exact block due to a - * race condition and will update the buffer but not modify the size of the cache. - * - * @param blockName - * block name - * @param buf - * block buffer - */ - @Override - public CacheEntry cacheBlock(String blockName, byte buf[]) { - return cacheBlock(blockName, buf, false); - } - - /** - * Get the buffer of the block with the specified name. - * - * @param blockName - * block name - * @return buffer of specified block name, or null if not in cache - */ - @Override - public CachedBlock getBlock(String blockName) { - CachedBlock cb = map.get(blockName); - if (cb == null) { - stats.miss(); - return null; - } - stats.hit(); - cb.access(count.incrementAndGet()); - return cb; - } - - protected long evictBlock(CachedBlock block) { - map.remove(block.getName()); - size.addAndGet(-1 * block.heapSize()); - elements.decrementAndGet(); - stats.evicted(); - return block.heapSize(); - } - - /** - * Multi-threaded call to run the eviction process. - */ - private void runEviction() { - if (evictionThread == null) { - evict(); - } else { - evictionThread.evict(); - } - } - - /** - * Eviction method. - */ - void evict() { - - // Ensure only one eviction at a time - if (!evictionLock.tryLock()) - return; - - try { - evictionInProgress = true; - - long bytesToFree = size.get() - minSize(); - - log.trace("Block cache LRU eviction started. Attempting to free {} bytes", bytesToFree); - - if (bytesToFree <= 0) - return; - - // Instantiate priority buckets - BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize, singleSize()); - BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize, multiSize()); - BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize, memorySize()); - - // Scan entire map putting into appropriate buckets - for (CachedBlock cachedBlock : map.values()) { - switch (cachedBlock.getPriority()) { - case SINGLE: { - bucketSingle.add(cachedBlock); - break; - } - case MULTI: { - bucketMulti.add(cachedBlock); - break; - } - case MEMORY: { - bucketMemory.add(cachedBlock); - break; - } - } - } - - PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<>(3); - - bucketQueue.add(bucketSingle); - bucketQueue.add(bucketMulti); - bucketQueue.add(bucketMemory); - - int remainingBuckets = 3; - long bytesFreed = 0; - - BlockBucket bucket; - while ((bucket = bucketQueue.poll()) != null) { - long overflow = bucket.overflow(); - if (overflow > 0) { - long bucketBytesToFree = Math.min(overflow, (long) Math.ceil((bytesToFree - bytesFreed) / (double) remainingBuckets)); - bytesFreed += bucket.free(bucketBytesToFree); - } - remainingBuckets--; - } - - float singleMB = ((float) bucketSingle.totalSize()) / ((float) (1024 * 1024)); - float multiMB = ((float) bucketMulti.totalSize()) / ((float) (1024 * 1024)); - float memoryMB = ((float) bucketMemory.totalSize()) / ((float) (1024 * 1024)); - - log.trace("Block cache LRU eviction completed. Freed {} bytes. Priority Sizes: Single={}MB ({}), Multi={}MB ({}), Memory={}MB ({})", bytesFreed, - singleMB, bucketSingle.totalSize(), multiMB, bucketMulti.totalSize(), memoryMB, bucketMemory.totalSize()); - - } finally { - stats.evict(); - evictionInProgress = false; - evictionLock.unlock(); - } - } - - /** - * Used to group blocks into priority buckets. There will be a BlockBucket for each priority (single, multi, memory). Once bucketed, the eviction algorithm - * takes the appropriate number of elements out of each according to configuration parameters and their relatives sizes. - */ - private class BlockBucket implements Comparable<BlockBucket> { - - private CachedBlockQueue queue; - private long totalSize = 0; - private long bucketSize; - - public BlockBucket(long bytesToFree, long blockSize, long bucketSize) { - this.bucketSize = bucketSize; - queue = new CachedBlockQueue(bytesToFree, blockSize); - totalSize = 0; - } - - public void add(CachedBlock block) { - totalSize += block.heapSize(); - queue.add(block); - } - - public long free(long toFree) { - CachedBlock[] blocks = queue.get(); - long freedBytes = 0; - for (int i = 0; i < blocks.length; i++) { - freedBytes += evictBlock(blocks[i]); - if (freedBytes >= toFree) { - return freedBytes; - } - } - return freedBytes; - } - - public long overflow() { - return totalSize - bucketSize; - } - - public long totalSize() { - return totalSize; - } - - @Override - public int compareTo(BlockBucket that) { - if (this.overflow() == that.overflow()) - return 0; - return this.overflow() > that.overflow() ? 1 : -1; - } - - @Override - public int hashCode() { - return Objects.hashCode(overflow()); - } - - @Override - public boolean equals(Object that) { - if (that instanceof BlockBucket) - return compareTo((BlockBucket) that) == 0; - return false; - } - } - - @Override - public long getMaxSize() { - return this.maxSize; - } - - /** - * Get the current size of this cache. - * - * @return current size in bytes - */ - public long getCurrentSize() { - return this.size.get(); - } - - /** - * Get the current size of this cache. - * - * @return current size in bytes - */ - public long getFreeSize() { - return getMaxSize() - getCurrentSize(); - } - - /** - * Get the size of this cache (number of cached blocks) - * - * @return number of cached blocks - */ - public long size() { - return this.elements.get(); - } - - /** - * Get the number of eviction runs that have occurred - */ - public long getEvictionCount() { - return this.stats.getEvictionCount(); - } - - /** - * Get the number of blocks that have been evicted during the lifetime of this cache. - */ - public long getEvictedCount() { - return this.stats.getEvictedCount(); - } - - /** - * Eviction thread. Sits in waiting state until an eviction is triggered when the cache size grows above the acceptable level. - * - * <p> - * Thread is triggered into action by {@link LruBlockCache#runEviction()} - */ - private static class EvictionThread extends Thread { - private WeakReference<LruBlockCache> cache; - private boolean running = false; - - public EvictionThread(LruBlockCache cache) { - super("LruBlockCache.EvictionThread"); - setDaemon(true); - this.cache = new WeakReference<>(cache); - } - - public synchronized boolean running() { - return running; - } - - @Override - public void run() { - while (true) { - synchronized (this) { - running = true; - try { - this.wait(); - } catch (InterruptedException e) {} - } - LruBlockCache cache = this.cache.get(); - if (cache == null) - break; - cache.evict(); - } - } - - public void evict() { - synchronized (this) { - this.notify(); - } - } - } - - /* - * Statistics thread. Periodically prints the cache statistics to the log. - */ - private static class StatisticsThread extends Thread { - LruBlockCache lru; - - public StatisticsThread(LruBlockCache lru) { - super("LruBlockCache.StatisticsThread"); - setDaemon(true); - this.lru = lru; - } - - @Override - public void run() { - lru.logStats(); - } - } - - public void logStats() { - // Log size - long totalSize = heapSize(); - long freeSize = maxSize - totalSize; - float sizeMB = ((float) totalSize) / ((float) (1024 * 1024)); - float freeMB = ((float) freeSize) / ((float) (1024 * 1024)); - float maxMB = ((float) maxSize) / ((float) (1024 * 1024)); - log.debug("Cache Stats: Sizes: Total={}MB ({}), Free={}MB ({}), Max={}MB ({}), Counts: Blocks={}, Access={}, Hit={}, Miss={}, Evictions={}, Evicted={}," - + "Ratios: Hit Ratio={}%, Miss Ratio={}%, Evicted/Run={}, Duplicate Reads={}", sizeMB, totalSize, freeMB, freeSize, maxMB, maxSize, size(), - stats.requestCount(), stats.hitCount(), stats.getMissCount(), stats.getEvictionCount(), stats.getEvictedCount(), stats.getHitRatio() * 100, - stats.getMissRatio() * 100, stats.evictedPerEviction(), stats.getDuplicateReads()); - } - - /** - * Get counter statistics for this cache. - * - * <p> - * Includes: total accesses, hits, misses, evicted blocks, and runs of the eviction processes. - */ - public CacheStats getStats() { - return this.stats; - } - - public static class CacheStats implements BlockCache.Stats { - private final AtomicLong accessCount = new AtomicLong(0); - private final AtomicLong hitCount = new AtomicLong(0); - private final AtomicLong missCount = new AtomicLong(0); - private final AtomicLong evictionCount = new AtomicLong(0); - private final AtomicLong evictedCount = new AtomicLong(0); - private final AtomicLong duplicateReads = new AtomicLong(0); - - public void miss() { - missCount.incrementAndGet(); - accessCount.incrementAndGet(); - } - - public void hit() { - hitCount.incrementAndGet(); - accessCount.incrementAndGet(); - } - - public void evict() { - evictionCount.incrementAndGet(); - } - - public void duplicateReads() { - duplicateReads.incrementAndGet(); - } - - public void evicted() { - evictedCount.incrementAndGet(); - } - - @Override - public long requestCount() { - return accessCount.get(); - } - - public long getMissCount() { - return missCount.get(); - } - - @Override - public long hitCount() { - return hitCount.get(); - } - - public long getEvictionCount() { - return evictionCount.get(); - } - - public long getDuplicateReads() { - return duplicateReads.get(); - } - - public long getEvictedCount() { - return evictedCount.get(); - } - - public double getHitRatio() { - return ((float) hitCount() / (float) requestCount()); - } - - public double getMissRatio() { - return ((float) getMissCount() / (float) requestCount()); - } - - public double evictedPerEviction() { - return (float) getEvictedCount() / (float) getEvictionCount(); - } - } - - public final static long CACHE_FIXED_OVERHEAD = ClassSize.align((3 * SizeConstants.SIZEOF_LONG) + (8 * ClassSize.REFERENCE) - + (5 * SizeConstants.SIZEOF_FLOAT) + SizeConstants.SIZEOF_BOOLEAN + ClassSize.OBJECT); - - // HeapSize implementation - @Override - public long heapSize() { - return getCurrentSize(); - } - - public static long calculateOverhead(long maxSize, long blockSize, int concurrency) { - return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP + ((int) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY) - + (concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT); - } - - // Simple calculators of sizes given factors and maxSize - - private long acceptableSize() { - return (long) Math.floor(this.maxSize * this.acceptableFactor); - } - - private long minSize() { - return (long) Math.floor(this.maxSize * this.minFactor); - } - - private long singleSize() { - return (long) Math.floor(this.maxSize * this.singleFactor * this.minFactor); - } - - private long multiSize() { - return (long) Math.floor(this.maxSize * this.multiFactor * this.minFactor); - } - - private long memorySize() { - return (long) Math.floor(this.maxSize * this.memoryFactor * this.minFactor); - } - - public void shutdown() { - this.scheduleThreadPool.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java deleted file mode 100644 index bab52af..0000000 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.file.blockfile.cache; - -import static java.util.Objects.requireNonNull; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.Policy; -import com.github.benmanes.caffeine.cache.stats.CacheStats; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -/** - * A block cache that is memory bounded using the W-TinyLFU eviction algorithm. This implementation delegates to a Caffeine cache to provide concurrent O(1) - * read and write operations. - * <ul> - * <li>W-TinyLFU: http://arxiv.org/pdf/1512.00727.pdf</li> - * <li>Caffeine: https://github.com/ben-manes/caffeine</li> - * <li>Cache design: http://highscalability.com/blog/2016/1/25/design-of-a-modern-cache.html</li> - * </ul> - */ -public final class TinyLfuBlockCache implements BlockCache { - private static final Logger log = LoggerFactory.getLogger(TinyLfuBlockCache.class); - private static final int STATS_PERIOD_SEC = 60; - - private final Cache<String,Block> cache; - private final Policy.Eviction<String,Block> policy; - private final ScheduledExecutorService statsExecutor; - - public TinyLfuBlockCache(long maxSize, long blockSize) { - cache = Caffeine.newBuilder().initialCapacity((int) Math.ceil(1.2 * maxSize / blockSize)).weigher((String blockName, Block block) -> { - int keyWeight = ClassSize.align(blockName.length()) + ClassSize.STRING; - return keyWeight + block.weight(); - }).maximumWeight(maxSize).recordStats().build(); - policy = cache.policy().eviction().get(); - - statsExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("TinyLfuBlockCacheStatsExecutor").setDaemon(true) - .build()); - statsExecutor.scheduleAtFixedRate(this::logStats, STATS_PERIOD_SEC, STATS_PERIOD_SEC, TimeUnit.SECONDS); - } - - @Override - public long getMaxSize() { - return policy.getMaximum(); - } - - @Override - public CacheEntry getBlock(String blockName) { - return cache.getIfPresent(blockName); - } - - @Override - public CacheEntry cacheBlock(String blockName, byte[] buffer) { - return cache.asMap().compute(blockName, (key, block) -> { - if (block == null) { - return new Block(buffer); - } - block.buffer = buffer; - return block; - }); - } - - @Override - public CacheEntry cacheBlock(String blockName, byte[] buffer, /* ignored */boolean inMemory) { - return cacheBlock(blockName, buffer); - } - - @Override - public BlockCache.Stats getStats() { - CacheStats stats = cache.stats(); - return new BlockCache.Stats() { - @Override - public long hitCount() { - return stats.hitCount(); - } - - @Override - public long requestCount() { - return stats.requestCount(); - } - }; - } - - private void logStats() { - double maxMB = ((double) policy.getMaximum()) / ((double) (1024 * 1024)); - double sizeMB = ((double) policy.weightedSize().getAsLong()) / ((double) (1024 * 1024)); - double freeMB = maxMB - sizeMB; - log.debug("Cache Size={}MB, Free={}MB, Max={}MB, Blocks={}", sizeMB, freeMB, maxMB, cache.estimatedSize()); - log.debug(cache.stats().toString()); - } - - private static final class Block implements CacheEntry { - private volatile byte[] buffer; - private volatile Object index; - - Block(byte[] buffer) { - this.buffer = requireNonNull(buffer); - } - - @Override - public byte[] getBuffer() { - return buffer; - } - - @Override - public Object getIndex() { - return index; - } - - @Override - public void setIndex(Object index) { - this.index = index; - } - - int weight() { - return ClassSize.align(buffer.length) + SizeConstants.SIZEOF_LONG + ClassSize.REFERENCE + ClassSize.OBJECT + ClassSize.ARRAY; - } - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java new file mode 100644 index 0000000..fa8d824 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java @@ -0,0 +1,625 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.blockfile.cache.lru; + +import java.lang.ref.WeakReference; +import java.util.Objects; +import java.util.PriorityQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.accumulo.core.file.blockfile.cache.BlockCache; +import org.apache.accumulo.core.file.blockfile.cache.CacheEntry; +import org.apache.accumulo.core.file.blockfile.cache.CachedBlock; +import org.apache.accumulo.core.file.blockfile.cache.CachedBlockQueue; +import org.apache.accumulo.core.file.blockfile.cache.ClassSize; +import org.apache.accumulo.core.file.blockfile.cache.HeapSize; +import org.apache.accumulo.core.file.blockfile.cache.SizeConstants; +import org.apache.accumulo.core.util.NamingThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A block cache implementation that is memory-aware using {@link HeapSize}, memory-bound using an LRU eviction algorithm, and concurrent: backed by a + * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving constant-time {@link #cacheBlock} and {@link #getBlock} operations. + * + * <p> + * Contains three levels of block priority to allow for scan-resistance and in-memory families. A block is added with an inMemory flag if necessary, otherwise a + * block becomes a single access priority. Once a blocked is accessed again, it changes to multiple access. This is used to prevent scans from thrashing the + * cache, adding a least-frequently-used element to the eviction algorithm. + * + * <p> + * Each priority is given its own chunk of the total cache to ensure fairness during eviction. Each priority will retain close to its maximum size, however, if + * any priority is not using its entire chunk the others are able to grow beyond their chunk size. + * + * <p> + * Instantiated at a minimum with the total size and average block size. All sizes are in bytes. The block size is not especially important as this cache is + * fully dynamic in its sizing of blocks. It is only used for pre-allocating data structures and in initial heap estimation of the map. + * + * <p> + * The detailed constructor defines the sizes for the three priorities (they should total to the maximum size defined). It also sets the levels that trigger and + * control the eviction thread. + * + * <p> + * The acceptable size is the cache size level which triggers the eviction process to start. It evicts enough blocks to get the size below the minimum size + * specified. + * + * <p> + * Eviction happens in a separate thread and involves a single full-scan of the map. It determines how many bytes must be freed to reach the minimum size, and + * then while scanning determines the fewest least-recently-used blocks necessary from each of the three priorities (would be 3 times bytes to free). It then + * uses the priority chunk sizes to evict fairly according to the relative sizes and usage. + */ +public class LruBlockCache implements BlockCache, HeapSize { + + private static final Logger log = LoggerFactory.getLogger(LruBlockCache.class); + + /** Statistics thread */ + static final int statThreadPeriod = 60; + + /** Concurrent map (the cache) */ + private final ConcurrentHashMap<String,CachedBlock> map; + + /** Eviction lock (locked when eviction in process) */ + private final ReentrantLock evictionLock = new ReentrantLock(true); + + /** Volatile boolean to track if we are in an eviction process or not */ + private volatile boolean evictionInProgress = false; + + /** Eviction thread */ + private final EvictionThread evictionThread; + + /** Statistics thread schedule pool (for heavy debugging, could remove) */ + private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1, new NamingThreadFactory("LRUBlockCacheStats")); + + /** Current size of cache */ + private final AtomicLong size; + + /** Current number of cached elements */ + private final AtomicLong elements; + + /** Cache access count (sequential ID) */ + private final AtomicLong count; + + /** Cache statistics */ + private final CacheStats stats; + + /** Overhead of the structure itself */ + private final long overhead; + + private final LruBlockCacheConfiguration conf; + + /** + * Default constructor. Specify maximum size and expected average block size (approximation is fine). + * + * <p> + * All other factors will be calculated based on defaults specified in this class. + * + * @param conf + * block cache configuration + */ + public LruBlockCache(final LruBlockCacheConfiguration conf) { + this.conf = conf; + + int mapInitialSize = (int) Math.ceil(1.2 * conf.getMaxSize() / conf.getBlockSize()); + + map = new ConcurrentHashMap<>(mapInitialSize, conf.getMapLoadFactor(), conf.getMapConcurrencyLevel()); + this.stats = new CacheStats(); + this.count = new AtomicLong(0); + this.elements = new AtomicLong(0); + this.overhead = calculateOverhead(conf.getMaxSize(), conf.getBlockSize(), conf.getMapConcurrencyLevel()); + this.size = new AtomicLong(this.overhead); + + if (conf.isUseEvictionThread()) { + this.evictionThread = new EvictionThread(this); + this.evictionThread.start(); + while (!this.evictionThread.running()) { + try { + Thread.sleep(10); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + } else { + this.evictionThread = null; + } + this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS); + } + + public long getOverhead() { + return overhead; + } + + // BlockCache implementation + + /** + * Cache the block with the specified name and buffer. + * <p> + * It is assumed this will NEVER be called on an already cached block. If that is done, it is assumed that you are reinserting the same exact block due to a + * race condition and will update the buffer but not modify the size of the cache. + * + * @param blockName + * block name + * @param buf + * block buffer + * @param inMemory + * if block is in-memory + */ + @Override + public CacheEntry cacheBlock(String blockName, byte buf[], boolean inMemory) { + CachedBlock cb = map.get(blockName); + if (cb != null) { + stats.duplicateReads(); + cb.access(count.incrementAndGet()); + } else { + cb = new CachedBlock(blockName, buf, count.incrementAndGet(), inMemory); + CachedBlock currCb = map.putIfAbsent(blockName, cb); + if (currCb != null) { + stats.duplicateReads(); + cb = currCb; + cb.access(count.incrementAndGet()); + } else { + // Actually added block to cache + long newSize = size.addAndGet(cb.heapSize()); + elements.incrementAndGet(); + if (newSize > acceptableSize() && !evictionInProgress) { + runEviction(); + } + } + } + + return cb; + } + + /** + * Cache the block with the specified name and buffer. + * <p> + * It is assumed this will NEVER be called on an already cached block. If that is done, it is assumed that you are reinserting the same exact block due to a + * race condition and will update the buffer but not modify the size of the cache. + * + * @param blockName + * block name + * @param buf + * block buffer + */ + @Override + public CacheEntry cacheBlock(String blockName, byte buf[]) { + return cacheBlock(blockName, buf, false); + } + + /** + * Get the buffer of the block with the specified name. + * + * @param blockName + * block name + * @return buffer of specified block name, or null if not in cache + */ + @Override + public CachedBlock getBlock(String blockName) { + CachedBlock cb = map.get(blockName); + if (cb == null) { + stats.miss(); + return null; + } + stats.hit(); + cb.access(count.incrementAndGet()); + return cb; + } + + protected long evictBlock(CachedBlock block) { + map.remove(block.getName()); + size.addAndGet(-1 * block.heapSize()); + elements.decrementAndGet(); + stats.evicted(); + return block.heapSize(); + } + + /** + * Multi-threaded call to run the eviction process. + */ + private void runEviction() { + if (evictionThread == null) { + evict(); + } else { + evictionThread.evict(); + } + } + + /** + * Eviction method. + */ + void evict() { + + // Ensure only one eviction at a time + if (!evictionLock.tryLock()) + return; + + try { + evictionInProgress = true; + + long bytesToFree = size.get() - minSize(); + + log.trace("Block cache LRU eviction started. Attempting to free {} bytes", bytesToFree); + + if (bytesToFree <= 0) + return; + + // Instantiate priority buckets + BlockBucket bucketSingle = new BlockBucket(bytesToFree, conf.getBlockSize(), singleSize()); + BlockBucket bucketMulti = new BlockBucket(bytesToFree, conf.getBlockSize(), multiSize()); + BlockBucket bucketMemory = new BlockBucket(bytesToFree, conf.getBlockSize(), memorySize()); + + // Scan entire map putting into appropriate buckets + for (CachedBlock cachedBlock : map.values()) { + switch (cachedBlock.getPriority()) { + case SINGLE: { + bucketSingle.add(cachedBlock); + break; + } + case MULTI: { + bucketMulti.add(cachedBlock); + break; + } + case MEMORY: { + bucketMemory.add(cachedBlock); + break; + } + } + } + + PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<>(3); + + bucketQueue.add(bucketSingle); + bucketQueue.add(bucketMulti); + bucketQueue.add(bucketMemory); + + int remainingBuckets = 3; + long bytesFreed = 0; + + BlockBucket bucket; + while ((bucket = bucketQueue.poll()) != null) { + long overflow = bucket.overflow(); + if (overflow > 0) { + long bucketBytesToFree = Math.min(overflow, (long) Math.ceil((bytesToFree - bytesFreed) / (double) remainingBuckets)); + bytesFreed += bucket.free(bucketBytesToFree); + } + remainingBuckets--; + } + + float singleMB = ((float) bucketSingle.totalSize()) / ((float) (1024 * 1024)); + float multiMB = ((float) bucketMulti.totalSize()) / ((float) (1024 * 1024)); + float memoryMB = ((float) bucketMemory.totalSize()) / ((float) (1024 * 1024)); + + log.trace("Block cache LRU eviction completed. Freed {} bytes. Priority Sizes: Single={}MB ({}), Multi={}MB ({}), Memory={}MB ({})", bytesFreed, + singleMB, bucketSingle.totalSize(), multiMB, bucketMulti.totalSize(), memoryMB, bucketMemory.totalSize()); + + } finally { + stats.evict(); + evictionInProgress = false; + evictionLock.unlock(); + } + } + + /** + * Used to group blocks into priority buckets. There will be a BlockBucket for each priority (single, multi, memory). Once bucketed, the eviction algorithm + * takes the appropriate number of elements out of each according to configuration parameters and their relatives sizes. + */ + private class BlockBucket implements Comparable<BlockBucket> { + + private CachedBlockQueue queue; + private long totalSize = 0; + private long bucketSize; + + public BlockBucket(long bytesToFree, long blockSize, long bucketSize) { + this.bucketSize = bucketSize; + queue = new CachedBlockQueue(bytesToFree, blockSize); + totalSize = 0; + } + + public void add(CachedBlock block) { + totalSize += block.heapSize(); + queue.add(block); + } + + public long free(long toFree) { + CachedBlock[] blocks = queue.get(); + long freedBytes = 0; + for (int i = 0; i < blocks.length; i++) { + freedBytes += evictBlock(blocks[i]); + if (freedBytes >= toFree) { + return freedBytes; + } + } + return freedBytes; + } + + public long overflow() { + return totalSize - bucketSize; + } + + public long totalSize() { + return totalSize; + } + + @Override + public int compareTo(BlockBucket that) { + if (this.overflow() == that.overflow()) + return 0; + return this.overflow() > that.overflow() ? 1 : -1; + } + + @Override + public int hashCode() { + return Objects.hashCode(overflow()); + } + + @Override + public boolean equals(Object that) { + if (that instanceof BlockBucket) + return compareTo((BlockBucket) that) == 0; + return false; + } + } + + @Override + public long getMaxHeapSize() { + return getMaxSize(); + } + + @Override + public long getMaxSize() { + return this.conf.getMaxSize(); + } + + /** + * Get the current size of this cache. + * + * @return current size in bytes + */ + public long getCurrentSize() { + return this.size.get(); + } + + /** + * Get the current size of this cache. + * + * @return current size in bytes + */ + public long getFreeSize() { + return getMaxSize() - getCurrentSize(); + } + + /** + * Get the size of this cache (number of cached blocks) + * + * @return number of cached blocks + */ + public long size() { + return this.elements.get(); + } + + /** + * Get the number of eviction runs that have occurred + */ + public long getEvictionCount() { + return this.stats.getEvictionCount(); + } + + /** + * Get the number of blocks that have been evicted during the lifetime of this cache. + */ + public long getEvictedCount() { + return this.stats.getEvictedCount(); + } + + /** + * Eviction thread. Sits in waiting state until an eviction is triggered when the cache size grows above the acceptable level. + * + * <p> + * Thread is triggered into action by {@link LruBlockCache#runEviction()} + */ + private static class EvictionThread extends Thread { + private WeakReference<LruBlockCache> cache; + private boolean running = false; + + public EvictionThread(LruBlockCache cache) { + super("LruBlockCache.EvictionThread"); + setDaemon(true); + this.cache = new WeakReference<>(cache); + } + + public synchronized boolean running() { + return running; + } + + @Override + public void run() { + while (true) { + synchronized (this) { + running = true; + try { + this.wait(); + } catch (InterruptedException e) {} + } + LruBlockCache cache = this.cache.get(); + if (cache == null) + break; + cache.evict(); + } + } + + public void evict() { + synchronized (this) { + this.notify(); + } + } + } + + /* + * Statistics thread. Periodically prints the cache statistics to the log. + */ + private static class StatisticsThread extends Thread { + LruBlockCache lru; + + public StatisticsThread(LruBlockCache lru) { + super("LruBlockCache.StatisticsThread"); + setDaemon(true); + this.lru = lru; + } + + @Override + public void run() { + lru.logStats(); + } + } + + public void logStats() { + // Log size + long totalSize = heapSize(); + long freeSize = this.conf.getMaxSize() - totalSize; + float sizeMB = ((float) totalSize) / ((float) (1024 * 1024)); + float freeMB = ((float) freeSize) / ((float) (1024 * 1024)); + float maxMB = ((float) this.conf.getMaxSize()) / ((float) (1024 * 1024)); + log.debug("Cache Stats: Sizes: Total={}MB ({}), Free={}MB ({}), Max={}MB ({}), Counts: Blocks={}, Access={}, Hit={}, Miss={}, Evictions={}, Evicted={}," + + "Ratios: Hit Ratio={}%, Miss Ratio={}%, Evicted/Run={}, Duplicate Reads={}", sizeMB, totalSize, freeMB, freeSize, maxMB, this.conf.getMaxSize(), + size(), stats.requestCount(), stats.hitCount(), stats.getMissCount(), stats.getEvictionCount(), stats.getEvictedCount(), stats.getHitRatio() * 100, + stats.getMissRatio() * 100, stats.evictedPerEviction(), stats.getDuplicateReads()); + } + + /** + * Get counter statistics for this cache. + * + * <p> + * Includes: total accesses, hits, misses, evicted blocks, and runs of the eviction processes. + */ + public CacheStats getStats() { + return this.stats; + } + + public static class CacheStats implements BlockCache.Stats { + private final AtomicLong accessCount = new AtomicLong(0); + private final AtomicLong hitCount = new AtomicLong(0); + private final AtomicLong missCount = new AtomicLong(0); + private final AtomicLong evictionCount = new AtomicLong(0); + private final AtomicLong evictedCount = new AtomicLong(0); + private final AtomicLong duplicateReads = new AtomicLong(0); + + public void miss() { + missCount.incrementAndGet(); + accessCount.incrementAndGet(); + } + + public void hit() { + hitCount.incrementAndGet(); + accessCount.incrementAndGet(); + } + + public void evict() { + evictionCount.incrementAndGet(); + } + + public void duplicateReads() { + duplicateReads.incrementAndGet(); + } + + public void evicted() { + evictedCount.incrementAndGet(); + } + + @Override + public long requestCount() { + return accessCount.get(); + } + + public long getMissCount() { + return missCount.get(); + } + + @Override + public long hitCount() { + return hitCount.get(); + } + + public long getEvictionCount() { + return evictionCount.get(); + } + + public long getDuplicateReads() { + return duplicateReads.get(); + } + + public long getEvictedCount() { + return evictedCount.get(); + } + + public double getHitRatio() { + return ((float) hitCount() / (float) requestCount()); + } + + public double getMissRatio() { + return ((float) getMissCount() / (float) requestCount()); + } + + public double evictedPerEviction() { + return (float) getEvictedCount() / (float) getEvictionCount(); + } + } + + public final static long CACHE_FIXED_OVERHEAD = ClassSize.align((3 * SizeConstants.SIZEOF_LONG) + (8 * ClassSize.REFERENCE) + + (5 * SizeConstants.SIZEOF_FLOAT) + SizeConstants.SIZEOF_BOOLEAN + ClassSize.OBJECT); + + // HeapSize implementation + @Override + public long heapSize() { + return getCurrentSize(); + } + + public static long calculateOverhead(long maxSize, long blockSize, int concurrency) { + return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP + ((int) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY) + + (concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT); + } + + // Simple calculators of sizes given factors and maxSize + + private long acceptableSize() { + return (long) Math.floor(this.conf.getMaxSize() * this.conf.getAcceptableFactor()); + } + + private long minSize() { + return (long) Math.floor(this.conf.getMaxSize() * this.conf.getMinFactor()); + } + + private long singleSize() { + return (long) Math.floor(this.conf.getMaxSize() * this.conf.getSingleFactor() * this.conf.getMinFactor()); + } + + private long multiSize() { + return (long) Math.floor(this.conf.getMaxSize() * this.conf.getMultiFactor() * this.conf.getMinFactor()); + } + + private long memorySize() { + return (long) Math.floor(this.conf.getMaxSize() * this.conf.getMemoryFactor() * this.conf.getMinFactor()); + } + + public void shutdown() { + this.scheduleThreadPool.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheConfiguration.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheConfiguration.java new file mode 100644 index 0000000..49790cb --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheConfiguration.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.blockfile.cache.lru; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.file.blockfile.cache.BlockCacheConfiguration; +import org.apache.accumulo.core.file.blockfile.cache.CacheType; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; + +public final class LruBlockCacheConfiguration extends BlockCacheConfiguration { + + public static final String PROPERTY_PREFIX = "lru"; + + /** Default Configuration Parameters */ + + /** Backing Concurrent Map Configuration */ + public static final Float DEFAULT_LOAD_FACTOR = 0.75f; + public static final Integer DEFAULT_CONCURRENCY_LEVEL = 16; + + /** Eviction thresholds */ + public static final Float DEFAULT_MIN_FACTOR = 0.75f; + public static final Float DEFAULT_ACCEPTABLE_FACTOR = 0.85f; + + /** Priority buckets */ + public static final Float DEFAULT_SINGLE_FACTOR = 0.25f; + public static final Float DEFAULT_MULTI_FACTOR = 0.50f; + public static final Float DEFAULT_MEMORY_FACTOR = 0.25f; + + // property names + public static final String ACCEPTABLE_FACTOR_PROPERTY = "acceptable.factor"; + public static final String MIN_FACTOR_PROPERTY = "min.factor"; + public static final String SINGLE_FACTOR_PROPERTY = "single.factor"; + public static final String MULTI_FACTOR_PROPERTY = "multi.factor"; + public static final String MEMORY_FACTOR_PROPERTY = "memory.factor"; + public static final String MAP_LOAD_PROPERTY = "map.load"; + public static final String MAP_CONCURRENCY_PROPERTY = "map.concurrency"; + public static final String EVICTION_THREAD_PROPERTY = "eviction.thread"; + + /** Acceptable size of cache (no evictions if size < acceptable) */ + private final float acceptableFactor; + + /** Minimum threshold of cache (when evicting, evict until size < min) */ + private final float minFactor; + + /** Single access bucket size */ + private final float singleFactor; + + /** Multiple access bucket size */ + private final float multiFactor; + + /** In-memory bucket size */ + private final float memoryFactor; + + /** LruBlockCache cache = new LruBlockCache **/ + private final float mapLoadFactor; + + /** LruBlockCache cache = new LruBlockCache **/ + private final int mapConcurrencyLevel; + + private final boolean useEvictionThread; + + public LruBlockCacheConfiguration(AccumuloConfiguration conf, CacheType type) { + super(conf, type, PROPERTY_PREFIX); + + this.acceptableFactor = get(ACCEPTABLE_FACTOR_PROPERTY).map(Float::valueOf).filter(f -> f > 0).orElse(DEFAULT_ACCEPTABLE_FACTOR); + this.minFactor = get(MIN_FACTOR_PROPERTY).map(Float::valueOf).filter(f -> f > 0).orElse(DEFAULT_MIN_FACTOR); + this.singleFactor = get(SINGLE_FACTOR_PROPERTY).map(Float::valueOf).filter(f -> f > 0).orElse(DEFAULT_SINGLE_FACTOR); + this.multiFactor = get(MULTI_FACTOR_PROPERTY).map(Float::valueOf).filter(f -> f > 0).orElse(DEFAULT_MULTI_FACTOR); + this.memoryFactor = get(MEMORY_FACTOR_PROPERTY).map(Float::valueOf).filter(f -> f > 0).orElse(DEFAULT_MEMORY_FACTOR); + this.mapLoadFactor = get(MAP_LOAD_PROPERTY).map(Float::valueOf).filter(f -> f > 0).orElse(DEFAULT_LOAD_FACTOR); + this.mapConcurrencyLevel = get(MAP_CONCURRENCY_PROPERTY).map(Integer::valueOf).filter(i -> i > 0).orElse(DEFAULT_CONCURRENCY_LEVEL); + this.useEvictionThread = get(EVICTION_THREAD_PROPERTY).map(Boolean::valueOf).orElse(true); + + if (this.getSingleFactor() + this.getMultiFactor() + this.getMemoryFactor() != 1) { + throw new IllegalArgumentException("Single, multi, and memory factors " + " should total 1.0"); + } + if (this.getMinFactor() >= this.getAcceptableFactor()) { + throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor"); + } + if (this.getMinFactor() >= 1.0f || this.getAcceptableFactor() >= 1.0f) { + throw new IllegalArgumentException("all factors must be < 1"); + } + } + + public float getAcceptableFactor() { + return acceptableFactor; + } + + public float getMinFactor() { + return minFactor; + } + + public float getSingleFactor() { + return singleFactor; + } + + public float getMultiFactor() { + return multiFactor; + } + + public float getMemoryFactor() { + return memoryFactor; + } + + public float getMapLoadFactor() { + return mapLoadFactor; + } + + public int getMapConcurrencyLevel() { + return mapConcurrencyLevel; + } + + public boolean isUseEvictionThread() { + return useEvictionThread; + } + + public static class Builder { + private Map<String,String> props = new HashMap<>(); + private String prefix; + + private Builder(String prefix) { + this.prefix = prefix; + } + + private void set(String prop, float val) { + props.put(prefix + prop, Float.toString(val)); + } + + public Builder acceptableFactor(float af) { + Preconditions.checkArgument(af > 0); + set(ACCEPTABLE_FACTOR_PROPERTY, af); + return this; + } + + public Builder minFactor(float mf) { + Preconditions.checkArgument(mf > 0); + set(MIN_FACTOR_PROPERTY, mf); + return this; + } + + public Builder singleFactor(float sf) { + Preconditions.checkArgument(sf > 0); + set(SINGLE_FACTOR_PROPERTY, sf); + return this; + } + + public Builder multiFactor(float mf) { + Preconditions.checkArgument(mf > 0); + set(MULTI_FACTOR_PROPERTY, mf); + return this; + } + + public Builder memoryFactor(float mf) { + Preconditions.checkArgument(mf > 0); + set(MEMORY_FACTOR_PROPERTY, mf); + return this; + } + + public Builder mapLoadFactor(float mlf) { + Preconditions.checkArgument(mlf > 0); + set(MAP_LOAD_PROPERTY, mlf); + return this; + } + + public Builder mapConcurrencyLevel(int mcl) { + Preconditions.checkArgument(mcl > 0); + props.put(prefix + MAP_CONCURRENCY_PROPERTY, mcl + ""); + return this; + } + + public Builder useEvictionThread(boolean uet) { + props.put(prefix + EVICTION_THREAD_PROPERTY, uet + ""); + return this; + } + + public Map<String,String> buildMap() { + return ImmutableMap.copyOf(props); + } + } + + public static Builder builder(CacheType ct) { + return new Builder(getPrefix(ct, PROPERTY_PREFIX)); + } + + @Override + public String toString() { + return super.toString() + ", acceptableFactor: " + this.getAcceptableFactor() + ", minFactor: " + this.getMinFactor() + ", singleFactor: " + + this.getSingleFactor() + ", multiFactor: " + this.getMultiFactor() + ", memoryFactor: " + this.getMemoryFactor() + ", mapLoadFactor: " + + this.getMapLoadFactor() + ", mapConcurrencyLevel: " + this.getMapConcurrencyLevel() + ", useEvictionThread: " + this.isUseEvictionThread(); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheManager.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheManager.java new file mode 100644 index 0000000..8a1e430 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheManager.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.blockfile.cache.lru; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.file.blockfile.cache.BlockCache; +import org.apache.accumulo.core.file.blockfile.cache.BlockCacheManager; +import org.apache.accumulo.core.file.blockfile.cache.CacheType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LruBlockCacheManager extends BlockCacheManager { + + private static final Logger LOG = LoggerFactory.getLogger(LruBlockCacheManager.class); + + @Override + protected BlockCache createCache(AccumuloConfiguration conf, CacheType type) { + LruBlockCacheConfiguration cc = new LruBlockCacheConfiguration(conf, type); + LOG.info("Creating {} cache with configuration {}", type, cc); + return new LruBlockCache(cc); + } + + @Override + public void stop() { + for (CacheType type : CacheType.values()) { + LruBlockCache cache = ((LruBlockCache) this.getBlockCache(type)); + if (null != cache) { + cache.shutdown(); + } + } + super.stop(); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/452732c8/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java new file mode 100644 index 0000000..db4e789 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.blockfile.cache.tinylfu; + +import static java.util.Objects.requireNonNull; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.file.blockfile.cache.BlockCache; +import org.apache.accumulo.core.file.blockfile.cache.CacheEntry; +import org.apache.accumulo.core.file.blockfile.cache.ClassSize; +import org.apache.accumulo.core.file.blockfile.cache.SizeConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Policy; +import com.github.benmanes.caffeine.cache.stats.CacheStats; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * A block cache that is memory bounded using the W-TinyLFU eviction algorithm. This implementation delegates to a Caffeine cache to provide concurrent O(1) + * read and write operations. + * <ul> + * <li>W-TinyLFU: http://arxiv.org/pdf/1512.00727.pdf</li> + * <li>Caffeine: https://github.com/ben-manes/caffeine</li> + * <li>Cache design: http://highscalability.com/blog/2016/1/25/design-of-a-modern-cache.html</li> + * </ul> + */ +public final class TinyLfuBlockCache implements BlockCache { + private static final Logger log = LoggerFactory.getLogger(TinyLfuBlockCache.class); + private static final int STATS_PERIOD_SEC = 60; + + private Cache<String,Block> cache; + private Policy.Eviction<String,Block> policy; + private ScheduledExecutorService statsExecutor; + + public TinyLfuBlockCache(TinyLfuBlockCacheConfiguration conf) { + cache = Caffeine.newBuilder().initialCapacity((int) Math.ceil(1.2 * conf.getMaxSize() / conf.getBlockSize())).weigher((String blockName, Block block) -> { + int keyWeight = ClassSize.align(blockName.length()) + ClassSize.STRING; + return keyWeight + block.weight(); + }).maximumWeight(conf.getMaxSize()).recordStats().build(); + policy = cache.policy().eviction().get(); + statsExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("TinyLfuBlockCacheStatsExecutor").setDaemon(true) + .build()); + statsExecutor.scheduleAtFixedRate(this::logStats, STATS_PERIOD_SEC, STATS_PERIOD_SEC, TimeUnit.SECONDS); + + } + + @Override + public long getMaxHeapSize() { + return getMaxSize(); + } + + @Override + public long getMaxSize() { + return policy.getMaximum(); + } + + @Override + public CacheEntry getBlock(String blockName) { + return cache.getIfPresent(blockName); + } + + @Override + public CacheEntry cacheBlock(String blockName, byte[] buffer) { + return cache.asMap().compute(blockName, (key, block) -> { + if (block == null) { + return new Block(buffer); + } + block.buffer = buffer; + return block; + }); + } + + @Override + public CacheEntry cacheBlock(String blockName, byte[] buffer, /* ignored */boolean inMemory) { + return cacheBlock(blockName, buffer); + } + + @Override + public BlockCache.Stats getStats() { + CacheStats stats = cache.stats(); + return new BlockCache.Stats() { + @Override + public long hitCount() { + return stats.hitCount(); + } + + @Override + public long requestCount() { + return stats.requestCount(); + } + }; + } + + private void logStats() { + double maxMB = ((double) policy.getMaximum()) / ((double) (1024 * 1024)); + double sizeMB = ((double) policy.weightedSize().getAsLong()) / ((double) (1024 * 1024)); + double freeMB = maxMB - sizeMB; + log.debug("Cache Size={}MB, Free={}MB, Max={}MB, Blocks={}", sizeMB, freeMB, maxMB, cache.estimatedSize()); + log.debug(cache.stats().toString()); + } + + private static final class Block implements CacheEntry { + private volatile byte[] buffer; + private volatile Object index; + + Block(byte[] buffer) { + this.buffer = requireNonNull(buffer); + } + + @Override + public byte[] getBuffer() { + return buffer; + } + + @Override + public Object getIndex() { + return index; + } + + @Override + public void setIndex(Object index) { + this.index = index; + } + + int weight() { + return ClassSize.align(buffer.length) + SizeConstants.SIZEOF_LONG + ClassSize.REFERENCE + ClassSize.OBJECT + ClassSize.ARRAY; + } + } +}