# ignite-670 more tests, use default expiry policy from isolated updater

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/34c29ed7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/34c29ed7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/34c29ed7

Branch: refs/heads/gg-9998
Commit: 34c29ed74578bf569bf537a57f2db642e1d58d2a
Parents: f111294
Author: sboikov <sboi...@gridgain.com>
Authored: Thu Apr 2 16:24:01 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Thu Apr 2 16:24:01 2015 +0300

----------------------------------------------------------------------
 .../datastreamer/DataStreamerImpl.java          |  21 ++-
 .../cache/ttl/CacheTtlAbstractSelfTest.java     | 175 +++++++++++++++----
 2 files changed, 161 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34c29ed7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 71a9364..2b470f6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -43,6 +43,7 @@ import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
 import javax.cache.*;
+import javax.cache.expiry.*;
 import java.util.*;
 import java.util.Map.*;
 import java.util.concurrent.*;
@@ -1390,6 +1391,11 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
 
                 GridCacheVersion ver = cctx.versions().next(topVer);
 
+                long ttl = CU.TTL_ETERNAL;
+                long expiryTime = CU.EXPIRE_TIME_ETERNAL;
+
+                ExpiryPolicy plc = cctx.expiry();
+
                 for (Entry<KeyCacheObject, CacheObject> e : entries) {
                     try {
                         e.getKey().finishUnmarshal(cctx.cacheObjectContext(), 
cctx.deploy().globalLoader());
@@ -1398,10 +1404,21 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
 
                         entry.unswap(false);
 
+                        if (plc != null) {
+                            ttl = CU.toTtl(plc.getExpiryForCreation());
+
+                            if (ttl == CU.TTL_ZERO)
+                                continue;
+                            else if (ttl == CU.TTL_NOT_CHANGED)
+                                ttl = 0;
+
+                            expiryTime = CU.toExpireTime(ttl);
+                        }
+
                         entry.initialValue(e.getValue(),
                             ver,
-                            CU.TTL_ETERNAL,
-                            CU.EXPIRE_TIME_ETERNAL,
+                            ttl,
+                            expiryTime,
                             false,
                             topVer,
                             GridDrType.DR_LOAD);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/34c29ed7/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ttl/CacheTtlAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ttl/CacheTtlAbstractSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ttl/CacheTtlAbstractSelfTest.java
index ebe6f16..0e2e738 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ttl/CacheTtlAbstractSelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ttl/CacheTtlAbstractSelfTest.java
@@ -21,17 +21,22 @@ import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.eviction.lru.*;
 import org.apache.ignite.cache.query.*;
+import org.apache.ignite.cache.store.*;
 import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 import org.apache.ignite.testframework.junits.common.*;
 
+import javax.cache.*;
 import javax.cache.configuration.*;
 import javax.cache.expiry.*;
+import javax.cache.integration.*;
 import java.util.*;
 
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.cache.CachePeekMode.*;
 
 /**
  * TTL test.
@@ -61,17 +66,33 @@ public abstract class CacheTtlAbstractSelfTest extends 
GridCommonAbstractTest {
         cache.setOffHeapMaxMemory(0);
         cache.setEvictionPolicy(new LruEvictionPolicy(MAX_CACHE_SIZE));
         cache.setIndexedTypes(Integer.class, Integer.class);
+        cache.setBackups(2);
+
+        cache.setCacheStoreFactory(singletonFactory(new CacheStoreAdapter() {
+            @Override public void loadCache(IgniteBiInClosure clo, Object... 
args) {
+                for (int i = 0; i < SIZE; i++)
+                    clo.apply(i, i);
+            }
+
+            @Override public Object load(Object key) throws 
CacheLoaderException {
+                return key;
+            }
+
+            @Override public void write(Cache.Entry entry) throws 
CacheWriterException {
+                // No-op.
+            }
+
+            @Override public void delete(Object key) throws 
CacheWriterException {
+                // No-op.
+            }
+        }));
 
         cache.setExpiryPolicyFactory(
             FactoryBuilder.factoryOf(new TouchedExpiryPolicy(new 
Duration(MILLISECONDS, DEFAULT_TIME_TO_LIVE))));
 
         cfg.setCacheConfiguration(cache);
 
-        TcpDiscoverySpi disco = new TcpDiscoverySpi();
-
-        disco.setIpFinder(IP_FINDER);
-
-        cfg.setDiscoverySpi(disco);
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
 
         return cfg;
     }
@@ -109,14 +130,82 @@ public abstract class CacheTtlAbstractSelfTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testDefaultTimeToLiveLoadCache() throws Exception {
+        IgniteCache<Integer, Integer> cache = jcache(0);
+
+        cache.loadCache(null);
+
+        checkSizeBeforeLive(SIZE);
+
+        Thread.sleep(DEFAULT_TIME_TO_LIVE + 500);
+
+        checkSizeAfterLive();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDefaultTimeToLiveLoadAll() throws Exception {
+        IgniteCache<Integer, Integer> cache = jcache(0);
+
+        CompletionListenerFuture fut = new CompletionListenerFuture();
+
+        Set<Integer> keys = new HashSet<>();
+
+        for (int i = 0; i < SIZE; ++i)
+            keys.add(i);
+
+        cache.loadAll(keys, false, fut);
+
+        fut.get();
+
+        checkSizeBeforeLive(SIZE);
+
+        Thread.sleep(DEFAULT_TIME_TO_LIVE + 500);
+
+        checkSizeAfterLive();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDefaultTimeToLiveStreamerAdd() throws Exception {
+        try (IgniteDataStreamer<Integer, Integer> streamer = 
ignite(0).dataStreamer(null)) {
+            for (int i = 0; i < SIZE; i++)
+                streamer.addData(i, i);
+        }
+
+        checkSizeBeforeLive(SIZE);
+
+        Thread.sleep(DEFAULT_TIME_TO_LIVE + 500);
+
+        checkSizeAfterLive();
+
+        try (IgniteDataStreamer<Integer, Integer> streamer = 
ignite(0).dataStreamer(null)) {
+            streamer.allowOverwrite(true);
+
+            for (int i = 0; i < SIZE; i++)
+                streamer.addData(i, i);
+        }
+
+        checkSizeBeforeLive(SIZE);
+
+        Thread.sleep(DEFAULT_TIME_TO_LIVE + 500);
+
+        checkSizeAfterLive();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testDefaultTimeToLivePut() throws Exception {
         IgniteCache<Integer, Integer> cache = jcache(0);
 
-        List<Integer> keys = primaryKeys(cache, 1);
+        Integer key = 0;
 
-        cache.put(keys.get(0), 1);
+        cache.put(key, 1);
 
-        checkSizeBeforeLive(cache, 1);
+        checkSizeBeforeLive(1);
 
         Thread.sleep(DEFAULT_TIME_TO_LIVE + 500);
 
@@ -131,14 +220,12 @@ public abstract class CacheTtlAbstractSelfTest extends 
GridCommonAbstractTest {
 
         Map<Integer, Integer> entries = new HashMap<>();
 
-        List<Integer> keys = primaryKeys(cache, SIZE);
-
         for (int i = 0; i < SIZE; ++i)
-            entries.put(keys.get(i), i);
+            entries.put(i, i);
 
         cache.putAll(entries);
 
-        checkSizeBeforeLive(cache, SIZE);
+        checkSizeBeforeLive(SIZE);
 
         Thread.sleep(DEFAULT_TIME_TO_LIVE + 500);
 
@@ -149,54 +236,76 @@ public abstract class CacheTtlAbstractSelfTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testTimeToLiveTtl() throws Exception {
-        IgniteCache<Integer, Integer> cache = jcache(0);
-
         long time = DEFAULT_TIME_TO_LIVE + 2000;
 
-        List<Integer> keys = primaryKeys(cache, SIZE);
+        IgniteCache<Integer, Integer> cache = this.<Integer, 
Integer>jcache(0).withExpiryPolicy(
+            new TouchedExpiryPolicy(new Duration(MILLISECONDS, time)));
 
         for (int i = 0; i < SIZE; i++)
-            cache.withExpiryPolicy(new TouchedExpiryPolicy(new 
Duration(MILLISECONDS, time))).
-                put(keys.get(i), i);
+            cache.put(i, i);
 
-        checkSizeBeforeLive(cache, SIZE);
+        checkSizeBeforeLive(SIZE);
 
         Thread.sleep(DEFAULT_TIME_TO_LIVE + 500);
 
-        checkSizeBeforeLive(cache, SIZE);
+        checkSizeBeforeLive(SIZE);
 
         Thread.sleep(time - DEFAULT_TIME_TO_LIVE + 500);
 
         checkSizeAfterLive();
     }
 
+    private void checkSizeBeforeLive(int size) throws Exception {
+        checkSizeBeforeLive(size, gridCount());
+    }
+
     /**
+     * @param size Expected size.
+     * @param gridCnt Number of nodes.
      * @throws Exception If failed.
      */
-    private void checkSizeBeforeLive(IgniteCache<Integer, Integer> cache, int 
size) throws Exception {
-        if (memoryMode() == CacheMemoryMode.OFFHEAP_TIERED) {
-            assertEquals(0, cache.localSize(CachePeekMode.ONHEAP));
-            assertEquals(size, cache.localSize(CachePeekMode.OFFHEAP));
-        }
-        else {
-            assertEquals(size > MAX_CACHE_SIZE ? MAX_CACHE_SIZE : size, 
cache.localSize(CachePeekMode.ONHEAP));
-            assertEquals(size > MAX_CACHE_SIZE ? size - MAX_CACHE_SIZE : 0, 
cache.localSize(CachePeekMode.OFFHEAP));
-        }
+    private void checkSizeBeforeLive(int size, int gridCnt) throws Exception {
+        for (int i = 0; i < gridCnt; ++i) {
+            IgniteCache<Integer, Integer> cache = jcache(i);
+
+            if (memoryMode() == CacheMemoryMode.OFFHEAP_TIERED) {
+                assertEquals("Unexpected size, node: " + i, 0, 
cache.localSize(ONHEAP));
+                assertEquals("Unexpected size, node: " + i, size, 
cache.localSize(OFFHEAP));
+            }
+            else {
+                assertEquals("Unexpected size, node: " + i, size > 
MAX_CACHE_SIZE ? MAX_CACHE_SIZE : size,
+                    cache.localSize(ONHEAP));
 
-        assertFalse(cache.query(new SqlQuery<>(Integer.class, "_val >= 
0")).getAll().isEmpty());
+                assertEquals("Unexpected size, node: " + i,
+                    size > MAX_CACHE_SIZE ? size - MAX_CACHE_SIZE : 0, 
cache.localSize(OFFHEAP));
+            }
+
+            assertFalse(cache.query(new SqlQuery<>(Integer.class, "_val >= 
0")).getAll().isEmpty());
+        }
     }
 
     /**
      * @throws Exception If failed.
      */
     private void checkSizeAfterLive() throws Exception {
-        for (int i = 0; i < gridCount(); ++i) {
+        checkSizeAfterLive(gridCount());
+    }
+
+    /**
+     * @param gridCnt Number of nodes.
+     * @throws Exception If failed.
+     */
+    private void checkSizeAfterLive(int gridCnt) throws Exception {
+        for (int i = 0; i < gridCnt; ++i) {
             IgniteCache<Integer, Integer> cache = jcache(i);
 
             assertEquals(0, cache.localSize());
-            assertEquals(0, cache.localSize(CachePeekMode.OFFHEAP));
-            assertEquals(0, cache.localSize(CachePeekMode.SWAP));
+            assertEquals(0, cache.localSize(OFFHEAP));
+            assertEquals(0, cache.localSize(SWAP));
             assertEquals(0, cache.query(new SqlQuery<>(Integer.class, "_val >= 
0")).getAll().size());
+
+            for (int key = 0; key < SIZE; key++)
+                assertNull(cache.localPeek(key));
         }
     }
 }

Reply via email to