ignite-sql-tests - replicated client only cache -> run on random node

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/59528bd9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/59528bd9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/59528bd9

Branch: refs/heads/ignite-45
Commit: 59528bd9fcea9f8dca9bfd11e533272af06c8ecf
Parents: 95666e9
Author: S.Vladykin <svlady...@gridgain.com>
Authored: Mon Mar 16 01:52:23 2015 +0300
Committer: S.Vladykin <svlady...@gridgain.com>
Committed: Mon Mar 16 01:52:23 2015 +0300

----------------------------------------------------------------------
 .../internal/processors/cache/IgniteCacheProxy.java | 16 ++++++++++++++--
 .../query/h2/twostep/GridReduceQueryExecutor.java   | 10 +++++++++-
 2 files changed, 23 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59528bd9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index baea0d6..4138d11 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -413,7 +413,7 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
             if (qry instanceof SqlQuery) {
                 SqlQuery p = (SqlQuery)qry;
 
-                if (ctx.isReplicated() || ctx.isLocal())
+                if (isReplicatedDataNode() || ctx.isLocal())
                     return doLocalQuery(p);
 
                 return ctx.kernalContext().query().queryTwoStep(ctx.name(), 
p.getType(), p.getSql(), p.getArgs());
@@ -432,6 +432,18 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
         }
     }
 
+    /**
+     * @return {@code true} If this is a replicated cache and we are on a data 
node.
+     */
+    private boolean isReplicatedDataNode() {
+        if (!ctx.isReplicated())
+            return false;
+
+        ClusterGroup grp = 
ctx.kernalContext().grid().cluster().forDataNodes(ctx.name());
+
+        return grp.node(ctx.localNodeId()) != null;
+    }
+
     /** {@inheritDoc} */
     @Override public QueryCursor<List<?>> queryFields(SqlFieldsQuery qry) {
         A.notNull(qry, "qry");
@@ -441,7 +453,7 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
         try {
             validate(qry);
 
-            if (ctx.isReplicated() || ctx.isLocal())
+            if (isReplicatedDataNode() || ctx.isLocal())
                 return doLocalFieldsQuery(qry);
 
             return ctx.kernalContext().query().queryTwoStep(ctx.name(), 
qry.getSql(), qry.getArgs());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59528bd9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 365fb43..f3d6bfc 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -240,7 +240,15 @@ public class GridReduceQueryExecutor implements 
GridMessageListener {
         r.conn = h2.connectionForSpace(space);
 
         // TODO Add topology version.
-        final Collection<ClusterNode> nodes = 
ctx.grid().cluster().forCacheNodes(space).nodes();
+        ClusterGroup dataNodes = ctx.grid().cluster().forDataNodes(space);
+
+        if (ctx.cache().internalCache(space).context().isReplicated()) {
+            assert dataNodes.node(ctx.localNodeId()) == null : "We must be on 
a client node.";
+
+            dataNodes = dataNodes.forRandom(); // Select random data node to 
run query on a replicated data.
+        }
+
+        final Collection<ClusterNode> nodes = dataNodes.nodes();
 
         for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
             GridMergeTable tbl;

Reply via email to