#IGNITE-389 - Partition scan review and fixes.w
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d72b040c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d72b040c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d72b040c Branch: refs/heads/ignite-929 Commit: d72b040ccc4718852747d42152a448b9653f2c3f Parents: 381c169 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Tue May 26 12:08:10 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Tue May 26 12:08:10 2015 -0700 ---------------------------------------------------------------------- .../apache/ignite/cache/query/ScanQuery.java | 39 +++++++++++++++++++- .../processors/cache/IgniteCacheProxy.java | 3 +- .../cache/query/GridCacheQueryAdapter.java | 5 ++- .../cache/IgniteCacheAbstractQuerySelfTest.java | 11 +++--- .../org/apache/ignite/spark/IgniteRDD.scala | 6 ++- 5 files changed, 55 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d72b040c/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java index 688eb2e..f56b0c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java @@ -36,11 +36,18 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K, V>> { /** */ private IgniteBiPredicate<K, V> filter; + /** */ + private Integer part; + /** * Create scan query returning all entries. */ public ScanQuery() { - this(null); + this(null, null); + } + + public ScanQuery(int part) { + this(part, null); } /** @@ -49,6 +56,16 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K, V>> { * @param filter Filter. If {@code null} then all entries will be returned. */ public ScanQuery(@Nullable IgniteBiPredicate<K, V> filter) { + this(null, filter); + } + + /** + * Create scan query with filter. + * + * @param filter Filter. If {@code null} then all entries will be returned. + */ + public ScanQuery(Integer part, @Nullable IgniteBiPredicate<K, V> filter) { + setPartition(part); setFilter(filter); } @@ -73,6 +90,26 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K, V>> { return this; } + /** + * Gets partition number over which this query should iterate. Will return {@code null} if partition was not + * set. In this case query will iterate over all partitions in the cache. + * + * @return Partition number or {@code null}. + */ + public Integer getPartition() { + return part; + } + + /** + * Sets partition number over which this query should iterate. If {@code null}, query will iterate over + * all partitions in the cache. Must be in the range [0, N) where N is partition number in the cache. + * + * @param part Partition number over which this query should iterate. + */ + public void setPartition(Integer part) { + this.part = part; + } + /** {@inheritDoc} */ @Override public ScanQuery<K, V> setPageSize(int pageSize) { return (ScanQuery<K, V>)super.setPageSize(pageSize); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d72b040c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 0009127..176543b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -353,7 +353,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V if (filter instanceof ScanQuery) { IgniteBiPredicate<K, V> p = ((ScanQuery)filter).getFilter(); - qry = ctx.queries().createScanQuery(p != null ? p : ACCEPT_ALL, null, isKeepPortable); + qry = ctx.queries().createScanQuery(p != null ? p : ACCEPT_ALL, ((ScanQuery)filter).getPartition(), + isKeepPortable); if (grp != null) qry.projection(grp); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d72b040c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index d976d2c..9ab8c4f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -487,11 +487,14 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { @Nullable final ClusterGroup prj, @Nullable final Integer part) { assert cctx != null; + final List<ClusterNode> owners = part == null ? null : + cctx.topology().owners(part, cctx.affinity().affinityTopologyVersion()); + return F.view(CU.allNodes(cctx), new P1<ClusterNode>() { @Override public boolean apply(ClusterNode n) { return cctx.discovery().cacheAffinityNode(n, cctx.name()) && (prj == null || prj.node(n.id()) != null) && - (part == null || cctx.affinity().primary(n , part, cctx.affinity().affinityTopologyVersion())); + (part == null || owners.contains(n)); } }); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d72b040c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java index eb5027c..228526f 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java @@ -669,19 +669,20 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac } for (int i = 0; i < cctx.affinity().partitions(); i++) { - CacheQuery<Map.Entry<Integer, Integer>> qry = - ((IgniteCacheProxy<Integer, Integer>)cache).context().queries().createScanQuery(null, i, false); + ScanQuery<Integer, Integer> scan = new ScanQuery<>(i); - CacheQueryFuture<Map.Entry<Integer, Integer>> fut = qry.execute(); + Collection<Cache.Entry<Integer, Integer>> actual = cache.query(scan).getAll(); Map<Integer, Integer> exp = entries.get(i); - Collection<Map.Entry<Integer, Integer>> actual = fut.get(); + int size = exp == null ? 0 : exp.size(); + + assertEquals("Failed for partition: " + i, size, actual.size()); if (exp == null) assertTrue(actual.isEmpty()); else - for (Map.Entry<Integer, Integer> entry : actual) + for (Cache.Entry<Integer, Integer> entry : actual) assertTrue(entry.getValue().equals(exp.get(entry.getKey()))); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d72b040c/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala index 281c483..6a3720c 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala @@ -37,7 +37,11 @@ class IgniteRDD[K, V] ( override def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = { val cache = ensureCache() - val it: java.util.Iterator[Cache.Entry[K, V]] = cache.query(new ScanQuery[K, V]()).iterator() + val qry: ScanQuery[K, V] = new ScanQuery[K, V]() + + qry.setPartition(part.index) + + val it: java.util.Iterator[Cache.Entry[K, V]] = cache.query(qry).iterator() new IgniteQueryIterator[Cache.Entry[K, V], (K, V)](it, entry => { (entry.getKey, entry.getValue)