Repository: incubator-ignite Updated Branches: refs/heads/ignite-238 [created] f636bb159
# ignite-238 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f636bb15 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f636bb15 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f636bb15 Branch: refs/heads/ignite-238 Commit: f636bb1593afe78e1af8674097ad7ccb8904f4f8 Parents: 210bd2e Author: sboikov <sboi...@gridgain.com> Authored: Thu Feb 12 15:18:42 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Feb 12 16:53:40 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/cache/CacheProjection.java | 4 +- .../processors/cache/GridCacheAdapter.java | 28 +++-- .../processors/cache/GridCacheEntryEx.java | 7 +- .../processors/cache/GridCacheMapEntry.java | 36 ++++-- .../cache/GridCacheProjectionImpl.java | 8 +- .../processors/cache/GridCacheProxyImpl.java | 8 +- .../cache/IgniteCacheExpiryPolicy.java | 6 + .../processors/cache/IgniteCacheProxy.java | 2 +- .../dht/atomic/GridDhtAtomicCache.java | 5 + .../cache/query/GridCacheQueryManager.java | 109 +++++++++++++++---- .../processors/cache/GridCacheTestEntryEx.java | 8 +- .../IgniteCacheExpiryPolicyAbstractTest.java | 20 ++++ 12 files changed, 194 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f636bb15/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java index b6e4dc1..f156b3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java @@ -476,10 +476,12 @@ public interface CacheProjection<K, V> extends Iterable<Cache.Entry<K, V>> { /** * @param key Key. * @param peekModes Peek modes. + * @param plc Expiry policy if TTL should be updated. * @return Value. * @throws IgniteCheckedException If failed. */ - @Nullable public V localPeek(K key, CachePeekMode[] peekModes) throws IgniteCheckedException; + @Nullable public V localPeek(K key, CachePeekMode[] peekModes, @Nullable IgniteCacheExpiryPolicy plc) + throws IgniteCheckedException; /** * @param peekModes Peek modes. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f636bb15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 6f75e32..0cd8886 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -712,7 +712,11 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @SuppressWarnings("ForLoopReplaceableByForEach") - @Nullable @Override public V localPeek(K key, CachePeekMode[] peekModes) throws IgniteCheckedException { + @Nullable @Override public V localPeek(K key, + CachePeekMode[] peekModes, + @Nullable IgniteCacheExpiryPolicy plc) + throws IgniteCheckedException + { A.notNull(key, "key"); if (keyCheck) @@ -783,7 +787,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, (ctx.isNear() ? ctx.near().dht().peekEx(key) : peekEx(key)); if (e != null) { - val = e.peek(modes.heap, modes.offheap, modes.swap, topVer); + val = e.peek(modes.heap, modes.offheap, modes.swap, topVer, plc); modes.offheap = false; modes.swap = false; @@ -799,7 +803,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } } else - val = localCachePeek0(key, modes.heap, modes.offheap, modes.swap); + val = localCachePeek0(key, modes.heap, modes.offheap, modes.swap, plc); if (ctx.portableEnabled()) val = (V)ctx.unwrapPortableIfNeeded(val, ctx.keepPortable()); @@ -819,11 +823,16 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @param heap Read heap flag. * @param offheap Read offheap flag. * @param swap Read swap flag. + * @param plc Optional expiry policy. * @return Value. * @throws GridCacheEntryRemovedException If entry removed. * @throws IgniteCheckedException If failed. */ - @Nullable private V localCachePeek0(K key, boolean heap, boolean offheap, boolean swap) + @Nullable private V localCachePeek0(K key, + boolean heap, + boolean offheap, + boolean swap, + IgniteCacheExpiryPolicy plc) throws GridCacheEntryRemovedException, IgniteCheckedException { assert ctx.isLocal(); assert heap || offheap || swap; @@ -832,7 +841,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, GridCacheEntryEx<K, V> e = peekEx(key); if (e != null) - return e.peek(heap, offheap, swap, -1); + return e.peek(heap, offheap, swap, -1, plc); } if (offheap || swap) { @@ -2117,9 +2126,9 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, filter, expiry); - GridCacheVersion ver = entry.version(); - if (val == null) { + GridCacheVersion ver = entry.version(); + if (misses == null) misses = new GridLeanMap<>(); @@ -6080,6 +6089,11 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** {@inheritDoc} */ + @Override public synchronized boolean readyToFlush(int cnt) { + return (entries != null && entries.size() > cnt) || (rdrsMap != null && rdrsMap.size() > cnt); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GetExpiryPolicy.class, this); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f636bb15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 145627f..ec12281 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -607,11 +607,16 @@ public interface GridCacheEntryEx<K, V> { * @param offheap Read from offheap flag. * @param swap Read from swap flag. * @param topVer Topology version. + * @param plc Expiry policy if TTL should be updated. * @return Value. * @throws GridCacheEntryRemovedException If entry has been removed. * @throws IgniteCheckedException If failed. */ - @Nullable public V peek(boolean heap, boolean offheap, boolean swap, long topVer) + @Nullable public V peek(boolean heap, + boolean offheap, + boolean swap, + long topVer, + @Nullable IgniteCacheExpiryPolicy plc) throws GridCacheEntryRemovedException, IgniteCheckedException; /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f636bb15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index c20cf03..69a95a1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -2761,14 +2761,15 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> @Nullable @Override public V peek(boolean heap, boolean offheap, boolean swap, - long topVer) + long topVer, + @Nullable IgniteCacheExpiryPolicy expiryPlc) throws GridCacheEntryRemovedException, IgniteCheckedException { assert heap || offheap || swap; try { if (heap) { - GridTuple<V> val = peekGlobal(false, topVer, null); + GridTuple<V> val = peekGlobal(false, topVer, null, expiryPlc); if (val != null) return val.get(); @@ -2851,13 +2852,13 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> return peekTx(failFast, filter, tx); case GLOBAL: - return peekGlobal(failFast, topVer, filter); + return peekGlobal(failFast, topVer, filter, null); case NEAR_ONLY: - return peekGlobal(failFast, topVer, filter); + return peekGlobal(failFast, topVer, filter, null); case PARTITIONED_ONLY: - return peekGlobal(failFast, topVer, filter); + return peekGlobal(failFast, topVer, filter, null); case SMART: /* @@ -2871,7 +2872,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> * may have enlisted into the same transaction and that's why we pass 'true' * to 'e.peek(true)' method in this case. */ - return tx == null || tx.state() != ACTIVE ? peekGlobal(failFast, topVer, filter) : + return tx == null || tx.state() != ACTIVE ? peekGlobal(failFast, topVer, filter, null) : peekTxThenGlobal(failFast, filter, tx); case SWAP: @@ -2962,7 +2963,7 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> long topVer = tx == null ? cctx.affinity().affinityTopologyVersion() : tx.topologyVersion(); - return peekGlobal(failFast, topVer, filter); + return peekGlobal(failFast, topVer, filter, null); } /** @@ -2982,14 +2983,18 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> * @param failFast Fail fast flag. * @param topVer Topology version. * @param filter Filter. + * @param expiryPlc Optional expiry policy. * @return Peeked value. * @throws GridCacheFilterFailedException If filter failed. * @throws GridCacheEntryRemovedException If entry got removed. * @throws IgniteCheckedException If unexpected cache failure occurred. */ @SuppressWarnings({"RedundantTypeArguments"}) - @Nullable private GridTuple<V> peekGlobal(boolean failFast, long topVer, - IgnitePredicate<Cache.Entry<K, V>>[] filter) + @Nullable private GridTuple<V> peekGlobal(boolean failFast, + long topVer, + IgnitePredicate<Cache.Entry<K, V>>[] filter, + @Nullable IgniteCacheExpiryPolicy expiryPlc + ) throws GridCacheEntryRemovedException, GridCacheFilterFailedException, IgniteCheckedException { if (!valid(topVer)) return null; @@ -3012,6 +3017,19 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> ver = this.ver; val = rawGetOrUnmarshalUnlocked(false); + + if (val != null && expiryPlc != null) { + long ttl = expiryPlc.forAccess(); + + if (ttl != CU.TTL_NOT_CHANGED) { + updateTtl(ttl); + + expiryPlc.ttlUpdated(key(), + getOrMarshalKeyBytes(), + version(), + hasReaders() ? ((GridDhtCacheEntry)this).readers() : null); + } + } } if (!cctx.isAll(wrap(), filter)) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f636bb15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java index 9986be7..f074c77 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java @@ -1011,8 +1011,12 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V } /** {@inheritDoc} */ - @Nullable @Override public V localPeek(K key, CachePeekMode[] peekModes) throws IgniteCheckedException { - return cache.localPeek(key, peekModes); + @Nullable @Override public V localPeek(K key, + CachePeekMode[] peekModes, + @Nullable IgniteCacheExpiryPolicy plc) + throws IgniteCheckedException + { + return cache.localPeek(key, peekModes, plc); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f636bb15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index 542fd19..653d2a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -1220,11 +1220,15 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K, V>, Externali } /** {@inheritDoc} */ - @Nullable @Override public V localPeek(K key, CachePeekMode[] peekModes) throws IgniteCheckedException { + @Nullable @Override public V localPeek(K key, + CachePeekMode[] peekModes, + @Nullable IgniteCacheExpiryPolicy plc) + throws IgniteCheckedException + { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return delegate.localPeek(key, peekModes); + return delegate.localPeek(key, peekModes, plc); } finally { gate.leave(prev); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f636bb15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpiryPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpiryPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpiryPolicy.java index b55d472..deb1d29 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpiryPolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpiryPolicy.java @@ -63,6 +63,12 @@ public interface IgniteCacheExpiryPolicy { public void reset(); /** + * @param cnt Entries count. + * @return {@code True} if number of entries or readers is greater than given number. + */ + public boolean readyToFlush(int cnt); + + /** * @return Entries with TTL updated on access. */ @Nullable public Map<Object, IgniteBiTuple<byte[], GridCacheVersion>> entries(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f636bb15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 9ef4716..db9fbe0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -279,7 +279,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return delegate.localPeek(key, peekModes); + return delegate.localPeek(key, peekModes, null); } catch (IgniteCheckedException e) { throw cacheException(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f636bb15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index b93fed7..b07c4c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -3028,6 +3028,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ + @Override public boolean readyToFlush(int cnt) { + return (entries != null && entries.size() > cnt) || (rdrsMap != null && rdrsMap.size() > cnt); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(UpdateExpiryPolicy.class, this); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f636bb15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 985dba9..7fb9d63 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -44,6 +44,7 @@ import org.jdk8.backport.*; import org.jetbrains.annotations.*; import javax.cache.*; +import javax.cache.expiry.*; import java.io.*; import java.sql.*; import java.util.*; @@ -755,9 +756,17 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte injectResources(keyValFilter); - GridIterator<IgniteBiTuple<K, V>> heapIt = new GridIteratorAdapter<IgniteBiTuple<K, V>>() { + final CachePeekMode[] peekModes = {CachePeekMode.ONHEAP}; + + final GridDhtCacheAdapter dht = cctx.isLocal() ? null : (cctx.isNear() ? cctx.near().dht() : cctx.dht()); + + final ExpiryPolicy plc = cctx.expiry(); + + final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt = new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() { private IgniteBiTuple<K, V> next; + private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().accessExpiryPolicy(plc); + private Iterator<K> iter = qry.includeBackups() || cctx.isReplicated() ? prj.keySet().iterator() : prj.primaryKeySet().iterator(); @@ -765,11 +774,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte advance(); } - @Override public boolean hasNextX() { + @Override public boolean onHasNext() { return next != null; } - @Override public IgniteBiTuple<K, V> nextX() { + @Override public IgniteBiTuple<K, V> onNext() { if (next == null) throw new NoSuchElementException(); @@ -780,10 +789,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte return next0; } - @Override public void removeX() { - throw new UnsupportedOperationException(); - } - private void advance() { IgniteBiTuple<K, V> next0 = null; @@ -792,7 +797,23 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte K key = iter.next(); - V val = prj.peek(key); + V val; + + try { + val = prj.localPeek(key, peekModes, expiryPlc); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to peek value: " + e); + + val = null; + } + + if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) { + dht.sendTtlUpdateRequest(expiryPlc); + + expiryPlc = cctx.cache().accessExpiryPolicy(plc); + } if (val != null) { next0 = F.t(key, val); @@ -807,6 +828,21 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte next = next0 != null ? new IgniteBiTuple<>(next0.getKey(), next0.getValue()) : null; + + if (next == null) + sendTtlUpdate(); + } + + @Override protected void onClose() throws IgniteCheckedException { + sendTtlUpdate(); + } + + private void sendTtlUpdate() { + if (dht != null && expiryPlc != null) { + dht.sendTtlUpdateRequest(expiryPlc); + + expiryPlc = null; + } } private boolean checkPredicate(Map.Entry<K, V> e) { @@ -850,6 +886,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte @Override protected void onRemove() { it.remove(); } + + @Override protected void onClose() throws IgniteCheckedException { + heapIt.close(); + } }; } @@ -998,6 +1038,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte boolean rmvRes = true; + final boolean statsEnabled = cctx.config().isStatisticsEnabled(); + + final boolean readEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); + try { // Preparing query closures. IgnitePredicate<Cache.Entry<Object, Object>> prjFilter = qryInfo.projectionPredicate(); @@ -1054,6 +1098,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte boolean metaSent = false; while (!Thread.currentThread().isInterrupted() && it.hasNext()) { + long start = statsEnabled ? System.nanoTime() : 0L; + Object row = it.next(); // Query is cancelled. @@ -1063,7 +1109,15 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte break; } - if (cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ)) { + if (statsEnabled) { + CacheMetricsImpl metrics = cctx.cache().metrics0(); + + metrics.onRead(true); + + metrics.addGetTimeNanos(System.nanoTime() - start); + } + + if (readEvt) { cctx.gridEvents().record(new CacheQueryReadEvent<K, V>( cctx.localNode(), "SQL fields query result set row read.", @@ -1211,7 +1265,13 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte long topVer = cctx.affinity().affinityTopologyVersion(); + final boolean statsEnabled = cctx.config().isStatisticsEnabled(); + + final boolean readEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); + while (!Thread.currentThread().isInterrupted() && iter.hasNext()) { + long start = statsEnabled ? System.nanoTime() : 0L; + IgniteBiTuple<K, V> row = iter.next(); // Query is cancelled. @@ -1248,9 +1308,17 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte continue; } - switch (type) { - case SQL: - if (cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ)) { + if (statsEnabled) { + CacheMetricsImpl metrics = cctx.cache().metrics0(); + + metrics.onRead(true); + + metrics.addGetTimeNanos(System.nanoTime() - start); + } + + if (readEvt) { + switch (type) { + case SQL: cctx.gridEvents().record(new CacheQueryReadEvent<>( cctx.localNode(), "SQL query entry read.", @@ -1268,12 +1336,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte val, null, null)); - } - break; + break; - case TEXT: - if (cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ)) { + case TEXT: cctx.gridEvents().record(new CacheQueryReadEvent<>( cctx.localNode(), "Full text query entry read.", @@ -1291,12 +1357,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte val, null, null)); - } - break; + break; - case SCAN: - if (cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ)) { + case SCAN: cctx.gridEvents().record(new CacheQueryReadEvent<>( cctx.localNode(), "Scan query entry read.", @@ -1314,9 +1378,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte val, null, null)); - } - break; + break; + } } Map.Entry<K, V> entry = F.t(key, val); @@ -1387,6 +1451,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte /** * @param qryInfo Info. + * @param taskName Task name. * @return Iterator. * @throws IgniteCheckedException In case of error. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f636bb15/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index 7b6e335..ffe3cc9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -836,8 +836,12 @@ public class GridCacheTestEntryEx<K, V> extends GridMetadataAwareAdapter impleme } /** {@inheritDoc} */ - @Nullable @Override public V peek(boolean heap, boolean offheap, boolean swap, long topVer) - throws GridCacheEntryRemovedException, IgniteCheckedException { + @Nullable @Override public V peek(boolean heap, + boolean offheap, + boolean swap, + long topVer, + @Nullable IgniteCacheExpiryPolicy plc) + { return null; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f636bb15/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java index 59e8bc4..6616958 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java @@ -30,6 +30,7 @@ import org.apache.ignite.testframework.*; import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; +import javax.cache.*; import javax.cache.configuration.*; import javax.cache.expiry.*; import javax.cache.processor.*; @@ -307,6 +308,25 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs txGetAll(txMode); } } + + IgniteCache<Integer, Integer> cache = jcache(0); + + Collection<Integer> putKeys = keys(); + + for (final Integer key : putKeys) + cache.put(key, key); + + Iterator<Cache.Entry<Integer, Integer>> it = cache.iterator(); + + List<Integer> itKeys = new ArrayList<>(); + + while (it.hasNext()) + itKeys.add(it.next().getKey()); + + assertTrue(itKeys.size() >= putKeys.size()); + + for (Integer key : itKeys) + checkTtl(key, 62_000L, true); } /**