# IGNITE-262 - Fix for internal continuous queries
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7326485b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7326485b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7326485b Branch: refs/heads/sprint-1 Commit: 7326485ba5854b2596ff9b3d54b727ad841a6fff Parents: 66fd52c Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Sun Feb 15 17:01:40 2015 -0800 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Sun Feb 15 17:01:40 2015 -0800 ---------------------------------------------------------------------- .../processors/cache/IgniteCacheProxy.java | 4 -- .../continuous/CacheContinuousQueryManager.java | 48 ++++++++++---------- 2 files changed, 25 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7326485b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 4d901d0..79f3994 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -1232,10 +1232,6 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } } - - - - /** {@inheritDoc} */ @Override protected IgniteCache<K, V> createAsyncInstance() { return new IgniteCacheProxy<>(ctx, delegate, prj, true); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7326485b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 60b22e0..f0b668d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -399,29 +399,31 @@ public class CacheContinuousQueryManager<K, V> extends GridCacheManagerAdapter<K UUID id = cctx.kernalContext().continuous().startRoutine(hnd, bufSize, timeInterval, autoUnsubscribe, grp.predicate()).get(); - final Iterator<Cache.Entry<K, V>> it = cctx.cache().igniteIterator(); - - locLsnr.onUpdated(new Iterable<CacheEntryEvent<? extends K, ? extends V>>() { - @Override public Iterator<CacheEntryEvent<? extends K, ? extends V>> iterator() { - return new Iterator<CacheEntryEvent<? extends K, ? extends V>>() { - @Override public boolean hasNext() { - return it.hasNext(); - } - - @Override public CacheEntryEvent<? extends K, ? extends V> next() { - Cache.Entry<K, V> e = it.next(); - - return new CacheContinuousQueryEvent<>( - cctx.kernalContext().cache().jcache(cctx.name()), CREATED, - new CacheContinuousQueryEntry<>(e.getKey(), e.getValue(), null, null, null)); - } - - @Override public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - }); + if (notifyExisting) { + final Iterator<Cache.Entry<K, V>> it = cctx.cache().entrySetx().iterator(); + + locLsnr.onUpdated(new Iterable<CacheEntryEvent<? extends K, ? extends V>>() { + @Override public Iterator<CacheEntryEvent<? extends K, ? extends V>> iterator() { + return new Iterator<CacheEntryEvent<? extends K, ? extends V>>() { + @Override public boolean hasNext() { + return it.hasNext(); + } + + @Override public CacheEntryEvent<? extends K, ? extends V> next() { + Cache.Entry<K, V> e = it.next(); + + return new CacheContinuousQueryEvent<>( + cctx.kernalContext().cache().jcache(cctx.name()), CREATED, + new CacheContinuousQueryEntry<>(e.getKey(), e.getValue(), null, null, null)); + } + + @Override public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }); + } return id; }