New test, changes associated with it

Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d2017f96
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d2017f96
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d2017f96

Branch: refs/heads/IGNITE
Commit: d2017f96775a6fb845051f010c4be34c9501e932
Parents: c4a18a6
Author: Dave Marion <dlmar...@apache.org>
Authored: Fri May 19 13:08:28 2017 -0400
Committer: Dave Marion <dlmar...@apache.org>
Committed: Fri May 19 13:08:28 2017 -0400

----------------------------------------------------------------------
 .../cache/tiered/TieredBlockCache.java          | 31 +++++++++-
 .../tiered/TieredBlockCacheConfiguration.java   | 10 +--
 .../cache/tiered/TieredBlockCacheManager.java   | 39 +++++++++++-
 .../blockfile/cache/TestTieredBlockCache.java   | 64 +++++++++++++++++---
 4 files changed, 128 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/d2017f96/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
index 6031666..f8ef434 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
@@ -2,12 +2,16 @@ package org.apache.accumulo.core.file.blockfile.cache.tiered;
 
 import static java.util.Objects.requireNonNull;
 
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
 import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMetrics;
+import org.apache.ignite.cache.CachePeekMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,22 +43,41 @@ public class TieredBlockCache implements BlockCache {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(TieredBlockCache.class);
        private final IgniteCache<String, Block> cache;
+       private final CacheMetrics metrics;
        private final TieredBlockCacheConfiguration conf;
        private final AtomicLong hitCount = new AtomicLong(0);
        private final AtomicLong requestCount = new AtomicLong(0);
-
+       private final ScheduledFuture<?> future;
        
        public TieredBlockCache(TieredBlockCacheConfiguration conf, Ignite 
ignite) {
                this.conf = conf;
                this.cache = ignite.getOrCreateCache(conf.getConfiguration());
+               metrics = cache.localMxBean();
                LOG.info("Created {} cache with configuration {}", 
                                conf.getConfiguration().getName(), 
conf.getConfiguration());
+               this.future = 
TieredBlockCacheManager.SCHEDULER.scheduleAtFixedRate(new Runnable() {
+                       @Override
+                       public void run() {
+                               LOG.info(cache.localMetrics().toString());
+                               LOG.info(cache.getName() + " entries, on-heap: 
" + getOnHeapEntryCount() + ", off-heap: " + getOffHeapEntryCount());
+                       }
+               }, TieredBlockCacheManager.STAT_INTERVAL, 
TieredBlockCacheManager.STAT_INTERVAL, TimeUnit.SECONDS);
        }
        
        public void stop() {
+               this.future.cancel(false);
                this.cache.close();
+               this.cache.destroy();
        }
        
+       public long getOnHeapEntryCount() {
+               return this.cache.sizeLong(CachePeekMode.ONHEAP);
+       }
+
+       public long getOffHeapEntryCount() {
+               return this.cache.sizeLong(CachePeekMode.OFFHEAP);
+       }
+
        @Override
        public CacheEntry cacheBlock(String blockName, byte[] buf, boolean 
inMemory) {
                return cacheBlock(blockName, buf);
@@ -62,7 +85,7 @@ public class TieredBlockCache implements BlockCache {
        
        @Override
        public CacheEntry cacheBlock(String blockName, byte[] buf) {
-               return this.cache.getAndPutIfAbsent(blockName, new Block(buf));
+               return this.cache.getAndPut(blockName, new Block(buf));
        }
 
        @Override
@@ -79,6 +102,10 @@ public class TieredBlockCache implements BlockCache {
        public long getMaxSize() {
                return this.conf.getMaxSize();
        }
+       
+       public CacheMetrics getCacheMetrics() {
+               return this.metrics;
+       }
 
        @Override
        public Stats getStats() {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d2017f96/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
index 5f2cde5..6b813e4 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
@@ -22,8 +22,12 @@ public class TieredBlockCacheConfiguration extends 
BlockCacheConfiguration {
          configuration.setName(type.name());
          configuration.setCacheMode(CacheMode.LOCAL);
          configuration.setOnheapCacheEnabled(true);
-         configuration.setEvictionPolicy(new LruEvictionPolicy<String, 
Block>((int) this.getMaxSize()));
+         LruEvictionPolicy<String, Block> ePolicy = new LruEvictionPolicy<>();
+         ePolicy.setMaxSize((int) (0.75 * this.getMaxSize()));
+         ePolicy.setMaxMemorySize(this.getMaxSize());
+         configuration.setEvictionPolicy(ePolicy);
          
configuration.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(Duration.ONE_HOUR));
+         configuration.setStatisticsEnabled(true);
        }
 
        public CacheConfiguration<String, Block> getConfiguration() {
@@ -34,7 +38,5 @@ public class TieredBlockCacheConfiguration extends 
BlockCacheConfiguration {
        public String toString() {
          return this.configuration.toString();
        }
-       
-       
-       
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d2017f96/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
index 5ea8a80..d3f1170 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
@@ -1,11 +1,16 @@
 package org.apache.accumulo.core.file.blockfile.cache.tiered;
 
 import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
 import org.apache.accumulo.core.file.blockfile.cache.BlockCacheManager;
 import org.apache.accumulo.core.file.blockfile.cache.CacheType;
+import org.apache.accumulo.core.util.NamingThreadFactory;
+import org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.Ignition;
 import org.apache.ignite.configuration.DataPageEvictionMode;
@@ -19,23 +24,32 @@ public class TieredBlockCacheManager extends 
BlockCacheManager {
        
        private static final Logger LOG = 
LoggerFactory.getLogger(TieredBlockCacheManager.class);
        
-       public static final String PROPERTY_PREFIX = "tiered";
+       static final ScheduledExecutorService SCHEDULER = 
Executors.newScheduledThreadPool(1, new 
NamingThreadFactory("TieredBlockCacheStats"));
+       static final int STAT_INTERVAL = 60;
        
+       public static final String PROPERTY_PREFIX = "tiered";
        private static final String TIERED_PROPERTY_BASE = 
BlockCacheManager.CACHE_PROPERTY_BASE + PROPERTY_PREFIX + ".";
+       
        public static final String OFF_HEAP_MAX_SIZE_PROPERTY = 
TIERED_PROPERTY_BASE + "off-heap.max.size";
        public static final String OFF_HEAP_BLOCK_SIZE_PROPERTY = 
TIERED_PROPERTY_BASE + "off-heap.block.size";
        
+       private static final long OFF_HEAP_MIN_SIZE = 10 * 1024 * 1024;
        private static final long OFF_HEAP_MAX_SIZE_DEFAULT = 512 * 1024 * 1024;
        private static final int OFF_HEAP_BLOCK_SIZE_DEFAULT = 16 * 1024;
+       private static final String OFF_HEAP_CONFIG_NAME = "OFF_HEAP_MEMORY";
 
        private Ignite IGNITE;
        
        @Override
        public void start(AccumuloConfiguration conf) {
                
-               final long offHeapMaxSize = 
Optional.ofNullable(conf.get(OFF_HEAP_MAX_SIZE_PROPERTY)).map(Long::valueOf).filter(f
 -> f > 0).orElse(OFF_HEAP_MAX_SIZE_DEFAULT);
+               long offHeapMaxSize = 
Optional.ofNullable(conf.get(OFF_HEAP_MAX_SIZE_PROPERTY)).map(Long::valueOf).filter(f
 -> f > 0).orElse(OFF_HEAP_MAX_SIZE_DEFAULT);
                final int offHeapBlockSize = 
Optional.ofNullable(conf.get(OFF_HEAP_BLOCK_SIZE_PROPERTY)).map(Integer::valueOf).filter(f
 -> f > 0).orElse(OFF_HEAP_BLOCK_SIZE_DEFAULT);
 
+               if (offHeapMaxSize < OFF_HEAP_MIN_SIZE) {
+                       LOG.warn("Off heap max size setting too low, overriding 
to minimum of 10MB");
+                       offHeapMaxSize = OFF_HEAP_MIN_SIZE;
+               }
                // Ignite configuration.
                IgniteConfiguration cfg = new IgniteConfiguration();
                cfg.setDaemon(true);
@@ -45,18 +59,38 @@ public class TieredBlockCacheManager extends 
BlockCacheManager {
                memCfg.setPageSize(offHeapBlockSize);
                
                MemoryPolicyConfiguration plCfg = new 
MemoryPolicyConfiguration();
+               plCfg.setName(OFF_HEAP_CONFIG_NAME);
                plCfg.setInitialSize(offHeapMaxSize);
                plCfg.setMaxSize(offHeapMaxSize);
                plCfg.setPageEvictionMode(DataPageEvictionMode.RANDOM_2_LRU);
                plCfg.setEvictionThreshold(0.9);
+               plCfg.setEmptyPagesPoolSize((int)(offHeapMaxSize / 
offHeapBlockSize / 10) - 1);
+               plCfg.setMetricsEnabled(true);
 
                memCfg.setMemoryPolicies(plCfg); //apply custom memory policy
+               memCfg.setDefaultMemoryPolicyName(OFF_HEAP_CONFIG_NAME);
                
                cfg.setMemoryConfiguration(memCfg); // apply off-heap memory 
configuration
                LOG.info("Starting Ignite with configuration {}", 
cfg.toString());
                IGNITE = Ignition.start(cfg);
 
                super.start(conf);
+               
+               SCHEDULER.scheduleAtFixedRate(new Runnable() {
+                       @Override
+                       public void run() {
+                               IGNITE.memoryMetrics().forEach(m -> {
+                                       ToStringBuilder builder = new 
ToStringBuilder(m);
+                                       builder.append("memory region name", 
m.getName());
+                                       builder.append(" page allocation rate", 
m.getAllocationRate());
+                                       builder.append(" page eviction rate", 
m.getEvictionRate());
+                                       builder.append(" total allocated 
pages", m.getTotalAllocatedPages());
+                                       builder.append(" page free space %", 
m.getPagesFillFactor());
+                                       builder.append(" large entry 
fragmentation %", m.getLargeEntriesPagesPercentage());
+                                       LOG.info(builder.toString());
+                               });
+                       }
+               }, STAT_INTERVAL, STAT_INTERVAL, TimeUnit.SECONDS);
        }
 
        @Override
@@ -67,6 +101,7 @@ public class TieredBlockCacheManager extends 
BlockCacheManager {
                        cache.stop();
                  }
                }
+               SCHEDULER.shutdownNow();
                IGNITE.close();
                super.stop();
        }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d2017f96/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestTieredBlockCache.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestTieredBlockCache.java
 
b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestTieredBlockCache.java
index 3149438..4642224 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestTieredBlockCache.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestTieredBlockCache.java
@@ -1,8 +1,5 @@
 package org.apache.accumulo.core.file.blockfile.cache;
 
-import java.nio.charset.StandardCharsets;
-import java.util.Random;
-
 import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.conf.Property;
@@ -13,7 +10,7 @@ import org.junit.Test;
 
 public class TestTieredBlockCache {
        
-       private static final long BLOCKSIZE = 1024;
+       private static final int BLOCKSIZE = 1024;
        private static final long MAXSIZE = 1024*100;
        
        private static class Holder {
@@ -59,15 +56,66 @@ public class TestTieredBlockCache {
              Assert.assertTrue(ce != null);
              Assert.assertEquals(ce.getBuffer().length, h.getBuf().length);
            }
-
            manager.stop();
        }
        
-    private Holder[] generateRandomBlocks(int numBlocks, long maxSize) {
+       @Test
+       public void testOffHeapBlockMigration() throws Exception {
+           DefaultConfiguration dc = new DefaultConfiguration();
+           ConfigurationCopy cc = new ConfigurationCopy(dc);
+           cc.set(Property.TSERV_CACHE_FACTORY_IMPL, 
TieredBlockCacheManager.class.getName());
+           cc.set("general.custom.cache.block.tiered.off-heap.max.size", 
Long.toString(10*1024*1024));
+           cc.set("general.custom.cache.block.tiered.off-heap.block.sizee", 
Long.toString(BLOCKSIZE));
+           BlockCacheManager manager = BlockCacheManager.getInstance(cc);
+           cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(BLOCKSIZE));
+           cc.set(Property.TSERV_DATACACHE_SIZE, "2048");
+           cc.set(Property.TSERV_INDEXCACHE_SIZE, "2048");
+           cc.set(Property.TSERV_SUMMARYCACHE_SIZE, "2048");
+           manager.start(cc);
+           TieredBlockCache cache = (TieredBlockCache) 
manager.getBlockCache(CacheType.DATA);
+           
+           // With this configuration we have an on-heap cache with a max size 
of 2K with 1K blocks
+           // and an off heap cache with a max size of 10MB with 1K blocks
+
+           for (Holder h : generateRandomBlocks(1, 1024)) {
+               cache.cacheBlock(h.getName(), h.getBuf());
+           }
+           Assert.assertEquals(1, cache.getCacheMetrics().getCachePuts());
+           Assert.assertEquals(0, cache.getCacheMetrics().getOffHeapPuts());
+           
+           for (Holder h : generateRandomBlocks(1023, 1024)) {
+               cache.cacheBlock(h.getName(), h.getBuf());
+           }
+           Assert.assertEquals(1, cache.getOnHeapEntryCount());
+           Assert.assertEquals(1023, cache.getOffHeapEntryCount());
+           
+           Assert.assertEquals(1, cache.getCacheMetrics().getSize());
+           Assert.assertEquals(1023, 
cache.getCacheMetrics().getOffHeapEntriesCount());
+           Assert.assertEquals(1024, cache.getCacheMetrics().getCachePuts());
+           Assert.assertEquals(0, cache.getCacheMetrics().getOffHeapPuts());
+           
+           manager.stop();
+
+       }
+
+       /**
+        * 
+        * @param numBlocks
+        *           number of blocks to create
+        * @param blockSize
+        *           number of bytes in each block
+        * @return
+        */
+    private Holder[] generateRandomBlocks(int numBlocks, int blockSize) {
+      byte[] b = new byte[blockSize];
+      for (int x = 0; x < blockSize; x++) {
+       b[x] = '0';
+      }
       Holder[] blocks = new Holder[numBlocks];
-      Random r = new Random();
       for (int i = 0; i < numBlocks; i++) {
-        blocks[i] = new Holder("block" + i, Integer.toString(r.nextInt((int) 
maxSize) + 1).getBytes(StandardCharsets.UTF_8));
+       byte[] buf = new byte[blockSize];
+       System.arraycopy(b, 0, buf, 0, blockSize);
+        blocks[i] = new Holder("block" + i, buf);
       }
       return blocks;
     }

Reply via email to