#IGNITE-53: merge
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3362a61d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3362a61d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3362a61d Branch: refs/heads/ignite-99-2 Commit: 3362a61d64b7e54a92788fe300cde010852c1e10 Parents: 7af5d02 Author: ivasilinets <ivasilin...@gridgain.com> Authored: Fri Jan 23 19:05:39 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Fri Jan 23 19:05:39 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/CacheEntryImpl.java | 8 +- .../processors/cache/GridCacheAdapter.java | 32 ++++ .../processors/cache/GridCacheContext.java | 13 ++ .../processors/cache/GridCacheGateway.java | 2 + .../processors/cache/IgniteCacheProxy.java | 19 +- .../cache/datastructures/GridCacheSetImpl.java | 174 ++---------------- .../cache/query/GridCacheQueryAdapter.java | 14 +- .../query/GridCacheQueryFutureAdapter.java | 7 +- .../cache/query/GridCacheQueryManager.java | 4 +- .../cache/CacheWeakQueryIteratorsHolder.java | 22 +-- .../cache/GridCacheAbstractFullApiSelfTest.java | 178 +++++++++++++++++++ 11 files changed, 270 insertions(+), 203 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3362a61d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl.java index 27dadbd..e609cd1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryImpl.java @@ -49,8 +49,12 @@ public class CacheEntryImpl<K, V> implements Cache.Entry<K, V> { } /** {@inheritDoc} */ - @Override public <T> T unwrap(Class<T> clazz) { - throw new IllegalArgumentException(); + @SuppressWarnings("unchecked") + @Override public <T> T unwrap(Class<T> cls) { + if (!cls.equals(getClass())) + throw new IllegalArgumentException("Unwrapping to class is not supported: " + cls); + + return (T)this; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3362a61d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 112483a..3c0ed75 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -46,9 +46,11 @@ import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.gridgain.grid.kernal.processors.cache.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; +import javax.cache.*; import javax.cache.expiry.*; import javax.cache.processor.*; import java.io.*; @@ -3615,6 +3617,36 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, return entrySet().iterator(); } + /** + * @param prj Projection. + * @return Distributed ignite cache iterator. + */ + public Iterator<Cache.Entry<K, V>> igniteIterator(final GridCacheProjectionImpl<K, V> prj) { + CacheQueryFuture<Map.Entry<K, V>> fut = queries().createScanQuery(null) + .keepAll(false) + .execute(); + + return ctx.itHolder().iterator(fut, new CacheIteratorConverter<Cache.Entry<K, V>, Map.Entry<K, V>>() { + @Override protected Cache.Entry<K, V> convert(Map.Entry<K, V> e) { + return new CacheEntryImpl<>(e.getKey(), e.getValue()); + } + + @Override protected void remove(Cache.Entry<K, V> item) { + GridCacheProjectionImpl<K, V> prev = ctx.gate().enter(prj); + + try { + GridCacheAdapter.this.removex(item.getKey()); + } + catch (IgniteCheckedException e) { + throw new CacheException(e); + } + finally { + ctx.gate().leave(prev); + } + } + }); + } + /** {@inheritDoc} */ @Nullable @Override public V promote(K key) throws IgniteCheckedException { return promote(key, true); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3362a61d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 567d0bb..d1dcaa3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -52,6 +52,7 @@ import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.offheap.unsafe.*; import org.apache.ignite.internal.util.tostring.*; +import org.gridgain.grid.kernal.processors.cache.*; import org.jetbrains.annotations.*; import javax.cache.configuration.*; @@ -186,6 +187,9 @@ public class GridCacheContext<K, V> implements Externalizable { /** Default expiry policy. */ private ExpiryPolicy expiryPlc; + /** Cache weak query iterator holder. */ + private CacheWeakQueryIteratorsHolder<Map.Entry<K, V>> itHolder; + /** * Empty constructor required for {@link Externalizable}. */ @@ -300,6 +304,8 @@ public class GridCacheContext<K, V> implements Externalizable { if (expiryPlc instanceof EternalExpiryPolicy) expiryPlc = null; + + itHolder = new CacheWeakQueryIteratorsHolder(log); } /** @@ -837,6 +843,13 @@ public class GridCacheContext<K, V> implements Externalizable { } /** + * @return Iterators Holder. + */ + public CacheWeakQueryIteratorsHolder<Map.Entry<K, V>> itHolder() { + return itHolder; + } + + /** * @return Swap manager. */ public GridCacheSwapManager<K, V> swap() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3362a61d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java index ae97afe..2de235a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java @@ -87,6 +87,8 @@ public class GridCacheGateway<K, V> { */ @Nullable public GridCacheProjectionImpl<K, V> enter(@Nullable GridCacheProjectionImpl<K, V> prj) { try { + ctx.itHolder().checkWeakQueue(); + GridCacheAdapter<K, V> cache = ctx.cache(); GridCachePreloader<K, V> preldr = cache != null ? cache.preloader() : null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3362a61d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index b9265a5..94ee239 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -820,27 +820,10 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public Iterator<Cache.Entry<K, V>> iterator() { - // TODO IGNITE-1. GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return F.iterator(delegate, new C1<CacheEntry<K, V>, Entry<K, V>>() { - @Override public Entry<K, V> apply(final CacheEntry<K, V> e) { - return new Entry<K, V>() { - @Override public K getKey() { - return e.getKey(); - } - - @Override public V getValue() { - return e.getValue(); - } - - @Override public <T> T unwrap(Class<T> clazz) { - throw new IllegalArgumentException(); - } - }; - } - }, false); + return ((GridCacheAdapter)delegate).igniteIterator(prj); } finally { gate.leave(prev); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3362a61d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetImpl.java index 80151ec..b84d957 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetImpl.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.gridgain.grid.kernal.processors.cache.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; @@ -53,9 +54,6 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements CacheS /** Cache. */ private final GridCache<GridCacheSetItemKey, Boolean> cache; - /** Logger. */ - private final IgniteLogger log; - /** Set name. */ private final String name; @@ -71,12 +69,6 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements CacheS /** Removed flag. */ private volatile boolean rmvd; - /** Iterators weak references queue. */ - private final ReferenceQueue<SetIterator<?>> itRefQueue = new ReferenceQueue<>(); - - /** Iterators futures. */ - private final Map<WeakReference<SetIterator<?>>, CacheQueryFuture<?>> itFuts = new ConcurrentHashMap8<>(); - /** * @param ctx Cache context. * @param name Set name. @@ -91,8 +83,6 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements CacheS cache = ctx.cache(); - log = ctx.logger(GridCacheSetImpl.class); - hdrPart = ctx.affinity().partition(new GridCacheSetHeaderKey(name)); } @@ -348,16 +338,21 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements CacheS qry.projection(ctx.grid().forNodes(nodes)); - CacheQueryFuture<T> fut = qry.execute(); + CacheQueryFuture<Map.Entry<T, ?>> fut = qry.execute(); - SetIterator<T> it = new SetIterator<>(fut); + CacheWeakQueryIteratorsHolder.WeakQueryFutureIterator it = + ctx.itHolder().iterator(fut, new CacheIteratorConverter<T, Map.Entry<T, ?>>() { + @Override protected T convert(Map.Entry<T, ?> e) { + return e.getKey(); + } - itFuts.put(it.weakReference(), fut); + @Override protected void remove(T item) { + GridCacheSetImpl.this.remove(item); + } + }); if (rmvd) { - itFuts.remove(it.weakReference()); - - it.close(); + ctx.itHolder().removeIterator(it); checkRemoved(); } @@ -443,18 +438,8 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements CacheS this.rmvd = rmvd; - if (rmvd) { - for (CacheQueryFuture<?> fut : itFuts.values()) { - try { - fut.cancel(); - } - catch (IgniteCheckedException e) { - log.error("Failed to close iterator.", e); - } - } - - itFuts.clear(); - } + if (rmvd) + ctx.itHolder().clearQueries(); } /** @@ -466,29 +451,10 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements CacheS } /** - * Closes unreachable iterators. - */ - private void checkWeakQueue() { - for (Reference<? extends SetIterator<?>> itRef = itRefQueue.poll(); itRef != null; itRef = itRefQueue.poll()) { - try { - WeakReference<SetIterator<?>> weakRef = (WeakReference<SetIterator<?>>)itRef; - - CacheQueryFuture<?> fut = itFuts.remove(weakRef); - - if (fut != null) - fut.cancel(); - } - catch (IgniteCheckedException e) { - log.error("Failed to close iterator.", e); - } - } - } - - /** * Checks if set was removed and handles iterators weak reference queue. */ private void onAccess() { - checkWeakQueue(); + ctx.itHolder().checkWeakQueue(); checkRemoved(); } @@ -523,116 +489,6 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements CacheS /** * */ - private class SetIterator<T> extends GridCloseableIteratorAdapter<T> { - /** */ - private static final long serialVersionUID = -1460570789166994846L; - - /** Query future. */ - private final CacheQueryFuture<T> fut; - - /** Init flag. */ - private boolean init; - - /** Next item. */ - private T next; - - /** Current item. */ - private T cur; - - /** Weak reference. */ - private final WeakReference<SetIterator<?>> weakRef; - - /** - * @param fut Query future. - */ - private SetIterator(CacheQueryFuture<T> fut) { - this.fut = fut; - - weakRef = new WeakReference<SetIterator<?>>(this, itRefQueue); - } - - /** {@inheritDoc} */ - @Override protected T onNext() throws IgniteCheckedException { - init(); - - if (next == null) { - clearWeakReference(); - - throw new NoSuchElementException(); - } - - cur = next; - - Map.Entry e = (Map.Entry)fut.next(); - - next = e != null ? (T)e.getKey() : null; - - if (next == null) - clearWeakReference(); - - return cur; - } - - /** {@inheritDoc} */ - @Override protected boolean onHasNext() throws IgniteCheckedException { - init(); - - boolean hasNext = next != null; - - if (!hasNext) - clearWeakReference(); - - return hasNext; - } - - /** {@inheritDoc} */ - @Override protected void onClose() throws IgniteCheckedException { - fut.cancel(); - - clearWeakReference(); - } - - /** {@inheritDoc} */ - @Override protected void onRemove() throws IgniteCheckedException { - if (cur == null) - throw new NoSuchElementException(); - - GridCacheSetImpl.this.remove(cur); - } - - /** - * @throws IgniteCheckedException If failed. - */ - private void init() throws IgniteCheckedException { - if (!init) { - Map.Entry e = (Map.Entry)fut.next(); - - next = e != null ? (T)e.getKey() : null; - - init = true; - } - } - - /** - * @return Iterator weak reference. - */ - WeakReference<SetIterator<?>> weakReference() { - return weakRef; - } - - /** - * Clears weak reference. - */ - private void clearWeakReference() { - weakRef.clear(); // Do not need to enqueue. - - itFuts.remove(weakRef); - } - } - - /** - * - */ private static class SumReducer implements IgniteReducer<Object, Integer>, Externalizable { /** */ private static final long serialVersionUID = -3436987759126521204L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3362a61d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index f65017a..d35d215 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -65,13 +65,13 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { private volatile GridCacheQueryMetricsAdapter metrics; /** */ - private volatile int pageSize; + private volatile int pageSize = DFLT_PAGE_SIZE; /** */ private volatile long timeout; /** */ - private volatile boolean keepAll; + private volatile boolean keepAll = true; /** */ private volatile boolean incBackups; @@ -123,13 +123,6 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { log = cctx.logger(getClass()); - pageSize = DFLT_PAGE_SIZE; - timeout = 0; - keepAll = true; - incBackups = false; - dedup = false; - prj = null; - metrics = new GridCacheQueryMetricsAdapter(); } @@ -419,6 +412,9 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { cctx.checkSecurity(GridSecurityPermission.CACHE_READ); + if (nodes.isEmpty()) + return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), new ClusterGroupEmptyException()); + if (log.isDebugEnabled()) log.debug("Executing query [query=" + this + ", nodes=" + nodes + ']'); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3362a61d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java index 94c85b4..4202c99 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java @@ -58,7 +58,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda protected final GridCacheQueryBean qry; /** Set of received keys used to deduplicate query result set. */ - private final Collection<K> keys = new HashSet<>(); + private final Collection<K> keys; /** */ private final Queue<Collection<R>> queue = new LinkedList<>(); @@ -92,6 +92,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda */ protected GridCacheQueryFutureAdapter() { qry = null; + keys = null; } /** @@ -121,6 +122,8 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda cctx.time().addTimeoutObject(this); } + + keys = qry.query().enableDedup() ? new HashSet<K>() : null; } /** @@ -335,7 +338,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda if (!qry.query().enableDedup()) return col; - Collection<Object> dedupCol = new LinkedList<>(); + Collection<Object> dedupCol = new ArrayList<>(col.size()); synchronized (mux) { for (Object o : col) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3362a61d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 5a0df9b..ae5fe77 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -756,8 +756,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte GridIterator<IgniteBiTuple<K, V>> heapIt = new GridIteratorAdapter<IgniteBiTuple<K, V>>() { private IgniteBiTuple<K, V> next; - private Iterator<K> iter = prj.keySet().iterator(); - + private Iterator<K> iter = qry.includeBackups() || cctx.isReplicated() ? + prj.keySet().iterator() : prj.primaryKeySet().iterator(); { advance(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3362a61d/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheWeakQueryIteratorsHolder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheWeakQueryIteratorsHolder.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheWeakQueryIteratorsHolder.java index f955695..eea72fe 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheWeakQueryIteratorsHolder.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheWeakQueryIteratorsHolder.java @@ -18,9 +18,9 @@ package org.gridgain.grid.kernal.processors.cache; import org.apache.ignite.*; -import org.gridgain.grid.cache.query.*; -import org.gridgain.grid.util.*; -import org.gridgain.grid.util.typedef.internal.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.jdk8.backport.*; import java.lang.ref.*; @@ -34,7 +34,7 @@ public class CacheWeakQueryIteratorsHolder<V> { private final ReferenceQueue<WeakQueryFutureIterator> refQueue = new ReferenceQueue<>(); /** Iterators futures. */ - private final Map<WeakReference<WeakQueryFutureIterator>, GridCacheQueryFuture<V>> futs = + private final Map<WeakReference<WeakQueryFutureIterator>, CacheQueryFuture<V>> futs = new ConcurrentHashMap8<>(); /** Logger. */ @@ -53,10 +53,10 @@ public class CacheWeakQueryIteratorsHolder<V> { * @param <T> Type for the iterator. * @return Iterator over the cache. */ - public <T> WeakQueryFutureIterator iterator(GridCacheQueryFuture<V> fut, CacheIteratorConverter<T, V> convert) { + public <T> WeakQueryFutureIterator iterator(CacheQueryFuture<V> fut, CacheIteratorConverter<T, V> convert) { WeakQueryFutureIterator it = new WeakQueryFutureIterator(fut, convert); - GridCacheQueryFuture<V> old = futs.put(it.weakReference(), fut); + CacheQueryFuture<V> old = futs.put(it.weakReference(), fut); assert old == null; @@ -83,7 +83,7 @@ public class CacheWeakQueryIteratorsHolder<V> { try { WeakReference<WeakQueryFutureIterator> weakRef = (WeakReference<WeakQueryFutureIterator>)itRef; - GridCacheQueryFuture<?> fut = futs.remove(weakRef); + CacheQueryFuture<?> fut = futs.remove(weakRef); if (fut != null) fut.cancel(); @@ -98,7 +98,7 @@ public class CacheWeakQueryIteratorsHolder<V> { * Cancel all cache queries. */ public void clearQueries(){ - for (GridCacheQueryFuture<?> fut : futs.values()) { + for (CacheQueryFuture<?> fut : futs.values()) { try { fut.cancel(); } @@ -112,13 +112,13 @@ public class CacheWeakQueryIteratorsHolder<V> { /** - * Iterator based of {@link GridCacheQueryFuture}. + * Iterator based of {@link CacheQueryFuture}. * * @param <T> Type for iterator. */ public class WeakQueryFutureIterator<T> extends GridCloseableIteratorAdapter<T> { /** Query future. */ - private final GridCacheQueryFuture<V> fut; + private final CacheQueryFuture<V> fut; /** Weak reference. */ private final WeakReference<WeakQueryFutureIterator<T>> weakRef; @@ -137,7 +137,7 @@ public class CacheWeakQueryIteratorsHolder<V> { /** * @param fut GridCacheQueryFuture to iterate. */ - WeakQueryFutureIterator(GridCacheQueryFuture<V> fut, CacheIteratorConverter<T, V> convert) { + WeakQueryFutureIterator(CacheQueryFuture<V> fut, CacheIteratorConverter<T, V> convert) { this.fut = fut; this.weakRef = new WeakReference<>(this, refQueue); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3362a61d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index 7c6af95..b0b0a5a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -18,12 +18,14 @@ package org.apache.ignite.internal.processors.cache; import com.google.common.collect.*; +import junit.framework.*; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; import org.apache.ignite.spi.swapspace.inmemory.*; @@ -33,6 +35,7 @@ import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.testframework.*; import org.jetbrains.annotations.*; +import javax.cache.*; import javax.cache.expiry.*; import javax.cache.processor.*; import java.util.*; @@ -5087,4 +5090,179 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract return primaryKeysForCache(prj, cnt); } + + /** + * @throws Exception If failed. + */ + public void testIgniteCacheIterator() throws Exception { + IgniteCache<String, Integer> cache = jcache(0); + + assertFalse(cache.iterator().hasNext()); + + final int SIZE = 20000; + + Map<String, Integer> entries = new HashMap<>(); + + for (int i = 0; i < SIZE; ++i) { + cache.put(Integer.toString(i), i); + + entries.put(Integer.toString(i), i); + } + + checkIteratorHasNext(); + + checkIteratorCache(entries); + + checkIteratorRemove(cache, entries); + + checkIteratorEmpty(cache); + } + + /** + * If hasNext() is called repeatedly, it should return the same result. + */ + private void checkIteratorHasNext() { + Iterator<Cache.Entry<String, Integer>> iter = jcache(0).iterator(); + + assertEquals(iter.hasNext(), iter.hasNext()); + + while (iter.hasNext()) + iter.next(); + + assertFalse(iter.hasNext()); + } + + /** + * @param cache Cache. + * @param entries Expected entries in the cache. + */ + private void checkIteratorRemove(IgniteCache<String, Integer> cache, Map<String, Integer> entries) { + // Check that we can remove element. + String rmvKey = Integer.toString(5); + + removeCacheIterator(cache, rmvKey); + + entries.remove(rmvKey); + + assertFalse(cache.containsKey(rmvKey)); + assertNull(cache.get(rmvKey)); + + checkIteratorCache(entries); + + // Check that we cannot call Iterator.remove() without next(). + final Iterator<Cache.Entry<String, Integer>> iter = jcache(0).iterator(); + + assertTrue(iter.hasNext()); + + iter.next(); + + iter.remove(); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Void call() throws Exception { + iter.remove(); + + return null; + } + }, IllegalStateException.class, null); + } + + /** + * @param cache Cache. + * @param key Key to remove. + */ + private void removeCacheIterator(IgniteCache<String, Integer> cache, String key) { + Iterator<Cache.Entry<String, Integer>> iter = cache.iterator(); + + int delCnt = 0; + + while (iter.hasNext()) { + Cache.Entry<String, Integer> cur = iter.next(); + + if (cur.getKey().equals(key)) { + iter.remove(); + + delCnt++; + } + } + + assertEquals(1, delCnt); + } + + /** + * @param entries Expected entries in the cache. + */ + private void checkIteratorCache(Map<String, Integer> entries) { + for (int i = 0; i < gridCount(); ++i) + checkIteratorCache(jcache(i), entries); + } + + /** + * @param cache Cache. + * @param entries Expected entries in the cache. + */ + private void checkIteratorCache(IgniteCache<String, Integer> cache, Map<String, Integer> entries) { + Iterator<Cache.Entry<String, Integer>> iter = cache.iterator(); + + int cnt = 0; + + while (iter.hasNext()) { + Cache.Entry<String, Integer> cur = iter.next(); + + assertTrue(entries.containsKey(cur.getKey())); + assertEquals(entries.get(cur.getKey()), cur.getValue()); + + cnt++; + } + + assertEquals(entries.size(), cnt); + } + + /** + * Checks iterators are cleared. + */ + private void checkIteratorsCleared() { + for (int j = 0; j < gridCount(); j++) { + + GridCacheQueryManager queries = context(j).queries(); + + Map map = GridTestUtils.getFieldValue(queries, GridCacheQueryManager.class, "qryIters"); + + for (Object obj : map.values()) + assertEquals("Iterators not removed for grid " + j, 0, ((Map) obj).size()); + } + } + + /** + * Checks iterators are cleared after using. + */ + private void checkIteratorEmpty(IgniteCache<String, Integer> cache) throws InterruptedException, InterruptedException { + int cnt = 5; + + for (int i = 0; i < cnt; ++i) { + Iterator<Cache.Entry<String, Integer>> iter = cache.iterator(); + + iter.next(); + + assert iter.hasNext(); + } + + System.gc(); + + for (int i = 0; i < 10; i++) { + try { + cache.size(); // Trigger weak queue poll. + + checkIteratorsCleared(); + } + catch (AssertionFailedError e) { + if (i == 9) + throw e; + + log.info("Set iterators not cleared, will wait"); + + Thread.sleep(500); + } + } + } }