# ignite-694

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/327a1086
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/327a1086
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/327a1086

Branch: refs/heads/ignite-sprint-3
Commit: 327a1086d60b46c49fe3fbfc73af49c21f1238f4
Parents: 76d80f4
Author: sboikov <sboi...@gridgain.com>
Authored: Wed Apr 8 12:13:37 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Wed Apr 8 12:13:37 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheIoManager.java    | 44 +++++++++++++-------
 .../processors/cache/GridCacheMessage.java      |  7 ++++
 .../GridCachePartitionExchangeManager.java      | 12 +++++-
 .../GridDhtPartitionsAbstractMessage.java       | 15 ++++---
 4 files changed, 57 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/327a1086/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 09fe2e0..6fefdfd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -84,27 +84,43 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
 
             final GridCacheMessage cacheMsg = (GridCacheMessage)msg;
 
-            AffinityTopologyVersion locAffVer = 
cctx.exchange().topologyVersion();
-            AffinityTopologyVersion rmtAffVer = cacheMsg.topologyVersion();
+            IgniteInternalFuture<?> fut = null;
 
-            if (locAffVer.compareTo(rmtAffVer) < 0) {
-                if (log.isDebugEnabled())
-                    log.debug("Received message has higher topology version 
[msg=" + msg +
-                        ", locTopVer=" + locAffVer + ", rmtTopVer=" + 
rmtAffVer + ']');
+            if (cacheMsg.partitionExchangeMessage()) {
+                long locTopVer = cctx.discovery().topologyVersion();
+                long rmtTopVer = cacheMsg.topologyVersion().topologyVersion();
 
-                IgniteInternalFuture<?> topFut = 
cctx.exchange().affinityReadyFuture(rmtAffVer);
+                if (locTopVer < rmtTopVer) {
+                    if (log.isDebugEnabled())
+                        log.debug("Received message has higher topology 
version [msg=" + msg +
+                            ", locTopVer=" + locTopVer + ", rmtTopVer=" + 
rmtTopVer + ']');
 
-                if (topFut != null && !topFut.isDone()) {
-                    topFut.listen(new CI1<IgniteInternalFuture<?>>() {
-                        @Override public void apply(IgniteInternalFuture<?> t) 
{
-                            handleMessage(nodeId, cacheMsg);
-                        }
-                    });
+                    fut = cctx.discovery().topologyFuture(rmtTopVer);
+                }
+            }
+            else {
+                AffinityTopologyVersion locAffVer = 
cctx.exchange().readyAffinityVersion();
+                AffinityTopologyVersion rmtAffVer = cacheMsg.topologyVersion();
 
-                    return;
+                if (locAffVer.compareTo(rmtAffVer) < 0) {
+                    if (log.isDebugEnabled())
+                        log.debug("Received message has higher affinity 
topology version [msg=" + msg +
+                            ", locTopVer=" + locAffVer + ", rmtTopVer=" + 
rmtAffVer + ']');
+
+                    fut = cctx.exchange().affinityReadyFuture(rmtAffVer);
                 }
             }
 
+            if (fut != null && !fut.isDone()) {
+                fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                    @Override public void apply(IgniteInternalFuture<?> t) {
+                        handleMessage(nodeId, cacheMsg);
+                    }
+                });
+
+                return;
+            }
+
             handleMessage(nodeId, cacheMsg);
         }
     };

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/327a1086/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index b9bce3e..fefd582 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -86,6 +86,13 @@ public abstract class GridCacheMessage implements Message {
     }
 
     /**
+     * @return {@code True} if this message is partition exchange message.
+     */
+    public boolean partitionExchangeMessage() {
+        return false;
+    }
+
+    /**
      * @return {@code True} if class loading errors should be ignored, false 
otherwise.
      */
     public boolean ignoreClassErrors() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/327a1086/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index e8e3ea1..0955328 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -372,13 +372,15 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
     /**
      * @param cacheId Cache ID.
+     * @return Client partition topology.
      */
     public GridClientPartitionTopology clearClientTopology(int cacheId) {
         return clientTops.remove(cacheId);
     }
 
     /**
-     * Gets topology version of last completed partition exchange.
+     * Gets topology version of last partition exchange, it is possible that 
last partition exchange
+     * is not completed yet.
      *
      * @return Topology version.
      */
@@ -390,6 +392,13 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     }
 
     /**
+     * @return Topology version of latest completed partition exchange.
+     */
+    public AffinityTopologyVersion readyAffinityVersion() {
+        return readyTopVer.get();
+    }
+
+    /**
      * @return Last completed topology future.
      */
     public GridDhtTopologyFuture lastTopologyFuture() {
@@ -796,7 +805,6 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
                     if (top != null)
                         updated |= top.update(null, entry.getValue()) != null;
-
                 }
 
                 if (updated)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/327a1086/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index 6af0072..5a8616d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@ -46,11 +46,6 @@ abstract class GridDhtPartitionsAbstractMessage extends 
GridCacheMessage {
         // No-op.
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean allowForStartup() {
-        return true;
-    }
-
     /**
      * @param exchId Exchange ID.
      * @param lastVer Last version.
@@ -60,6 +55,16 @@ abstract class GridDhtPartitionsAbstractMessage extends 
GridCacheMessage {
         this.lastVer = lastVer;
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean allowForStartup() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean partitionExchangeMessage() {
+        return true;
+    }
+
     /**
      * @return Exchange ID.
      */

Reply via email to