# ignite-53 review
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c927f689 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c927f689 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c927f689 Branch: refs/heads/sprint-1 Commit: c927f689acf599665939eeefa11dd7d7154e52f7 Parents: 9710fff Author: sboikov <sboi...@gridgain.com> Authored: Fri Jan 16 10:23:51 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Jan 16 10:34:44 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/IgniteCacheProxy.java | 206 ++-------------- .../kernal/processors/cache/CacheEntryImpl.java | 64 +++++ .../cache/CacheWeakQueryIteratorsHolder.java | 232 ++++++++++++++++++ .../cache/datastructures/GridCacheSetImpl.java | 44 ++-- .../IgniteQueryAbstractStorage.java | 233 ------------------- .../cache/GridCacheAbstractFullApiSelfTest.java | 23 +- 6 files changed, 338 insertions(+), 464 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c927f689/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index b0ae913..9b78fb1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -26,7 +26,6 @@ import org.apache.ignite.resources.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.cache.*; -import org.gridgain.grid.kernal.processors.cache.datastructures.*; import org.gridgain.grid.util.tostring.*; import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; @@ -62,8 +61,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** Projection. */ private GridCacheProjectionImpl<K, V> prj; - /** Query storage */ - private final IgniteQueryStorage queryStorage; + /** Iterator holder. */ + private final CacheWeakQueryIteratorsHolder<Entry<K, V>, Map.Entry<K, V>> itHolder; /** * @param ctx Context. @@ -84,7 +83,15 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements this.delegate = delegate; this.prj = prj; - this.queryStorage = new IgniteQueryStorage(ctx); + this.itHolder = new CacheWeakQueryIteratorsHolder<Entry<K, V>, Map.Entry<K, V>>(ctx.logger(IgniteCacheProxy.class)) { + @Override protected Entry<K, V> convert(Map.Entry<K, V> e) { + return new CacheEntryImpl<>(e.getKey(), e.getValue()); + } + + @Override protected void remove(Entry<K, V> item) { + IgniteCacheProxy.this.remove(item.getKey()); + } + }; gate = ctx.gate(); } @@ -93,8 +100,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements * @return Context. */ public GridCacheContext<K, V> context() { - onAccess(); - return ctx; } @@ -102,15 +107,11 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements * @return Ignite instance. */ @Override public GridEx ignite() { - onAccess(); - return ctx.grid(); } /** {@inheritDoc} */ @Override public <C extends Configuration<K, V>> C getConfiguration(Class<C> clazz) { - onAccess(); - GridCacheConfiguration cfg = ctx.config(); if (!clazz.isAssignableFrom(cfg.getClass())) @@ -121,16 +122,12 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Nullable @Override public Entry<K, V> randomEntry() { - onAccess(); - // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public IgniteCache<K, V> withExpiryPolicy(ExpiryPolicy plc) { - onAccess(); - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -145,8 +142,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) { - onAccess(); - try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -168,8 +163,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) { - onAccess(); - try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -187,8 +180,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Nullable @Override public V getAndPutIf(K key, V val, IgnitePredicate<GridCacheEntry<K, V>> filter) { - onAccess(); - try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -206,8 +197,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public boolean putIf(K key, V val, IgnitePredicate<GridCacheEntry<K, V>> filter) { - onAccess(); - try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -225,8 +214,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public V getAndRemoveIf(K key, IgnitePredicate<GridCacheEntry<K, V>> filter) { - onAccess(); - try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -244,8 +231,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public boolean removeIf(K key, IgnitePredicate<GridCacheEntry<K, V>> filter) { - onAccess(); - try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -263,8 +248,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Nullable @Override public V getAndPutIfAbsent(K key, V val) throws CacheException { - onAccess(); - try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -282,32 +265,24 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public void removeAll(IgnitePredicate filter) throws CacheException { - onAccess(); - // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public Lock lock(K key) throws CacheException { - onAccess(); - // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public Lock lockAll(Set<? extends K> keys) throws CacheException { - onAccess(); - // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public boolean isLocked(K key) { - onAccess(); - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -320,8 +295,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public boolean isLockedByThread(K key) { - onAccess(); - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -334,24 +307,18 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public Iterable<Entry<K, V>> localEntries(CachePeekMode... peekModes) throws CacheException { - onAccess(); - // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public Map<K, V> localPartition(int part) throws CacheException { - onAccess(); - // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public void localEvict(Collection<? extends K> keys) { - onAccess(); - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -364,8 +331,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Nullable @Override public V localPeek(K key, CachePeekMode... peekModes) { - onAccess(); - // TODO IGNITE-1. if (peekModes.length != 0) throw new UnsupportedOperationException(); @@ -382,8 +347,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public void localPromote(Set<? extends K> keys) throws CacheException { - onAccess(); - try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -401,16 +364,12 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public boolean clear(Collection<? extends K> keys) { - onAccess(); - // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public int size(CachePeekMode... peekModes) throws CacheException { - onAccess(); - // TODO IGNITE-1. if (peekModes.length != 0) throw new UnsupportedOperationException(); @@ -427,8 +386,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public int localSize(CachePeekMode... peekModes) { - onAccess(); - // TODO IGNITE-1. GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -442,8 +399,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public V get(K key) { - onAccess(); - try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -461,8 +416,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public Map<K, V> getAll(Set<? extends K> keys) { - onAccess(); - try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -483,8 +436,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements * @return Values map. */ public Map<K, V> getAll(Collection<? extends K> keys) { - onAccess(); - try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -507,8 +458,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements * @return Entry set. */ public Set<GridCacheEntry<K, V>> entrySetx(IgnitePredicate<GridCacheEntry<K, V>>... filter) { - onAccess(); - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -523,8 +472,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements * @param filter Filter. */ public void removeAll(IgnitePredicate<GridCacheEntry<K, V>>... filter) { - onAccess(); - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -540,8 +487,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public boolean containsKey(K key) { - onAccess(); - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -556,16 +501,12 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements @Override public void loadAll(Set<? extends K> keys, boolean replaceExistingValues, CompletionListener completionLsnr) { - onAccess(); - // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public void put(K key, V val) { - onAccess(); - try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -583,8 +524,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public V getAndPut(K key, V val) { - onAccess(); - try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -602,8 +541,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public void putAll(Map<? extends K, ? extends V> map) { - onAccess(); - try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -621,8 +558,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public boolean putIfAbsent(K key, V val) { - onAccess(); - try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -640,8 +575,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public boolean remove(K key) { - onAccess(); - try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -659,8 +592,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public boolean remove(K key, V oldVal) { - onAccess(); - try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -678,8 +609,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public V getAndRemove(K key) { - onAccess(); - try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -697,8 +626,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public boolean replace(K key, V oldVal, V newVal) { - onAccess(); - try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -716,8 +643,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public boolean replace(K key, V val) { - onAccess(); - try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -735,8 +660,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public V getAndReplace(K key, V val) { - onAccess(); - try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -754,8 +677,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public void removeAll(Set<? extends K> keys) { - onAccess(); - try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -775,8 +696,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements * @param keys Keys to remove. */ public void removeAll(Collection<? extends K> keys) { - onAccess(); - try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -794,8 +713,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public void removeAll() { - onAccess(); - // TODO IGNITE-1. GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -812,8 +729,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public void clear() { - onAccess(); - // TODO IGNITE-1. GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -831,8 +746,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) throws EntryProcessorException { - onAccess(); - try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -872,8 +785,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, Object... args) { - onAccess(); - try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -893,8 +804,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll( Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args) { - onAccess(); - try { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); @@ -912,15 +821,11 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public String getName() { - onAccess(); - return delegate.name(); } /** {@inheritDoc} */ @Override public CacheManager getCacheManager() { - onAccess(); - // TODO IGNITE-45 (Support start/close/destroy cache correctly) IgniteCachingProvider provider = (IgniteCachingProvider)Caching.getCachingProvider( IgniteCachingProvider.class.getName(), @@ -934,16 +839,12 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public void close() { - onAccess(); - // TODO IGNITE-45 (Support start/close/destroy cache correctly) getCacheManager().destroyCache(getName()); } /** {@inheritDoc} */ @Override public boolean isClosed() { - onAccess(); - // TODO IGNITE-45 (Support start/close/destroy cache correctly) return getCacheManager() == null; } @@ -951,8 +852,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public <T> T unwrap(Class<T> clazz) { - onAccess(); - if (clazz.equals(IgniteCache.class)) return (T)this; @@ -961,28 +860,24 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public void registerCacheEntryListener(CacheEntryListenerConfiguration cacheEntryLsnrConfiguration) { - onAccess(); - // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration cacheEntryLsnrConfiguration) { - onAccess(); - // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public Iterator<Cache.Entry<K, V>> iterator() { - onAccess(); - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return queryStorage.iterator(delegate.queries().createScanQuery(null).execute()); + itHolder.checkWeakQueue(); + + return itHolder.iterator(delegate.queries().createScanQuery(null).execute()); } finally { gate.leave(prev); @@ -991,56 +886,42 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public QueryCursor<Entry<K, V>> query(QueryPredicate<K, V> filter) { - onAccess(); - // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public <R> QueryCursor<R> query(QueryReducer<Entry<K, V>, R> rmtRdc, QueryPredicate<K, V> filter) { - onAccess(); - // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public QueryCursor<List<?>> queryFields(QuerySqlPredicate<K, V> filter) { - onAccess(); - // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public <R> QueryCursor<R> queryFields(QueryReducer<List<?>, R> rmtRdc, QuerySqlPredicate<K, V> filter) { - onAccess(); - // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public QueryCursor<Entry<K, V>> localQuery(QueryPredicate<K, V> filter) { - onAccess(); - // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public QueryCursor<List<?>> localQueryFields(QuerySqlPredicate<K, V> filter) { - onAccess(); - // TODO IGNITE-1. throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public IgniteCache<K, V> enableAsync() { - onAccess(); - if (isAsync()) return this; @@ -1050,8 +931,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public <K1, V1> IgniteCache<K1, V1> keepPortable() { - onAccess(); - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -1077,8 +956,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public IgniteCache<K, V> flagsOn(@Nullable GridCacheFlag... flags) { - onAccess(); - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { @@ -1124,8 +1001,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { - onAccess(); - out.writeObject(ctx); out.writeObject(delegate); @@ -1136,8 +1011,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** {@inheritDoc} */ @SuppressWarnings({"unchecked"}) @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - onAccess(); - ctx = (GridCacheContext<K, V>)in.readObject(); delegate = (GridCacheProjectionEx<K, V>)in.readObject(); @@ -1147,18 +1020,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements gate = ctx.gate(); } - /** - * Checks if set was removed and handles iterators weak reference queue. - */ - private void onAccess() { - try { - queryStorage.onAccess(); - } - catch (IgniteCheckedException e) { - throw cacheException(e); - } - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(IgniteCacheProxy.class, this); @@ -1233,43 +1094,4 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements return S.toString(LoadCacheClosure.class, this); } } - - /** - * Queries' storage - */ - private class IgniteQueryStorage extends IgniteQueryAbstractStorage<Entry<K, V>, Map.Entry<K, V>> { - /** - * @param ctx Cache context. - */ - public IgniteQueryStorage(GridCacheContext ctx) { - super(ctx); - } - - /** {@inheritDoc} */ - @Override protected Cache.Entry<K, V> convert(final Map.Entry<K, V> v) { - return new Cache.Entry<K, V>() { - @Override public K getKey() { - return v.getKey(); - } - - @Override public V getValue() { - return v.getValue(); - } - - @Override public <T> T unwrap(Class<T> clazz) { - throw new IllegalArgumentException(); - } - }; - } - - /** {@inheritDoc} */ - @Override protected void remove(Entry<K, V> item) { - try { - delegate.removex(item.getKey()); - } - catch (IgniteCheckedException e) { - throw cacheException(e); - } - } - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c927f689/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheEntryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheEntryImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheEntryImpl.java new file mode 100644 index 0000000..3a7a546 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheEntryImpl.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gridgain.grid.kernal.processors.cache; + +import javax.cache.*; + +/** + * + */ +public class CacheEntryImpl<K, V> implements Cache.Entry<K, V> { + /** */ + private final K key; + + /** */ + private final V val; + + /** + * @param key Key. + * @param val Value. + */ + public CacheEntryImpl(K key, V val) { + this.key = key; + this.val = val; + } + + /** {@inheritDoc} */ + @Override public K getKey() { + return key; + } + + /** {@inheritDoc} */ + @Override public V getValue() { + return val; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public <T> T unwrap(Class<T> cls) { + if (!cls.equals(getClass())) + throw new IllegalArgumentException("Unwrapping is not supported: " + cls); + + return (T)this; + } + + /** {@inheritDoc} */ + public String toString() { + return "CacheEntry [key=" + key + ", val=" + val + ']'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c927f689/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheWeakQueryIteratorsHolder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheWeakQueryIteratorsHolder.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheWeakQueryIteratorsHolder.java new file mode 100644 index 0000000..eaa65be --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheWeakQueryIteratorsHolder.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gridgain.grid.kernal.processors.cache; + +import org.apache.ignite.*; +import org.gridgain.grid.cache.query.*; +import org.gridgain.grid.util.*; +import org.jdk8.backport.*; + +import java.lang.ref.*; +import java.util.*; + +/** + * @param <T> Type for iterator. + * @param <V> Type for cache query future. + */ +public abstract class CacheWeakQueryIteratorsHolder<T, V> { + /** Iterators weak references queue. */ + private final ReferenceQueue<WeakQueryFutureIterator> refQueue = new ReferenceQueue<>(); + + /** Iterators futures. */ + private final Map<WeakReference<WeakQueryFutureIterator>, GridCacheQueryFuture<V>> futs = + new ConcurrentHashMap8<>(); + + /** Logger. */ + private final IgniteLogger log; + + /** + * @param log Logger. + */ + public CacheWeakQueryIteratorsHolder(IgniteLogger log) { + this.log = log; + } + + /** + * Iterator over the cache. + * @param fut Query to iterate + * @return iterator + */ + public WeakQueryFutureIterator iterator(GridCacheQueryFuture<V> fut) { + WeakQueryFutureIterator it = new WeakQueryFutureIterator(fut); + + futs.put(it.weakReference(), fut); + + return it; + } + + /** + * @param it Iterator. + * + * @throws IgniteCheckedException If failed. + */ + public void removeIterator(WeakQueryFutureIterator it) throws IgniteCheckedException { + futs.remove(it.weakReference()); + + it.close(); + } + + /** + * Closes unreachable iterators. + */ + public void checkWeakQueue() { + for (Reference<? extends WeakQueryFutureIterator> itRef = refQueue.poll(); itRef != null; itRef = refQueue.poll()) { + try { + WeakReference<WeakQueryFutureIterator> weakRef = (WeakReference<WeakQueryFutureIterator>)itRef; + + GridCacheQueryFuture<?> fut = futs.remove(weakRef); + + if (fut != null) + fut.cancel(); + } + catch (IgniteCheckedException e) { + log.error("Failed to close iterator.", e); + } + } + } + + /** + * Cancel all cache queries + */ + public void clearQueries(){ + for (GridCacheQueryFuture<?> fut : futs.values()) { + try { + fut.cancel(); + } + catch (IgniteCheckedException e) { + log.error("Failed to close iterator.", e); + } + } + + futs.clear(); + } + + /** + * Converts class V to class T. + * + * @param v Item to convert. + * @return Converted item. + */ + protected abstract T convert(V v); + + /** + * Removes item. + * + * @param item Item to remove. + */ + protected abstract void remove(T item); + + /** + * Iterator based of {@link GridCacheQueryFuture}. + */ + public class WeakQueryFutureIterator extends GridCloseableIteratorAdapter<T> { + /** Query future. */ + private final GridCacheQueryFuture<V> fut; + + /** Weak reference. */ + private final WeakReference<WeakQueryFutureIterator> weakRef; + + /** Init flag. */ + private boolean init; + + /** Next item. */ + private T next; + + /** Current item. */ + private T cur; + + /** + * @param fut GridCacheQueryFuture to iterate. + */ + WeakQueryFutureIterator(GridCacheQueryFuture<V> fut) { + this.fut = fut; + + this.weakRef = new WeakReference<>(this, refQueue); + } + + /** {@inheritDoc} */ + @Override public T onNext() throws IgniteCheckedException { + init(); + + if (next == null) { + clearWeakReference(); + + throw new NoSuchElementException(); + } + + cur = next; + + V futNext = fut.next(); + + if (futNext == null) + clearWeakReference(); + + next = futNext != null ? convert(futNext) : null; + + return cur; + } + + /** {@inheritDoc} */ + @Override public boolean onHasNext() throws IgniteCheckedException { + init(); + + boolean hasNext = next != null; + + if (!hasNext) + clearWeakReference(); + + return hasNext; + } + + /** {@inheritDoc} */ + @Override protected void onClose() throws IgniteCheckedException { + fut.cancel(); + + clearWeakReference(); + } + + /** {@inheritDoc} */ + @Override protected void onRemove() throws IgniteCheckedException { + if (cur == null) + throw new IllegalStateException(); + + CacheWeakQueryIteratorsHolder.this.remove(cur); + + cur = null; + } + + /** + * @return Iterator weak reference. + */ + private WeakReference<WeakQueryFutureIterator> weakReference() { + return weakRef; + } + + /** + * Clears weak reference. + */ + private void clearWeakReference() { + weakRef.clear(); + + futs.remove(weakRef); + } + + /** + * @throws IgniteCheckedException If failed. + */ + private void init() throws IgniteCheckedException { + if (!init) { + V futNext = fut.next(); + + next = futNext != null ? convert(futNext) : null; + + init = true; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c927f689/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java index 23eca52..cacc933 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java @@ -66,8 +66,8 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa /** Removed flag. */ private volatile boolean rmvd; - /** Query storage */ - private final IgniteQueryStorage queryStorage; + /** Iterator holder. */ + private final CacheWeakQueryIteratorsHolder<T, Map.Entry<T, ?>> itHolder; /** * @param ctx Cache context. @@ -85,7 +85,15 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa hdrPart = ctx.affinity().partition(new GridCacheSetHeaderKey(name)); - queryStorage = new IgniteQueryStorage(ctx); + itHolder = new CacheWeakQueryIteratorsHolder<T, Map.Entry<T, ?>>(ctx.logger(GridCacheSetImpl.class)) { + @Override protected T convert(Map.Entry<T, ?> e) { + return e.getKey(); + } + + @Override protected void remove(T item) { + GridCacheSetImpl.this.remove(item); + } + }; } /** {@inheritDoc} */ @@ -340,10 +348,10 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa qry.projection(ctx.grid().forNodes(nodes)); - IgniteQueryAbstractStorage.IgniteIterator it = queryStorage.iterator(qry.execute()); + CacheWeakQueryIteratorsHolder.WeakQueryFutureIterator it = itHolder.iterator(qry.execute()); if (rmvd) { - queryStorage.removeIterator(it); + itHolder.removeIterator(it); checkRemoved(); } @@ -430,7 +438,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa this.rmvd = rmvd; if (rmvd) - queryStorage.clearQueries(); + itHolder.clearQueries(); } /** @@ -445,7 +453,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa * Checks if set was removed and handles iterators weak reference queue. */ private void onAccess() { - queryStorage.checkWeakQueue(); + itHolder.checkWeakQueue(); checkRemoved(); } @@ -478,28 +486,6 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa } /** - * Queries' storage. - */ - private class IgniteQueryStorage extends IgniteQueryAbstractStorage<T, Map.Entry<T, ?>> { - /** - * @param ctx Cache context. - */ - public IgniteQueryStorage(GridCacheContext ctx) { - super(ctx); - } - - /** {@inheritDoc} */ - @Override protected T convert(Map.Entry<T, ?> v) { - return v != null ? (T) v.getKey() : null; - } - - /** {@inheritDoc} */ - @Override protected void remove(T item) { - GridCacheSetImpl.this.remove(item); - } - } - - /** * */ private static class SumReducer implements IgniteReducer<Object, Integer>, Externalizable { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c927f689/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryAbstractStorage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryAbstractStorage.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryAbstractStorage.java deleted file mode 100644 index 443fb86..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryAbstractStorage.java +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache.datastructures; - -import org.apache.ignite.*; -import org.gridgain.grid.cache.query.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.gridgain.grid.util.*; -import org.jdk8.backport.*; - -import java.lang.ref.*; -import java.util.*; - -/** - * Storage for GridCacheQueryFuture. - * @param <T> Type for iterator. - * @param <V> Type for cache query future. - */ -public abstract class IgniteQueryAbstractStorage<T, V> { - /** Iterators weak references queue. */ - private final ReferenceQueue<IgniteIterator> refQueue = new ReferenceQueue<>(); - - /** Iterators futures. */ - private final Map<WeakReference<IgniteIterator>, GridCacheQueryFuture<V>> futs = new ConcurrentHashMap8<>(); - - /** Logger. */ - private final IgniteLogger log; - - /** - * @param ctx Cache context. - */ - public IgniteQueryAbstractStorage(GridCacheContext ctx) { - log = ctx.logger(IgniteQueryAbstractStorage.class); - } - - /** - * Iterator over the cache. - * @param fut Query to iterate - * @return iterator - */ - public IgniteIterator iterator(GridCacheQueryFuture<V> fut) { - IgniteIterator it = new IgniteIterator(fut); - - futs.put(it.weakReference(), fut); - - return it; - } - - public void removeIterator(IgniteIterator it) throws IgniteCheckedException { - futs.remove(it.weakReference()); - - it.close(); - } - - /** - * Closes unreachable iterators. - */ - public void checkWeakQueue() { - for (Reference<? extends IgniteIterator> itRef = refQueue.poll(); itRef != null; itRef = refQueue.poll()) { - try { - WeakReference<IgniteIterator> weakRef = (WeakReference<IgniteIterator>)itRef; - - GridCacheQueryFuture<?> fut = futs.remove(weakRef); - - if (fut != null) - fut.cancel(); - } - catch (IgniteCheckedException e) { - log.error("Failed to close iterator.", e); - } - } - } - - /** - * Checks if set was removed and handles iterators weak reference queue. - */ - public void onAccess() throws IgniteCheckedException { - checkWeakQueue(); - } - - /** - * Cancel all cache queries - */ - protected void clearQueries(){ - for (GridCacheQueryFuture<?> fut : futs.values()) { - try { - fut.cancel(); - } - catch (IgniteCheckedException e) { - log.error("Failed to close iterator.", e); - } - } - - futs.clear(); - } - - /** - * Convert class V to class T. - * @param v Item to convert. - * @return Converted item. - */ - protected abstract T convert(V v); - - /** - * Remove item from the cache. - * @param item Item to remove. - */ - protected abstract void remove(T item); - - /** - * Iterator over the cache. - */ - public class IgniteIterator extends GridCloseableIteratorAdapter<T> { - /** Query future. */ - private final GridCacheQueryFuture<V> fut; - - /** Weak reference. */ - private final WeakReference<IgniteIterator> weakRef; - - /** Init flag. */ - private boolean init; - - /** Next item. */ - private T next; - - /** Current item. */ - private T cur; - - /** - * @param fut GridCacheQueryFuture to iterate - */ - IgniteIterator(GridCacheQueryFuture<V> fut) { - this.fut = fut; - - this.weakRef = new WeakReference<IgniteIterator>(this, refQueue); - } - - /** - * @return Iterator weak reference. - */ - public WeakReference<IgniteIterator> weakReference() { - return weakRef; - } - - /** {@inheritDoc} */ - @Override public T onNext() throws IgniteCheckedException { - init(); - - if (next == null) { - clearWeakReference(); - - throw new NoSuchElementException(); - } - - cur = next; - - V futNext = fut.next(); - - if (futNext == null) - clearWeakReference(); - - next = futNext != null ? convert(futNext) : null; - - return cur; - } - - /** {@inheritDoc} */ - @Override public boolean onHasNext() throws IgniteCheckedException { - init(); - - boolean hasNext = next != null; - - if (!hasNext) - clearWeakReference(); - - return hasNext; - } - - /** {@inheritDoc} */ - @Override protected void onClose() throws IgniteCheckedException { - fut.cancel(); - - clearWeakReference(); - } - - /** {@inheritDoc} */ - @Override protected void onRemove() throws IgniteCheckedException { - if (cur == null) - throw new IllegalStateException(); - - IgniteQueryAbstractStorage.this.remove(cur); - - cur = null; - } - - /** - * Clears weak reference. - */ - private void clearWeakReference() { - weakRef.clear(); - - futs.remove(weakRef); - } - - /** - * @throws IgniteCheckedException If failed. - */ - private void init() throws IgniteCheckedException { - if (!init) { - V futNext = fut.next(); - - next = futNext != null ? convert(futNext) : null; - - init = true; - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c927f689/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java index 03a60cb..a44acd1 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -5256,11 +5256,13 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract public void testIgniteCacheIterator() throws Exception { IgniteCache<String, Integer> cache = jcache(0); - final int cacheSz = 100; + assertFalse(cache.iterator().hasNext()); - Map<String, Integer> entries = new HashMap(); + final int SIZE = 100; - for (int i = 0; i < cacheSz; ++i) { + Map<String, Integer> entries = new HashMap<>(); + + for (int i = 0; i < SIZE; ++i) { cache.put(Integer.toString(i), i); entries.put(Integer.toString(i), i); @@ -5301,11 +5303,12 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract entries.remove(rmvKey); assertFalse(cache.containsKey(rmvKey)); + assertNull(cache.get(rmvKey)); checkIteratorCache(entries); // Check that we cannot call Iterator.remove() without next(). - Iterator<Cache.Entry<String, Integer>> iter = jcache(0).iterator(); + final Iterator<Cache.Entry<String, Integer>> iter = jcache(0).iterator(); assertTrue(iter.hasNext()); @@ -5313,13 +5316,13 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract iter.remove(); - try { - iter.remove(); + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Void call() throws Exception { + iter.remove(); - fail(); - } - catch (IllegalStateException e) { - } + return null; + } + }, IllegalStateException.class, null); } /**