Repository: incubator-ignite Updated Branches: refs/heads/ignite-784-1 [created] b5ed4f20f
#ignite-784: fix compatibility suite. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6bd55a85 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6bd55a85 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6bd55a85 Branch: refs/heads/ignite-784-1 Commit: 6bd55a853750625e5bff790eeb5ca001e47ac75f Parents: 381fd1a Author: ivasilinets <ivasilin...@gridgain.com> Authored: Fri Apr 24 14:36:44 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Fri Apr 24 14:36:44 2015 +0300 ---------------------------------------------------------------------- .../datastreamer/DataStreamProcessor.java | 48 +++++++++++++------- 1 file changed, 31 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6bd55a85/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java index 9e53bb5..3e6f207 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.worker.*; +import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.*; import org.apache.ignite.stream.*; import org.apache.ignite.thread.*; @@ -36,12 +37,16 @@ import java.util.*; import java.util.concurrent.*; import static org.apache.ignite.internal.GridTopic.*; +import static org.apache.ignite.internal.IgniteNodeAttributes.*; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; /** * */ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { + /** The lowest version of ignite that is compatible with current version. */ + private static IgniteProductVersion COMPATIBLE_VERSION_SINCE = IgniteProductVersion.fromString("1.0.4"); + /** Loaders map (access is not supposed to be highly concurrent). */ private Collection<DataStreamerImpl> ldrs = new GridConcurrentHashSet<>(); @@ -189,25 +194,34 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { AffinityTopologyVersion locAffVer = ctx.cache().context().exchange().readyAffinityVersion(); AffinityTopologyVersion rmtAffVer = req.topologyVersion(); - if (locAffVer.compareTo(rmtAffVer) < 0) { - if (log.isDebugEnabled()) - log.debug("Received request has higher affinity topology version [request=" + req + - ", locTopVer=" + locAffVer + ", rmtTopVer=" + rmtAffVer + ']'); - - IgniteInternalFuture<?> fut = ctx.cache().context().exchange().affinityReadyFuture(rmtAffVer); + if (rmtAffVer == null) { + IgniteProductVersion rmtVer = ctx.discovery().node(nodeId).version(); - if (fut != null && !fut.isDone()) { - fut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> t) { - ctx.closure().runLocalSafe(new Runnable() { - @Override public void run() { - processRequest(nodeId, req); - } - }, false); - } - }); + assert rmtVer.compareTo(COMPATIBLE_VERSION_SINCE) < 0; + } + else { + if (locAffVer.compareTo(rmtAffVer) < 0) { + if (log.isDebugEnabled()) + log.debug("Received request has higher affinity topology version [request=" + req + + ", locTopVer=" + locAffVer + ", rmtTopVer=" + rmtAffVer + ']'); + + IgniteInternalFuture<?> fut = ctx.cache().context().exchange().affinityReadyFuture(rmtAffVer); + + if (fut != null && !fut.isDone()) { + fut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override + public void apply(IgniteInternalFuture<?> t) { + ctx.closure().runLocalSafe(new Runnable() { + @Override + public void run() { + processRequest(nodeId, req); + } + }, false); + } + }); - return; + return; + } } }