#IGNITE-53: Refactor out the common part from IgniteCacheProxy and GridCacheSetImpl
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b4c2ca14 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b4c2ca14 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b4c2ca14 Branch: refs/heads/ignite-59 Commit: b4c2ca14ddc823d2546efdf52ca3cb6b078c9865 Parents: f3c6ec1 Author: ivasilinets <ivasilin...@gridgain.com> Authored: Thu Jan 15 15:32:57 2015 +0400 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Thu Jan 15 15:32:57 2015 +0400 ---------------------------------------------------------------------- .../processors/cache/IgniteCacheProxy.java | 238 ++++++++++++++----- .../IgniteQueryFutureStorage.java | 192 +++++++++++++++ 2 files changed, 371 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4c2ca14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index e0cfa5a..c41f44e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -27,6 +27,7 @@ import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.query.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.kernal.processors.cache.datastructures.*; import org.gridgain.grid.util.tostring.*; import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; @@ -62,6 +63,9 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** Projection. */ private GridCacheProjectionImpl<K, V> prj; + /** Query future storage */ + private final IgniteQueryFutureStorage queryStorage; + /** * @param ctx Context. * @param delegate Delegate. @@ -81,6 +85,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements this.delegate = delegate; this.prj = prj; + this.queryStorage = new IgniteQueryFutureStorage(ctx); + gate = ctx.gate(); } @@ -88,6 +94,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements * @return Context. */ public GridCacheContext<K, V> context() { + onAccess(); + return ctx; } @@ -95,11 +103,15 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements * @return Ignite instance. */ @Override public GridEx ignite() { + onAccess(); + return ctx.grid(); } /** {@inheritDoc} */ @Override public <C extends Configuration<K, V>> C getConfiguration(Class<C> clazz) { + onAccess(); + GridCacheConfiguration cfg = ctx.config(); if (!clazz.isAssignableFrom(cfg.getClass())) @@ -110,12 +122,16 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Nullable @Override public Entry<K, V> randomEntry() { + onAccess(); + // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public IgniteCache<K, V> withExpiryPolicy(ExpiryPolicy plc) { + onAccess(); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -130,6 +146,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) { + onAccess(); + try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -151,6 +169,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) { + onAccess(); + try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -168,6 +188,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Nullable @Override public V getAndPutIf(K key, V val, IgnitePredicate<GridCacheEntry<K, V>> filter) { + onAccess(); + try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -185,6 +207,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public boolean putIf(K key, V val, IgnitePredicate<GridCacheEntry<K, V>> filter) { + onAccess(); + try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -202,6 +226,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public V getAndRemoveIf(K key, IgnitePredicate<GridCacheEntry<K, V>> filter) { + onAccess(); + try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -219,6 +245,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public boolean removeIf(K key, IgnitePredicate<GridCacheEntry<K, V>> filter) { + onAccess(); + try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -236,6 +264,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Nullable @Override public V getAndPutIfAbsent(K key, V val) throws CacheException { + onAccess(); + try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -253,24 +283,32 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public void removeAll(IgnitePredicate filter) throws CacheException { + onAccess(); + // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public Lock lock(K key) throws CacheException { + onAccess(); + // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public Lock lockAll(Set<? extends K> keys) throws CacheException { + onAccess(); + // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public boolean isLocked(K key) { + onAccess(); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -283,6 +321,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public boolean isLockedByThread(K key) { + onAccess(); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -295,18 +335,24 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public Iterable<Entry<K, V>> localEntries(CachePeekMode... peekModes) throws CacheException { + onAccess(); + // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public Map<K, V> localPartition(int part) throws CacheException { + onAccess(); + // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public void localEvict(Collection<? extends K> keys) { + onAccess(); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -319,6 +365,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Nullable @Override public V localPeek(K key, CachePeekMode... peekModes) { + onAccess(); + // TODO IGNITE-1. if (peekModes.length != 0) throw new UnsupportedOperationException(); @@ -335,6 +383,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public void localPromote(Set<? extends K> keys) throws CacheException { + onAccess(); + try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -352,12 +402,16 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public boolean clear(Collection<? extends K> keys) { + onAccess(); + // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public int size(CachePeekMode... peekModes) throws CacheException { + onAccess(); + // TODO IGNITE-1. if (peekModes.length != 0) throw new UnsupportedOperationException(); @@ -374,6 +428,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public int localSize(CachePeekMode... peekModes) { + onAccess(); + // TODO IGNITE-1. GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -387,6 +443,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public V get(K key) { + onAccess(); + try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -404,6 +462,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public Map<K, V> getAll(Set<? extends K> keys) { + onAccess(); + try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -424,6 +484,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements * @return Values map. */ public Map<K, V> getAll(Collection<? extends K> keys) { + onAccess(); + try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -446,6 +508,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements * @return Entry set. */ public Set<GridCacheEntry<K, V>> entrySetx(IgnitePredicate<GridCacheEntry<K, V>>... filter) { + onAccess(); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -460,6 +524,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements * @param filter Filter. */ public void removeAll(IgnitePredicate<GridCacheEntry<K, V>>... filter) { + onAccess(); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -475,6 +541,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public boolean containsKey(K key) { + onAccess(); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -489,12 +557,16 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements @Override public void loadAll(Set<? extends K> keys, boolean replaceExistingValues, CompletionListener completionLsnr) { + onAccess(); + // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public void put(K key, V val) { + onAccess(); + try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -512,6 +584,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public V getAndPut(K key, V val) { + onAccess(); + try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -529,6 +603,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public void putAll(Map<? extends K, ? extends V> map) { + onAccess(); + try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -546,6 +622,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public boolean putIfAbsent(K key, V val) { + onAccess(); + try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -563,6 +641,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public boolean remove(K key) { + onAccess(); + try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -580,6 +660,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public boolean remove(K key, V oldVal) { + onAccess(); + try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -597,6 +679,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public V getAndRemove(K key) { + onAccess(); + try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -614,6 +698,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public boolean replace(K key, V oldVal, V newVal) { + onAccess(); + try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -631,6 +717,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public boolean replace(K key, V val) { + onAccess(); + try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -648,6 +736,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public V getAndReplace(K key, V val) { + onAccess(); + try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -665,6 +755,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public void removeAll(Set<? extends K> keys) { + onAccess(); + try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -684,6 +776,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements * @param keys Keys to remove. */ public void removeAll(Collection<? extends K> keys) { + onAccess(); + try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -701,6 +795,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public void removeAll() { + onAccess(); + // TODO IGNITE-1. GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -717,6 +813,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public void clear() { + onAccess(); + // TODO IGNITE-1. GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -734,6 +832,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) throws EntryProcessorException { + onAccess(); + try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -773,6 +873,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, Object... args) { + onAccess(); + try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -792,6 +894,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll( Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args) { + onAccess(); + try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -809,11 +913,15 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public String getName() { + onAccess(); + return delegate.name(); } /** {@inheritDoc} */ @Override public CacheManager getCacheManager() { + onAccess(); + // TODO IGNITE-45 (Support start/close/destroy cache correctly) IgniteCachingProvider provider = (IgniteCachingProvider)Caching.getCachingProvider( IgniteCachingProvider.class.getName(), @@ -827,12 +935,16 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public void close() { + onAccess(); + // TODO IGNITE-45 (Support start/close/destroy cache correctly) getCacheManager().destroyCache(getName()); } /** {@inheritDoc} */ @Override public boolean isClosed() { + onAccess(); + // TODO IGNITE-45 (Support start/close/destroy cache correctly) return getCacheManager() == null; } @@ -840,6 +952,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public <T> T unwrap(Class<T> clazz) { + onAccess(); + if (clazz.equals(IgniteCache.class)) return (T)this; @@ -848,22 +962,28 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public void registerCacheEntryListener(CacheEntryListenerConfiguration cacheEntryLsnrConfiguration) { + onAccess(); + // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration cacheEntryLsnrConfiguration) { + onAccess(); + // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public Iterator<Cache.Entry<K, V>> iterator() { + onAccess(); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return new IgniteCacheIterator(); + return new IgniteCacheIterator(delegate.queries().createScanQuery(null).execute(), queryStorage); } finally { gate.leave(prev); @@ -872,42 +992,56 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public QueryCursor<Entry<K, V>> query(QueryPredicate<K, V> filter) { + onAccess(); + // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public <R> QueryCursor<R> query(QueryReducer<Entry<K, V>, R> rmtRdc, QueryPredicate<K, V> filter) { + onAccess(); + // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public QueryCursor<List<?>> queryFields(QuerySqlPredicate<K, V> filter) { + onAccess(); + // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public <R> QueryCursor<R> queryFields(QueryReducer<List<?>, R> rmtRdc, QuerySqlPredicate<K, V> filter) { + onAccess(); + // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public QueryCursor<Entry<K, V>> localQuery(QueryPredicate<K, V> filter) { + onAccess(); + // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public QueryCursor<List<?>> localQueryFields(QuerySqlPredicate<K, V> filter) { + onAccess(); + // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public IgniteCache<K, V> enableAsync() { + onAccess(); + if (isAsync()) return this; @@ -917,6 +1051,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public <K1, V1> IgniteCache<K1, V1> keepPortable() { + onAccess(); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -942,6 +1078,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public IgniteCache<K, V> flagsOn(@Nullable GridCacheFlag... flags) { + onAccess(); + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -987,6 +1125,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { + onAccess(); + out.writeObject(ctx); out.writeObject(delegate); @@ -997,6 +1137,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @SuppressWarnings({"unchecked"}) @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + onAccess(); + ctx = (GridCacheContext<K, V>)in.readObject(); delegate = (GridCacheProjectionEx<K, V>)in.readObject(); @@ -1006,6 +1148,17 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements gate = ctx.gate(); } + /** + * Checks if set was removed and handles iterators weak reference queue. + */ + private void onAccess() { + try { + queryStorage.onAccess(); + } catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(IgniteCacheProxy.class, this); @@ -1085,90 +1238,57 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements * Iterator over the cache. */ private class IgniteCacheIterator implements Iterator<Cache.Entry<K, V>> { - /** Cache query future for all entries in distributed ignite cache. */ - private GridCacheQueryFuture<Map.Entry<K, V>> fut; - - /** Current element. */ - private Map.Entry<K, V> curEntry; - /** Next element. */ - private Map.Entry<K,V> nextEntry; + /** Iterator over the cache*/ + IgniteQueryFutureStorage.Iterator<Map.Entry<K, V>> iter; - /** Init first time. */ - private boolean firstTime = true; + IgniteCacheIterator(GridCacheQueryFuture<Map.Entry<K, V>> fut, IgniteQueryFutureStorage storage) { + iter = storage.iterator(fut); + } /** {@inheritDoc} */ @Override public boolean hasNext() { - initFirstTime(); - - return nextEntry != null; + try { + return iter.onHasNext(); + } catch (IgniteCheckedException e) { + throw cacheException(e); + } } /** {@inheritDoc} */ @Override public Entry<K, V> next() { - initFirstTime(); - - curEntry = nextEntry; - - if (curEntry == null) - throw new NoSuchElementException(); - try { - nextEntry = fut.next(); + final Map.Entry<K, V> cur = iter.onNext(); + return new Cache.Entry<K, V>() { + @Override public K getKey() { + return cur.getKey(); + } + + @Override public V getValue() { + return cur.getValue(); + } + + @Override public <T> T unwrap(Class<T> clazz) { + throw new IllegalArgumentException(); + } + }; } catch (IgniteCheckedException e) { - curEntry = null; - throw cacheException(e); } - return new Cache.Entry<K, V>() { - @Override public K getKey() { - return curEntry.getKey(); - } - - @Override public V getValue() { - return curEntry.getValue(); - } - @Override public <T> T unwrap(Class<T> clazz) { - throw new IllegalArgumentException(); - } - }; } /** {@inheritDoc} */ @Override public void remove() { - if (curEntry == null) - throw new IllegalStateException(); - + Map.Entry<K, V> curEntry = iter.itemToRemove(); try { delegate.removex(curEntry.getKey()); } catch (IgniteCheckedException e) { throw cacheException(e); } - - curEntry = null; - } - - /** - * Initialize fields at first call - */ - private void initFirstTime() { - if (!firstTime) { - return; - } - - firstTime = false; - fut = delegate.queries().createScanQuery(null).execute(); - - try { - nextEntry = fut.next(); - } - catch (IgniteCheckedException e) { - throw cacheException(e); - } } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4c2ca14/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryFutureStorage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryFutureStorage.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryFutureStorage.java new file mode 100644 index 0000000..69ef38b --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryFutureStorage.java @@ -0,0 +1,192 @@ +package org.gridgain.grid.kernal.processors.cache.datastructures; + +import org.apache.ignite.*; +import org.gridgain.grid.cache.query.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.jdk8.backport.*; + +import java.lang.ref.*; +import java.util.*; + +/** + * Storage for GridCacheQueryFuture. + */ +public class IgniteQueryFutureStorage { + /** Iterators weak references queue. */ + private final ReferenceQueue<Iterator<?>> refQueue = new ReferenceQueue<>(); + + /** Iterators futures. */ + private final Map<WeakReference<Iterator<?>>, GridCacheQueryFuture<?>> futs = new ConcurrentHashMap8<>(); + + /** Logger. */ + private final IgniteLogger log; + + /** + * @param ctx Cache context. + */ + public IgniteQueryFutureStorage(GridCacheContext ctx) { + log = ctx.logger(GridCacheSetImpl.class); + } + + /** + * Iterator over the cache. + * @param fut Query to iterate + * @return iterator + */ + public <T> Iterator<T> iterator(GridCacheQueryFuture<T> fut) { + Iterator<T> it = new Iterator<>(fut); + + futs.put(it.weakReference(), fut); + + return it; + } + + /** + * Closes unreachable iterators. + */ + private void checkWeakQueue() throws IgniteCheckedException { + for (Reference<? extends Iterator<?>> itRef = refQueue.poll(); itRef != null; itRef = refQueue.poll()) { + WeakReference<Iterator<?>> weakRef = (WeakReference<Iterator<?>>) itRef; + + GridCacheQueryFuture<?> fut = futs.remove(weakRef); + + if (fut != null) + fut.cancel(); + + } + } + + /** + * Checks if set was removed and handles iterators weak reference queue. + */ + public void onAccess() throws IgniteCheckedException { + checkWeakQueue(); + } + + /** + * Cancel all cache queries + * @throws IgniteCheckedException + */ + protected void clearQueries() throws IgniteCheckedException { + for (GridCacheQueryFuture<?> fut : futs.values()) { + try { + fut.cancel(); + } + catch (IgniteCheckedException e) { + log.error("Failed to close iterator.", e); + } + + } + futs.clear(); + } + + /** + * Iterator over the cache + */ + public class Iterator<T> { + /** Query future. */ + private final GridCacheQueryFuture<T> fut; + + /** Weak reference. */ + private final WeakReference<Iterator<?>> weakRef; + + /** Init flag. */ + private boolean init; + + /** Next item. */ + private T next; + + /** Current item. */ + private T cur; + + /** + * @param fut GridCacheQueryFuture to iterate + */ + Iterator(GridCacheQueryFuture<T> fut) { + this.fut = fut; + this.weakRef = new WeakReference<Iterator<?>>(this, refQueue); + } + + + /** + * @throws IgniteCheckedException If failed. + */ + private void init() throws IgniteCheckedException { + if (!init) { + next = fut.next(); + + init = true; + } + } + + /** + * @return Iterator weak reference. + */ + WeakReference<Iterator<?>> weakReference() { + return weakRef; + } + + /** + * Clears weak reference. + */ + private void clearWeakReference() { + weakRef.clear(); + + futs.remove(weakRef); + } + + /** + * The same as Iterator.next() + */ + public T onNext() throws IgniteCheckedException { + init(); + + if (next == null) { + clearWeakReference(); + + throw new NoSuchElementException(); + } + + cur = next; + + Map.Entry e = (Map.Entry) fut.next(); + + next = e != null ? (T) e.getKey() : null; + + if (next == null) + clearWeakReference(); + + return cur; + } + + /** + * The same as Iterator.hasNext() + */ + public boolean onHasNext() throws IgniteCheckedException { + init(); + + boolean hasNext = next != null; + + if (!hasNext) + clearWeakReference(); + + return hasNext; + } + + /** + * @return current item to remove + * @throws IllegalStateException if the {@code onNext} method has not + * yet been called, or the {@code itemToRemove} method has already + * been called after the last call to the {@code onNext} + * method + */ + public T itemToRemove() { + if (cur == null) + throw new IllegalStateException(); + T res = cur; + cur = null; + return res; + } + } + +}