ignite-396 Batch eviction support added to CacheFifoEvictionPolicy
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8242c151 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8242c151 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8242c151 Branch: refs/heads/ignite-639 Commit: 8242c1512657dbbb0efe9a3e0238c88d2aac447a Parents: e6a6b64 Author: Andrey Gura <ag...@gridgain.com> Authored: Mon Apr 6 15:18:54 2015 +0300 Committer: Andrey Gura <ag...@gridgain.com> Committed: Mon Apr 6 21:17:21 2015 +0300 ---------------------------------------------------------------------- .../cache/eviction/fifo/FifoEvictionPolicy.java | 58 ++- .../eviction/fifo/FifoEvictionPolicyMBean.java | 16 + .../eviction/GridCacheEvictionAbstractTest.java | 5 +- ...ridCacheFifoBatchEvictionPolicySelfTest.java | 385 +++++++++++++++++++ 4 files changed, 454 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8242c151/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicy.java index d32c746..ca3cc1b 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicy.java @@ -27,8 +27,12 @@ import java.io.*; import java.util.*; /** - * Eviction policy based on {@code First In First Out (FIFO)} algorithm. This - * implementation is very efficient since it does not create any additional + * Eviction policy based on {@code First In First Out (FIFO)} algorithm and supports batch eviction. + * <p> + * The eviction starts when the cache size becomes {@code batchSize} elements greater than the maximum size. + * {@code batchSize} elements will be evicted in this case. The default {@code batchSize} value is {@code 1}. + * <p> + * This implementation is very efficient since it does not create any additional * table-like data structures. The {@code FIFO} ordering information is * maintained by attaching ordering metadata to cache entries. */ @@ -39,6 +43,9 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict /** Maximum size. */ private volatile int max = CacheConfiguration.DFLT_CACHE_SIZE; + /** Batch size. */ + private volatile int batchSize = 1; + /** FIFO queue. */ private final ConcurrentLinkedDeque8<EvictableEntry<K, V>> queue = new ConcurrentLinkedDeque8<>(); @@ -62,6 +69,20 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict } /** + * Constructs FIFO eviction policy with maximum size and given batch size. Empty entries are allowed. + * + * @param max Maximum allowed size of cache before entry will start getting evicted. + * @param batchSize Maximum size of batch. + */ + public FifoEvictionPolicy(int max, int batchSize) { + A.ensure(max > 0, "max > 0"); + A.ensure(batchSize > 0, "batchSize > 0"); + + this.max = max; + this.batchSize = batchSize; + } + + /** * Gets maximum allowed size of cache before entry will start getting evicted. * * @return Maximum allowed size of cache before entry will start getting evicted. @@ -82,6 +103,18 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict } /** {@inheritDoc} */ + @Override public int getBatchSize() { + return batchSize; + } + + /** {@inheritDoc} */ + @Override public void setBatchSize(int batchSize) { + A.ensure(batchSize > 0, "batchSize > 0"); + + this.batchSize = batchSize; + } + + /** {@inheritDoc} */ @Override public int getCurrentSize() { return queue.size(); } @@ -158,18 +191,23 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict private void shrink() { int max = this.max; + int batchSize = this.batchSize; + int startSize = queue.sizex(); - for (int i = 0; i < startSize && queue.sizex() > max; i++) { - EvictableEntry<K, V> entry = queue.poll(); + // Shrink only if queue is full. + if (startSize >= max + batchSize) { + for (int i = max; i < startSize && queue.sizex() > max; i++) { + EvictableEntry<K, V> entry = queue.poll(); - if (entry == null) - break; + if (entry == null) + break; - if (!entry.evict()) { - entry.removeMeta(); + if (!entry.evict()) { + entry.removeMeta(); - touch(entry); + touch(entry); + } } } } @@ -177,11 +215,13 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeInt(max); + out.writeInt(batchSize); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { max = in.readInt(); + batchSize = in.readInt(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8242c151/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicyMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicyMBean.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicyMBean.java index 4827e95..63a413e 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicyMBean.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicyMBean.java @@ -41,6 +41,22 @@ public interface FifoEvictionPolicyMBean { public void setMaxSize(int max); /** + * Gets batch size. + * + * @return batch size. + */ + @MXBeanDescription("Batch size.") + public int getBatchSize(); + + /** + * Sets batch size. + * + * @param batchSize Batch size. + */ + @MXBeanDescription("Set batch size.") + public void setBatchSize(int batchSize); + + /** * Gets current queue size. * * @return Current queue size. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8242c151/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheEvictionAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheEvictionAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheEvictionAbstractTest.java index 2081bfd..e0dab7d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheEvictionAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/GridCacheEvictionAbstractTest.java @@ -63,6 +63,9 @@ public abstract class GridCacheEvictionAbstractTest<T extends EvictionPolicy<?, /** Evict near sync. */ protected boolean evictNearSync = true; + /** Policy batch size. */ + protected int plcBatchSize = 0; + /** Policy max. */ protected int plcMax = 10; @@ -338,7 +341,7 @@ public abstract class GridCacheEvictionAbstractTest<T extends EvictionPolicy<?, int actual = colocated(i).size(); assertTrue("Cache size is greater then policy size [expected=" + endSize + ", actual=" + actual + ']', - actual <= endSize); + actual <= endSize + plcBatchSize); } checkPolicies(endPlcSize); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8242c151/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/fifo/GridCacheFifoBatchEvictionPolicySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/fifo/GridCacheFifoBatchEvictionPolicySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/fifo/GridCacheFifoBatchEvictionPolicySelfTest.java new file mode 100644 index 0000000..7a9f4f4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/fifo/GridCacheFifoBatchEvictionPolicySelfTest.java @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.eviction.fifo; + +import org.apache.ignite.*; +import org.apache.ignite.cache.eviction.*; +import org.apache.ignite.cache.eviction.fifo.*; +import org.apache.ignite.internal.processors.cache.eviction.*; + +import java.util.*; + +import static org.apache.ignite.cache.CacheMode.*; + +/** + * FIFO batch eviction test. + */ +public class GridCacheFifoBatchEvictionPolicySelfTest extends + GridCacheEvictionAbstractTest<FifoEvictionPolicy<String, String>> { + /** + * @throws Exception If failed. + */ + public void testPolicy() throws Exception { + try { + startGrid(); + + MockEntry e1 = new MockEntry("1", "1"); + MockEntry e2 = new MockEntry("2", "2"); + MockEntry e3 = new MockEntry("3", "3"); + MockEntry e4 = new MockEntry("4", "4"); + MockEntry e5 = new MockEntry("5", "5"); + + FifoEvictionPolicy<String, String> p = policy(); + + p.setMaxSize(3); + + p.setBatchSize(2); + + p.onEntryAccessed(false, e1); + + check(p.queue(), e1); + + p.onEntryAccessed(false, e2); + + check(p.queue(), e1, e2); + + p.onEntryAccessed(false, e3); + + check(p.queue(), e1, e2, e3); + + p.onEntryAccessed(false, e4); + + check(p.queue(), e1, e2, e3, e4); + + assertFalse(e1.isEvicted()); + assertFalse(e2.isEvicted()); + assertFalse(e3.isEvicted()); + assertFalse(e4.isEvicted()); + + assertEquals(4, p.getCurrentSize()); + + p.onEntryAccessed(false, e5); + + // Batch evicted. + check(p.queue(), e3, e4, e5); + + assertEquals(3, p.getCurrentSize()); + + assertTrue(e1.isEvicted()); + assertTrue(e2.isEvicted()); + assertFalse(e3.isEvicted()); + assertFalse(e4.isEvicted()); + assertFalse(e5.isEvicted()); + + p.onEntryAccessed(false, e1 = new MockEntry("1", "1")); + + check(p.queue(), e3, e4, e5, e1); + + assertEquals(4, p.getCurrentSize()); + + assertFalse(e3.isEvicted()); + assertFalse(e4.isEvicted()); + assertFalse(e5.isEvicted()); + assertFalse(e1.isEvicted()); + + p.onEntryAccessed(false, e5); + + check(p.queue(), e3, e4, e5, e1); + + assertFalse(e3.isEvicted()); + assertFalse(e4.isEvicted()); + assertFalse(e5.isEvicted()); + assertFalse(e1.isEvicted()); + + p.onEntryAccessed(false, e1); + + assertEquals(4, p.getCurrentSize()); + + check(p.queue(), e3, e4, e5, e1); + + assertFalse(e3.isEvicted()); + assertFalse(e4.isEvicted()); + assertFalse(e5.isEvicted()); + assertFalse(e1.isEvicted()); + + p.onEntryAccessed(true, e1); + + assertEquals(3, p.getCurrentSize()); + + assertFalse(e3.isEvicted()); + assertFalse(e4.isEvicted()); + assertFalse(e5.isEvicted()); + + p.onEntryAccessed(true, e4); + + assertEquals(2, p.getCurrentSize()); + + assertFalse(e3.isEvicted()); + assertFalse(e5.isEvicted()); + + p.onEntryAccessed(true, e5); + + assertEquals(1, p.getCurrentSize()); + + assertFalse(e3.isEvicted()); + + p.onEntryAccessed(true, e3); + + assertEquals(0, p.getCurrentSize()); + + assertFalse(e3.isEvicted()); + + info(p); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testMemory() throws Exception { + try { + startGrid(); + + FifoEvictionPolicy<String, String> p = policy(); + + int max = 10; + + int batchSize = 2; + + p.setMaxSize(max); + p.setBatchSize(batchSize); + + int cnt = max + batchSize; + + for (int i = 0; i < cnt; i++) + p.onEntryAccessed(false, new MockEntry(Integer.toString(i), Integer.toString(i))); + + info(p); + + assertEquals(cnt - batchSize, p.getCurrentSize()); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testRandom() throws Exception { + try { + startGrid(); + + FifoEvictionPolicy<String, String> p = policy(); + + int max = 10; + + int batchSize = 2; + + p.setMaxSize(max); + + p.setBatchSize(batchSize); + + Random rand = new Random(); + + int keys = 31; + + MockEntry[] fifos = new MockEntry[keys]; + + for (int i = 0; i < fifos.length; i++) + fifos[i] = new MockEntry(Integer.toString(i)); + + int runs = 5000000; + + for (int i = 0; i < runs; i++) { + boolean rmv = rand.nextBoolean(); + + int j = rand.nextInt(fifos.length); + + MockEntry e = entry(fifos, j); + + if (rmv) + fifos[j] = new MockEntry(Integer.toString(j)); + + p.onEntryAccessed(rmv, e); + } + + info(p); + + int curSize = p.getCurrentSize(); + + assert curSize < max + batchSize : + "curSize < max + batchSize [curSize=" + curSize + ", max=" + max + ", batchSize=" + batchSize + ']'; + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testAllowEmptyEntries() throws Exception { + try { + startGrid(); + + MockEntry e1 = new MockEntry("1"); + + MockEntry e2 = new MockEntry("2"); + + MockEntry e3 = new MockEntry("3"); + + MockEntry e4 = new MockEntry("4"); + + MockEntry e5 = new MockEntry("5"); + + FifoEvictionPolicy<String, String> p = policy(); + + p.setBatchSize(2); + + p.onEntryAccessed(false, e1); + + assertFalse(e1.isEvicted()); + + p.onEntryAccessed(false, e2); + + assertFalse(e1.isEvicted()); + assertFalse(e2.isEvicted()); + + p.onEntryAccessed(false, e3); + + assertFalse(e1.isEvicted()); + assertFalse(e3.isEvicted()); + + p.onEntryAccessed(false, e4); + + assertFalse(e1.isEvicted()); + assertFalse(e3.isEvicted()); + assertFalse(e4.isEvicted()); + + p.onEntryAccessed(false, e5); + + assertFalse(e1.isEvicted()); + assertFalse(e3.isEvicted()); + assertFalse(e5.isEvicted()); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testPut() throws Exception { + mode = LOCAL; + syncCommit = true; + plcMax = 10; + + Ignite ignite = startGrid(); + + try { + IgniteCache<Object, Object> cache = ignite.cache(null); + + int cnt = 500; + + int min = Integer.MAX_VALUE; + + int minIdx = 0; + + for (int i = 0; i < cnt; i++) { + cache.put(i, i); + + int cacheSize = cache.size(); + + if (i > plcMax && cacheSize < min) { + min = cacheSize; + minIdx = i; + } + } + + // Batch evicted. + assert min >= plcMax : "Min cache size is too small: " + min; + + info("Min cache size [min=" + min + ", idx=" + minIdx + ']'); + info("Current cache size " + cache.size()); + info("Current cache key size " + cache.size()); + + min = Integer.MAX_VALUE; + + minIdx = 0; + + // Touch. + for (int i = cnt; --i > cnt - plcMax;) { + cache.get(i); + + int cacheSize = cache.size(); + + if (cacheSize < min) { + min = cacheSize; + minIdx = i; + } + } + + info("----"); + info("Min cache size [min=" + min + ", idx=" + minIdx + ']'); + info("Current cache size " + cache.size()); + info("Current cache key size " + cache.size()); + + // Batch evicted. + assert min >= plcMax : "Min cache size is too small: " + min; + } + finally { + stopAllGrids(); + } + } + + /** {@inheritDoc} */ + @Override public void testPartitionedNearDisabled() throws Exception { + plcBatchSize = 2; + + super.testPartitionedNearDisabled(); + } + + /** {@inheritDoc} */ + @Override protected FifoEvictionPolicy<String, String> createPolicy(int plcMax) { + return new FifoEvictionPolicy<>(10, 2); + } + + /** {@inheritDoc} */ + @Override protected FifoEvictionPolicy<String, String> createNearPolicy(int nearMax) { + return new FifoEvictionPolicy<>(nearMax, 2); + } + + /** {@inheritDoc} */ + @Override protected void checkNearPolicies(int endNearPlcSize) { + for (int i = 0; i < gridCnt; i++) + for (EvictableEntry<String, String> e : nearPolicy(i).queue()) + assert !e.isCached() : "Invalid near policy size: " + nearPolicy(i).queue(); + } + + /** {@inheritDoc} */ + @Override protected void checkPolicies(int plcMax) { + for (int i = 0; i < gridCnt; i++) + assert policy(i).queue().size() <= plcMax + policy(i).getBatchSize(); + } + +}