#IGNITE-53: Refactor out the common part from IgniteCacheProxy and 
GridCacheSetImpl


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b4c2ca14
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b4c2ca14
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b4c2ca14

Branch: refs/heads/ignite-59
Commit: b4c2ca14ddc823d2546efdf52ca3cb6b078c9865
Parents: f3c6ec1
Author: ivasilinets <ivasilin...@gridgain.com>
Authored: Thu Jan 15 15:32:57 2015 +0400
Committer: ivasilinets <ivasilin...@gridgain.com>
Committed: Thu Jan 15 15:32:57 2015 +0400

----------------------------------------------------------------------
 .../processors/cache/IgniteCacheProxy.java      | 238 ++++++++++++++-----
 .../IgniteQueryFutureStorage.java               | 192 +++++++++++++++
 2 files changed, 371 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4c2ca14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index e0cfa5a..c41f44e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -27,6 +27,7 @@ import org.gridgain.grid.cache.*;
 import org.gridgain.grid.cache.query.*;
 import org.gridgain.grid.kernal.*;
 import org.gridgain.grid.kernal.processors.cache.*;
+import org.gridgain.grid.kernal.processors.cache.datastructures.*;
 import org.gridgain.grid.util.tostring.*;
 import org.gridgain.grid.util.typedef.*;
 import org.gridgain.grid.util.typedef.internal.*;
@@ -62,6 +63,9 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
     /** Projection. */
     private GridCacheProjectionImpl<K, V> prj;
 
+    /** Query future storage */
+    private final IgniteQueryFutureStorage queryStorage;
+
     /**
      * @param ctx Context.
      * @param delegate Delegate.
@@ -81,6 +85,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
         this.delegate = delegate;
         this.prj = prj;
 
+        this.queryStorage = new IgniteQueryFutureStorage(ctx);
+
         gate = ctx.gate();
     }
 
@@ -88,6 +94,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
      * @return Context.
      */
     public GridCacheContext<K, V> context() {
+        onAccess();
+
         return ctx;
     }
 
@@ -95,11 +103,15 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
      * @return Ignite instance.
      */
     @Override public GridEx ignite() {
+        onAccess();
+
         return ctx.grid();
     }
 
     /** {@inheritDoc} */
     @Override public <C extends Configuration<K, V>> C 
getConfiguration(Class<C> clazz) {
+        onAccess();
+
         GridCacheConfiguration cfg = ctx.config();
 
         if (!clazz.isAssignableFrom(cfg.getClass()))
@@ -110,12 +122,16 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Nullable @Override public Entry<K, V> randomEntry() {
+        onAccess();
+
         // TODO IGNITE-1.
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override public IgniteCache<K, V> withExpiryPolicy(ExpiryPolicy plc) {
+        onAccess();
+
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
@@ -130,6 +146,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, 
@Nullable Object... args) {
+        onAccess();
+
         try {
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -151,6 +169,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, 
@Nullable Object... args) {
+        onAccess();
+
         try {
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -168,6 +188,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Nullable @Override public V getAndPutIf(K key, V val, 
IgnitePredicate<GridCacheEntry<K, V>> filter) {
+        onAccess();
+
         try {
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -185,6 +207,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public boolean putIf(K key, V val, 
IgnitePredicate<GridCacheEntry<K, V>> filter) {
+        onAccess();
+
         try {
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -202,6 +226,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public V getAndRemoveIf(K key, IgnitePredicate<GridCacheEntry<K, 
V>> filter) {
+        onAccess();
+
         try {
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -219,6 +245,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public boolean removeIf(K key, IgnitePredicate<GridCacheEntry<K, 
V>> filter) {
+        onAccess();
+
         try {
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -236,6 +264,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Nullable @Override public V getAndPutIfAbsent(K key, V val) throws 
CacheException {
+        onAccess();
+
         try {
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -253,24 +283,32 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public void removeAll(IgnitePredicate filter) throws 
CacheException {
+        onAccess();
+
         // TODO IGNITE-1.
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override public Lock lock(K key) throws CacheException {
+        onAccess();
+
         // TODO IGNITE-1.
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override public Lock lockAll(Set<? extends K> keys) throws CacheException 
{
+        onAccess();
+
         // TODO IGNITE-1.
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override public boolean isLocked(K key) {
+        onAccess();
+
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
@@ -283,6 +321,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public boolean isLockedByThread(K key) {
+        onAccess();
+
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
@@ -295,18 +335,24 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public Iterable<Entry<K, V>> localEntries(CachePeekMode... 
peekModes) throws CacheException {
+        onAccess();
+
         // TODO IGNITE-1.
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override public Map<K, V> localPartition(int part) throws CacheException {
+        onAccess();
+
         // TODO IGNITE-1.
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override public void localEvict(Collection<? extends K> keys) {
+        onAccess();
+
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
@@ -319,6 +365,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Nullable @Override public V localPeek(K key, CachePeekMode... peekModes) {
+        onAccess();
+
         // TODO IGNITE-1.
         if (peekModes.length != 0)
             throw new UnsupportedOperationException();
@@ -335,6 +383,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public void localPromote(Set<? extends K> keys) throws 
CacheException {
+        onAccess();
+
         try {
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -352,12 +402,16 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public boolean clear(Collection<? extends K> keys) {
+        onAccess();
+
         // TODO IGNITE-1.
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override public int size(CachePeekMode... peekModes) throws 
CacheException {
+        onAccess();
+
         // TODO IGNITE-1.
         if (peekModes.length != 0)
             throw new UnsupportedOperationException();
@@ -374,6 +428,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public int localSize(CachePeekMode... peekModes) {
+        onAccess();
+
         // TODO IGNITE-1.
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -387,6 +443,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public V get(K key) {
+        onAccess();
+
         try {
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -404,6 +462,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public Map<K, V> getAll(Set<? extends K> keys) {
+        onAccess();
+
         try {
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -424,6 +484,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
      * @return Values map.
      */
     public Map<K, V> getAll(Collection<? extends K> keys) {
+        onAccess();
+
         try {
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -446,6 +508,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
      * @return Entry set.
      */
     public Set<GridCacheEntry<K, V>> 
entrySetx(IgnitePredicate<GridCacheEntry<K, V>>... filter) {
+        onAccess();
+
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
@@ -460,6 +524,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
      * @param filter Filter.
      */
     public void removeAll(IgnitePredicate<GridCacheEntry<K, V>>... filter) {
+        onAccess();
+
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
@@ -475,6 +541,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public boolean containsKey(K key) {
+        onAccess();
+
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
@@ -489,12 +557,16 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
     @Override public void loadAll(Set<? extends K> keys,
         boolean replaceExistingValues,
         CompletionListener completionLsnr) {
+        onAccess();
+
         // TODO IGNITE-1.
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override public void put(K key, V val) {
+        onAccess();
+
         try {
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -512,6 +584,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public V getAndPut(K key, V val) {
+        onAccess();
+
         try {
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -529,6 +603,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public void putAll(Map<? extends K, ? extends V> map) {
+        onAccess();
+
         try {
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -546,6 +622,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public boolean putIfAbsent(K key, V val) {
+        onAccess();
+
         try {
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -563,6 +641,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public boolean remove(K key) {
+        onAccess();
+
         try {
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -580,6 +660,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public boolean remove(K key, V oldVal) {
+        onAccess();
+
         try {
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -597,6 +679,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public V getAndRemove(K key) {
+        onAccess();
+
         try {
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -614,6 +698,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public boolean replace(K key, V oldVal, V newVal) {
+        onAccess();
+
         try {
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -631,6 +717,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public boolean replace(K key, V val) {
+        onAccess();
+
         try {
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -648,6 +736,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public V getAndReplace(K key, V val) {
+        onAccess();
+
         try {
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -665,6 +755,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public void removeAll(Set<? extends K> keys) {
+        onAccess();
+
         try {
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -684,6 +776,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
      * @param keys Keys to remove.
      */
     public void removeAll(Collection<? extends K> keys) {
+        onAccess();
+
         try {
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -701,6 +795,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public void removeAll() {
+        onAccess();
+
         // TODO IGNITE-1.
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -717,6 +813,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public void clear() {
+        onAccess();
+
         // TODO IGNITE-1.
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -734,6 +832,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
     /** {@inheritDoc} */
     @Override public <T> T invoke(K key, EntryProcessor<K, V, T> 
entryProcessor, Object... args)
         throws EntryProcessorException {
+        onAccess();
+
         try {
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -773,6 +873,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
     @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? 
extends K> keys,
         EntryProcessor<K, V, T> entryProcessor,
         Object... args) {
+        onAccess();
+
         try {
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -792,6 +894,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
     @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(
         Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
         Object... args) {
+        onAccess();
+
         try {
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
@@ -809,11 +913,15 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public String getName() {
+        onAccess();
+
         return delegate.name();
     }
 
     /** {@inheritDoc} */
     @Override public CacheManager getCacheManager() {
+        onAccess();
+
         // TODO IGNITE-45 (Support start/close/destroy cache correctly)
         IgniteCachingProvider provider = 
(IgniteCachingProvider)Caching.getCachingProvider(
             IgniteCachingProvider.class.getName(),
@@ -827,12 +935,16 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public void close() {
+        onAccess();
+
         // TODO IGNITE-45 (Support start/close/destroy cache correctly)
         getCacheManager().destroyCache(getName());
     }
 
     /** {@inheritDoc} */
     @Override public boolean isClosed() {
+        onAccess();
+
         // TODO IGNITE-45 (Support start/close/destroy cache correctly)
         return getCacheManager() == null;
     }
@@ -840,6 +952,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public <T> T unwrap(Class<T> clazz) {
+        onAccess();
+
         if (clazz.equals(IgniteCache.class))
             return (T)this;
 
@@ -848,22 +962,28 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public void 
registerCacheEntryListener(CacheEntryListenerConfiguration 
cacheEntryLsnrConfiguration) {
+        onAccess();
+
         // TODO IGNITE-1.
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override public void 
deregisterCacheEntryListener(CacheEntryListenerConfiguration 
cacheEntryLsnrConfiguration) {
+        onAccess();
+
         // TODO IGNITE-1.
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override public Iterator<Cache.Entry<K, V>> iterator() {
+        onAccess();
+
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
-            return new IgniteCacheIterator();
+            return new 
IgniteCacheIterator(delegate.queries().createScanQuery(null).execute(), 
queryStorage);
         }
         finally {
             gate.leave(prev);
@@ -872,42 +992,56 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public QueryCursor<Entry<K, V>> query(QueryPredicate<K, V> 
filter) {
+        onAccess();
+
         // TODO IGNITE-1.
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override public <R> QueryCursor<R> query(QueryReducer<Entry<K, V>, R> 
rmtRdc, QueryPredicate<K, V> filter) {
+        onAccess();
+
         // TODO IGNITE-1.
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override public QueryCursor<List<?>> queryFields(QuerySqlPredicate<K, V> 
filter) {
+        onAccess();
+
         // TODO IGNITE-1.
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override public <R> QueryCursor<R> queryFields(QueryReducer<List<?>, R> 
rmtRdc, QuerySqlPredicate<K, V> filter) {
+        onAccess();
+
         // TODO IGNITE-1.
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override public QueryCursor<Entry<K, V>> localQuery(QueryPredicate<K, V> 
filter) {
+        onAccess();
+
         // TODO IGNITE-1.
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override public QueryCursor<List<?>> 
localQueryFields(QuerySqlPredicate<K, V> filter) {
+        onAccess();
+
         // TODO IGNITE-1.
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override public IgniteCache<K, V> enableAsync() {
+        onAccess();
+
         if (isAsync())
             return this;
 
@@ -917,6 +1051,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public <K1, V1> IgniteCache<K1, V1> keepPortable() {
+        onAccess();
+
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
@@ -942,6 +1078,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public IgniteCache<K, V> flagsOn(@Nullable GridCacheFlag... 
flags) {
+        onAccess();
+
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
@@ -987,6 +1125,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
+        onAccess();
+
         out.writeObject(ctx);
 
         out.writeObject(delegate);
@@ -997,6 +1137,8 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
     /** {@inheritDoc} */
     @SuppressWarnings({"unchecked"})
     @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        onAccess();
+
         ctx = (GridCacheContext<K, V>)in.readObject();
 
         delegate = (GridCacheProjectionEx<K, V>)in.readObject();
@@ -1006,6 +1148,17 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
         gate = ctx.gate();
     }
 
+    /**
+     * Checks if set was removed and handles iterators weak reference queue.
+     */
+    private void onAccess() {
+        try {
+            queryStorage.onAccess();
+        } catch (IgniteCheckedException e) {
+            throw cacheException(e);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IgniteCacheProxy.class, this);
@@ -1085,90 +1238,57 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
      * Iterator over the cache.
      */
     private class IgniteCacheIterator implements Iterator<Cache.Entry<K, V>> {
-        /** Cache query future for all entries in distributed ignite cache. */
-        private GridCacheQueryFuture<Map.Entry<K, V>> fut;
-
-        /** Current element. */
-        private Map.Entry<K, V> curEntry;
 
-        /** Next element. */
-        private Map.Entry<K,V> nextEntry;
+        /** Iterator over the cache*/
+        IgniteQueryFutureStorage.Iterator<Map.Entry<K, V>> iter;
 
-        /** Init first time. */
-        private boolean firstTime = true;
+        IgniteCacheIterator(GridCacheQueryFuture<Map.Entry<K, V>> fut, 
IgniteQueryFutureStorage storage) {
+            iter = storage.iterator(fut);
+        }
 
         /** {@inheritDoc} */
         @Override public boolean hasNext() {
-            initFirstTime();
-
-            return nextEntry != null;
+            try {
+                return iter.onHasNext();
+            } catch (IgniteCheckedException e) {
+                throw cacheException(e);
+            }
         }
 
         /** {@inheritDoc} */
         @Override public Entry<K, V> next() {
-            initFirstTime();
-
-            curEntry = nextEntry;
-
-            if (curEntry == null)
-                throw new NoSuchElementException();
-
             try {
-                nextEntry = fut.next();
+                final Map.Entry<K, V> cur = iter.onNext();
+                return new Cache.Entry<K, V>() {
+                    @Override public K getKey() {
+                        return cur.getKey();
+                    }
+
+                    @Override public V getValue() {
+                        return cur.getValue();
+                    }
+
+                    @Override public <T> T unwrap(Class<T> clazz) {
+                        throw new IllegalArgumentException();
+                    }
+                };
             }
             catch (IgniteCheckedException e) {
-                curEntry = null;
-
                 throw cacheException(e);
             }
 
-            return new Cache.Entry<K, V>() {
-                @Override public K getKey() {
-                    return curEntry.getKey();
-                }
-
-                @Override public V getValue() {
-                    return curEntry.getValue();
-                }
 
-                @Override public <T> T unwrap(Class<T> clazz) {
-                    throw new IllegalArgumentException();
-                }
-            };
         }
 
         /** {@inheritDoc} */
         @Override public void remove() {
-            if (curEntry == null)
-                throw new IllegalStateException();
-
+            Map.Entry<K, V> curEntry = iter.itemToRemove();
             try {
                 delegate.removex(curEntry.getKey());
             }
             catch (IgniteCheckedException e) {
                 throw cacheException(e);
             }
-
-            curEntry = null;
-        }
-
-        /**
-         * Initialize fields at first call
-         */
-        private void initFirstTime() {
-            if (!firstTime) {
-                return;
-            }
-
-            firstTime = false;
-            fut = delegate.queries().createScanQuery(null).execute();
-
-            try {
-                nextEntry = fut.next();
-            }
-            catch (IgniteCheckedException e) {
-                throw cacheException(e);
-            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4c2ca14/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryFutureStorage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryFutureStorage.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryFutureStorage.java
new file mode 100644
index 0000000..69ef38b
--- /dev/null
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryFutureStorage.java
@@ -0,0 +1,192 @@
+package org.gridgain.grid.kernal.processors.cache.datastructures;
+
+import org.apache.ignite.*;
+import org.gridgain.grid.cache.query.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.jdk8.backport.*;
+
+import java.lang.ref.*;
+import java.util.*;
+
+/**
+ * Storage for GridCacheQueryFuture.
+ */
+public class IgniteQueryFutureStorage {
+    /** Iterators weak references queue. */
+    private final ReferenceQueue<Iterator<?>> refQueue = new 
ReferenceQueue<>();
+
+    /** Iterators futures. */
+    private final Map<WeakReference<Iterator<?>>, GridCacheQueryFuture<?>> 
futs = new ConcurrentHashMap8<>();
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /**
+     * @param ctx Cache context.
+     */
+    public IgniteQueryFutureStorage(GridCacheContext ctx) {
+        log = ctx.logger(GridCacheSetImpl.class);
+    }
+
+    /**
+     * Iterator over the cache.
+     * @param fut Query to iterate
+     * @return iterator
+     */
+    public <T> Iterator<T> iterator(GridCacheQueryFuture<T> fut) {
+        Iterator<T> it = new Iterator<>(fut);
+
+        futs.put(it.weakReference(), fut);
+
+        return it;
+    }
+
+    /**
+     * Closes unreachable iterators.
+     */
+    private void checkWeakQueue() throws IgniteCheckedException {
+        for (Reference<? extends Iterator<?>> itRef = refQueue.poll(); itRef 
!= null; itRef = refQueue.poll()) {
+            WeakReference<Iterator<?>> weakRef = (WeakReference<Iterator<?>>) 
itRef;
+
+            GridCacheQueryFuture<?> fut = futs.remove(weakRef);
+
+            if (fut != null)
+                fut.cancel();
+
+        }
+    }
+
+    /**
+     * Checks if set was removed and handles iterators weak reference queue.
+     */
+    public void onAccess() throws IgniteCheckedException {
+        checkWeakQueue();
+    }
+
+    /**
+     * Cancel all cache queries
+     * @throws IgniteCheckedException
+     */
+    protected void clearQueries() throws IgniteCheckedException {
+        for (GridCacheQueryFuture<?> fut : futs.values()) {
+            try {
+                fut.cancel();
+            }
+            catch (IgniteCheckedException e) {
+                log.error("Failed to close iterator.", e);
+            }
+
+        }
+        futs.clear();
+    }
+
+    /**
+     * Iterator over the cache
+     */
+    public class Iterator<T> {
+        /** Query future. */
+        private final GridCacheQueryFuture<T> fut;
+
+        /** Weak reference. */
+        private final WeakReference<Iterator<?>> weakRef;
+
+        /** Init flag. */
+        private boolean init;
+
+        /** Next item. */
+        private T next;
+
+        /** Current item. */
+        private T cur;
+
+        /**
+         * @param fut GridCacheQueryFuture to iterate
+         */
+        Iterator(GridCacheQueryFuture<T> fut) {
+            this.fut = fut;
+            this.weakRef = new WeakReference<Iterator<?>>(this, refQueue);
+        }
+
+
+        /**
+         * @throws IgniteCheckedException If failed.
+         */
+        private void init() throws IgniteCheckedException {
+            if (!init) {
+                next = fut.next();
+
+                init = true;
+            }
+        }
+
+        /**
+         * @return Iterator weak reference.
+         */
+        WeakReference<Iterator<?>> weakReference() {
+            return weakRef;
+        }
+
+        /**
+         * Clears weak reference.
+         */
+        private void clearWeakReference() {
+            weakRef.clear();
+
+            futs.remove(weakRef);
+        }
+
+        /**
+         * The same as Iterator.next()
+         */
+        public T onNext() throws IgniteCheckedException {
+            init();
+
+            if (next == null) {
+                clearWeakReference();
+
+                throw new NoSuchElementException();
+            }
+
+            cur = next;
+
+            Map.Entry e = (Map.Entry) fut.next();
+
+            next = e != null ? (T) e.getKey() : null;
+
+            if (next == null)
+                clearWeakReference();
+
+            return cur;
+        }
+
+        /**
+         * The same as Iterator.hasNext()
+         */
+        public boolean onHasNext() throws IgniteCheckedException {
+            init();
+
+            boolean hasNext = next != null;
+
+            if (!hasNext)
+                clearWeakReference();
+
+            return hasNext;
+        }
+
+        /**
+         * @return current item to remove
+         * @throws IllegalStateException if the {@code onNext} method has not
+         *         yet been called, or the {@code itemToRemove} method has 
already
+         *         been called after the last call to the {@code onNext}
+         *         method
+         */
+        public T itemToRemove() {
+            if (cur == null)
+                throw new IllegalStateException();
+            T res = cur;
+            cur = null;
+            return res;
+        }
+    }
+
+}

Reply via email to