#ignite-286:wip

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/42766004
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/42766004
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/42766004

Branch: refs/heads/ignite-286
Commit: 42766004414aa5817d6588156d94639dedff6e56
Parents: e174c0b
Author: ivasilinets <ivasilin...@gridgain.com>
Authored: Fri Apr 24 18:07:02 2015 +0300
Committer: ivasilinets <ivasilin...@gridgain.com>
Committed: Fri Apr 24 18:07:02 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheSwapManager.java  | 108 ++++++++++---------
 .../processors/cache/local/GridLocalCache.java  |   4 +-
 .../local/atomic/GridLocalAtomicCache.java      |   5 +-
 .../cache/query/GridCacheQueryManager.java      |  22 +++-
 ...achePartitionedMultiNodeFullApiSelfTest.java |   3 +-
 5 files changed, 82 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/42766004/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index 7b2d368..da6cac0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -1587,65 +1587,18 @@ public class GridCacheSwapManager extends 
GridCacheManagerAdapter {
         if (!offheapEnabled)
             return new GridEmptyCloseableIterator<>();
 
-        return new GridCloseableIteratorAdapter<Map.Entry<byte[], byte[]>>() {
-            private GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> it = 
offheap.iterator(spaceName);
-
-            private Map.Entry<byte[], byte[]> cur;
-
-            @Override protected Map.Entry<byte[], byte[]> onNext() {
-                return cur = it.next();
-            }
-
-            @Override protected boolean onHasNext() {
-                return it.hasNext();
-            }
-
-            @Override protected void onRemove() throws IgniteCheckedException {
-                KeyCacheObject key = cctx.toCacheKeyObject(cur.getKey());
-
-                int part = cctx.affinity().partition(key);
-
-                offheap.removex(spaceName, part, key, 
key.valueBytes(cctx.cacheObjectContext()));
-            }
-
-            @Override protected void onClose() throws IgniteCheckedException {
-                it.close();
-            }
-        };
+        return new OffHeapIterator(offheap.iterator(spaceName));
     }
 
     /**
+     * @param part Partition.
      * @return Raw off-heap iterator.
      */
     public GridCloseableIterator<Map.Entry<byte[], byte[]>> 
rawOffHeapIterator(final int part) {
         if (!offheapEnabled)
             return new GridEmptyCloseableIterator<>();
 
-        return new GridCloseableIteratorAdapter<Map.Entry<byte[], byte[]>>() {
-            private GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> it = 
offheap.iterator(spaceName, part);
-
-            private Map.Entry<byte[], byte[]> cur;
-
-            @Override protected Map.Entry<byte[], byte[]> onNext() {
-                return cur = it.next();
-            }
-
-            @Override protected boolean onHasNext() {
-                return it.hasNext();
-            }
-
-            @Override protected void onRemove() throws IgniteCheckedException {
-                KeyCacheObject key = cctx.toCacheKeyObject(cur.getKey());
-
-                int part = cctx.affinity().partition(key);
-
-                offheap.removex(spaceName, part, key, 
key.valueBytes(cctx.cacheObjectContext()));
-            }
-
-            @Override protected void onClose() throws IgniteCheckedException {
-                it.close();
-            }
-        };
+        return new OffHeapIterator(offheap.iterator(spaceName, part));
     }
 
     /**
@@ -1680,6 +1633,20 @@ public class GridCacheSwapManager extends 
GridCacheManagerAdapter {
     }
 
     /**
+     * @param part Partition.
+     * @return Raw off-heap iterator.
+     * @throws IgniteCheckedException If failed.
+     */
+    public GridCloseableIterator<Map.Entry<byte[], byte[]>> 
rawSwapIterator(int part) throws IgniteCheckedException {
+        if (!swapEnabled)
+            return new GridEmptyCloseableIterator<>();
+
+        checkIteratorQueue();
+
+        return swapMgr.rawIterator(spaceName, part);
+    }
+
+    /**
      * @param primary If {@code true} includes primary entries.
      * @param backup If {@code true} includes backup entries.
      * @param topVer Topology version.
@@ -2100,4 +2067,45 @@ public class GridCacheSwapManager extends 
GridCacheManagerAdapter {
         abstract protected Iterator<KeyCacheObject> partitionIterator(int part)
             throws IgniteCheckedException;
     }
+
+    /**
+     * Raw off-heap iterator.
+     */
+    private class OffHeapIterator extends 
GridCloseableIteratorAdapter<Map.Entry<byte[], byte[]>> {
+        /** Internal off-heap iterator. */
+        private final GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> it;
+
+        /** Current entry. */
+        private Map.Entry<byte[], byte[]> cur;
+
+        /**
+         * @param it Internal off-heap iterator.
+         */
+        public OffHeapIterator(GridCloseableIterator<IgniteBiTuple<byte[], 
byte[]>> it) {
+            this.it = it;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected Map.Entry<byte[], byte[]> onNext() {
+            return cur = it.next();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected boolean onHasNext() {
+            return it.hasNext();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void onRemove() throws IgniteCheckedException {
+            KeyCacheObject key = cctx.toCacheKeyObject(cur.getKey());
+            int part = cctx.affinity().partition(key);
+
+            offheap.removex(spaceName, part, key, 
key.valueBytes(cctx.cacheObjectContext()));
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void onClose() throws IgniteCheckedException {
+            it.close();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/42766004/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
index 8553b86..51fc7e6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java
@@ -208,8 +208,8 @@ public class GridLocalCache<K, V> extends 
GridCacheAdapter<K, V> {
         Set<K> keys = new HashSet<>();
 
         if (ctx.offheapTiered()) {
-            for (Iterator<KeyCacheObject> it =
-                         ctx.swap().offHeapKeyIterator(true, true, 
AffinityTopologyVersion.NONE); it.hasNext(); )
+            for (Iterator<KeyCacheObject> it = 
ctx.swap().offHeapKeyIterator(true, true, AffinityTopologyVersion.NONE);
+                 it.hasNext(); )
                 keys.add((K)it.next().value(ctx.cacheObjectContext(), false));
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/42766004/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 672f12d..32aa93e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -392,8 +392,8 @@ public class GridLocalAtomicCache<K, V> extends 
GridCacheAdapter<K, V> {
         Set<K> keys = new HashSet<>();
 
         if (ctx.offheapTiered()) {
-            for (Iterator<KeyCacheObject> it =
-                         ctx.swap().offHeapKeyIterator(true, true, 
AffinityTopologyVersion.NONE); it.hasNext(); )
+            for (Iterator<KeyCacheObject> it = 
ctx.swap().offHeapKeyIterator(true, true, AffinityTopologyVersion.NONE);
+                 it.hasNext(); )
                 keys.add((K)it.next().value(ctx.cacheObjectContext(), false));
         }
 
@@ -1383,7 +1383,6 @@ public class GridLocalAtomicCache<K, V> extends 
GridCacheAdapter<K, V> {
      */
     private List<GridCacheEntryEx> lockEntries(Collection<? extends K> keys) {
         List<GridCacheEntryEx> locked = new ArrayList<>(keys.size());
-
         for (K key : keys) {
             if (key == null)
                 throw new NullPointerException("Null key.");

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/42766004/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 20cf915..5ef4fee 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -882,7 +882,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
                 iters.add(offheapIterator(qry, parts));
 
             if (cctx.swap().swapEnabled())
-                iters.add(swapIterator(qry));
+                iters.add(swapIterator(qry, parts));
 
             it = new CompoundIterator<>(iters);
         }
@@ -916,23 +916,37 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
 
     /**
      * @param qry Query.
+     * @param parts Collection of partitions.
      * @return Swap iterator.
      * @throws IgniteCheckedException If failed.
      */
-    private GridIterator<IgniteBiTuple<K, V>> 
swapIterator(GridCacheQueryAdapter<?> qry)
+    private GridIterator<IgniteBiTuple<K, V>> 
swapIterator(GridCacheQueryAdapter<?> qry, Collection<Integer> parts)
         throws IgniteCheckedException {
         IgniteBiPredicate<K, V> filter = qry.scanFilter();
 
-        Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawSwapIterator();
+        Iterator<Map.Entry<byte[], byte[]>> it;
+
+        if (parts == null)
+            it = cctx.swap().rawSwapIterator();
+        else {
+            List<GridIterator<Map.Entry<byte[], byte[]>>> partIts = new 
ArrayList<>();
+
+            for (Integer part : parts)
+                partIts.add(cctx.swap().rawSwapIterator(part));
+
+            it = new CompoundIterator(partIts);
+        }
 
         return scanIterator(it, filter, qry.keepPortable());
     }
 
     /**
      * @param qry Query.
+     * @param parts Collection of partitions.
      * @return Offheap iterator.
+     * @throws IgniteCheckedException If failed.
      */
-    private GridIterator<IgniteBiTuple<K, V>> 
offheapIterator(GridCacheQueryAdapter<?> qry, Set<Integer> parts)
+    private GridIterator<IgniteBiTuple<K, V>> 
offheapIterator(GridCacheQueryAdapter<?> qry, Collection<Integer> parts)
         throws IgniteCheckedException {
         IgniteBiPredicate<K, V> filter = qry.scanFilter();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/42766004/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java
index 1a780df..3f85204 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java
@@ -330,7 +330,8 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest 
extends GridCacheParti
         assertEquals(0, cache1.localSize(NEAR));
         assertEquals(5, cache1.localSize(CachePeekMode.ALL) - 
cache1.localSize(NEAR));
 
-        assertEquals(nearEnabled() && memoryMode() != 
CacheMemoryMode.OFFHEAP_TIERED ? 2 : 0, cache2.localSize(NEAR));
+        assertEquals(nearEnabled() && memoryMode() != 
CacheMemoryMode.OFFHEAP_TIERED
+            ? 2 : 0, cache2.localSize(NEAR));
         assertEquals(0, cache2.localSize(CachePeekMode.ALL) - 
cache2.localSize(NEAR));
     }
 

Reply via email to