Repository: incubator-ignite Updated Branches: refs/heads/ignite-831 [created] 7d8c4bfba
# IGNITE-831 improve Custom message. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7d8c4bfb Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7d8c4bfb Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7d8c4bfb Branch: refs/heads/ignite-831 Commit: 7d8c4bfba773c568b8091db41ffc6e516f2b379a Parents: 0f3d372 Author: sevdokimov <[email protected]> Authored: Tue Apr 28 14:40:38 2015 +0300 Committer: sevdokimov <[email protected]> Committed: Tue Apr 28 14:40:38 2015 +0300 ---------------------------------------------------------------------- .../discovery/DiscoveryCustomMessage.java | 39 ++++++++++++++++++++ .../discovery/GridDiscoveryManager.java | 38 +++++++++++-------- .../cache/DynamicCacheChangeBatch.java | 14 ++++++- .../ignite/spi/discovery/DiscoverySpi.java | 4 +- .../discovery/tcp/TcpClientDiscoverySpi.java | 3 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 3 +- 6 files changed, 79 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d8c4bfb/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java new file mode 100644 index 0000000..38e327c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.discovery; + +import java.io.*; + +/** + * + */ +public interface DiscoveryCustomMessage extends Serializable { + /** + * Whether or not minor version of topology should be increased on message receive. + * + * @return {@code true} if minor topology version should be increased. + */ + public boolean forwardMinorVersion(); + + /** + * Whether or not minor + * + * @return + */ + public boolean waitForClientResponse(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d8c4bfb/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 0950774..0df7d5f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -279,19 +279,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } } - /** - * @param evtType Event type. - * @return Next affinity topology version. - */ - private AffinityTopologyVersion nextTopologyVersion(int evtType, long topVer) { - if (evtType == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) - minorTopVer++; - else if (evtType != EVT_NODE_METRICS_UPDATED) - minorTopVer = 0; - - return new AffinityTopologyVersion(topVer, minorTopVer); - } - /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { long totSysMemory = -1; @@ -363,7 +350,26 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { if (snapshots != null) topHist = snapshots; - AffinityTopologyVersion nextTopVer = nextTopologyVersion(type, topVer); + boolean verChanged; + + if (type == EVT_NODE_METRICS_UPDATED) + verChanged = false; + else if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) { + if (data != null && ((DiscoveryCustomMessage)data).forwardMinorVersion()) { + minorTopVer++; + + verChanged = true; + } + else + verChanged = false; + } + else { + minorTopVer = 0; + + verChanged = true; + } + + AffinityTopologyVersion nextTopVer = new AffinityTopologyVersion(topVer, minorTopVer); if (type == EVT_NODE_FAILED || type == EVT_NODE_LEFT) { for (DiscoCache c : discoCacheHist.values()) @@ -385,7 +391,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { // Put topology snapshot into discovery history. // There is no race possible between history maintenance and concurrent discovery // event notifications, since SPI notifies manager about all events from this listener. - if (type != EVT_NODE_METRICS_UPDATED) { + if (verChanged) { DiscoCache cache = new DiscoCache(locNode, F.view(topSnapshot, F.remoteNodes(locNode.id()))); discoCacheHist.put(nextTopVer, cache); @@ -1386,7 +1392,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** * @param evt Event. */ - public void sendCustomEvent(Serializable evt) { + public void sendCustomEvent(DiscoveryCustomMessage evt) { getSpi().sendCustomEvent(evt); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d8c4bfb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java index ac3660e..64aaee2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java @@ -17,16 +17,16 @@ package org.apache.ignite.internal.processors.cache; +import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; -import java.io.*; import java.util.*; /** * Cache change batch. */ -public class DynamicCacheChangeBatch implements Serializable { +public class DynamicCacheChangeBatch implements DiscoveryCustomMessage { /** */ private static final long serialVersionUID = 0L; @@ -72,4 +72,14 @@ public class DynamicCacheChangeBatch implements Serializable { @Override public String toString() { return S.toString(DynamicCacheChangeBatch.class, this); } + + /** {@inheritDoc} */ + @Override public boolean forwardMinorVersion() { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean waitForClientResponse() { + return false; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d8c4bfb/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java index 7560999..247ff67 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java @@ -18,11 +18,11 @@ package org.apache.ignite.spi.discovery; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.lang.*; import org.apache.ignite.spi.*; import org.jetbrains.annotations.*; -import java.io.*; import java.util.*; /** @@ -143,7 +143,7 @@ public interface DiscoverySpi extends IgniteSpi { * Sends custom message across the ring. * @param evt Event. */ - public void sendCustomEvent(Serializable evt); + public void sendCustomEvent(DiscoveryCustomMessage evt); /** * Initiates failure of provided node. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d8c4bfb/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java index e3182c4..44c67cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java @@ -22,6 +22,7 @@ import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -376,7 +377,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp } /** {@inheritDoc} */ - @Override public void sendCustomEvent(Serializable evt) { + @Override public void sendCustomEvent(DiscoveryCustomMessage evt) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7d8c4bfb/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 6a5eb25..d3472cd 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -24,6 +24,7 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.internal.*; import org.apache.ignite.internal.events.*; +import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.processors.security.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; @@ -1252,7 +1253,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov } /** {@inheritDoc} */ - @Override public void sendCustomEvent(Serializable evt) { + @Override public void sendCustomEvent(DiscoveryCustomMessage evt) { try { byte[] msgBytes;
