Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-484 35008b671 -> f2b96e0b5


ignite-484 - retry with topology version


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9c7274b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9c7274b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9c7274b0

Branch: refs/heads/ignite-484
Commit: 9c7274b0ac33936893bd532b70b57df5c668eb3f
Parents: 35008b6
Author: S.Vladykin <svlady...@gridgain.com>
Authored: Wed May 13 12:41:51 2015 +0300
Committer: S.Vladykin <svlady...@gridgain.com>
Committed: Wed May 13 12:41:51 2015 +0300

----------------------------------------------------------------------
 .../messages/GridQueryNextPageResponse.java     | 43 +++++++++-----------
 .../processors/query/h2/IgniteH2Indexing.java   | 13 ++++++
 .../query/h2/twostep/GridMapQueryExecutor.java  | 17 ++------
 .../h2/twostep/GridReduceQueryExecutor.java     | 22 ++++++----
 4 files changed, 48 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c7274b0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
index c2cca75..b881f93 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.h2.twostep.messages;
 
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 
@@ -33,12 +34,6 @@ public class GridQueryNextPageResponse implements Message {
     private static final long serialVersionUID = 0L;
 
     /** */
-    public static final byte CODE_OK = 0;
-
-    /** */
-    public static final byte CODE_RETRY = -1;
-
-    /** */
     private long qryReqId;
 
     /** */
@@ -61,8 +56,8 @@ public class GridQueryNextPageResponse implements Message {
     @GridDirectTransient
     private transient Collection<?> plainRows;
 
-    /** Response code. */
-    private byte code = CODE_OK;
+    /** */
+    private AffinityTopologyVersion retry;
 
     /**
      * For {@link Externalizable}.
@@ -95,20 +90,6 @@ public class GridQueryNextPageResponse implements Message {
     }
 
     /**
-     * @return Response code.
-     */
-    public byte code() {
-        return code;
-    }
-
-    /**
-     * @param code Response code.
-     */
-    public void code(byte code) {
-        this.code = code;
-    }
-
-    /**
      * @return Query request ID.
      */
     public long queryRequestId() {
@@ -211,7 +192,7 @@ public class GridQueryNextPageResponse implements Message {
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeByte("code", code))
+                if (!writer.writeMessage("retry", retry))
                     return false;
 
                 writer.incrementState();
@@ -277,7 +258,7 @@ public class GridQueryNextPageResponse implements Message {
                 reader.incrementState();
 
             case 6:
-                code = reader.readByte("code");
+                retry = reader.readMessage("retry");
 
                 if (!reader.isLastRead())
                     return false;
@@ -298,4 +279,18 @@ public class GridQueryNextPageResponse implements Message {
     @Override public byte fieldsCount() {
         return 7;
     }
+
+    /**
+     * @return Retry topology version.
+     */
+    public AffinityTopologyVersion retry() {
+        return retry;
+    }
+
+    /**
+     * @param retry Retry topology version.
+     */
+    public void retry(AffinityTopologyVersion retry) {
+        this.retry = retry;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c7274b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 44db280..31a33ef 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1379,6 +1379,19 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     }
 
     /**
+     * @param topVer Topology version.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void awaitForCacheAffinity(AffinityTopologyVersion topVer) throws 
IgniteCheckedException {
+        assert topVer != null;
+
+        IgniteInternalFuture<?> fut = 
ctx.cache().context().exchange().affinityReadyFuture(topVer);
+
+        if (fut != null)
+            fut.get();
+    }
+
+    /**
      * Wrapper to store connection and flag is schema set or not.
      */
     private static class ConnectionWrapper {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c7274b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index d01a8a4..39595b1 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -216,21 +216,10 @@ public class GridMapQueryExecutor {
     }
 
     /**
-     * @param topVer Topology version.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void awaitForCacheAffinity(AffinityTopologyVersion topVer) throws 
IgniteCheckedException {
-        IgniteInternalFuture<?> fut = 
ctx.cache().context().exchange().affinityReadyFuture(topVer);
-
-        if (fut != null)
-            fut.get();
-    }
-
-    /**
      * @param cacheNames Cache names.
      * @param topVer Topology version.
      * @param reserved Reserved list.
-     * @return {@code true} If all the needed partitions succesfuly reserved.
+     * @return {@code true} If all the needed partitions successfully reserved.
      * @throws IgniteCheckedException If failed.
      */
     private boolean reservePartitions(Collection<String> cacheNames,  
AffinityTopologyVersion topVer,
@@ -297,7 +286,7 @@ public class GridMapQueryExecutor {
 
             if (topVer != null) {
                 // Await all caches to be deployed on this node and all the 
needed topology changes to arrive.
-                awaitForCacheAffinity(topVer);
+                h2.awaitForCacheAffinity(topVer);
 
                 // Reserve primary partitions.
                 if (!reservePartitions(F.concat(true, req.space(), 
req.extraSpaces()), topVer, reserved)) {
@@ -476,7 +465,7 @@ public class GridMapQueryExecutor {
             loc ? null : Collections.<Message>emptyList(),
             loc ? Collections.<Value[]>emptyList() : null);
 
-        msg.code(GridQueryNextPageResponse.CODE_RETRY);
+        msg.retry(ctx.discovery().topologyVersionEx());
 
         ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, 
GridIoPolicy.PUBLIC_POOL);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c7274b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 68c7048..76de71b 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -261,10 +261,13 @@ public class GridReduceQueryExecutor {
 
         idx.addPage(page);
 
-        if (msg.code() == GridQueryNextPageResponse.CODE_RETRY)
-            r.retry = true;
+        if (msg.retry() != null) {
+            r.retry = msg.retry();
 
-        if (msg.allRows() != -1) // Only the first page contains row count.
+            while (r.latch.getCount() != 0)
+                r.latch.countDown();
+        }
+        else if (msg.allRows() != -1) // Only the first page contains row 
count.
             r.latch.countDown();
     }
 
@@ -274,7 +277,7 @@ public class GridReduceQueryExecutor {
      * @return Cursor.
      */
     public QueryCursor<List<?>> query(GridCacheContext<?,?> cctx, 
GridCacheTwoStepQuery qry) {
-        for (int attempt = 0;; attempt++) {
+        for (;;) {
             long qryReqId = reqIdGen.incrementAndGet();
 
             QueryRun r = new QueryRun();
@@ -349,7 +352,9 @@ public class GridReduceQueryExecutor {
 
                 ResultSet res = null;
 
-                if (!r.retry) {
+                AffinityTopologyVersion retry = r.retry;
+
+                if (retry == null) {
                     if (qry.explain())
                         return explainPlan(r.conn, space, qry);
 
@@ -365,9 +370,8 @@ public class GridReduceQueryExecutor {
 //                dropTable(r.conn, tbl.getName()); TODO
                 }
 
-                if (r.retry) {
-                    if (attempt > 0)
-                        U.sleep(attempt * 10);
+                if (retry != null) {
+                    h2.awaitForCacheAffinity(retry);
 
                     continue;
                 }
@@ -697,7 +701,7 @@ public class GridReduceQueryExecutor {
         private volatile CacheException rmtErr;
 
         /** */
-        private volatile boolean retry;
+        private volatile AffinityTopologyVersion retry;
     }
 
     /**

Reply via email to