Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-784-1 [created] b5ed4f20f


#ignite-784: fix compatibility suite.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6bd55a85
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6bd55a85
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6bd55a85

Branch: refs/heads/ignite-784-1
Commit: 6bd55a853750625e5bff790eeb5ca001e47ac75f
Parents: 381fd1a
Author: ivasilinets <ivasilin...@gridgain.com>
Authored: Fri Apr 24 14:36:44 2015 +0300
Committer: ivasilinets <ivasilin...@gridgain.com>
Committed: Fri Apr 24 14:36:44 2015 +0300

----------------------------------------------------------------------
 .../datastreamer/DataStreamProcessor.java       | 48 +++++++++++++-------
 1 file changed, 31 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6bd55a85/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 9e53bb5..3e6f207 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.util.worker.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.marshaller.*;
 import org.apache.ignite.stream.*;
 import org.apache.ignite.thread.*;
@@ -36,12 +37,16 @@ import java.util.*;
 import java.util.concurrent.*;
 
 import static org.apache.ignite.internal.GridTopic.*;
+import static org.apache.ignite.internal.IgniteNodeAttributes.*;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
 
 /**
  *
  */
 public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
+    /** The lowest version of ignite that is compatible with current version. 
*/
+    private static IgniteProductVersion COMPATIBLE_VERSION_SINCE = 
IgniteProductVersion.fromString("1.0.4");
+
     /** Loaders map (access is not supposed to be highly concurrent). */
     private Collection<DataStreamerImpl> ldrs = new GridConcurrentHashSet<>();
 
@@ -189,25 +194,34 @@ public class DataStreamProcessor<K, V> extends 
GridProcessorAdapter {
             AffinityTopologyVersion locAffVer = 
ctx.cache().context().exchange().readyAffinityVersion();
             AffinityTopologyVersion rmtAffVer = req.topologyVersion();
 
-            if (locAffVer.compareTo(rmtAffVer) < 0) {
-                if (log.isDebugEnabled())
-                    log.debug("Received request has higher affinity topology 
version [request=" + req +
-                        ", locTopVer=" + locAffVer + ", rmtTopVer=" + 
rmtAffVer + ']');
-
-                IgniteInternalFuture<?> fut = 
ctx.cache().context().exchange().affinityReadyFuture(rmtAffVer);
+            if (rmtAffVer == null) {
+                IgniteProductVersion rmtVer = 
ctx.discovery().node(nodeId).version();
 
-                if (fut != null && !fut.isDone()) {
-                    fut.listen(new CI1<IgniteInternalFuture<?>>() {
-                        @Override public void apply(IgniteInternalFuture<?> t) 
{
-                            ctx.closure().runLocalSafe(new Runnable() {
-                                @Override public void run() {
-                                    processRequest(nodeId, req);
-                                }
-                            }, false);
-                        }
-                    });
+                assert rmtVer.compareTo(COMPATIBLE_VERSION_SINCE) < 0;
+            }
+            else {
+                if (locAffVer.compareTo(rmtAffVer) < 0) {
+                    if (log.isDebugEnabled())
+                        log.debug("Received request has higher affinity 
topology version [request=" + req +
+                            ", locTopVer=" + locAffVer + ", rmtTopVer=" + 
rmtAffVer + ']');
+
+                    IgniteInternalFuture<?> fut = 
ctx.cache().context().exchange().affinityReadyFuture(rmtAffVer);
+
+                    if (fut != null && !fut.isDone()) {
+                        fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                            @Override
+                            public void apply(IgniteInternalFuture<?> t) {
+                                ctx.closure().runLocalSafe(new Runnable() {
+                                    @Override
+                                    public void run() {
+                                        processRequest(nodeId, req);
+                                    }
+                                }, false);
+                            }
+                        });
 
-                    return;
+                        return;
+                    }
                 }
             }
 

Reply via email to