Repository: incubator-ignite Updated Branches: refs/heads/ignite-836_2 [created] 94fed6571
# IGNITE-831 Create parent class for custom messages: DiscoveryCustomMessage. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1a4de262 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1a4de262 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1a4de262 Branch: refs/heads/ignite-836_2 Commit: 1a4de262b9a2f0d2bd589473546dd6ad3008a248 Parents: f9f7662 Author: sevdokimov <sergey.evdoki...@jetbrains.com> Authored: Mon May 4 12:05:49 2015 +0300 Committer: sevdokimov <sergey.evdoki...@jetbrains.com> Committed: Mon May 4 12:05:49 2015 +0300 ---------------------------------------------------------------------- .../discovery/DiscoveryCustomMessage.java | 32 +++++++++++++++++ .../discovery/GridDiscoveryManager.java | 38 +++++++++++--------- .../cache/DynamicCacheChangeBatch.java | 9 +++-- .../ignite/spi/discovery/DiscoverySpi.java | 4 +-- .../discovery/tcp/TcpClientDiscoverySpi.java | 3 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 3 +- 6 files changed, 67 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1a4de262/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java new file mode 100644 index 0000000..c797ebd --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.discovery; + +import java.io.*; + +/** + * + */ +public interface DiscoveryCustomMessage extends Serializable { + /** + * Whether or not minor version of topology should be increased on message receive. + * + * @return {@code true} if minor topology version should be increased. + */ + public boolean forwardMinorVersion(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1a4de262/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 0950774..0df7d5f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -279,19 +279,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } } - /** - * @param evtType Event type. - * @return Next affinity topology version. - */ - private AffinityTopologyVersion nextTopologyVersion(int evtType, long topVer) { - if (evtType == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) - minorTopVer++; - else if (evtType != EVT_NODE_METRICS_UPDATED) - minorTopVer = 0; - - return new AffinityTopologyVersion(topVer, minorTopVer); - } - /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { long totSysMemory = -1; @@ -363,7 +350,26 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { if (snapshots != null) topHist = snapshots; - AffinityTopologyVersion nextTopVer = nextTopologyVersion(type, topVer); + boolean verChanged; + + if (type == EVT_NODE_METRICS_UPDATED) + verChanged = false; + else if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) { + if (data != null && ((DiscoveryCustomMessage)data).forwardMinorVersion()) { + minorTopVer++; + + verChanged = true; + } + else + verChanged = false; + } + else { + minorTopVer = 0; + + verChanged = true; + } + + AffinityTopologyVersion nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer); if (type == EVT_NODE_FAILED || type == EVT_NODE_LEFT) { for (DiscoCache c : discoCacheHist.values()) @@ -385,7 +391,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { // Put topology snapshot into discovery history. // There is no race possible between history maintenance and concurrent discovery // event notifications, since SPI notifies manager about all events from this listener. - if (type != EVT_NODE_METRICS_UPDATED) { + if (verChanged) { DiscoCache cache = new DiscoCache(locNode, F.view(topSnapshot, F.remoteNodes(locNode.id()))); discoCacheHist.put(nextTopVer, cache); @@ -1386,7 +1392,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** * @param evt Event. */ - public void sendCustomEvent(Serializable evt) { + public void sendCustomEvent(DiscoveryCustomMessage evt) { getSpi().sendCustomEvent(evt); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1a4de262/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java index ac3660e..0257307 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java @@ -17,16 +17,16 @@ package org.apache.ignite.internal.processors.cache; +import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; -import java.io.*; import java.util.*; /** * Cache change batch. */ -public class DynamicCacheChangeBatch implements Serializable { +public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { /** */ private static final long serialVersionUID = 0L; @@ -72,4 +72,9 @@ public class DynamicCacheChangeBatch implements Serializable { @Override public String toString() { return S.toString(DynamicCacheChangeBatch.class, this); } + + /** {@inheritDoc} */ + @Override public boolean forwardMinorVersion() { + return true; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1a4de262/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java index 7560999..247ff67 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java @@ -18,11 +18,11 @@ package org.apache.ignite.spi.discovery; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.lang.*; import org.apache.ignite.spi.*; import org.jetbrains.annotations.*; -import java.io.*; import java.util.*; /** @@ -143,7 +143,7 @@ public interface DiscoverySpi extends IgniteSpi { * Sends custom message across the ring. * @param evt Event. */ - public void sendCustomEvent(Serializable evt); + public void sendCustomEvent(DiscoveryCustomMessage evt); /** * Initiates failure of provided node. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1a4de262/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java index c319f9e..59f4708 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java @@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.tcp; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -388,7 +389,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp } /** {@inheritDoc} */ - @Override public void sendCustomEvent(Serializable evt) { + @Override public void sendCustomEvent(DiscoveryCustomMessage evt) { try { sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), marsh.marshal(evt))); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1a4de262/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index c6b8e90..eba0528 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -24,6 +24,7 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.*; import org.apache.ignite.internal.events.*; +import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.processors.security.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; @@ -1248,7 +1249,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov } /** {@inheritDoc} */ - @Override public void sendCustomEvent(Serializable evt) { + @Override public void sendCustomEvent(DiscoveryCustomMessage evt) { try { msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), marsh.marshal(evt))); }