# ignite-694
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/327a1086 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/327a1086 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/327a1086 Branch: refs/heads/ignite-sprint-3 Commit: 327a1086d60b46c49fe3fbfc73af49c21f1238f4 Parents: 76d80f4 Author: sboikov <sboi...@gridgain.com> Authored: Wed Apr 8 12:13:37 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Apr 8 12:13:37 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheIoManager.java | 44 +++++++++++++------- .../processors/cache/GridCacheMessage.java | 7 ++++ .../GridCachePartitionExchangeManager.java | 12 +++++- .../GridDhtPartitionsAbstractMessage.java | 15 ++++--- 4 files changed, 57 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/327a1086/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 09fe2e0..6fefdfd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -84,27 +84,43 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { final GridCacheMessage cacheMsg = (GridCacheMessage)msg; - AffinityTopologyVersion locAffVer = cctx.exchange().topologyVersion(); - AffinityTopologyVersion rmtAffVer = cacheMsg.topologyVersion(); + IgniteInternalFuture<?> fut = null; - if (locAffVer.compareTo(rmtAffVer) < 0) { - if (log.isDebugEnabled()) - log.debug("Received message has higher topology version [msg=" + msg + - ", locTopVer=" + locAffVer + ", rmtTopVer=" + rmtAffVer + ']'); + if (cacheMsg.partitionExchangeMessage()) { + long locTopVer = cctx.discovery().topologyVersion(); + long rmtTopVer = cacheMsg.topologyVersion().topologyVersion(); - IgniteInternalFuture<?> topFut = cctx.exchange().affinityReadyFuture(rmtAffVer); + if (locTopVer < rmtTopVer) { + if (log.isDebugEnabled()) + log.debug("Received message has higher topology version [msg=" + msg + + ", locTopVer=" + locTopVer + ", rmtTopVer=" + rmtTopVer + ']'); - if (topFut != null && !topFut.isDone()) { - topFut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> t) { - handleMessage(nodeId, cacheMsg); - } - }); + fut = cctx.discovery().topologyFuture(rmtTopVer); + } + } + else { + AffinityTopologyVersion locAffVer = cctx.exchange().readyAffinityVersion(); + AffinityTopologyVersion rmtAffVer = cacheMsg.topologyVersion(); - return; + if (locAffVer.compareTo(rmtAffVer) < 0) { + if (log.isDebugEnabled()) + log.debug("Received message has higher affinity topology version [msg=" + msg + + ", locTopVer=" + locAffVer + ", rmtTopVer=" + rmtAffVer + ']'); + + fut = cctx.exchange().affinityReadyFuture(rmtAffVer); } } + if (fut != null && !fut.isDone()) { + fut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> t) { + handleMessage(nodeId, cacheMsg); + } + }); + + return; + } + handleMessage(nodeId, cacheMsg); } }; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/327a1086/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java index b9bce3e..fefd582 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java @@ -86,6 +86,13 @@ public abstract class GridCacheMessage implements Message { } /** + * @return {@code True} if this message is partition exchange message. + */ + public boolean partitionExchangeMessage() { + return false; + } + + /** * @return {@code True} if class loading errors should be ignored, false otherwise. */ public boolean ignoreClassErrors() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/327a1086/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index e8e3ea1..0955328 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -372,13 +372,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** * @param cacheId Cache ID. + * @return Client partition topology. */ public GridClientPartitionTopology clearClientTopology(int cacheId) { return clientTops.remove(cacheId); } /** - * Gets topology version of last completed partition exchange. + * Gets topology version of last partition exchange, it is possible that last partition exchange + * is not completed yet. * * @return Topology version. */ @@ -390,6 +392,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** + * @return Topology version of latest completed partition exchange. + */ + public AffinityTopologyVersion readyAffinityVersion() { + return readyTopVer.get(); + } + + /** * @return Last completed topology future. */ public GridDhtTopologyFuture lastTopologyFuture() { @@ -796,7 +805,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (top != null) updated |= top.update(null, entry.getValue()) != null; - } if (updated) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/327a1086/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java index 6af0072..5a8616d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java @@ -46,11 +46,6 @@ abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage { // No-op. } - /** {@inheritDoc} */ - @Override public boolean allowForStartup() { - return true; - } - /** * @param exchId Exchange ID. * @param lastVer Last version. @@ -60,6 +55,16 @@ abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage { this.lastVer = lastVer; } + /** {@inheritDoc} */ + @Override public boolean allowForStartup() { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean partitionExchangeMessage() { + return true; + } + /** * @return Exchange ID. */