ignite-sql-tests - replicated client only cache -> run on random node
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/59528bd9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/59528bd9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/59528bd9 Branch: refs/heads/ignite-45 Commit: 59528bd9fcea9f8dca9bfd11e533272af06c8ecf Parents: 95666e9 Author: S.Vladykin <svlady...@gridgain.com> Authored: Mon Mar 16 01:52:23 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Mon Mar 16 01:52:23 2015 +0300 ---------------------------------------------------------------------- .../internal/processors/cache/IgniteCacheProxy.java | 16 ++++++++++++++-- .../query/h2/twostep/GridReduceQueryExecutor.java | 10 +++++++++- 2 files changed, 23 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59528bd9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index baea0d6..4138d11 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -413,7 +413,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V if (qry instanceof SqlQuery) { SqlQuery p = (SqlQuery)qry; - if (ctx.isReplicated() || ctx.isLocal()) + if (isReplicatedDataNode() || ctx.isLocal()) return doLocalQuery(p); return ctx.kernalContext().query().queryTwoStep(ctx.name(), p.getType(), p.getSql(), p.getArgs()); @@ -432,6 +432,18 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } } + /** + * @return {@code true} If this is a replicated cache and we are on a data node. + */ + private boolean isReplicatedDataNode() { + if (!ctx.isReplicated()) + return false; + + ClusterGroup grp = ctx.kernalContext().grid().cluster().forDataNodes(ctx.name()); + + return grp.node(ctx.localNodeId()) != null; + } + /** {@inheritDoc} */ @Override public QueryCursor<List<?>> queryFields(SqlFieldsQuery qry) { A.notNull(qry, "qry"); @@ -441,7 +453,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V try { validate(qry); - if (ctx.isReplicated() || ctx.isLocal()) + if (isReplicatedDataNode() || ctx.isLocal()) return doLocalFieldsQuery(qry); return ctx.kernalContext().query().queryTwoStep(ctx.name(), qry.getSql(), qry.getArgs()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59528bd9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 365fb43..f3d6bfc 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -240,7 +240,15 @@ public class GridReduceQueryExecutor implements GridMessageListener { r.conn = h2.connectionForSpace(space); // TODO Add topology version. - final Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(space).nodes(); + ClusterGroup dataNodes = ctx.grid().cluster().forDataNodes(space); + + if (ctx.cache().internalCache(space).context().isReplicated()) { + assert dataNodes.node(ctx.localNodeId()) == null : "We must be on a client node."; + + dataNodes = dataNodes.forRandom(); // Select random data node to run query on a replicated data. + } + + final Collection<ClusterNode> nodes = dataNodes.nodes(); for (GridCacheSqlQuery mapQry : qry.mapQueries()) { GridMergeTable tbl;