#IGNITE-53: Refactoring IgniteQueryStorage
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/224f7e2f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/224f7e2f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/224f7e2f Branch: refs/heads/sprint-1 Commit: 224f7e2fcac84302e31478de8faaaecbc290c250 Parents: d6372ea Author: ivasilinets <ivasilin...@gridgain.com> Authored: Thu Jan 15 19:16:57 2015 +0400 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Thu Jan 15 19:16:57 2015 +0400 ---------------------------------------------------------------------- .../processors/cache/IgniteCacheProxy.java | 69 ++---- .../cache/datastructures/GridCacheSetImpl.java | 168 ++----------- .../IgniteQueryAbstractStorage.java | 233 +++++++++++++++++++ .../IgniteQueryFutureStorage.java | 190 --------------- 4 files changed, 276 insertions(+), 384 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/224f7e2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index cda62b2..b0ae913 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -24,7 +24,6 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.query.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.kernal.processors.cache.datastructures.*; @@ -63,8 +62,8 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements /** Projection. */ private GridCacheProjectionImpl<K, V> prj; - /** Query future storage */ - private final IgniteQueryFutureStorage queryStorage; + /** Query storage */ + private final IgniteQueryStorage queryStorage; /** * @param ctx Context. @@ -85,7 +84,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements this.delegate = delegate; this.prj = prj; - this.queryStorage = new IgniteQueryFutureStorage(ctx); + this.queryStorage = new IgniteQueryStorage(ctx); gate = ctx.gate(); } @@ -983,7 +982,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return new IgniteCacheIterator(delegate.queries().createScanQuery(null).execute(), queryStorage); + return queryStorage.iterator(delegate.queries().createScanQuery(null).execute()); } finally { gate.leave(prev); @@ -1236,55 +1235,37 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter implements } /** - * Iterator over the cache. + * Queries' storage */ - private class IgniteCacheIterator implements Iterator<Cache.Entry<K, V>> { - /** Iterator over the cache*/ - IgniteQueryFutureStorage.Iterator<Map.Entry<K, V>> iter; - - IgniteCacheIterator(GridCacheQueryFuture<Map.Entry<K, V>> fut, IgniteQueryFutureStorage storage) { - iter = storage.iterator(fut); - } - - /** {@inheritDoc} */ - @Override public boolean hasNext() { - try { - return iter.onHasNext(); - } catch (IgniteCheckedException e) { - throw cacheException(e); - } + private class IgniteQueryStorage extends IgniteQueryAbstractStorage<Entry<K, V>, Map.Entry<K, V>> { + /** + * @param ctx Cache context. + */ + public IgniteQueryStorage(GridCacheContext ctx) { + super(ctx); } /** {@inheritDoc} */ - @Override public Entry<K, V> next() { - try { - final Map.Entry<K, V> cur = iter.onNext(); - return new Cache.Entry<K, V>() { - @Override public K getKey() { - return cur.getKey(); - } - - @Override public V getValue() { - return cur.getValue(); - } - - @Override public <T> T unwrap(Class<T> clazz) { - throw new IllegalArgumentException(); - } - }; - } - catch (IgniteCheckedException e) { - throw cacheException(e); - } + @Override protected Cache.Entry<K, V> convert(final Map.Entry<K, V> v) { + return new Cache.Entry<K, V>() { + @Override public K getKey() { + return v.getKey(); + } + @Override public V getValue() { + return v.getValue(); + } + @Override public <T> T unwrap(Class<T> clazz) { + throw new IllegalArgumentException(); + } + }; } /** {@inheritDoc} */ - @Override public void remove() { - Map.Entry<K, V> curEntry = iter.itemToRemove(); + @Override protected void remove(Entry<K, V> item) { try { - delegate.removex(curEntry.getKey()); + delegate.removex(item.getKey()); } catch (IgniteCheckedException e) { throw cacheException(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/224f7e2f/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java index 9519ad8..23eca52 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSetImpl.java @@ -20,7 +20,6 @@ package org.gridgain.grid.kernal.processors.cache.datastructures; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.affinity.*; import org.gridgain.grid.cache.datastructures.*; @@ -31,11 +30,9 @@ import org.gridgain.grid.util.*; import org.gridgain.grid.util.lang.*; import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; -import org.jdk8.backport.*; import org.jetbrains.annotations.*; import java.io.*; -import java.lang.ref.*; import java.util.*; import java.util.concurrent.*; @@ -54,9 +51,6 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa /** Cache. */ private final GridCache<GridCacheSetItemKey, Boolean> cache; - /** Logger. */ - private final IgniteLogger log; - /** Set name. */ private final String name; @@ -72,11 +66,8 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa /** Removed flag. */ private volatile boolean rmvd; - /** Iterators weak references queue. */ - private final ReferenceQueue<SetIterator<?>> itRefQueue = new ReferenceQueue<>(); - - /** Iterators futures. */ - private final Map<WeakReference<SetIterator<?>>, GridCacheQueryFuture<?>> itFuts = new ConcurrentHashMap8<>(); + /** Query storage */ + private final IgniteQueryStorage queryStorage; /** * @param ctx Cache context. @@ -92,9 +83,9 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa cache = ctx.cache(); - log = ctx.logger(GridCacheSetImpl.class); - hdrPart = ctx.affinity().partition(new GridCacheSetHeaderKey(name)); + + queryStorage = new IgniteQueryStorage(ctx); } /** {@inheritDoc} */ @@ -349,16 +340,10 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa qry.projection(ctx.grid().forNodes(nodes)); - GridCacheQueryFuture<T> fut = qry.execute(); - - SetIterator<T> it = new SetIterator<>(fut); - - itFuts.put(it.weakReference(), fut); + IgniteQueryAbstractStorage.IgniteIterator it = queryStorage.iterator(qry.execute()); if (rmvd) { - itFuts.remove(it.weakReference()); - - it.close(); + queryStorage.removeIterator(it); checkRemoved(); } @@ -444,18 +429,8 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa this.rmvd = rmvd; - if (rmvd) { - for (GridCacheQueryFuture<?> fut : itFuts.values()) { - try { - fut.cancel(); - } - catch (IgniteCheckedException e) { - log.error("Failed to close iterator.", e); - } - } - - itFuts.clear(); - } + if (rmvd) + queryStorage.clearQueries(); } /** @@ -467,29 +442,10 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa } /** - * Closes unreachable iterators. - */ - private void checkWeakQueue() { - for (Reference<? extends SetIterator<?>> itRef = itRefQueue.poll(); itRef != null; itRef = itRefQueue.poll()) { - try { - WeakReference<SetIterator<?>> weakRef = (WeakReference<SetIterator<?>>)itRef; - - GridCacheQueryFuture<?> fut = itFuts.remove(weakRef); - - if (fut != null) - fut.cancel(); - } - catch (IgniteCheckedException e) { - log.error("Failed to close iterator.", e); - } - } - } - - /** * Checks if set was removed and handles iterators weak reference queue. */ private void onAccess() { - checkWeakQueue(); + queryStorage.checkWeakQueue(); checkRemoved(); } @@ -522,112 +478,24 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements GridCa } /** - * + * Queries' storage. */ - private class SetIterator<T> extends GridCloseableIteratorAdapter<T> { - /** */ - private static final long serialVersionUID = -1460570789166994846L; - - /** Query future. */ - private final GridCacheQueryFuture<T> fut; - - /** Init flag. */ - private boolean init; - - /** Next item. */ - private T next; - - /** Current item. */ - private T cur; - - /** Weak reference. */ - private final WeakReference<SetIterator<?>> weakRef; - + private class IgniteQueryStorage extends IgniteQueryAbstractStorage<T, Map.Entry<T, ?>> { /** - * @param fut Query future. + * @param ctx Cache context. */ - private SetIterator(GridCacheQueryFuture<T> fut) { - this.fut = fut; - - weakRef = new WeakReference<SetIterator<?>>(this, itRefQueue); - } - - /** {@inheritDoc} */ - @Override protected T onNext() throws IgniteCheckedException { - init(); - - if (next == null) { - clearWeakReference(); - - throw new NoSuchElementException(); - } - - cur = next; - - Map.Entry e = (Map.Entry)fut.next(); - - next = e != null ? (T)e.getKey() : null; - - if (next == null) - clearWeakReference(); - - return cur; + public IgniteQueryStorage(GridCacheContext ctx) { + super(ctx); } /** {@inheritDoc} */ - @Override protected boolean onHasNext() throws IgniteCheckedException { - init(); - - boolean hasNext = next != null; - - if (!hasNext) - clearWeakReference(); - - return hasNext; - } - - /** {@inheritDoc} */ - @Override protected void onClose() throws IgniteCheckedException { - fut.cancel(); - - clearWeakReference(); + @Override protected T convert(Map.Entry<T, ?> v) { + return v != null ? (T) v.getKey() : null; } /** {@inheritDoc} */ - @Override protected void onRemove() throws IgniteCheckedException { - if (cur == null) - throw new NoSuchElementException(); - - GridCacheSetImpl.this.remove(cur); - } - - /** - * @throws IgniteCheckedException If failed. - */ - private void init() throws IgniteCheckedException { - if (!init) { - Map.Entry e = (Map.Entry)fut.next(); - - next = e != null ? (T)e.getKey() : null; - - init = true; - } - } - - /** - * @return Iterator weak reference. - */ - WeakReference<SetIterator<?>> weakReference() { - return weakRef; - } - - /** - * Clears weak reference. - */ - private void clearWeakReference() { - weakRef.clear(); // Do not need to enqueue. - - itFuts.remove(weakRef); + @Override protected void remove(T item) { + GridCacheSetImpl.this.remove(item); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/224f7e2f/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryAbstractStorage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryAbstractStorage.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryAbstractStorage.java new file mode 100644 index 0000000..443fb86 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryAbstractStorage.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gridgain.grid.kernal.processors.cache.datastructures; + +import org.apache.ignite.*; +import org.gridgain.grid.cache.query.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.gridgain.grid.util.*; +import org.jdk8.backport.*; + +import java.lang.ref.*; +import java.util.*; + +/** + * Storage for GridCacheQueryFuture. + * @param <T> Type for iterator. + * @param <V> Type for cache query future. + */ +public abstract class IgniteQueryAbstractStorage<T, V> { + /** Iterators weak references queue. */ + private final ReferenceQueue<IgniteIterator> refQueue = new ReferenceQueue<>(); + + /** Iterators futures. */ + private final Map<WeakReference<IgniteIterator>, GridCacheQueryFuture<V>> futs = new ConcurrentHashMap8<>(); + + /** Logger. */ + private final IgniteLogger log; + + /** + * @param ctx Cache context. + */ + public IgniteQueryAbstractStorage(GridCacheContext ctx) { + log = ctx.logger(IgniteQueryAbstractStorage.class); + } + + /** + * Iterator over the cache. + * @param fut Query to iterate + * @return iterator + */ + public IgniteIterator iterator(GridCacheQueryFuture<V> fut) { + IgniteIterator it = new IgniteIterator(fut); + + futs.put(it.weakReference(), fut); + + return it; + } + + public void removeIterator(IgniteIterator it) throws IgniteCheckedException { + futs.remove(it.weakReference()); + + it.close(); + } + + /** + * Closes unreachable iterators. + */ + public void checkWeakQueue() { + for (Reference<? extends IgniteIterator> itRef = refQueue.poll(); itRef != null; itRef = refQueue.poll()) { + try { + WeakReference<IgniteIterator> weakRef = (WeakReference<IgniteIterator>)itRef; + + GridCacheQueryFuture<?> fut = futs.remove(weakRef); + + if (fut != null) + fut.cancel(); + } + catch (IgniteCheckedException e) { + log.error("Failed to close iterator.", e); + } + } + } + + /** + * Checks if set was removed and handles iterators weak reference queue. + */ + public void onAccess() throws IgniteCheckedException { + checkWeakQueue(); + } + + /** + * Cancel all cache queries + */ + protected void clearQueries(){ + for (GridCacheQueryFuture<?> fut : futs.values()) { + try { + fut.cancel(); + } + catch (IgniteCheckedException e) { + log.error("Failed to close iterator.", e); + } + } + + futs.clear(); + } + + /** + * Convert class V to class T. + * @param v Item to convert. + * @return Converted item. + */ + protected abstract T convert(V v); + + /** + * Remove item from the cache. + * @param item Item to remove. + */ + protected abstract void remove(T item); + + /** + * Iterator over the cache. + */ + public class IgniteIterator extends GridCloseableIteratorAdapter<T> { + /** Query future. */ + private final GridCacheQueryFuture<V> fut; + + /** Weak reference. */ + private final WeakReference<IgniteIterator> weakRef; + + /** Init flag. */ + private boolean init; + + /** Next item. */ + private T next; + + /** Current item. */ + private T cur; + + /** + * @param fut GridCacheQueryFuture to iterate + */ + IgniteIterator(GridCacheQueryFuture<V> fut) { + this.fut = fut; + + this.weakRef = new WeakReference<IgniteIterator>(this, refQueue); + } + + /** + * @return Iterator weak reference. + */ + public WeakReference<IgniteIterator> weakReference() { + return weakRef; + } + + /** {@inheritDoc} */ + @Override public T onNext() throws IgniteCheckedException { + init(); + + if (next == null) { + clearWeakReference(); + + throw new NoSuchElementException(); + } + + cur = next; + + V futNext = fut.next(); + + if (futNext == null) + clearWeakReference(); + + next = futNext != null ? convert(futNext) : null; + + return cur; + } + + /** {@inheritDoc} */ + @Override public boolean onHasNext() throws IgniteCheckedException { + init(); + + boolean hasNext = next != null; + + if (!hasNext) + clearWeakReference(); + + return hasNext; + } + + /** {@inheritDoc} */ + @Override protected void onClose() throws IgniteCheckedException { + fut.cancel(); + + clearWeakReference(); + } + + /** {@inheritDoc} */ + @Override protected void onRemove() throws IgniteCheckedException { + if (cur == null) + throw new IllegalStateException(); + + IgniteQueryAbstractStorage.this.remove(cur); + + cur = null; + } + + /** + * Clears weak reference. + */ + private void clearWeakReference() { + weakRef.clear(); + + futs.remove(weakRef); + } + + /** + * @throws IgniteCheckedException If failed. + */ + private void init() throws IgniteCheckedException { + if (!init) { + V futNext = fut.next(); + + next = futNext != null ? convert(futNext) : null; + + init = true; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/224f7e2f/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryFutureStorage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryFutureStorage.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryFutureStorage.java deleted file mode 100644 index d24045f..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/datastructures/IgniteQueryFutureStorage.java +++ /dev/null @@ -1,190 +0,0 @@ -package org.gridgain.grid.kernal.processors.cache.datastructures; - -import org.apache.ignite.*; -import org.gridgain.grid.cache.query.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.jdk8.backport.*; - -import java.lang.ref.*; -import java.util.*; - -/** - * Storage for GridCacheQueryFuture. - */ -public class IgniteQueryFutureStorage { - /** Iterators weak references queue. */ - private final ReferenceQueue<Iterator<?>> refQueue = new ReferenceQueue<>(); - - /** Iterators futures. */ - private final Map<WeakReference<Iterator<?>>, GridCacheQueryFuture<?>> futs = new ConcurrentHashMap8<>(); - - /** Logger. */ - private final IgniteLogger log; - - /** - * @param ctx Cache context. - */ - public IgniteQueryFutureStorage(GridCacheContext ctx) { - log = ctx.logger(GridCacheSetImpl.class); - } - - /** - * Iterator over the cache. - * @param fut Query to iterate - * @return iterator - */ - public <T> Iterator<T> iterator(GridCacheQueryFuture<T> fut) { - Iterator<T> it = new Iterator<>(fut); - - futs.put(it.weakReference(), fut); - - return it; - } - - /** - * Closes unreachable iterators. - */ - private void checkWeakQueue() throws IgniteCheckedException { - for (Reference<? extends Iterator<?>> itRef = refQueue.poll(); itRef != null; itRef = refQueue.poll()) { - WeakReference<Iterator<?>> weakRef = (WeakReference<Iterator<?>>) itRef; - - GridCacheQueryFuture<?> fut = futs.remove(weakRef); - - if (fut != null) - fut.cancel(); - - } - } - - /** - * Checks if set was removed and handles iterators weak reference queue. - */ - public void onAccess() throws IgniteCheckedException { - checkWeakQueue(); - } - - /** - * Cancel all cache queries - * @throws IgniteCheckedException - */ - protected void clearQueries() throws IgniteCheckedException { - for (GridCacheQueryFuture<?> fut : futs.values()) { - try { - fut.cancel(); - } - catch (IgniteCheckedException e) { - log.error("Failed to close iterator.", e); - } - } - - futs.clear(); - } - - /** - * Iterator over the cache - */ - public class Iterator<T> { - /** Query future. */ - private final GridCacheQueryFuture<T> fut; - - /** Weak reference. */ - private final WeakReference<Iterator<?>> weakRef; - - /** Init flag. */ - private boolean init; - - /** Next item. */ - private T next; - - /** Current item. */ - private T cur; - - /** - * @param fut GridCacheQueryFuture to iterate - */ - Iterator(GridCacheQueryFuture<T> fut) { - this.fut = fut; - - this.weakRef = new WeakReference<Iterator<?>>(this, refQueue); - } - - - /** - * @throws IgniteCheckedException If failed. - */ - private void init() throws IgniteCheckedException { - if (!init) { - next = fut.next(); - - init = true; - } - } - - /** - * @return Iterator weak reference. - */ - WeakReference<Iterator<?>> weakReference() { - return weakRef; - } - - /** - * Clears weak reference. - */ - private void clearWeakReference() { - weakRef.clear(); - - futs.remove(weakRef); - } - - /** - * The same as Iterator.next() - */ - public T onNext() throws IgniteCheckedException { - init(); - - if (next == null) { - clearWeakReference(); - - throw new NoSuchElementException(); - } - - cur = next; - next = fut.next(); - - if (next == null) - clearWeakReference(); - - return cur; - } - - /** - * The same as Iterator.hasNext() - */ - public boolean onHasNext() throws IgniteCheckedException { - init(); - - boolean hasNext = next != null; - - if (!hasNext) - clearWeakReference(); - - return hasNext; - } - - /** - * @return current item to remove - * @throws IllegalStateException if the {@code onNext} method has not - * yet been called, or the {@code itemToRemove} method has already - * been called after the last call to the {@code onNext} - * method - */ - public T itemToRemove() { - if (cur == null) - throw new IllegalStateException(); - T res = cur; - cur = null; - return res; - } - } - -}