Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-1239 be97bdee1 -> a235c1ebb


ignite-1239: fallback retries using a topology version when unreserved 
exception happened


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a235c1eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a235c1eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a235c1eb

Branch: refs/heads/ignite-1239
Commit: a235c1ebb4e222d17dc48e3b728e256d5e77c32c
Parents: be97bde
Author: Denis Magda <dma...@gridgain.com>
Authored: Fri Aug 14 12:32:37 2015 +0300
Committer: Denis Magda <dma...@gridgain.com>
Committed: Fri Aug 14 12:32:37 2015 +0300

----------------------------------------------------------------------
 .../GridDhtUnreservedPartitionException.java    | 15 +++++-
 .../cache/query/GridCacheQueryAdapter.java      | 50 +++++++++++++-------
 .../cache/query/GridCacheQueryManager.java      |  3 +-
 3 files changed, 49 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a235c1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnreservedPartitionException.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnreservedPartitionException.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnreservedPartitionException.java
index e64019b..d824a47 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnreservedPartitionException.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnreservedPartitionException.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.affinity.*;
 
 /**
  * Exception that is thrown when a partition reservation failed.
@@ -29,14 +30,19 @@ public class GridDhtUnreservedPartitionException extends 
IgniteCheckedException
     /** Partition. */
     private final int part;
 
+    /** Topology version. */
+    private final AffinityTopologyVersion topVer;
+
     /**
      * @param part Partition.
+     * @param topVer Affinity topology version.
      * @param msg Message.
      */
-    public GridDhtUnreservedPartitionException(int part, String msg) {
+    public GridDhtUnreservedPartitionException(int part, 
AffinityTopologyVersion topVer, String msg) {
         super(msg);
 
         this.part = part;
+        this.topVer = topVer;
     }
 
     /**
@@ -46,6 +52,13 @@ public class GridDhtUnreservedPartitionException extends 
IgniteCheckedException
         return part;
     }
 
+    /**
+     * @return Affinity topology version.
+     */
+    public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return getClass() + " [part=" + part + ", msg=" + getMessage() + ']';

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a235c1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index d4d9d00..5253835 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -458,7 +458,7 @@ public class GridCacheQueryAdapter<T> implements 
CacheQuery<T> {
             return (CacheQueryFuture<R>)(loc ? qryMgr.queryFieldsLocal(bean) :
                 qryMgr.queryFieldsDistributed(bean, nodes));
         else if (type == SCAN && part != null && nodes.size() > 1)
-            return new CacheQueryFallbackFuture<>(nodes, bean, qryMgr);
+            return new CacheQueryFallbackFuture<>(nodes, part, bean, qryMgr, 
cctx);
         else
             return (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) : 
qryMgr.queryDistributed(bean, nodes));
     }
@@ -557,8 +557,11 @@ public class GridCacheQueryAdapter<T> implements 
CacheQuery<T> {
         /** Backups. */
         private volatile Queue<ClusterNode> nodes;
 
-        /** Backups that failed with {@link 
GridDhtUnreservedPartitionException}. */
-        private volatile Collection<ClusterNode> unreservedNodes;
+        /** Topology version of the last detected {@link 
GridDhtUnreservedPartitionException}. */
+        private volatile AffinityTopologyVersion unreservedTopVer;
+
+        /** Number of times to retry the query on the nodes failed with {@link 
GridDhtUnreservedPartitionException}. */
+        private volatile int unreservedNodesRetryCnt = 5;
 
         /** Bean. */
         private final GridCacheQueryBean bean;
@@ -566,19 +569,26 @@ public class GridCacheQueryAdapter<T> implements 
CacheQuery<T> {
         /** Query manager. */
         private final GridCacheQueryManager qryMgr;
 
-        /** Number of times to retry the query on the nodes failed with {@link 
GridDhtUnreservedPartitionException}. */
-        private volatile int unreservedNodesRetryCnt = 5;
+        /** Cache context. */
+        private final GridCacheContext cctx;
+
+        /** Partition. */
+        private final int part;
 
         /**
          * @param nodes Backups.
+         * @param part Partition.
          * @param bean Bean.
          * @param qryMgr Query manager.
+         * @param cctx Cache context.
          */
-        public CacheQueryFallbackFuture(Collection<ClusterNode> nodes, 
GridCacheQueryBean bean,
-            GridCacheQueryManager qryMgr) {
+        public CacheQueryFallbackFuture(Collection<ClusterNode> nodes, int 
part, GridCacheQueryBean bean,
+            GridCacheQueryManager qryMgr, GridCacheContext cctx) {
             this.nodes = fallbacks(nodes);
             this.bean = bean;
             this.qryMgr = qryMgr;
+            this.cctx = cctx;
+            this.part = part;
 
             init();
         }
@@ -622,23 +632,29 @@ public class GridCacheQueryAdapter<T> implements 
CacheQuery<T> {
                     catch (IgniteCheckedException e) {
                         if (e.getCause() != null && e.getCause().getClass() ==
                             GridDhtUnreservedPartitionException.class) {
-                            // The race is impossible here because fallback 
queries are executed one by one.
-                            // Volatile guarantees visibility.
-                            if (unreservedNodes == null)
-                                unreservedNodes = new ArrayList<>(nodes.size() 
+ 1);
+                            unreservedTopVer = 
((GridDhtUnreservedPartitionException)e.getCause()).topologyVersion();
 
-                            unreservedNodes.add(node);
+                            assert unreservedTopVer != null;
                         }
 
                         if (F.isEmpty(nodes)) {
-                            if (unreservedNodes != null && 
--unreservedNodesRetryCnt > 0) {
-                                assert unreservedNodes.size() > 0;
+                            final AffinityTopologyVersion topVer = 
unreservedTopVer;
+
+                            if (topVer != null && --unreservedNodesRetryCnt > 
0) {
+
+                                
cctx.affinity().affinityReadyFuture(topVer).listen(
+                                    new 
IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
+                                        @Override public void apply(
+                                            
IgniteInternalFuture<AffinityTopologyVersion> future) {
 
-                                nodes = fallbacks(unreservedNodes);
+                                            nodes = 
fallbacks(cctx.topology().owners(part, topVer));
 
-                                unreservedNodes = null;
+                                            // Race is impossible here because 
query retries are executed one by one.
+                                            unreservedTopVer = null;
 
-                                init();
+                                            init();
+                                        }
+                                    });
                             }
                             else
                                 onDone(e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a235c1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 5c1aa4e..bfe5ecc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -795,7 +795,8 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
                         // double check for owning state
                         if (locPart == null || locPart.state() != OWNING || 
!locPart.reserve() ||
                             locPart.state() != OWNING)
-                            throw new 
GridDhtUnreservedPartitionException(part, "Partition can not be reserved");
+                            throw new GridDhtUnreservedPartitionException(part,
+                                cctx.affinity().affinityTopologyVersion(), 
"Partition can not be reserved");
 
                         iter = new Iterator<K>() {
                             private Iterator<KeyCacheObject> iter0 = 
locPart.keySet().iterator();

Reply via email to