#IGNITE-53: Move CacheWeakQueryIteratorsHolder to context.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9a5dcb3a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9a5dcb3a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9a5dcb3a Branch: refs/heads/ignite-86 Commit: 9a5dcb3a4a4c964ff4c53bf57a791170b310ff0c Parents: d803400 Author: ivasilinets <ivasilin...@gridgain.com> Authored: Mon Jan 19 17:31:18 2015 +0400 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Mon Jan 19 17:31:18 2015 +0400 ---------------------------------------------------------------------- .../processors/cache/IgniteCacheProxy.java | 28 +++++------ .../cache/CacheIteratorConverter.java | 39 +++++++++++++++ .../cache/CacheWeakQueryIteratorsHolder.java | 50 ++++++++------------ .../processors/cache/GridCacheContext.java | 12 +++++ .../cache/datastructures/GridCacheSetImpl.java | 32 ++++++------- 5 files changed, 100 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9a5dcb3a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 3ac11be..46fbfb4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -25,6 +25,7 @@ import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; import org.gridgain.grid.*; import org.gridgain.grid.cache.*; +import org.gridgain.grid.cache.query.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.util.tostring.*; @@ -62,9 +63,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** Projection. */ private GridCacheProjectionImpl<K, V> prj; - /** Iterator holder. */ - private final CacheWeakQueryIteratorsHolder<Entry<K, V>, Map.Entry<K, V>> itHolder; - /** * @param ctx Context. * @param delegate Delegate. @@ -85,16 +83,6 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements this.delegate = delegate; this.prj = prj; - itHolder = new CacheWeakQueryIteratorsHolder<Entry<K, V>, Map.Entry<K, V>>(ctx.logger(IgniteCacheProxy.class)) { - @Override protected Entry<K, V> convert(Map.Entry<K, V> e) { - return new CacheEntryImpl<>(e.getKey(), e.getValue()); - } - - @Override protected void remove(Entry<K, V> item) { - IgniteCacheProxy.this.remove(item.getKey()); - } - }; - gate = ctx.gate(); } @@ -942,9 +930,19 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - itHolder.checkWeakQueue(); + ctx.itHolder().checkWeakQueue(); + + GridCacheQueryFuture<Map.Entry<K, V>> fut = delegate.queries().createScanQuery(null).execute(); - return itHolder.iterator(delegate.queries().createScanQuery(null).execute()); + return ctx.itHolder().iterator(fut, new CacheIteratorConverter<Entry<K, V>, Map.Entry<K, V>>() { + @Override protected Entry<K, V> convert(Map.Entry<K, V> e) { + return new CacheEntryImpl<>(e.getKey(), e.getValue()); + } + + @Override protected void remove(Entry<K, V> item) { + IgniteCacheProxy.this.remove(item.getKey()); + } + }); } finally { gate.leave(prev); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9a5dcb3a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheIteratorConverter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheIteratorConverter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheIteratorConverter.java new file mode 100644 index 0000000..466460d --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheIteratorConverter.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gridgain.grid.kernal.processors.cache; + +/** + * @param <T> Type for iterator. + * @param <V> Type for cache query future. + */ +public abstract class CacheIteratorConverter <T, V> { + /** + * Converts class V to class T. + * + * @param v Item to convert. + * @return Converted item. + */ + protected abstract T convert(V v); + + /** + * Removes item. + * + * @param item Item to remove. + */ + protected abstract void remove(T item); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9a5dcb3a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheWeakQueryIteratorsHolder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheWeakQueryIteratorsHolder.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheWeakQueryIteratorsHolder.java index 1459927..2980663 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheWeakQueryIteratorsHolder.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheWeakQueryIteratorsHolder.java @@ -26,10 +26,9 @@ import java.lang.ref.*; import java.util.*; /** - * @param <T> Type for iterator. * @param <V> Type for cache query future. */ -public abstract class CacheWeakQueryIteratorsHolder<T, V> { +public class CacheWeakQueryIteratorsHolder<V> { /** Iterators weak references queue. */ private final ReferenceQueue<WeakQueryFutureIterator> refQueue = new ReferenceQueue<>(); @@ -48,12 +47,13 @@ public abstract class CacheWeakQueryIteratorsHolder<T, V> { } /** - * Iterator over the cache. - * @param fut Query to iterate - * @return iterator + * @param fut Query to iterate. + * @param convert Cache iterator converter. + * @param <T> Type for the iterator. + * @return Iterator over the cache. */ - public WeakQueryFutureIterator iterator(GridCacheQueryFuture<V> fut) { - WeakQueryFutureIterator it = new WeakQueryFutureIterator(fut); + public <T> WeakQueryFutureIterator iterator(GridCacheQueryFuture<V> fut, CacheIteratorConverter<T, V> convert) { + WeakQueryFutureIterator it = new WeakQueryFutureIterator(fut, convert); GridCacheQueryFuture<V> old = futs.put(it.weakReference(), fut); @@ -94,7 +94,7 @@ public abstract class CacheWeakQueryIteratorsHolder<T, V> { } /** - * Cancel all cache queries + * Cancel all cache queries. */ public void clearQueries(){ for (GridCacheQueryFuture<?> fut : futs.values()) { @@ -109,30 +109,20 @@ public abstract class CacheWeakQueryIteratorsHolder<T, V> { futs.clear(); } - /** - * Converts class V to class T. - * - * @param v Item to convert. - * @return Converted item. - */ - protected abstract T convert(V v); - - /** - * Removes item. - * - * @param item Item to remove. - */ - protected abstract void remove(T item); /** * Iterator based of {@link GridCacheQueryFuture}. + * + * @param <T> Type for iterator. */ - public class WeakQueryFutureIterator extends GridCloseableIteratorAdapter<T> { + public class WeakQueryFutureIterator<T> extends GridCloseableIteratorAdapter<T> { /** Query future. */ private final GridCacheQueryFuture<V> fut; /** Weak reference. */ - private final WeakReference<WeakQueryFutureIterator> weakRef; + private final WeakReference<WeakQueryFutureIterator<T>> weakRef; + + CacheIteratorConverter<T, V> convert; /** Init flag. */ private boolean init; @@ -146,10 +136,12 @@ public abstract class CacheWeakQueryIteratorsHolder<T, V> { /** * @param fut GridCacheQueryFuture to iterate. */ - WeakQueryFutureIterator(GridCacheQueryFuture<V> fut) { + WeakQueryFutureIterator(GridCacheQueryFuture<V> fut, CacheIteratorConverter<T, V> convert) { this.fut = fut; this.weakRef = new WeakReference<>(this, refQueue); + + this.convert = convert; } /** {@inheritDoc} */ @@ -169,7 +161,7 @@ public abstract class CacheWeakQueryIteratorsHolder<T, V> { if (futNext == null) clearWeakReference(); - next = futNext != null ? convert(futNext) : null; + next = futNext != null ? convert.convert(futNext) : null; return cur; } @@ -198,7 +190,7 @@ public abstract class CacheWeakQueryIteratorsHolder<T, V> { if (cur == null) throw new IllegalStateException(); - CacheWeakQueryIteratorsHolder.this.remove(cur); + convert.remove(cur); cur = null; } @@ -206,7 +198,7 @@ public abstract class CacheWeakQueryIteratorsHolder<T, V> { /** * @return Iterator weak reference. */ - private WeakReference<WeakQueryFutureIterator> weakReference() { + private WeakReference<WeakQueryFutureIterator<T>> weakReference() { return weakRef; } @@ -226,7 +218,7 @@ public abstract class CacheWeakQueryIteratorsHolder<T, V> { if (!init) { V futNext = fut.next(); - next = futNext != null ? convert(futNext) : null; + next = futNext != null ? convert.convert(futNext) : null; init = true; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9a5dcb3a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java index ad65344..cde739b 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java @@ -186,6 +186,9 @@ public class GridCacheContext<K, V> implements Externalizable { /** Default expiry policy. */ private ExpiryPolicy expiryPlc; + /** Cache weak query iterator holder. */ + private CacheWeakQueryIteratorsHolder<Map.Entry<K, V>> itHolder; + /** * Empty constructor required for {@link Externalizable}. */ @@ -300,6 +303,8 @@ public class GridCacheContext<K, V> implements Externalizable { if (expiryPlc instanceof EternalExpiryPolicy) expiryPlc = null; + + itHolder = new CacheWeakQueryIteratorsHolder(log); } /** @@ -837,6 +842,13 @@ public class GridCacheContext<K, V> implements Externalizable { } /** + * @return Iterators Holder. + */ + public CacheWeakQueryIteratorsHolder<Map.Entry<K, V>> itHolder() { + return itHolder; + } + + /** * @return Swap manager. */ public GridCacheSwapManager<K, V> swap() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9a5dcb3a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java index cacc933..852aa21 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java @@ -66,9 +66,6 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa /** Removed flag. */ private volatile boolean rmvd; - /** Iterator holder. */ - private final CacheWeakQueryIteratorsHolder<T, Map.Entry<T, ?>> itHolder; - /** * @param ctx Cache context. * @param name Set name. @@ -84,16 +81,6 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa cache = ctx.cache(); hdrPart = ctx.affinity().partition(new GridCacheSetHeaderKey(name)); - - itHolder = new CacheWeakQueryIteratorsHolder<T, Map.Entry<T, ?>>(ctx.logger(GridCacheSetImpl.class)) { - @Override protected T convert(Map.Entry<T, ?> e) { - return e.getKey(); - } - - @Override protected void remove(T item) { - GridCacheSetImpl.this.remove(item); - } - }; } /** {@inheritDoc} */ @@ -348,10 +335,21 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa qry.projection(ctx.grid().forNodes(nodes)); - CacheWeakQueryIteratorsHolder.WeakQueryFutureIterator it = itHolder.iterator(qry.execute()); + GridCacheQueryFuture<Map.Entry<T, ?>> fut = qry.execute(); + + CacheWeakQueryIteratorsHolder.WeakQueryFutureIterator it = + ctx.itHolder().iterator(fut, new CacheIteratorConverter<T, Map.Entry<T, ?>>() { + @Override protected T convert(Map.Entry<T, ?> e) { + return e.getKey(); + } + + @Override protected void remove(T item) { + GridCacheSetImpl.this.remove(item); + } + }); if (rmvd) { - itHolder.removeIterator(it); + ctx.itHolder().removeIterator(it); checkRemoved(); } @@ -438,7 +436,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa this.rmvd = rmvd; if (rmvd) - itHolder.clearQueries(); + ctx.itHolder().clearQueries(); } /** @@ -453,7 +451,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa * Checks if set was removed and handles iterators weak reference queue. */ private void onAccess() { - itHolder.checkWeakQueue(); + ctx.itHolder().checkWeakQueue(); checkRemoved(); }