Repository: incubator-ignite Updated Branches: refs/heads/ignite-312 [created] 38ebb3e3a
# IGNITE-312 Bug fix: custom event must be performed in same order in each nodes. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/38ebb3e3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/38ebb3e3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/38ebb3e3 Branch: refs/heads/ignite-312 Commit: 38ebb3e3aeb242efda79d652e83a41ed7b53cd6c Parents: 893d0fe Author: sevdokimov <sevdoki...@gridgain.com> Authored: Tue Mar 3 18:25:19 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Tue Mar 3 18:25:19 2015 +0300 ---------------------------------------------------------------------- .../eventstorage/GridEventStorageManager.java | 3 +- .../ignite/internal/util/IgniteUtils.java | 30 ++++++++++-------- .../spi/discovery/tcp/TcpDiscoverySpi.java | 33 ++++++++++++-------- .../TcpDiscoveryCustomEventMessage.java | 1 + 4 files changed, 40 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38ebb3e3/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java index 82af8bf..36ea7e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.events.*; import org.apache.ignite.internal.managers.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.deployment.*; @@ -434,7 +435,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi> * @return {@code true} if this is an internal event. */ private boolean isInternalEvent(int type) { - return F.contains(EVTS_DISCOVERY_ALL, type); + return type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT || F.contains(EVTS_DISCOVERY_ALL, type); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38ebb3e3/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index de71f59..7412ba2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -26,6 +26,7 @@ import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.compute.*; +import org.apache.ignite.internal.events.*; import org.apache.ignite.internal.managers.deployment.*; import org.apache.ignite.internal.mxbean.*; import org.apache.ignite.internal.processors.cache.*; @@ -452,22 +453,25 @@ public abstract class IgniteUtils { } // Event names initialization. - for (Field field : EventType.class.getFields()) { - if (field.getType().equals(int.class)) { - try { - assert field.getName().startsWith("EVT_") : "Invalid event name (should start with 'EVT_': " + - field.getName(); + Class<?>[] evtHolderClasses = new Class[]{EventType.class, DiscoveryCustomEvent.class}; - int type = field.getInt(null); + for (Class<?> cls : evtHolderClasses) { + for (Field field : cls.getFields()) { + if (Modifier.isStatic(field.getModifiers()) && field.getType().equals(int.class)) { + if (field.getName().startsWith("EVT_")) { + try { + int type = field.getInt(null); - String prev = GRID_EVT_NAMES.put(type, field.getName().substring(4)); + String prev = GRID_EVT_NAMES.put(type, field.getName().substring("EVT_".length())); - // Check for duplicate event types. - assert prev == null : "Duplicate event [type=" + type + ", name1=" + prev + - ", name2=" + field.getName() + ']'; - } - catch (IllegalAccessException e) { - throw new IgniteException(e); + // Check for duplicate event types. + assert prev == null : "Duplicate event [type=" + type + ", name1=" + prev + + ", name2=" + field.getName() + ']'; + } + catch (IllegalAccessException e) { + throw new IgniteException(e); + } + } } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38ebb3e3/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 3800783..34995ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -3784,7 +3784,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov boolean fireEvt = false; - if (node != null && msg.verified()) { + if (msg.verified()) { assert topVer > 0 : "Invalid topology version: " + msg; if (node.order() == 0) @@ -4471,24 +4471,31 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov * @param msg Message. */ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) { - if (msg.creatorNodeId().equals(getLocalNodeId())) { - if (msg.senderNodeId() != null) + if (isLocalNodeCoordinator()) { + if (msg.verified()) { + stats.onRingMessageReceived(msg); + + addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id())); + return; + } - msg.senderNodeId(getLocalNodeId()); + msg.verify(getLocalNodeId()); } - DiscoverySpiListener lsnr = TcpDiscoverySpi.this.lsnr; + if (msg.verified()) { + DiscoverySpiListener lsnr = TcpDiscoverySpi.this.lsnr; - TcpDiscoverySpiState spiState = spiStateCopy(); + TcpDiscoverySpiState spiState = spiStateCopy(); - if (lsnr != null && (spiState == CONNECTED || spiState == DISCONNECTING)) - lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, - msg.topologyVersion(), - ring.node(msg.creatorNodeId()), - null, - null, - msg.message()); + if (lsnr != null && (spiState == CONNECTED || spiState == DISCONNECTING)) + lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, + msg.topologyVersion(), + ring.node(msg.creatorNodeId()), + null, + null, + msg.message()); + } if (ring.hasRemoteNodes()) sendMessageAcrossRing(msg); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/38ebb3e3/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java index fcf10e9..b0c7400 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomEventMessage.java @@ -23,6 +23,7 @@ import java.util.*; /** * Wrapped for custom message. */ +@TcpDiscoveryEnsureDelivery public class TcpDiscoveryCustomEventMessage extends TcpDiscoveryAbstractMessage { /** */ private static final long serialVersionUID = 0L;