# ignite-784-1 minor
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c3e14b34 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c3e14b34 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c3e14b34 Branch: refs/heads/ignite-784-1 Commit: c3e14b34f2752d7daffefe9ad2d8cd3e7d3eaf00 Parents: 5fd5f02 Author: sboikov <sboi...@gridgain.com> Authored: Mon Apr 27 09:49:10 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Apr 27 09:49:10 2015 +0300 ---------------------------------------------------------------------- .../processors/datastreamer/DataStreamProcessor.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c3e14b34/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java index c928572..1375270 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java @@ -44,8 +44,8 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; * */ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { - /** The lowest version of ignite that is compatible with current version. */ - private static IgniteProductVersion COMPATIBLE_VERSION_SINCE = IgniteProductVersion.fromString("1.1.0"); + /** Ignite version when message field {@link DataStreamerRequest#topologyVersion()} was added. */ + private static IgniteProductVersion MSG_TOP_VER_SINCE = IgniteProductVersion.fromString("1.1.0"); /** Loaders map (access is not supposed to be highly concurrent). */ private Collection<DataStreamerImpl> ldrs = new GridConcurrentHashSet<>(); @@ -195,10 +195,10 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { AffinityTopologyVersion rmtAffVer = req.topologyVersion(); if (rmtAffVer == null) { - ClusterNode rmtNode = ctx.discovery().node(nodeId); + ClusterNode rmtNode = ctx.discovery().node(nodeId); if (rmtNode != null) - assert rmtNode.version().compareTo(COMPATIBLE_VERSION_SINCE) < 0; + assert rmtNode.version().compareTo(MSG_TOP_VER_SINCE) < 0 : rmtNode; } else { if (locAffVer.compareTo(rmtAffVer) < 0) { @@ -210,11 +210,9 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { if (fut != null && !fut.isDone()) { fut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override - public void apply(IgniteInternalFuture<?> t) { + @Override public void apply(IgniteInternalFuture<?> t) { ctx.closure().runLocalSafe(new Runnable() { - @Override - public void run() { + @Override public void run() { processRequest(nodeId, req); } }, false);