Repository: incubator-ignite Updated Branches: refs/heads/master 92b73ff17 -> 49f0fa42d
GG-9580 - Fix for queries in replicated cache Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1483feb0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1483feb0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1483feb0 Branch: refs/heads/master Commit: 1483feb0e2eed3263ea664115385f9bbe48f927a Parents: 1812040 Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Mon Dec 22 16:00:22 2014 -0800 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Mon Dec 22 16:00:22 2014 -0800 ---------------------------------------------------------------------- .../cache/query/GridCacheQueryAdapter.java | 46 ++++++++++++++++---- .../java/org/gridgain/grid/util/GridUtils.java | 19 ++++++++ .../cache/GridCacheAbstractQuerySelfTest.java | 3 +- .../GridCacheReplicatedQuerySelfTest.java | 39 ++++++++++++++++- 4 files changed, 96 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1483feb0/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryAdapter.java index d15d77c..e22b420 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueryAdapter.java @@ -13,7 +13,6 @@ import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.security.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.query.*; import org.gridgain.grid.kernal.processors.cache.*; @@ -23,6 +22,7 @@ import org.jetbrains.annotations.*; import java.util.*; +import static org.gridgain.grid.cache.GridCacheDistributionMode.*; import static org.gridgain.grid.kernal.processors.cache.query.GridCacheQueryType.*; /** @@ -447,18 +447,48 @@ public class GridCacheQueryAdapter<T> implements GridCacheQuery<T> { * @return Nodes to execute on. */ private Collection<ClusterNode> nodes() { - Collection<ClusterNode> nodes = CU.allNodes(cctx); + GridCacheMode cacheMode = cctx.config().getCacheMode(); + + switch (cacheMode) { + case LOCAL: + if (prj != null) + U.warn(log, "Ignoring query projection because it's executed over LOCAL cache " + + "(only local node will be queried): " + this); - if (prj == null) { - if (cctx.isReplicated()) return Collections.singletonList(cctx.localNode()); - return nodes; + case REPLICATED: + if (prj != null) + return nodes(cctx, prj); + + GridCacheDistributionMode mode = cctx.config().getDistributionMode(); + + return mode == PARTITIONED_ONLY || mode == NEAR_PARTITIONED ? + Collections.singletonList(cctx.localNode()) : + Collections.singletonList(F.rand(nodes(cctx, null))); + + case PARTITIONED: + return nodes(cctx, prj); + + default: + throw new IllegalStateException("Unknown cache distribution mode: " + cacheMode); } + } + + /** + * @param cctx Cache context. + * @param prj Projection (optional). + * @return Collection of data nodes in provided projection (if any). + */ + private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx, @Nullable final ClusterGroup prj) { + assert cctx != null; + + return F.view(CU.allNodes(cctx), new P1<ClusterNode>() { + @Override public boolean apply(ClusterNode n) { + GridCacheDistributionMode mode = U.distributionMode(n, cctx.name()); - return F.view(nodes, new P1<ClusterNode>() { - @Override public boolean apply(ClusterNode e) { - return prj.node(e.id()) != null; + return (mode == PARTITIONED_ONLY || mode == NEAR_PARTITIONED) && + (prj == null || prj.node(n.id()) != null); } }); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1483feb0/modules/core/src/main/java/org/gridgain/grid/util/GridUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/util/GridUtils.java b/modules/core/src/main/java/org/gridgain/grid/util/GridUtils.java index 803badd..fb7955d 100644 --- a/modules/core/src/main/java/org/gridgain/grid/util/GridUtils.java +++ b/modules/core/src/main/java/org/gridgain/grid/util/GridUtils.java @@ -7194,6 +7194,25 @@ public abstract class GridUtils { } /** + * Gets cache distribution mode on given node or {@code null} if cache is not + * present on given node. + * + * @param n Node to check. + * @param cacheName Cache to check. + * @return Cache distribution mode or {@code null} if cache is not found. + */ + @Nullable public static GridCacheDistributionMode distributionMode(ClusterNode n, String cacheName) { + GridCacheAttributes[] caches = n.attribute(ATTR_CACHE); + + if (caches != null) + for (GridCacheAttributes attrs : caches) + if (F.eq(cacheName, attrs.cacheName())) + return attrs.partitionedTaxonomy(); + + return null; + } + + /** * Checks if given node has near cache enabled for the specified * partitioned cache. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1483feb0/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractQuerySelfTest.java index 2883215..4987a8c 100644 --- a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAbstractQuerySelfTest.java @@ -18,7 +18,6 @@ import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.spi.swapspace.file.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.query.*; import org.gridgain.grid.cache.store.*; @@ -117,7 +116,7 @@ public abstract class GridCacheAbstractQuerySelfTest extends GridCommonAbstractT cc.setCacheMode(cacheMode()); cc.setAtomicityMode(atomicityMode()); - cc.setDistributionMode(distributionMode()); + cc.setDistributionMode(gridName.startsWith("client") ? CLIENT_ONLY :distributionMode()); cc.setWriteSynchronizationMode(FULL_SYNC); cc.setStore(store); cc.setPreloadMode(SYNC); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1483feb0/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheReplicatedQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheReplicatedQuerySelfTest.java b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheReplicatedQuerySelfTest.java index d72d004..c0c3306 100644 --- a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheReplicatedQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/GridCacheReplicatedQuerySelfTest.java @@ -13,7 +13,6 @@ import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.cache.query.*; import org.gridgain.grid.kernal.*; @@ -98,6 +97,44 @@ public class GridCacheReplicatedQuerySelfTest extends GridCacheAbstractQuerySelf } /** + * @throws Exception If failed. + */ + public void testClientOnlyNode() throws Exception { + try { + Ignite g = startGrid("client"); + + GridCache<Integer, Integer> c = g.cache(null); + + for (int i = 0; i < 10; i++) + c.putx(i, i); + + // Client cache should be empty. + assertEquals(0, c.size()); + + Collection<Map.Entry<Integer, Integer>> res = + c.queries().createSqlQuery(Integer.class, "_key >= 5 order by _key").execute().get(); + + assertEquals(5, res.size()); + + Iterator<Map.Entry<Integer, Integer>> it = res.iterator(); + + int i = 5; + + while (it.hasNext()) { + Map.Entry<Integer, Integer> e = it.next(); + + assertEquals(i, e.getKey().intValue()); + assertEquals(i, e.getValue().intValue()); + + i++; + } + } + finally { + stopGrid("client"); + } + } + + /** * JUnit. * * @throws Exception If failed.