# ignite-670 more tests, use default expiry policy from isolated updater
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/34c29ed7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/34c29ed7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/34c29ed7 Branch: refs/heads/gg-9998 Commit: 34c29ed74578bf569bf537a57f2db642e1d58d2a Parents: f111294 Author: sboikov <sboi...@gridgain.com> Authored: Thu Apr 2 16:24:01 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Apr 2 16:24:01 2015 +0300 ---------------------------------------------------------------------- .../datastreamer/DataStreamerImpl.java | 21 ++- .../cache/ttl/CacheTtlAbstractSelfTest.java | 175 +++++++++++++++---- 2 files changed, 161 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34c29ed7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index 71a9364..2b470f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -43,6 +43,7 @@ import org.jetbrains.annotations.*; import org.jsr166.*; import javax.cache.*; +import javax.cache.expiry.*; import java.util.*; import java.util.Map.*; import java.util.concurrent.*; @@ -1390,6 +1391,11 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed GridCacheVersion ver = cctx.versions().next(topVer); + long ttl = CU.TTL_ETERNAL; + long expiryTime = CU.EXPIRE_TIME_ETERNAL; + + ExpiryPolicy plc = cctx.expiry(); + for (Entry<KeyCacheObject, CacheObject> e : entries) { try { e.getKey().finishUnmarshal(cctx.cacheObjectContext(), cctx.deploy().globalLoader()); @@ -1398,10 +1404,21 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed entry.unswap(false); + if (plc != null) { + ttl = CU.toTtl(plc.getExpiryForCreation()); + + if (ttl == CU.TTL_ZERO) + continue; + else if (ttl == CU.TTL_NOT_CHANGED) + ttl = 0; + + expiryTime = CU.toExpireTime(ttl); + } + entry.initialValue(e.getValue(), ver, - CU.TTL_ETERNAL, - CU.EXPIRE_TIME_ETERNAL, + ttl, + expiryTime, false, topVer, GridDrType.DR_LOAD); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34c29ed7/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ttl/CacheTtlAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ttl/CacheTtlAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ttl/CacheTtlAbstractSelfTest.java index ebe6f16..0e2e738 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ttl/CacheTtlAbstractSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ttl/CacheTtlAbstractSelfTest.java @@ -21,17 +21,22 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.eviction.lru.*; import org.apache.ignite.cache.query.*; +import org.apache.ignite.cache.store.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.testframework.junits.common.*; +import javax.cache.*; import javax.cache.configuration.*; import javax.cache.expiry.*; +import javax.cache.integration.*; import java.util.*; -import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.cache.CachePeekMode.*; /** * TTL test. @@ -61,17 +66,33 @@ public abstract class CacheTtlAbstractSelfTest extends GridCommonAbstractTest { cache.setOffHeapMaxMemory(0); cache.setEvictionPolicy(new LruEvictionPolicy(MAX_CACHE_SIZE)); cache.setIndexedTypes(Integer.class, Integer.class); + cache.setBackups(2); + + cache.setCacheStoreFactory(singletonFactory(new CacheStoreAdapter() { + @Override public void loadCache(IgniteBiInClosure clo, Object... args) { + for (int i = 0; i < SIZE; i++) + clo.apply(i, i); + } + + @Override public Object load(Object key) throws CacheLoaderException { + return key; + } + + @Override public void write(Cache.Entry entry) throws CacheWriterException { + // No-op. + } + + @Override public void delete(Object key) throws CacheWriterException { + // No-op. + } + })); cache.setExpiryPolicyFactory( FactoryBuilder.factoryOf(new TouchedExpiryPolicy(new Duration(MILLISECONDS, DEFAULT_TIME_TO_LIVE)))); cfg.setCacheConfiguration(cache); - TcpDiscoverySpi disco = new TcpDiscoverySpi(); - - disco.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(disco); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); return cfg; } @@ -109,14 +130,82 @@ public abstract class CacheTtlAbstractSelfTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testDefaultTimeToLiveLoadCache() throws Exception { + IgniteCache<Integer, Integer> cache = jcache(0); + + cache.loadCache(null); + + checkSizeBeforeLive(SIZE); + + Thread.sleep(DEFAULT_TIME_TO_LIVE + 500); + + checkSizeAfterLive(); + } + + /** + * @throws Exception If failed. + */ + public void testDefaultTimeToLiveLoadAll() throws Exception { + IgniteCache<Integer, Integer> cache = jcache(0); + + CompletionListenerFuture fut = new CompletionListenerFuture(); + + Set<Integer> keys = new HashSet<>(); + + for (int i = 0; i < SIZE; ++i) + keys.add(i); + + cache.loadAll(keys, false, fut); + + fut.get(); + + checkSizeBeforeLive(SIZE); + + Thread.sleep(DEFAULT_TIME_TO_LIVE + 500); + + checkSizeAfterLive(); + } + + /** + * @throws Exception If failed. + */ + public void testDefaultTimeToLiveStreamerAdd() throws Exception { + try (IgniteDataStreamer<Integer, Integer> streamer = ignite(0).dataStreamer(null)) { + for (int i = 0; i < SIZE; i++) + streamer.addData(i, i); + } + + checkSizeBeforeLive(SIZE); + + Thread.sleep(DEFAULT_TIME_TO_LIVE + 500); + + checkSizeAfterLive(); + + try (IgniteDataStreamer<Integer, Integer> streamer = ignite(0).dataStreamer(null)) { + streamer.allowOverwrite(true); + + for (int i = 0; i < SIZE; i++) + streamer.addData(i, i); + } + + checkSizeBeforeLive(SIZE); + + Thread.sleep(DEFAULT_TIME_TO_LIVE + 500); + + checkSizeAfterLive(); + } + + /** + * @throws Exception If failed. + */ public void testDefaultTimeToLivePut() throws Exception { IgniteCache<Integer, Integer> cache = jcache(0); - List<Integer> keys = primaryKeys(cache, 1); + Integer key = 0; - cache.put(keys.get(0), 1); + cache.put(key, 1); - checkSizeBeforeLive(cache, 1); + checkSizeBeforeLive(1); Thread.sleep(DEFAULT_TIME_TO_LIVE + 500); @@ -131,14 +220,12 @@ public abstract class CacheTtlAbstractSelfTest extends GridCommonAbstractTest { Map<Integer, Integer> entries = new HashMap<>(); - List<Integer> keys = primaryKeys(cache, SIZE); - for (int i = 0; i < SIZE; ++i) - entries.put(keys.get(i), i); + entries.put(i, i); cache.putAll(entries); - checkSizeBeforeLive(cache, SIZE); + checkSizeBeforeLive(SIZE); Thread.sleep(DEFAULT_TIME_TO_LIVE + 500); @@ -149,54 +236,76 @@ public abstract class CacheTtlAbstractSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testTimeToLiveTtl() throws Exception { - IgniteCache<Integer, Integer> cache = jcache(0); - long time = DEFAULT_TIME_TO_LIVE + 2000; - List<Integer> keys = primaryKeys(cache, SIZE); + IgniteCache<Integer, Integer> cache = this.<Integer, Integer>jcache(0).withExpiryPolicy( + new TouchedExpiryPolicy(new Duration(MILLISECONDS, time))); for (int i = 0; i < SIZE; i++) - cache.withExpiryPolicy(new TouchedExpiryPolicy(new Duration(MILLISECONDS, time))). - put(keys.get(i), i); + cache.put(i, i); - checkSizeBeforeLive(cache, SIZE); + checkSizeBeforeLive(SIZE); Thread.sleep(DEFAULT_TIME_TO_LIVE + 500); - checkSizeBeforeLive(cache, SIZE); + checkSizeBeforeLive(SIZE); Thread.sleep(time - DEFAULT_TIME_TO_LIVE + 500); checkSizeAfterLive(); } + private void checkSizeBeforeLive(int size) throws Exception { + checkSizeBeforeLive(size, gridCount()); + } + /** + * @param size Expected size. + * @param gridCnt Number of nodes. * @throws Exception If failed. */ - private void checkSizeBeforeLive(IgniteCache<Integer, Integer> cache, int size) throws Exception { - if (memoryMode() == CacheMemoryMode.OFFHEAP_TIERED) { - assertEquals(0, cache.localSize(CachePeekMode.ONHEAP)); - assertEquals(size, cache.localSize(CachePeekMode.OFFHEAP)); - } - else { - assertEquals(size > MAX_CACHE_SIZE ? MAX_CACHE_SIZE : size, cache.localSize(CachePeekMode.ONHEAP)); - assertEquals(size > MAX_CACHE_SIZE ? size - MAX_CACHE_SIZE : 0, cache.localSize(CachePeekMode.OFFHEAP)); - } + private void checkSizeBeforeLive(int size, int gridCnt) throws Exception { + for (int i = 0; i < gridCnt; ++i) { + IgniteCache<Integer, Integer> cache = jcache(i); + + if (memoryMode() == CacheMemoryMode.OFFHEAP_TIERED) { + assertEquals("Unexpected size, node: " + i, 0, cache.localSize(ONHEAP)); + assertEquals("Unexpected size, node: " + i, size, cache.localSize(OFFHEAP)); + } + else { + assertEquals("Unexpected size, node: " + i, size > MAX_CACHE_SIZE ? MAX_CACHE_SIZE : size, + cache.localSize(ONHEAP)); - assertFalse(cache.query(new SqlQuery<>(Integer.class, "_val >= 0")).getAll().isEmpty()); + assertEquals("Unexpected size, node: " + i, + size > MAX_CACHE_SIZE ? size - MAX_CACHE_SIZE : 0, cache.localSize(OFFHEAP)); + } + + assertFalse(cache.query(new SqlQuery<>(Integer.class, "_val >= 0")).getAll().isEmpty()); + } } /** * @throws Exception If failed. */ private void checkSizeAfterLive() throws Exception { - for (int i = 0; i < gridCount(); ++i) { + checkSizeAfterLive(gridCount()); + } + + /** + * @param gridCnt Number of nodes. + * @throws Exception If failed. + */ + private void checkSizeAfterLive(int gridCnt) throws Exception { + for (int i = 0; i < gridCnt; ++i) { IgniteCache<Integer, Integer> cache = jcache(i); assertEquals(0, cache.localSize()); - assertEquals(0, cache.localSize(CachePeekMode.OFFHEAP)); - assertEquals(0, cache.localSize(CachePeekMode.SWAP)); + assertEquals(0, cache.localSize(OFFHEAP)); + assertEquals(0, cache.localSize(SWAP)); assertEquals(0, cache.query(new SqlQuery<>(Integer.class, "_val >= 0")).getAll().size()); + + for (int key = 0; key < SIZE; key++) + assertNull(cache.localPeek(key)); } } }