New test, changes associated with it
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d2017f96 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d2017f96 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d2017f96 Branch: refs/heads/IGNITE Commit: d2017f96775a6fb845051f010c4be34c9501e932 Parents: c4a18a6 Author: Dave Marion <dlmar...@apache.org> Authored: Fri May 19 13:08:28 2017 -0400 Committer: Dave Marion <dlmar...@apache.org> Committed: Fri May 19 13:08:28 2017 -0400 ---------------------------------------------------------------------- .../cache/tiered/TieredBlockCache.java | 31 +++++++++- .../tiered/TieredBlockCacheConfiguration.java | 10 +-- .../cache/tiered/TieredBlockCacheManager.java | 39 +++++++++++- .../blockfile/cache/TestTieredBlockCache.java | 64 +++++++++++++++++--- 4 files changed, 128 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/d2017f96/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java index 6031666..f8ef434 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java @@ -2,12 +2,16 @@ package org.apache.accumulo.core.file.blockfile.cache.tiered; import static java.util.Objects.requireNonNull; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.file.blockfile.cache.BlockCache; import org.apache.accumulo.core.file.blockfile.cache.CacheEntry; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheMetrics; +import org.apache.ignite.cache.CachePeekMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,22 +43,41 @@ public class TieredBlockCache implements BlockCache { private static final Logger LOG = LoggerFactory.getLogger(TieredBlockCache.class); private final IgniteCache<String, Block> cache; + private final CacheMetrics metrics; private final TieredBlockCacheConfiguration conf; private final AtomicLong hitCount = new AtomicLong(0); private final AtomicLong requestCount = new AtomicLong(0); - + private final ScheduledFuture<?> future; public TieredBlockCache(TieredBlockCacheConfiguration conf, Ignite ignite) { this.conf = conf; this.cache = ignite.getOrCreateCache(conf.getConfiguration()); + metrics = cache.localMxBean(); LOG.info("Created {} cache with configuration {}", conf.getConfiguration().getName(), conf.getConfiguration()); + this.future = TieredBlockCacheManager.SCHEDULER.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + LOG.info(cache.localMetrics().toString()); + LOG.info(cache.getName() + " entries, on-heap: " + getOnHeapEntryCount() + ", off-heap: " + getOffHeapEntryCount()); + } + }, TieredBlockCacheManager.STAT_INTERVAL, TieredBlockCacheManager.STAT_INTERVAL, TimeUnit.SECONDS); } public void stop() { + this.future.cancel(false); this.cache.close(); + this.cache.destroy(); } + public long getOnHeapEntryCount() { + return this.cache.sizeLong(CachePeekMode.ONHEAP); + } + + public long getOffHeapEntryCount() { + return this.cache.sizeLong(CachePeekMode.OFFHEAP); + } + @Override public CacheEntry cacheBlock(String blockName, byte[] buf, boolean inMemory) { return cacheBlock(blockName, buf); @@ -62,7 +85,7 @@ public class TieredBlockCache implements BlockCache { @Override public CacheEntry cacheBlock(String blockName, byte[] buf) { - return this.cache.getAndPutIfAbsent(blockName, new Block(buf)); + return this.cache.getAndPut(blockName, new Block(buf)); } @Override @@ -79,6 +102,10 @@ public class TieredBlockCache implements BlockCache { public long getMaxSize() { return this.conf.getMaxSize(); } + + public CacheMetrics getCacheMetrics() { + return this.metrics; + } @Override public Stats getStats() { http://git-wip-us.apache.org/repos/asf/accumulo/blob/d2017f96/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java index 5f2cde5..6b813e4 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java @@ -22,8 +22,12 @@ public class TieredBlockCacheConfiguration extends BlockCacheConfiguration { configuration.setName(type.name()); configuration.setCacheMode(CacheMode.LOCAL); configuration.setOnheapCacheEnabled(true); - configuration.setEvictionPolicy(new LruEvictionPolicy<String, Block>((int) this.getMaxSize())); + LruEvictionPolicy<String, Block> ePolicy = new LruEvictionPolicy<>(); + ePolicy.setMaxSize((int) (0.75 * this.getMaxSize())); + ePolicy.setMaxMemorySize(this.getMaxSize()); + configuration.setEvictionPolicy(ePolicy); configuration.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(Duration.ONE_HOUR)); + configuration.setStatisticsEnabled(true); } public CacheConfiguration<String, Block> getConfiguration() { @@ -34,7 +38,5 @@ public class TieredBlockCacheConfiguration extends BlockCacheConfiguration { public String toString() { return this.configuration.toString(); } - - - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/d2017f96/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java index 5ea8a80..d3f1170 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java @@ -1,11 +1,16 @@ package org.apache.accumulo.core.file.blockfile.cache.tiered; import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.file.blockfile.cache.BlockCache; import org.apache.accumulo.core.file.blockfile.cache.BlockCacheManager; import org.apache.accumulo.core.file.blockfile.cache.CacheType; +import org.apache.accumulo.core.util.NamingThreadFactory; +import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.ignite.Ignite; import org.apache.ignite.Ignition; import org.apache.ignite.configuration.DataPageEvictionMode; @@ -19,23 +24,32 @@ public class TieredBlockCacheManager extends BlockCacheManager { private static final Logger LOG = LoggerFactory.getLogger(TieredBlockCacheManager.class); - public static final String PROPERTY_PREFIX = "tiered"; + static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1, new NamingThreadFactory("TieredBlockCacheStats")); + static final int STAT_INTERVAL = 60; + public static final String PROPERTY_PREFIX = "tiered"; private static final String TIERED_PROPERTY_BASE = BlockCacheManager.CACHE_PROPERTY_BASE + PROPERTY_PREFIX + "."; + public static final String OFF_HEAP_MAX_SIZE_PROPERTY = TIERED_PROPERTY_BASE + "off-heap.max.size"; public static final String OFF_HEAP_BLOCK_SIZE_PROPERTY = TIERED_PROPERTY_BASE + "off-heap.block.size"; + private static final long OFF_HEAP_MIN_SIZE = 10 * 1024 * 1024; private static final long OFF_HEAP_MAX_SIZE_DEFAULT = 512 * 1024 * 1024; private static final int OFF_HEAP_BLOCK_SIZE_DEFAULT = 16 * 1024; + private static final String OFF_HEAP_CONFIG_NAME = "OFF_HEAP_MEMORY"; private Ignite IGNITE; @Override public void start(AccumuloConfiguration conf) { - final long offHeapMaxSize = Optional.ofNullable(conf.get(OFF_HEAP_MAX_SIZE_PROPERTY)).map(Long::valueOf).filter(f -> f > 0).orElse(OFF_HEAP_MAX_SIZE_DEFAULT); + long offHeapMaxSize = Optional.ofNullable(conf.get(OFF_HEAP_MAX_SIZE_PROPERTY)).map(Long::valueOf).filter(f -> f > 0).orElse(OFF_HEAP_MAX_SIZE_DEFAULT); final int offHeapBlockSize = Optional.ofNullable(conf.get(OFF_HEAP_BLOCK_SIZE_PROPERTY)).map(Integer::valueOf).filter(f -> f > 0).orElse(OFF_HEAP_BLOCK_SIZE_DEFAULT); + if (offHeapMaxSize < OFF_HEAP_MIN_SIZE) { + LOG.warn("Off heap max size setting too low, overriding to minimum of 10MB"); + offHeapMaxSize = OFF_HEAP_MIN_SIZE; + } // Ignite configuration. IgniteConfiguration cfg = new IgniteConfiguration(); cfg.setDaemon(true); @@ -45,18 +59,38 @@ public class TieredBlockCacheManager extends BlockCacheManager { memCfg.setPageSize(offHeapBlockSize); MemoryPolicyConfiguration plCfg = new MemoryPolicyConfiguration(); + plCfg.setName(OFF_HEAP_CONFIG_NAME); plCfg.setInitialSize(offHeapMaxSize); plCfg.setMaxSize(offHeapMaxSize); plCfg.setPageEvictionMode(DataPageEvictionMode.RANDOM_2_LRU); plCfg.setEvictionThreshold(0.9); + plCfg.setEmptyPagesPoolSize((int)(offHeapMaxSize / offHeapBlockSize / 10) - 1); + plCfg.setMetricsEnabled(true); memCfg.setMemoryPolicies(plCfg); //apply custom memory policy + memCfg.setDefaultMemoryPolicyName(OFF_HEAP_CONFIG_NAME); cfg.setMemoryConfiguration(memCfg); // apply off-heap memory configuration LOG.info("Starting Ignite with configuration {}", cfg.toString()); IGNITE = Ignition.start(cfg); super.start(conf); + + SCHEDULER.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + IGNITE.memoryMetrics().forEach(m -> { + ToStringBuilder builder = new ToStringBuilder(m); + builder.append("memory region name", m.getName()); + builder.append(" page allocation rate", m.getAllocationRate()); + builder.append(" page eviction rate", m.getEvictionRate()); + builder.append(" total allocated pages", m.getTotalAllocatedPages()); + builder.append(" page free space %", m.getPagesFillFactor()); + builder.append(" large entry fragmentation %", m.getLargeEntriesPagesPercentage()); + LOG.info(builder.toString()); + }); + } + }, STAT_INTERVAL, STAT_INTERVAL, TimeUnit.SECONDS); } @Override @@ -67,6 +101,7 @@ public class TieredBlockCacheManager extends BlockCacheManager { cache.stop(); } } + SCHEDULER.shutdownNow(); IGNITE.close(); super.stop(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/d2017f96/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestTieredBlockCache.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestTieredBlockCache.java b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestTieredBlockCache.java index 3149438..4642224 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestTieredBlockCache.java +++ b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestTieredBlockCache.java @@ -1,8 +1,5 @@ package org.apache.accumulo.core.file.blockfile.cache; -import java.nio.charset.StandardCharsets; -import java.util.Random; - import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; @@ -13,7 +10,7 @@ import org.junit.Test; public class TestTieredBlockCache { - private static final long BLOCKSIZE = 1024; + private static final int BLOCKSIZE = 1024; private static final long MAXSIZE = 1024*100; private static class Holder { @@ -59,15 +56,66 @@ public class TestTieredBlockCache { Assert.assertTrue(ce != null); Assert.assertEquals(ce.getBuffer().length, h.getBuf().length); } - manager.stop(); } - private Holder[] generateRandomBlocks(int numBlocks, long maxSize) { + @Test + public void testOffHeapBlockMigration() throws Exception { + DefaultConfiguration dc = new DefaultConfiguration(); + ConfigurationCopy cc = new ConfigurationCopy(dc); + cc.set(Property.TSERV_CACHE_FACTORY_IMPL, TieredBlockCacheManager.class.getName()); + cc.set("general.custom.cache.block.tiered.off-heap.max.size", Long.toString(10*1024*1024)); + cc.set("general.custom.cache.block.tiered.off-heap.block.sizee", Long.toString(BLOCKSIZE)); + BlockCacheManager manager = BlockCacheManager.getInstance(cc); + cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(BLOCKSIZE)); + cc.set(Property.TSERV_DATACACHE_SIZE, "2048"); + cc.set(Property.TSERV_INDEXCACHE_SIZE, "2048"); + cc.set(Property.TSERV_SUMMARYCACHE_SIZE, "2048"); + manager.start(cc); + TieredBlockCache cache = (TieredBlockCache) manager.getBlockCache(CacheType.DATA); + + // With this configuration we have an on-heap cache with a max size of 2K with 1K blocks + // and an off heap cache with a max size of 10MB with 1K blocks + + for (Holder h : generateRandomBlocks(1, 1024)) { + cache.cacheBlock(h.getName(), h.getBuf()); + } + Assert.assertEquals(1, cache.getCacheMetrics().getCachePuts()); + Assert.assertEquals(0, cache.getCacheMetrics().getOffHeapPuts()); + + for (Holder h : generateRandomBlocks(1023, 1024)) { + cache.cacheBlock(h.getName(), h.getBuf()); + } + Assert.assertEquals(1, cache.getOnHeapEntryCount()); + Assert.assertEquals(1023, cache.getOffHeapEntryCount()); + + Assert.assertEquals(1, cache.getCacheMetrics().getSize()); + Assert.assertEquals(1023, cache.getCacheMetrics().getOffHeapEntriesCount()); + Assert.assertEquals(1024, cache.getCacheMetrics().getCachePuts()); + Assert.assertEquals(0, cache.getCacheMetrics().getOffHeapPuts()); + + manager.stop(); + + } + + /** + * + * @param numBlocks + * number of blocks to create + * @param blockSize + * number of bytes in each block + * @return + */ + private Holder[] generateRandomBlocks(int numBlocks, int blockSize) { + byte[] b = new byte[blockSize]; + for (int x = 0; x < blockSize; x++) { + b[x] = '0'; + } Holder[] blocks = new Holder[numBlocks]; - Random r = new Random(); for (int i = 0; i < numBlocks; i++) { - blocks[i] = new Holder("block" + i, Integer.toString(r.nextInt((int) maxSize) + 1).getBytes(StandardCharsets.UTF_8)); + byte[] buf = new byte[blockSize]; + System.arraycopy(b, 0, buf, 0, blockSize); + blocks[i] = new Holder("block" + i, buf); } return blocks; }