#IGNITE-53: merge

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3362a61d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3362a61d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3362a61d

Branch: refs/heads/sprint-1
Commit: 3362a61d64b7e54a92788fe300cde010852c1e10
Parents: 7af5d02
Author: ivasilinets <ivasilin...@gridgain.com>
Authored: Fri Jan 23 19:05:39 2015 +0300
Committer: ivasilinets <ivasilin...@gridgain.com>
Committed: Fri Jan 23 19:05:39 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/CacheEntryImpl.java        |   8 +-
 .../processors/cache/GridCacheAdapter.java      |  32 ++++
 .../processors/cache/GridCacheContext.java      |  13 ++
 .../processors/cache/GridCacheGateway.java      |   2 +
 .../processors/cache/IgniteCacheProxy.java      |  19 +-
 .../cache/datastructures/GridCacheSetImpl.java  | 174 ++----------------
 .../cache/query/GridCacheQueryAdapter.java      |  14 +-
 .../query/GridCacheQueryFutureAdapter.java      |   7 +-
 .../cache/query/GridCacheQueryManager.java      |   4 +-
 .../cache/CacheWeakQueryIteratorsHolder.java    |  22 +--
 .../cache/GridCacheAbstractFullApiSelfTest.java | 178 +++++++++++++++++++
 11 files changed, 270 insertions(+), 203 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3362a61d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl.java
index 27dadbd..e609cd1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl.java
@@ -49,8 +49,12 @@ public class CacheEntryImpl<K, V> implements Cache.Entry<K, 
V> {
     }
 
     /** {@inheritDoc} */
-    @Override public <T> T unwrap(Class<T> clazz) {
-        throw new IllegalArgumentException();
+    @SuppressWarnings("unchecked")
+    @Override public <T> T unwrap(Class<T> cls) {
+        if (!cls.equals(getClass()))
+            throw new IllegalArgumentException("Unwrapping to class is not 
supported: " + cls);
+
+        return (T)this;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3362a61d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 112483a..3c0ed75 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -46,9 +46,11 @@ import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.gridgain.grid.kernal.processors.cache.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.*;
 import javax.cache.expiry.*;
 import javax.cache.processor.*;
 import java.io.*;
@@ -3615,6 +3617,36 @@ public abstract class GridCacheAdapter<K, V> implements 
GridCache<K, V>,
         return entrySet().iterator();
     }
 
+    /**
+     * @param prj Projection.
+     * @return Distributed ignite cache iterator.
+     */
+    public Iterator<Cache.Entry<K, V>> igniteIterator(final 
GridCacheProjectionImpl<K, V> prj) {
+        CacheQueryFuture<Map.Entry<K, V>> fut = queries().createScanQuery(null)
+            .keepAll(false)
+            .execute();
+
+        return ctx.itHolder().iterator(fut, new 
CacheIteratorConverter<Cache.Entry<K, V>, Map.Entry<K, V>>() {
+            @Override protected Cache.Entry<K, V> convert(Map.Entry<K, V> e) {
+                return new CacheEntryImpl<>(e.getKey(), e.getValue());
+            }
+
+            @Override protected void remove(Cache.Entry<K, V> item) {
+                GridCacheProjectionImpl<K, V> prev = ctx.gate().enter(prj);
+
+                try {
+                    GridCacheAdapter.this.removex(item.getKey());
+                }
+                catch (IgniteCheckedException e) {
+                    throw new CacheException(e);
+                }
+                finally {
+                    ctx.gate().leave(prev);
+                }
+            }
+        });
+    }
+
     /** {@inheritDoc} */
     @Nullable @Override public V promote(K key) throws IgniteCheckedException {
         return promote(key, true);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3362a61d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 567d0bb..d1dcaa3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.offheap.unsafe.*;
 import org.apache.ignite.internal.util.tostring.*;
+import org.gridgain.grid.kernal.processors.cache.*;
 import org.jetbrains.annotations.*;
 
 import javax.cache.configuration.*;
@@ -186,6 +187,9 @@ public class GridCacheContext<K, V> implements 
Externalizable {
     /** Default expiry policy. */
     private ExpiryPolicy expiryPlc;
 
+    /** Cache weak query iterator holder. */
+    private CacheWeakQueryIteratorsHolder<Map.Entry<K, V>> itHolder;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -300,6 +304,8 @@ public class GridCacheContext<K, V> implements 
Externalizable {
 
         if (expiryPlc instanceof EternalExpiryPolicy)
             expiryPlc = null;
+
+        itHolder = new CacheWeakQueryIteratorsHolder(log);
     }
 
     /**
@@ -837,6 +843,13 @@ public class GridCacheContext<K, V> implements 
Externalizable {
     }
 
     /**
+     * @return Iterators Holder.
+     */
+    public CacheWeakQueryIteratorsHolder<Map.Entry<K, V>> itHolder() {
+        return itHolder;
+    }
+
+    /**
      * @return Swap manager.
      */
     public GridCacheSwapManager<K, V> swap() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3362a61d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
index ae97afe..2de235a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
@@ -87,6 +87,8 @@ public class GridCacheGateway<K, V> {
      */
     @Nullable public GridCacheProjectionImpl<K, V> enter(@Nullable 
GridCacheProjectionImpl<K, V> prj) {
         try {
+            ctx.itHolder().checkWeakQueue();
+
             GridCacheAdapter<K, V> cache = ctx.cache();
 
             GridCachePreloader<K, V> preldr = cache != null ? 
cache.preloader() : null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3362a61d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index b9265a5..94ee239 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -820,27 +820,10 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter implements
 
     /** {@inheritDoc} */
     @Override public Iterator<Cache.Entry<K, V>> iterator() {
-        // TODO IGNITE-1.
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
-            return F.iterator(delegate, new C1<CacheEntry<K, V>, Entry<K, 
V>>() {
-                @Override public Entry<K, V> apply(final CacheEntry<K, V> e) {
-                    return new Entry<K, V>() {
-                        @Override public K getKey() {
-                            return e.getKey();
-                        }
-
-                        @Override public V getValue() {
-                            return e.getValue();
-                        }
-
-                        @Override public <T> T unwrap(Class<T> clazz) {
-                            throw new IllegalArgumentException();
-                        }
-                    };
-                }
-            }, false);
+            return ((GridCacheAdapter)delegate).igniteIterator(prj);
         }
         finally {
             gate.leave(prev);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3362a61d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetImpl.java
index 80151ec..b84d957 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetImpl.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.gridgain.grid.kernal.processors.cache.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
@@ -53,9 +54,6 @@ public class GridCacheSetImpl<T> extends 
AbstractCollection<T> implements CacheS
     /** Cache. */
     private final GridCache<GridCacheSetItemKey, Boolean> cache;
 
-    /** Logger. */
-    private final IgniteLogger log;
-
     /** Set name. */
     private final String name;
 
@@ -71,12 +69,6 @@ public class GridCacheSetImpl<T> extends 
AbstractCollection<T> implements CacheS
     /** Removed flag. */
     private volatile boolean rmvd;
 
-    /** Iterators weak references queue. */
-    private final ReferenceQueue<SetIterator<?>> itRefQueue = new 
ReferenceQueue<>();
-
-    /** Iterators futures. */
-    private final Map<WeakReference<SetIterator<?>>, CacheQueryFuture<?>> 
itFuts = new ConcurrentHashMap8<>();
-
     /**
      * @param ctx Cache context.
      * @param name Set name.
@@ -91,8 +83,6 @@ public class GridCacheSetImpl<T> extends 
AbstractCollection<T> implements CacheS
 
         cache = ctx.cache();
 
-        log = ctx.logger(GridCacheSetImpl.class);
-
         hdrPart = ctx.affinity().partition(new GridCacheSetHeaderKey(name));
     }
 
@@ -348,16 +338,21 @@ public class GridCacheSetImpl<T> extends 
AbstractCollection<T> implements CacheS
 
             qry.projection(ctx.grid().forNodes(nodes));
 
-            CacheQueryFuture<T> fut = qry.execute();
+            CacheQueryFuture<Map.Entry<T, ?>> fut = qry.execute();
 
-            SetIterator<T> it = new SetIterator<>(fut);
+            CacheWeakQueryIteratorsHolder.WeakQueryFutureIterator it =
+                ctx.itHolder().iterator(fut, new CacheIteratorConverter<T, 
Map.Entry<T, ?>>() {
+                    @Override protected T convert(Map.Entry<T, ?> e) {
+                        return e.getKey();
+                    }
 
-            itFuts.put(it.weakReference(), fut);
+                    @Override protected void remove(T item) {
+                        GridCacheSetImpl.this.remove(item);
+                    }
+                });
 
             if (rmvd) {
-                itFuts.remove(it.weakReference());
-
-                it.close();
+                ctx.itHolder().removeIterator(it);
 
                 checkRemoved();
             }
@@ -443,18 +438,8 @@ public class GridCacheSetImpl<T> extends 
AbstractCollection<T> implements CacheS
 
         this.rmvd = rmvd;
 
-        if (rmvd) {
-            for (CacheQueryFuture<?> fut : itFuts.values()) {
-                try {
-                    fut.cancel();
-                }
-                catch (IgniteCheckedException e) {
-                    log.error("Failed to close iterator.", e);
-                }
-            }
-
-            itFuts.clear();
-        }
+        if (rmvd)
+            ctx.itHolder().clearQueries();
     }
 
     /**
@@ -466,29 +451,10 @@ public class GridCacheSetImpl<T> extends 
AbstractCollection<T> implements CacheS
     }
 
     /**
-     * Closes unreachable iterators.
-     */
-    private void checkWeakQueue() {
-        for (Reference<? extends SetIterator<?>> itRef = itRefQueue.poll(); 
itRef != null; itRef = itRefQueue.poll()) {
-            try {
-                WeakReference<SetIterator<?>> weakRef = 
(WeakReference<SetIterator<?>>)itRef;
-
-                CacheQueryFuture<?> fut = itFuts.remove(weakRef);
-
-                if (fut != null)
-                    fut.cancel();
-            }
-            catch (IgniteCheckedException e) {
-                log.error("Failed to close iterator.", e);
-            }
-        }
-    }
-
-    /**
      * Checks if set was removed and handles iterators weak reference queue.
      */
     private void onAccess() {
-        checkWeakQueue();
+        ctx.itHolder().checkWeakQueue();
 
         checkRemoved();
     }
@@ -523,116 +489,6 @@ public class GridCacheSetImpl<T> extends 
AbstractCollection<T> implements CacheS
     /**
      *
      */
-    private class SetIterator<T> extends GridCloseableIteratorAdapter<T> {
-        /** */
-        private static final long serialVersionUID = -1460570789166994846L;
-
-        /** Query future. */
-        private final CacheQueryFuture<T> fut;
-
-        /** Init flag. */
-        private boolean init;
-
-        /** Next item. */
-        private T next;
-
-        /** Current item. */
-        private T cur;
-
-        /** Weak reference. */
-        private final WeakReference<SetIterator<?>> weakRef;
-
-        /**
-         * @param fut Query future.
-         */
-        private SetIterator(CacheQueryFuture<T> fut) {
-            this.fut = fut;
-
-            weakRef = new WeakReference<SetIterator<?>>(this, itRefQueue);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected T onNext() throws IgniteCheckedException {
-            init();
-
-            if (next == null) {
-                clearWeakReference();
-
-                throw new NoSuchElementException();
-            }
-
-            cur = next;
-
-            Map.Entry e = (Map.Entry)fut.next();
-
-            next = e != null ? (T)e.getKey() : null;
-
-            if (next == null)
-                clearWeakReference();
-
-            return cur;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected boolean onHasNext() throws IgniteCheckedException {
-            init();
-
-            boolean hasNext = next != null;
-
-            if (!hasNext)
-                clearWeakReference();
-
-            return hasNext;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void onClose() throws IgniteCheckedException {
-            fut.cancel();
-
-            clearWeakReference();
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void onRemove() throws IgniteCheckedException {
-            if (cur == null)
-                throw new NoSuchElementException();
-
-            GridCacheSetImpl.this.remove(cur);
-        }
-
-        /**
-         * @throws IgniteCheckedException If failed.
-         */
-        private void init() throws IgniteCheckedException {
-            if (!init) {
-                Map.Entry e = (Map.Entry)fut.next();
-
-                next = e != null ? (T)e.getKey() : null;
-
-                init = true;
-            }
-        }
-
-        /**
-         * @return Iterator weak reference.
-         */
-        WeakReference<SetIterator<?>> weakReference() {
-            return weakRef;
-        }
-
-        /**
-         * Clears weak reference.
-         */
-        private void clearWeakReference() {
-            weakRef.clear(); // Do not need to enqueue.
-
-            itFuts.remove(weakRef);
-        }
-    }
-
-    /**
-     *
-     */
     private static class SumReducer implements IgniteReducer<Object, Integer>, 
Externalizable {
         /** */
         private static final long serialVersionUID = -3436987759126521204L;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3362a61d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index f65017a..d35d215 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -65,13 +65,13 @@ public class GridCacheQueryAdapter<T> implements 
CacheQuery<T> {
     private volatile GridCacheQueryMetricsAdapter metrics;
 
     /** */
-    private volatile int pageSize;
+    private volatile int pageSize = DFLT_PAGE_SIZE;
 
     /** */
     private volatile long timeout;
 
     /** */
-    private volatile boolean keepAll;
+    private volatile boolean keepAll = true;
 
     /** */
     private volatile boolean incBackups;
@@ -123,13 +123,6 @@ public class GridCacheQueryAdapter<T> implements 
CacheQuery<T> {
 
         log = cctx.logger(getClass());
 
-        pageSize = DFLT_PAGE_SIZE;
-        timeout = 0;
-        keepAll = true;
-        incBackups = false;
-        dedup = false;
-        prj = null;
-
         metrics = new GridCacheQueryMetricsAdapter();
     }
 
@@ -419,6 +412,9 @@ public class GridCacheQueryAdapter<T> implements 
CacheQuery<T> {
 
         cctx.checkSecurity(GridSecurityPermission.CACHE_READ);
 
+        if (nodes.isEmpty())
+            return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), new 
ClusterGroupEmptyException());
+
         if (log.isDebugEnabled())
             log.debug("Executing query [query=" + this + ", nodes=" + nodes + 
']');
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3362a61d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
index 94c85b4..4202c99 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
@@ -58,7 +58,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> 
extends GridFutureAda
     protected final GridCacheQueryBean qry;
 
     /** Set of received keys used to deduplicate query result set. */
-    private final Collection<K> keys = new HashSet<>();
+    private final Collection<K> keys;
 
     /** */
     private final Queue<Collection<R>> queue = new LinkedList<>();
@@ -92,6 +92,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> 
extends GridFutureAda
      */
     protected GridCacheQueryFutureAdapter() {
         qry = null;
+        keys = null;
     }
 
     /**
@@ -121,6 +122,8 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> 
extends GridFutureAda
 
             cctx.time().addTimeoutObject(this);
         }
+
+        keys = qry.query().enableDedup() ? new HashSet<K>() : null;
     }
 
     /**
@@ -335,7 +338,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> 
extends GridFutureAda
         if (!qry.query().enableDedup())
             return col;
 
-        Collection<Object> dedupCol = new LinkedList<>();
+        Collection<Object> dedupCol = new ArrayList<>(col.size());
 
         synchronized (mux) {
             for (Object o : col)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3362a61d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 5a0df9b..ae5fe77 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -756,8 +756,8 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
         GridIterator<IgniteBiTuple<K, V>> heapIt = new 
GridIteratorAdapter<IgniteBiTuple<K, V>>() {
             private IgniteBiTuple<K, V> next;
 
-            private Iterator<K> iter = prj.keySet().iterator();
-
+            private Iterator<K> iter = qry.includeBackups() || 
cctx.isReplicated() ?
+                prj.keySet().iterator() : prj.primaryKeySet().iterator();
             {
                 advance();
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3362a61d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheWeakQueryIteratorsHolder.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheWeakQueryIteratorsHolder.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheWeakQueryIteratorsHolder.java
index f955695..eea72fe 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheWeakQueryIteratorsHolder.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheWeakQueryIteratorsHolder.java
@@ -18,9 +18,9 @@
 package org.gridgain.grid.kernal.processors.cache;
 
 import org.apache.ignite.*;
-import org.gridgain.grid.cache.query.*;
-import org.gridgain.grid.util.*;
-import org.gridgain.grid.util.typedef.internal.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
 import org.jdk8.backport.*;
 
 import java.lang.ref.*;
@@ -34,7 +34,7 @@ public class CacheWeakQueryIteratorsHolder<V> {
     private final ReferenceQueue<WeakQueryFutureIterator> refQueue = new 
ReferenceQueue<>();
 
     /** Iterators futures. */
-    private final Map<WeakReference<WeakQueryFutureIterator>, 
GridCacheQueryFuture<V>> futs =
+    private final Map<WeakReference<WeakQueryFutureIterator>, 
CacheQueryFuture<V>> futs =
         new ConcurrentHashMap8<>();
 
     /** Logger. */
@@ -53,10 +53,10 @@ public class CacheWeakQueryIteratorsHolder<V> {
      * @param <T> Type for the iterator.
      * @return Iterator over the cache.
      */
-    public <T> WeakQueryFutureIterator iterator(GridCacheQueryFuture<V> fut, 
CacheIteratorConverter<T, V> convert) {
+    public <T> WeakQueryFutureIterator iterator(CacheQueryFuture<V> fut, 
CacheIteratorConverter<T, V> convert) {
         WeakQueryFutureIterator it = new WeakQueryFutureIterator(fut, convert);
 
-        GridCacheQueryFuture<V> old = futs.put(it.weakReference(), fut);
+        CacheQueryFuture<V> old = futs.put(it.weakReference(), fut);
 
         assert old == null;
 
@@ -83,7 +83,7 @@ public class CacheWeakQueryIteratorsHolder<V> {
             try {
                 WeakReference<WeakQueryFutureIterator> weakRef = 
(WeakReference<WeakQueryFutureIterator>)itRef;
 
-                GridCacheQueryFuture<?> fut = futs.remove(weakRef);
+                CacheQueryFuture<?> fut = futs.remove(weakRef);
 
                 if (fut != null)
                     fut.cancel();
@@ -98,7 +98,7 @@ public class CacheWeakQueryIteratorsHolder<V> {
      * Cancel all cache queries.
      */
     public void clearQueries(){
-        for (GridCacheQueryFuture<?> fut : futs.values()) {
+        for (CacheQueryFuture<?> fut : futs.values()) {
             try {
                 fut.cancel();
             }
@@ -112,13 +112,13 @@ public class CacheWeakQueryIteratorsHolder<V> {
 
 
     /**
-     * Iterator based of {@link GridCacheQueryFuture}.
+     * Iterator based of {@link CacheQueryFuture}.
      *
      * @param <T> Type for iterator.
      */
     public class WeakQueryFutureIterator<T> extends 
GridCloseableIteratorAdapter<T> {
         /** Query future. */
-        private final GridCacheQueryFuture<V> fut;
+        private final CacheQueryFuture<V> fut;
 
         /** Weak reference. */
         private final WeakReference<WeakQueryFutureIterator<T>> weakRef;
@@ -137,7 +137,7 @@ public class CacheWeakQueryIteratorsHolder<V> {
         /**
          * @param fut GridCacheQueryFuture to iterate.
          */
-        WeakQueryFutureIterator(GridCacheQueryFuture<V> fut, 
CacheIteratorConverter<T, V> convert) {
+        WeakQueryFutureIterator(CacheQueryFuture<V> fut, 
CacheIteratorConverter<T, V> convert) {
             this.fut = fut;
 
             this.weakRef = new WeakReference<>(this, refQueue);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3362a61d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 7c6af95..b0b0a5a 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -18,12 +18,14 @@
 package org.apache.ignite.internal.processors.cache;
 
 import com.google.common.collect.*;
+import junit.framework.*;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.transactions.*;
 import org.apache.ignite.spi.swapspace.inmemory.*;
@@ -33,6 +35,7 @@ import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.testframework.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.*;
 import javax.cache.expiry.*;
 import javax.cache.processor.*;
 import java.util.*;
@@ -5087,4 +5090,179 @@ public abstract class GridCacheAbstractFullApiSelfTest 
extends GridCacheAbstract
 
         return primaryKeysForCache(prj, cnt);
     }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testIgniteCacheIterator() throws Exception {
+        IgniteCache<String, Integer> cache = jcache(0);
+
+        assertFalse(cache.iterator().hasNext());
+
+        final int SIZE = 20000;
+
+        Map<String, Integer> entries = new HashMap<>();
+
+        for (int i = 0; i < SIZE; ++i) {
+            cache.put(Integer.toString(i), i);
+
+            entries.put(Integer.toString(i), i);
+        }
+
+        checkIteratorHasNext();
+
+        checkIteratorCache(entries);
+
+        checkIteratorRemove(cache, entries);
+
+        checkIteratorEmpty(cache);
+    }
+
+    /**
+     * If hasNext() is called repeatedly, it should return the same result.
+     */
+    private void checkIteratorHasNext() {
+        Iterator<Cache.Entry<String, Integer>> iter = jcache(0).iterator();
+
+        assertEquals(iter.hasNext(), iter.hasNext());
+
+        while (iter.hasNext())
+            iter.next();
+
+        assertFalse(iter.hasNext());
+    }
+
+    /**
+     * @param cache Cache.
+     * @param entries Expected entries in the cache.
+     */
+    private void checkIteratorRemove(IgniteCache<String, Integer> cache, 
Map<String, Integer> entries) {
+        // Check that we can remove element.
+        String rmvKey = Integer.toString(5);
+
+        removeCacheIterator(cache, rmvKey);
+
+        entries.remove(rmvKey);
+
+        assertFalse(cache.containsKey(rmvKey));
+        assertNull(cache.get(rmvKey));
+
+        checkIteratorCache(entries);
+
+        // Check that we cannot call Iterator.remove() without next().
+        final Iterator<Cache.Entry<String, Integer>> iter = 
jcache(0).iterator();
+
+        assertTrue(iter.hasNext());
+
+        iter.next();
+
+        iter.remove();
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Void call() throws Exception {
+                iter.remove();
+
+                return null;
+            }
+        }, IllegalStateException.class, null);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param key Key to remove.
+     */
+    private void removeCacheIterator(IgniteCache<String, Integer> cache, 
String key) {
+        Iterator<Cache.Entry<String, Integer>> iter = cache.iterator();
+
+        int delCnt = 0;
+
+        while (iter.hasNext()) {
+            Cache.Entry<String, Integer> cur = iter.next();
+
+            if (cur.getKey().equals(key)) {
+                iter.remove();
+
+                delCnt++;
+            }
+        }
+
+        assertEquals(1, delCnt);
+    }
+
+    /**
+     * @param entries Expected entries in the cache.
+     */
+    private void checkIteratorCache(Map<String, Integer> entries) {
+        for (int i = 0; i < gridCount(); ++i)
+            checkIteratorCache(jcache(i), entries);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param entries Expected entries in the cache.
+     */
+    private void checkIteratorCache(IgniteCache<String, Integer> cache, 
Map<String, Integer> entries) {
+        Iterator<Cache.Entry<String, Integer>> iter = cache.iterator();
+
+        int cnt = 0;
+
+        while (iter.hasNext()) {
+            Cache.Entry<String, Integer> cur = iter.next();
+
+            assertTrue(entries.containsKey(cur.getKey()));
+            assertEquals(entries.get(cur.getKey()), cur.getValue());
+
+            cnt++;
+        }
+
+        assertEquals(entries.size(), cnt);
+    }
+
+    /**
+     * Checks iterators are cleared.
+     */
+    private void checkIteratorsCleared() {
+        for (int j = 0; j < gridCount(); j++) {
+
+            GridCacheQueryManager queries = context(j).queries();
+
+            Map map = GridTestUtils.getFieldValue(queries, 
GridCacheQueryManager.class, "qryIters");
+
+            for (Object obj : map.values())
+                assertEquals("Iterators not removed for grid " + j, 0, ((Map) 
obj).size());
+        }
+    }
+
+    /**
+     * Checks iterators are cleared after using.
+     */
+    private void checkIteratorEmpty(IgniteCache<String, Integer> cache) throws 
InterruptedException, InterruptedException {
+        int cnt = 5;
+
+        for (int i = 0; i < cnt; ++i) {
+            Iterator<Cache.Entry<String, Integer>> iter = cache.iterator();
+
+            iter.next();
+
+            assert iter.hasNext();
+        }
+
+        System.gc();
+
+        for (int i = 0; i < 10; i++) {
+            try {
+                cache.size(); // Trigger weak queue poll.
+
+                checkIteratorsCleared();
+            }
+            catch (AssertionFailedError e) {
+                if (i == 9)
+                    throw e;
+
+                log.info("Set iterators not cleared, will wait");
+
+                Thread.sleep(500);
+            }
+        }
+    }
 }

Reply via email to