#ignite-286:wip
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/42766004 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/42766004 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/42766004 Branch: refs/heads/ignite-286 Commit: 42766004414aa5817d6588156d94639dedff6e56 Parents: e174c0b Author: ivasilinets <ivasilin...@gridgain.com> Authored: Fri Apr 24 18:07:02 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Fri Apr 24 18:07:02 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheSwapManager.java | 108 ++++++++++--------- .../processors/cache/local/GridLocalCache.java | 4 +- .../local/atomic/GridLocalAtomicCache.java | 5 +- .../cache/query/GridCacheQueryManager.java | 22 +++- ...achePartitionedMultiNodeFullApiSelfTest.java | 3 +- 5 files changed, 82 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/42766004/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index 7b2d368..da6cac0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -1587,65 +1587,18 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { if (!offheapEnabled) return new GridEmptyCloseableIterator<>(); - return new GridCloseableIteratorAdapter<Map.Entry<byte[], byte[]>>() { - private GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> it = offheap.iterator(spaceName); - - private Map.Entry<byte[], byte[]> cur; - - @Override protected Map.Entry<byte[], byte[]> onNext() { - return cur = it.next(); - } - - @Override protected boolean onHasNext() { - return it.hasNext(); - } - - @Override protected void onRemove() throws IgniteCheckedException { - KeyCacheObject key = cctx.toCacheKeyObject(cur.getKey()); - - int part = cctx.affinity().partition(key); - - offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); - } - - @Override protected void onClose() throws IgniteCheckedException { - it.close(); - } - }; + return new OffHeapIterator(offheap.iterator(spaceName)); } /** + * @param part Partition. * @return Raw off-heap iterator. */ public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawOffHeapIterator(final int part) { if (!offheapEnabled) return new GridEmptyCloseableIterator<>(); - return new GridCloseableIteratorAdapter<Map.Entry<byte[], byte[]>>() { - private GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> it = offheap.iterator(spaceName, part); - - private Map.Entry<byte[], byte[]> cur; - - @Override protected Map.Entry<byte[], byte[]> onNext() { - return cur = it.next(); - } - - @Override protected boolean onHasNext() { - return it.hasNext(); - } - - @Override protected void onRemove() throws IgniteCheckedException { - KeyCacheObject key = cctx.toCacheKeyObject(cur.getKey()); - - int part = cctx.affinity().partition(key); - - offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); - } - - @Override protected void onClose() throws IgniteCheckedException { - it.close(); - } - }; + return new OffHeapIterator(offheap.iterator(spaceName, part)); } /** @@ -1680,6 +1633,20 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { } /** + * @param part Partition. + * @return Raw off-heap iterator. + * @throws IgniteCheckedException If failed. + */ + public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawSwapIterator(int part) throws IgniteCheckedException { + if (!swapEnabled) + return new GridEmptyCloseableIterator<>(); + + checkIteratorQueue(); + + return swapMgr.rawIterator(spaceName, part); + } + + /** * @param primary If {@code true} includes primary entries. * @param backup If {@code true} includes backup entries. * @param topVer Topology version. @@ -2100,4 +2067,45 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { abstract protected Iterator<KeyCacheObject> partitionIterator(int part) throws IgniteCheckedException; } + + /** + * Raw off-heap iterator. + */ + private class OffHeapIterator extends GridCloseableIteratorAdapter<Map.Entry<byte[], byte[]>> { + /** Internal off-heap iterator. */ + private final GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> it; + + /** Current entry. */ + private Map.Entry<byte[], byte[]> cur; + + /** + * @param it Internal off-heap iterator. + */ + public OffHeapIterator(GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> it) { + this.it = it; + } + + /** {@inheritDoc} */ + @Override protected Map.Entry<byte[], byte[]> onNext() { + return cur = it.next(); + } + + /** {@inheritDoc} */ + @Override protected boolean onHasNext() { + return it.hasNext(); + } + + /** {@inheritDoc} */ + @Override protected void onRemove() throws IgniteCheckedException { + KeyCacheObject key = cctx.toCacheKeyObject(cur.getKey()); + int part = cctx.affinity().partition(key); + + offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + } + + /** {@inheritDoc} */ + @Override protected void onClose() throws IgniteCheckedException { + it.close(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/42766004/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java index 8553b86..51fc7e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java @@ -208,8 +208,8 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> { Set<K> keys = new HashSet<>(); if (ctx.offheapTiered()) { - for (Iterator<KeyCacheObject> it = - ctx.swap().offHeapKeyIterator(true, true, AffinityTopologyVersion.NONE); it.hasNext(); ) + for (Iterator<KeyCacheObject> it = ctx.swap().offHeapKeyIterator(true, true, AffinityTopologyVersion.NONE); + it.hasNext(); ) keys.add((K)it.next().value(ctx.cacheObjectContext(), false)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/42766004/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index 672f12d..32aa93e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -392,8 +392,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { Set<K> keys = new HashSet<>(); if (ctx.offheapTiered()) { - for (Iterator<KeyCacheObject> it = - ctx.swap().offHeapKeyIterator(true, true, AffinityTopologyVersion.NONE); it.hasNext(); ) + for (Iterator<KeyCacheObject> it = ctx.swap().offHeapKeyIterator(true, true, AffinityTopologyVersion.NONE); + it.hasNext(); ) keys.add((K)it.next().value(ctx.cacheObjectContext(), false)); } @@ -1383,7 +1383,6 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { */ private List<GridCacheEntryEx> lockEntries(Collection<? extends K> keys) { List<GridCacheEntryEx> locked = new ArrayList<>(keys.size()); - for (K key : keys) { if (key == null) throw new NullPointerException("Null key."); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/42766004/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 20cf915..5ef4fee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -882,7 +882,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte iters.add(offheapIterator(qry, parts)); if (cctx.swap().swapEnabled()) - iters.add(swapIterator(qry)); + iters.add(swapIterator(qry, parts)); it = new CompoundIterator<>(iters); } @@ -916,23 +916,37 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte /** * @param qry Query. + * @param parts Collection of partitions. * @return Swap iterator. * @throws IgniteCheckedException If failed. */ - private GridIterator<IgniteBiTuple<K, V>> swapIterator(GridCacheQueryAdapter<?> qry) + private GridIterator<IgniteBiTuple<K, V>> swapIterator(GridCacheQueryAdapter<?> qry, Collection<Integer> parts) throws IgniteCheckedException { IgniteBiPredicate<K, V> filter = qry.scanFilter(); - Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawSwapIterator(); + Iterator<Map.Entry<byte[], byte[]>> it; + + if (parts == null) + it = cctx.swap().rawSwapIterator(); + else { + List<GridIterator<Map.Entry<byte[], byte[]>>> partIts = new ArrayList<>(); + + for (Integer part : parts) + partIts.add(cctx.swap().rawSwapIterator(part)); + + it = new CompoundIterator(partIts); + } return scanIterator(it, filter, qry.keepPortable()); } /** * @param qry Query. + * @param parts Collection of partitions. * @return Offheap iterator. + * @throws IgniteCheckedException If failed. */ - private GridIterator<IgniteBiTuple<K, V>> offheapIterator(GridCacheQueryAdapter<?> qry, Set<Integer> parts) + private GridIterator<IgniteBiTuple<K, V>> offheapIterator(GridCacheQueryAdapter<?> qry, Collection<Integer> parts) throws IgniteCheckedException { IgniteBiPredicate<K, V> filter = qry.scanFilter(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/42766004/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java index 1a780df..3f85204 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java @@ -330,7 +330,8 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest extends GridCacheParti assertEquals(0, cache1.localSize(NEAR)); assertEquals(5, cache1.localSize(CachePeekMode.ALL) - cache1.localSize(NEAR)); - assertEquals(nearEnabled() && memoryMode() != CacheMemoryMode.OFFHEAP_TIERED ? 2 : 0, cache2.localSize(NEAR)); + assertEquals(nearEnabled() && memoryMode() != CacheMemoryMode.OFFHEAP_TIERED + ? 2 : 0, cache2.localSize(NEAR)); assertEquals(0, cache2.localSize(CachePeekMode.ALL) - cache2.localSize(NEAR)); }