Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-238 [created] f636bb159


# ignite-238


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f636bb15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f636bb15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f636bb15

Branch: refs/heads/ignite-238
Commit: f636bb1593afe78e1af8674097ad7ccb8904f4f8
Parents: 210bd2e
Author: sboikov <sboi...@gridgain.com>
Authored: Thu Feb 12 15:18:42 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Thu Feb 12 16:53:40 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/cache/CacheProjection.java    |   4 +-
 .../processors/cache/GridCacheAdapter.java      |  28 +++--
 .../processors/cache/GridCacheEntryEx.java      |   7 +-
 .../processors/cache/GridCacheMapEntry.java     |  36 ++++--
 .../cache/GridCacheProjectionImpl.java          |   8 +-
 .../processors/cache/GridCacheProxyImpl.java    |   8 +-
 .../cache/IgniteCacheExpiryPolicy.java          |   6 +
 .../processors/cache/IgniteCacheProxy.java      |   2 +-
 .../dht/atomic/GridDhtAtomicCache.java          |   5 +
 .../cache/query/GridCacheQueryManager.java      | 109 +++++++++++++++----
 .../processors/cache/GridCacheTestEntryEx.java  |   8 +-
 .../IgniteCacheExpiryPolicyAbstractTest.java    |  20 ++++
 12 files changed, 194 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f636bb15/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java 
b/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java
index b6e4dc1..f156b3f 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java
@@ -476,10 +476,12 @@ public interface CacheProjection<K, V> extends 
Iterable<Cache.Entry<K, V>> {
     /**
      * @param key Key.
      * @param peekModes Peek modes.
+     * @param plc Expiry policy if TTL should be updated.
      * @return Value.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable public V localPeek(K key, CachePeekMode[] peekModes) throws 
IgniteCheckedException;
+    @Nullable public V localPeek(K key, CachePeekMode[] peekModes, @Nullable 
IgniteCacheExpiryPolicy plc)
+        throws IgniteCheckedException;
 
     /**
      * @param peekModes Peek modes.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f636bb15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 6f75e32..0cd8886 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -712,7 +712,11 @@ public abstract class GridCacheAdapter<K, V> implements 
GridCache<K, V>,
 
     /** {@inheritDoc} */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    @Nullable @Override public V localPeek(K key, CachePeekMode[] peekModes) 
throws IgniteCheckedException {
+    @Nullable @Override public V localPeek(K key,
+        CachePeekMode[] peekModes,
+        @Nullable IgniteCacheExpiryPolicy plc)
+        throws IgniteCheckedException
+    {
         A.notNull(key, "key");
 
         if (keyCheck)
@@ -783,7 +787,7 @@ public abstract class GridCacheAdapter<K, V> implements 
GridCache<K, V>,
                         (ctx.isNear() ? ctx.near().dht().peekEx(key) : 
peekEx(key));
 
                     if (e != null) {
-                        val = e.peek(modes.heap, modes.offheap, modes.swap, 
topVer);
+                        val = e.peek(modes.heap, modes.offheap, modes.swap, 
topVer, plc);
 
                         modes.offheap = false;
                         modes.swap = false;
@@ -799,7 +803,7 @@ public abstract class GridCacheAdapter<K, V> implements 
GridCache<K, V>,
                 }
             }
             else
-                val = localCachePeek0(key, modes.heap, modes.offheap, 
modes.swap);
+                val = localCachePeek0(key, modes.heap, modes.offheap, 
modes.swap, plc);
 
             if (ctx.portableEnabled())
                 val = (V)ctx.unwrapPortableIfNeeded(val, ctx.keepPortable());
@@ -819,11 +823,16 @@ public abstract class GridCacheAdapter<K, V> implements 
GridCache<K, V>,
      * @param heap Read heap flag.
      * @param offheap Read offheap flag.
      * @param swap Read swap flag.
+     * @param plc Optional expiry policy.
      * @return Value.
      * @throws GridCacheEntryRemovedException If entry removed.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable private V localCachePeek0(K key, boolean heap, boolean offheap, 
boolean swap)
+    @Nullable private V localCachePeek0(K key,
+        boolean heap,
+        boolean offheap,
+        boolean swap,
+        IgniteCacheExpiryPolicy plc)
         throws GridCacheEntryRemovedException, IgniteCheckedException {
         assert ctx.isLocal();
         assert heap || offheap || swap;
@@ -832,7 +841,7 @@ public abstract class GridCacheAdapter<K, V> implements 
GridCache<K, V>,
             GridCacheEntryEx<K, V> e = peekEx(key);
 
             if (e != null)
-                return e.peek(heap, offheap, swap, -1);
+                return e.peek(heap, offheap, swap, -1, plc);
         }
 
         if (offheap || swap) {
@@ -2117,9 +2126,9 @@ public abstract class GridCacheAdapter<K, V> implements 
GridCache<K, V>,
                                 filter,
                                 expiry);
 
-                            GridCacheVersion ver = entry.version();
-
                             if (val == null) {
+                                GridCacheVersion ver = entry.version();
+
                                 if (misses == null)
                                     misses = new GridLeanMap<>();
 
@@ -6080,6 +6089,11 @@ public abstract class GridCacheAdapter<K, V> implements 
GridCache<K, V>,
         }
 
         /** {@inheritDoc} */
+        @Override public synchronized boolean readyToFlush(int cnt) {
+            return (entries != null && entries.size() > cnt) || (rdrsMap != 
null && rdrsMap.size() > cnt);
+        }
+
+        /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(GetExpiryPolicy.class, this);
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f636bb15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 145627f..ec12281 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -607,11 +607,16 @@ public interface GridCacheEntryEx<K, V> {
      * @param offheap Read from offheap flag.
      * @param swap Read from swap flag.
      * @param topVer Topology version.
+     * @param plc Expiry policy if TTL should be updated.
      * @return Value.
      * @throws GridCacheEntryRemovedException If entry has been removed.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable public V peek(boolean heap, boolean offheap, boolean swap, long 
topVer)
+    @Nullable public V peek(boolean heap,
+        boolean offheap,
+        boolean swap,
+        long topVer,
+        @Nullable IgniteCacheExpiryPolicy plc)
         throws GridCacheEntryRemovedException, IgniteCheckedException;
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f636bb15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index c20cf03..69a95a1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2761,14 +2761,15 @@ public abstract class GridCacheMapEntry<K, V> 
implements GridCacheEntryEx<K, V>
     @Nullable @Override public V peek(boolean heap,
         boolean offheap,
         boolean swap,
-        long topVer)
+        long topVer,
+        @Nullable IgniteCacheExpiryPolicy expiryPlc)
         throws GridCacheEntryRemovedException, IgniteCheckedException
     {
         assert heap || offheap || swap;
 
         try {
             if (heap) {
-                GridTuple<V> val = peekGlobal(false, topVer, null);
+                GridTuple<V> val = peekGlobal(false, topVer, null, expiryPlc);
 
                 if (val != null)
                     return val.get();
@@ -2851,13 +2852,13 @@ public abstract class GridCacheMapEntry<K, V> 
implements GridCacheEntryEx<K, V>
                 return peekTx(failFast, filter, tx);
 
             case GLOBAL:
-                return peekGlobal(failFast, topVer, filter);
+                return peekGlobal(failFast, topVer, filter, null);
 
             case NEAR_ONLY:
-                return peekGlobal(failFast, topVer, filter);
+                return peekGlobal(failFast, topVer, filter, null);
 
             case PARTITIONED_ONLY:
-                return peekGlobal(failFast, topVer, filter);
+                return peekGlobal(failFast, topVer, filter, null);
 
             case SMART:
                 /*
@@ -2871,7 +2872,7 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
                  * may have enlisted into the same transaction and that's why 
we pass 'true'
                  * to 'e.peek(true)' method in this case.
                  */
-                return tx == null || tx.state() != ACTIVE ? 
peekGlobal(failFast, topVer, filter) :
+                return tx == null || tx.state() != ACTIVE ? 
peekGlobal(failFast, topVer, filter, null) :
                     peekTxThenGlobal(failFast, filter, tx);
 
             case SWAP:
@@ -2962,7 +2963,7 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
 
         long topVer = tx == null ? cctx.affinity().affinityTopologyVersion() : 
tx.topologyVersion();
 
-        return peekGlobal(failFast, topVer, filter);
+        return peekGlobal(failFast, topVer, filter, null);
     }
 
     /**
@@ -2982,14 +2983,18 @@ public abstract class GridCacheMapEntry<K, V> 
implements GridCacheEntryEx<K, V>
      * @param failFast Fail fast flag.
      * @param topVer Topology version.
      * @param filter Filter.
+     * @param expiryPlc Optional expiry policy.
      * @return Peeked value.
      * @throws GridCacheFilterFailedException If filter failed.
      * @throws GridCacheEntryRemovedException If entry got removed.
      * @throws IgniteCheckedException If unexpected cache failure occurred.
      */
     @SuppressWarnings({"RedundantTypeArguments"})
-    @Nullable private GridTuple<V> peekGlobal(boolean failFast, long topVer,
-        IgnitePredicate<Cache.Entry<K, V>>[] filter)
+    @Nullable private GridTuple<V> peekGlobal(boolean failFast,
+        long topVer,
+        IgnitePredicate<Cache.Entry<K, V>>[] filter,
+        @Nullable IgniteCacheExpiryPolicy expiryPlc
+        )
         throws GridCacheEntryRemovedException, GridCacheFilterFailedException, 
IgniteCheckedException {
         if (!valid(topVer))
             return null;
@@ -3012,6 +3017,19 @@ public abstract class GridCacheMapEntry<K, V> implements 
GridCacheEntryEx<K, V>
 
                     ver = this.ver;
                     val = rawGetOrUnmarshalUnlocked(false);
+
+                    if (val != null && expiryPlc != null) {
+                        long ttl = expiryPlc.forAccess();
+
+                        if (ttl != CU.TTL_NOT_CHANGED) {
+                            updateTtl(ttl);
+
+                            expiryPlc.ttlUpdated(key(),
+                                getOrMarshalKeyBytes(),
+                                version(),
+                                hasReaders() ? 
((GridDhtCacheEntry)this).readers() : null);
+                        }
+                    }
                 }
 
                 if (!cctx.isAll(wrap(), filter))

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f636bb15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
index 9986be7..f074c77 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
@@ -1011,8 +1011,12 @@ public class GridCacheProjectionImpl<K, V> implements 
GridCacheProjectionEx<K, V
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public V localPeek(K key, CachePeekMode[] peekModes) 
throws IgniteCheckedException {
-        return cache.localPeek(key, peekModes);
+    @Nullable @Override public V localPeek(K key,
+        CachePeekMode[] peekModes,
+        @Nullable IgniteCacheExpiryPolicy plc)
+        throws IgniteCheckedException
+    {
+        return cache.localPeek(key, peekModes, plc);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f636bb15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index 542fd19..653d2a6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -1220,11 +1220,15 @@ public class GridCacheProxyImpl<K, V> implements 
GridCacheProxy<K, V>, Externali
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public V localPeek(K key, CachePeekMode[] peekModes) 
throws IgniteCheckedException {
+    @Nullable @Override public V localPeek(K key,
+        CachePeekMode[] peekModes,
+        @Nullable IgniteCacheExpiryPolicy plc)
+        throws IgniteCheckedException
+    {
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
-            return delegate.localPeek(key, peekModes);
+            return delegate.localPeek(key, peekModes, plc);
         }
         finally {
             gate.leave(prev);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f636bb15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpiryPolicy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpiryPolicy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpiryPolicy.java
index b55d472..deb1d29 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpiryPolicy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpiryPolicy.java
@@ -63,6 +63,12 @@ public interface IgniteCacheExpiryPolicy {
     public void reset();
 
     /**
+     * @param cnt Entries count.
+     * @return {@code True} if number of entries or readers is greater than 
given number.
+     */
+    public boolean readyToFlush(int cnt);
+
+    /**
      * @return Entries with TTL updated on access.
      */
     @Nullable public Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> 
entries();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f636bb15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 9ef4716..db9fbe0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -279,7 +279,7 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
-            return delegate.localPeek(key, peekModes);
+            return delegate.localPeek(key, peekModes, null);
         }
         catch (IgniteCheckedException e) {
             throw cacheException(e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f636bb15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index b93fed7..b07c4c7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -3028,6 +3028,11 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
         }
 
         /** {@inheritDoc} */
+        @Override public boolean readyToFlush(int cnt) {
+            return (entries != null && entries.size() > cnt) || (rdrsMap != 
null && rdrsMap.size() > cnt);
+        }
+
+        /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(UpdateExpiryPolicy.class, this);
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f636bb15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 985dba9..7fb9d63 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -44,6 +44,7 @@ import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
 import javax.cache.*;
+import javax.cache.expiry.*;
 import java.io.*;
 import java.sql.*;
 import java.util.*;
@@ -755,9 +756,17 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
 
         injectResources(keyValFilter);
 
-        GridIterator<IgniteBiTuple<K, V>> heapIt = new 
GridIteratorAdapter<IgniteBiTuple<K, V>>() {
+        final CachePeekMode[] peekModes = {CachePeekMode.ONHEAP};
+
+        final GridDhtCacheAdapter dht = cctx.isLocal() ? null : (cctx.isNear() 
? cctx.near().dht() : cctx.dht());
+
+        final ExpiryPolicy plc = cctx.expiry();
+
+        final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt = new 
GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
             private IgniteBiTuple<K, V> next;
 
+            private IgniteCacheExpiryPolicy expiryPlc = 
cctx.cache().accessExpiryPolicy(plc);
+
             private Iterator<K> iter = qry.includeBackups() || 
cctx.isReplicated() ?
                 prj.keySet().iterator() : prj.primaryKeySet().iterator();
 
@@ -765,11 +774,11 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
                 advance();
             }
 
-            @Override public boolean hasNextX() {
+            @Override public boolean onHasNext() {
                 return next != null;
             }
 
-            @Override public IgniteBiTuple<K, V> nextX() {
+            @Override public IgniteBiTuple<K, V> onNext() {
                 if (next == null)
                     throw new NoSuchElementException();
 
@@ -780,10 +789,6 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
                 return next0;
             }
 
-            @Override public void removeX() {
-                throw new UnsupportedOperationException();
-            }
-
             private void advance() {
                 IgniteBiTuple<K, V> next0 = null;
 
@@ -792,7 +797,23 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
 
                     K key = iter.next();
 
-                    V val = prj.peek(key);
+                    V val;
+
+                    try {
+                        val = prj.localPeek(key, peekModes, expiryPlc);
+                    }
+                    catch (IgniteCheckedException e) {
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to peek value: " + e);
+
+                        val = null;
+                    }
+
+                    if (dht != null && expiryPlc != null && 
expiryPlc.readyToFlush(100)) {
+                        dht.sendTtlUpdateRequest(expiryPlc);
+
+                        expiryPlc = cctx.cache().accessExpiryPolicy(plc);
+                    }
 
                     if (val != null) {
                         next0 = F.t(key, val);
@@ -807,6 +828,21 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
                 next = next0 != null ?
                     new IgniteBiTuple<>(next0.getKey(), next0.getValue()) :
                     null;
+
+                if (next == null)
+                    sendTtlUpdate();
+            }
+
+            @Override protected void onClose() throws IgniteCheckedException {
+                sendTtlUpdate();
+            }
+
+            private void sendTtlUpdate() {
+                if (dht != null && expiryPlc != null) {
+                    dht.sendTtlUpdateRequest(expiryPlc);
+
+                    expiryPlc = null;
+                }
             }
 
             private boolean checkPredicate(Map.Entry<K, V> e) {
@@ -850,6 +886,10 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
             @Override protected void onRemove() {
                 it.remove();
             }
+
+            @Override protected void onClose() throws IgniteCheckedException {
+                heapIt.close();
+            }
         };
     }
 
@@ -998,6 +1038,10 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
 
             boolean rmvRes = true;
 
+            final boolean statsEnabled = cctx.config().isStatisticsEnabled();
+
+            final boolean readEvt = 
cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
+
             try {
                 // Preparing query closures.
                 IgnitePredicate<Cache.Entry<Object, Object>> prjFilter = 
qryInfo.projectionPredicate();
@@ -1054,6 +1098,8 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
                 boolean metaSent = false;
 
                 while (!Thread.currentThread().isInterrupted() && 
it.hasNext()) {
+                    long start = statsEnabled ? System.nanoTime() : 0L;
+
                     Object row = it.next();
 
                     // Query is cancelled.
@@ -1063,7 +1109,15 @@ public abstract class GridCacheQueryManager<K, V> 
extends GridCacheManagerAdapte
                         break;
                     }
 
-                    if 
(cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ)) {
+                    if (statsEnabled) {
+                        CacheMetricsImpl metrics = cctx.cache().metrics0();
+
+                        metrics.onRead(true);
+
+                        metrics.addGetTimeNanos(System.nanoTime() - start);
+                    }
+
+                    if (readEvt) {
                         cctx.gridEvents().record(new CacheQueryReadEvent<K, V>(
                             cctx.localNode(),
                             "SQL fields query result set row read.",
@@ -1211,7 +1265,13 @@ public abstract class GridCacheQueryManager<K, V> 
extends GridCacheManagerAdapte
 
                 long topVer = cctx.affinity().affinityTopologyVersion();
 
+                final boolean statsEnabled = 
cctx.config().isStatisticsEnabled();
+
+                final boolean readEvt = 
cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
+
                 while (!Thread.currentThread().isInterrupted() && 
iter.hasNext()) {
+                    long start = statsEnabled ? System.nanoTime() : 0L;
+
                     IgniteBiTuple<K, V> row = iter.next();
 
                     // Query is cancelled.
@@ -1248,9 +1308,17 @@ public abstract class GridCacheQueryManager<K, V> 
extends GridCacheManagerAdapte
                         continue;
                     }
 
-                    switch (type) {
-                        case SQL:
-                            if 
(cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ)) {
+                    if (statsEnabled) {
+                        CacheMetricsImpl metrics = cctx.cache().metrics0();
+
+                        metrics.onRead(true);
+
+                        metrics.addGetTimeNanos(System.nanoTime() - start);
+                    }
+
+                    if (readEvt) {
+                        switch (type) {
+                            case SQL:
                                 cctx.gridEvents().record(new 
CacheQueryReadEvent<>(
                                     cctx.localNode(),
                                     "SQL query entry read.",
@@ -1268,12 +1336,10 @@ public abstract class GridCacheQueryManager<K, V> 
extends GridCacheManagerAdapte
                                     val,
                                     null,
                                     null));
-                            }
 
-                            break;
+                                break;
 
-                        case TEXT:
-                            if 
(cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ)) {
+                            case TEXT:
                                 cctx.gridEvents().record(new 
CacheQueryReadEvent<>(
                                     cctx.localNode(),
                                     "Full text query entry read.",
@@ -1291,12 +1357,10 @@ public abstract class GridCacheQueryManager<K, V> 
extends GridCacheManagerAdapte
                                     val,
                                     null,
                                     null));
-                            }
 
-                            break;
+                                break;
 
-                        case SCAN:
-                            if 
(cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ)) {
+                            case SCAN:
                                 cctx.gridEvents().record(new 
CacheQueryReadEvent<>(
                                     cctx.localNode(),
                                     "Scan query entry read.",
@@ -1314,9 +1378,9 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
                                     val,
                                     null,
                                     null));
-                            }
 
-                            break;
+                                break;
+                        }
                     }
 
                     Map.Entry<K, V> entry = F.t(key, val);
@@ -1387,6 +1451,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
 
     /**
      * @param qryInfo Info.
+     * @param taskName Task name.
      * @return Iterator.
      * @throws IgniteCheckedException In case of error.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f636bb15/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 7b6e335..ffe3cc9 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -836,8 +836,12 @@ public class GridCacheTestEntryEx<K, V> extends 
GridMetadataAwareAdapter impleme
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public V peek(boolean heap, boolean offheap, boolean 
swap, long topVer)
-        throws GridCacheEntryRemovedException, IgniteCheckedException {
+    @Nullable @Override public V peek(boolean heap,
+        boolean offheap,
+        boolean swap,
+        long topVer,
+        @Nullable IgniteCacheExpiryPolicy plc)
+    {
         return null;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f636bb15/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index 59e8bc4..6616958 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.testframework.*;
 import org.apache.ignite.transactions.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.*;
 import javax.cache.configuration.*;
 import javax.cache.expiry.*;
 import javax.cache.processor.*;
@@ -307,6 +308,25 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest 
extends IgniteCacheAbs
                 txGetAll(txMode);
             }
         }
+
+        IgniteCache<Integer, Integer> cache = jcache(0);
+
+        Collection<Integer> putKeys = keys();
+
+        for (final Integer key : putKeys)
+            cache.put(key, key);
+
+        Iterator<Cache.Entry<Integer, Integer>> it = cache.iterator();
+
+        List<Integer> itKeys = new ArrayList<>();
+
+        while (it.hasNext())
+            itKeys.add(it.next().getKey());
+
+        assertTrue(itKeys.size() >= putKeys.size());
+
+        for (Integer key : itKeys)
+            checkTtl(key, 62_000L, true);
     }
 
     /**

Reply via email to