# IGNITE-831 Create DiscoverySpiCustomMessage.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/359680db Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/359680db Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/359680db Branch: refs/heads/ignite-709_2 Commit: 359680dbd918f7459dc8dda7e6084a05a5ee7b42 Parents: d0dac7d Author: sevdokimov <sevdoki...@gridgain.com> Authored: Wed May 6 15:09:40 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Wed May 6 15:09:40 2015 +0300 ---------------------------------------------------------------------- .../internal/events/DiscoveryCustomEvent.java | 14 ++++++------ .../discovery/CustomMessageWrapper.java | 5 ---- .../discovery/GridDiscoveryManager.java | 24 +++++++++++--------- .../GridCachePartitionExchangeManager.java | 4 ++-- .../discovery/DiscoverySpiCustomMessage.java | 7 ------ .../spi/discovery/DiscoverySpiListener.java | 5 ++-- .../discovery/tcp/TcpClientDiscoverySpi.java | 4 ++-- .../spi/discovery/tcp/TcpDiscoverySpi.java | 2 +- .../discovery/AbstractDiscoverySelfTest.java | 8 ++++--- 9 files changed, 32 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/359680db/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java index ee32692..ad33aae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/events/DiscoveryCustomEvent.java @@ -34,7 +34,7 @@ public class DiscoveryCustomEvent extends DiscoveryEvent { /** * Built-in event type: custom event sent. * <br> - * Generated when someone invoke {@link GridDiscoveryManager#sendCustomEvent(Serializable)}. + * Generated when someone invoke {@link GridDiscoveryManager#sendCustomEvent(DiscoveryCustomMessage)}. * <p> * * @see DiscoveryCustomEvent @@ -42,7 +42,7 @@ public class DiscoveryCustomEvent extends DiscoveryEvent { public static final int EVT_DISCOVERY_CUSTOM_EVT = 18; /** */ - private Serializable data; + private DiscoveryCustomMessage customMsg; /** Affinity topology version. */ private AffinityTopologyVersion affTopVer; @@ -57,15 +57,15 @@ public class DiscoveryCustomEvent extends DiscoveryEvent { /** * @return Data. */ - public Serializable data() { - return data; + public DiscoveryCustomMessage customMessage() { + return customMsg; } /** - * @param data New data. + * @param customMsg New customMessage. */ - public void data(Serializable data) { - this.data = data; + public void customMessage(DiscoveryCustomMessage customMsg) { + this.customMsg = customMsg; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/359680db/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java index f394fe1..9575cc7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java @@ -35,11 +35,6 @@ class CustomMessageWrapper implements DiscoverySpiCustomMessage { } /** {@inheritDoc} */ - @Override public boolean forwardMinorVersion() { - return delegate.forwardMinorVersion(); - } - - /** {@inheritDoc} */ @Nullable @Override public DiscoverySpiCustomMessage newMessageOnRingEnd() { DiscoveryCustomMessage res = delegate.newMessageOnRingEnd(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/359680db/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 5533fa1..c5698a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -344,8 +344,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ClusterNode node, Collection<ClusterNode> topSnapshot, Map<Long, Collection<ClusterNode>> snapshots, - @Nullable Serializable data + @Nullable DiscoverySpiCustomMessage spiCustomMsg ) { + DiscoveryCustomMessage customMsg = spiCustomMsg == null ? null + : ((CustomMessageWrapper)spiCustomMsg).delegate(); + final ClusterNode locNode = localNode(); if (snapshots != null) @@ -356,7 +359,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { if (type == EVT_NODE_METRICS_UPDATED) verChanged = false; else if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) { - if (data != null && ((DiscoverySpiCustomMessage)data).forwardMinorVersion()) { + if (customMsg != null && customMsg.forwardMinorVersion()) { minorTopVer++; verChanged = true; @@ -380,9 +383,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) { - if (data != null) { - DiscoveryCustomMessage customMsg = ((CustomMessageWrapper)data).delegate(); - + if (customMsg != null) { for (Class cls = customMsg.getClass(); cls != null; cls = cls.getSuperclass()) { List<CustomEventListener<DiscoveryCustomMessage>> list = customEvtLsnrs.get(cls); @@ -435,7 +436,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { return; } - discoWrk.addEvent(type, nextTopVer, node, topSnapshot, data); + discoWrk.addEvent(type, nextTopVer, node, topSnapshot, customMsg); } }); @@ -1567,8 +1568,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** Worker for discovery events. */ private class DiscoveryWorker extends GridWorker { /** Event queue. */ - private final BlockingQueue<GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>, Serializable>> evts = - new LinkedBlockingQueue<>(); + private final BlockingQueue<GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>, + DiscoveryCustomMessage>> evts = new LinkedBlockingQueue<>(); /** Node segmented event fired flag. */ private boolean nodeSegFired; @@ -1634,7 +1635,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { AffinityTopologyVersion topVer, ClusterNode node, Collection<ClusterNode> topSnapshot, - @Nullable Serializable data + @Nullable DiscoveryCustomMessage data ) { assert node != null : data; @@ -1675,7 +1676,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** @throws InterruptedException If interrupted. */ @SuppressWarnings("DuplicateCondition") private void body0() throws InterruptedException { - GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>, Serializable> evt = evts.take(); + GridTuple5<Integer, AffinityTopologyVersion, ClusterNode, Collection<ClusterNode>, + DiscoveryCustomMessage> evt = evts.take(); int type = evt.get1(); @@ -1793,7 +1795,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { customEvt.type(type); customEvt.topologySnapshot(topVer.topologyVersion(), null); customEvt.affinityTopologyVersion(topVer); - customEvt.data(evt.get5()); + customEvt.customMessage(evt.get5()); ctx.event().record(customEvt); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/359680db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 5f82ae2..c7aa322 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -150,8 +150,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana else { DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)e; - if (customEvt.data() instanceof DynamicCacheChangeBatch) { - DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.data(); + if (customEvt.customMessage() instanceof DynamicCacheChangeBatch) { + DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.customMessage(); Collection<DynamicCacheChangeRequest> valid = new ArrayList<>(batch.requests().size()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/359680db/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java index 1550613..8d92deb 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java @@ -26,13 +26,6 @@ import java.io.*; */ public interface DiscoverySpiCustomMessage extends Serializable { /** - * Whether or not minor version of topology should be increased on message receive. - * - * @return {@code true} if minor topology version should be increased. - */ - public boolean forwardMinorVersion(); - - /** * Called when message passed the ring. */ @Nullable public DiscoverySpiCustomMessage newMessageOnRingEnd(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/359680db/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java index 7f17fe4..f46869d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java @@ -18,10 +18,9 @@ package org.apache.ignite.spi.discovery; import org.apache.ignite.cluster.*; -import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.*; import org.jetbrains.annotations.*; -import java.io.*; import java.util.*; /** @@ -47,5 +46,5 @@ public interface DiscoverySpiListener { ClusterNode node, Collection<ClusterNode> topSnapshot, @Nullable Map<Long, Collection<ClusterNode>> topHist, - @Nullable Serializable data); + @Nullable DiscoverySpiCustomMessage data); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/359680db/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java index 3e839f8..5ce7437 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java @@ -1332,7 +1332,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp if (node != null && node.visible()) { try { - Serializable msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader()); + DiscoverySpiCustomMessage msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader()); notifyDiscovery(EVT_DISCOVERY_CUSTOM_EVT, topVer, node, allNodes(), msgObj); } @@ -1433,7 +1433,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp * @param top Topology snapshot. */ private void notifyDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> top, - @Nullable Serializable data) { + @Nullable DiscoverySpiCustomMessage data) { DiscoverySpiListener lsnr = TcpClientDiscoverySpi.this.lsnr; if (lsnr != null) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/359680db/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 8051172..40dca05 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -4524,7 +4524,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (node != null) { try { - Serializable msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader()); + DiscoverySpiCustomMessage msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader()); lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, msg.topologyVersion(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/359680db/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java index 3c61f00..9c6fbb4 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java @@ -25,6 +25,7 @@ import org.apache.ignite.spi.*; import org.apache.ignite.testframework.config.*; import org.apache.ignite.testframework.junits.*; import org.apache.ignite.testframework.junits.spi.*; +import org.jetbrains.annotations.*; import javax.management.*; import java.io.*; @@ -132,7 +133,7 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri /** {@inheritDoc} */ @Override public void onDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot, - Map<Long, Collection<ClusterNode>> topHist, Serializable data) { + Map<Long, Collection<ClusterNode>> topHist, @Nullable DiscoverySpiCustomMessage data) { if (type == EVT_NODE_METRICS_UPDATED) isMetricsUpdate = true; } @@ -205,7 +206,7 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri DiscoverySpiListener locHeartbeatLsnr = new DiscoverySpiListener() { @Override public void onDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot, Map<Long, Collection<ClusterNode>> topHist, - Serializable data) { + @Nullable DiscoverySpiCustomMessage data) { // If METRICS_UPDATED came from local node if (type == EVT_NODE_METRICS_UPDATED && node.id().equals(spi.getLocalNode().id())) @@ -369,7 +370,8 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri spi.setListener(new DiscoverySpiListener() { @SuppressWarnings({"NakedNotify"}) @Override public void onDiscovery(int type, long topVer, ClusterNode node, - Collection<ClusterNode> topSnapshot, Map<Long, Collection<ClusterNode>> topHist, Serializable data) { + Collection<ClusterNode> topSnapshot, Map<Long, Collection<ClusterNode>> topHist, + @Nullable DiscoverySpiCustomMessage data) { info("Discovery event [type=" + type + ", node=" + node + ']'); synchronized (mux) {