# IGNITE-831 Create DiscoverySpiCustomMessage.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5ffaa4cb Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5ffaa4cb Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5ffaa4cb Branch: refs/heads/ignite-709_3 Commit: 5ffaa4cb6f35ead8fc94c563d52e98cc5241acbc Parents: 657afd0 Author: sevdokimov <sevdoki...@gridgain.com> Authored: Wed May 6 14:27:14 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Wed May 6 14:43:16 2015 +0300 ---------------------------------------------------------------------- .../discovery/CustomMessageWrapper.java | 61 ++++++++++++++++++++ .../discovery/GridDiscoveryManager.java | 18 +++--- .../ignite/spi/discovery/DiscoverySpi.java | 5 +- .../discovery/DiscoverySpiCustomMessage.java | 39 +++++++++++++ .../discovery/tcp/TcpClientDiscoverySpi.java | 3 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 17 +++--- 6 files changed, 121 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5ffaa4cb/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java new file mode 100644 index 0000000..f394fe1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.discovery; + +import org.apache.ignite.spi.discovery.*; +import org.jetbrains.annotations.*; + +/** + * + */ +class CustomMessageWrapper implements DiscoverySpiCustomMessage { + /** */ + private final DiscoveryCustomMessage delegate; + + /** + * @param delegate Delegate. + */ + CustomMessageWrapper(@NotNull DiscoveryCustomMessage delegate) { + this.delegate = delegate; + } + + /** {@inheritDoc} */ + @Override public boolean forwardMinorVersion() { + return delegate.forwardMinorVersion(); + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoverySpiCustomMessage newMessageOnRingEnd() { + DiscoveryCustomMessage res = delegate.newMessageOnRingEnd(); + + return res == null ? null : new CustomMessageWrapper(res); + } + + /** + * @return Delegate. + */ + @NotNull + public DiscoveryCustomMessage delegate() { + return delegate; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return delegate.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5ffaa4cb/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 3d35bee..5533fa1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -356,7 +356,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { if (type == EVT_NODE_METRICS_UPDATED) verChanged = false; else if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) { - if (data != null && ((DiscoveryCustomMessage)data).forwardMinorVersion()) { + if (data != null && ((DiscoverySpiCustomMessage)data).forwardMinorVersion()) { minorTopVer++; verChanged = true; @@ -381,16 +381,18 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) { if (data != null) { - for (Class cls = data.getClass(); cls != null; cls = cls.getSuperclass()) { + DiscoveryCustomMessage customMsg = ((CustomMessageWrapper)data).delegate(); + + for (Class cls = customMsg.getClass(); cls != null; cls = cls.getSuperclass()) { List<CustomEventListener<DiscoveryCustomMessage>> list = customEvtLsnrs.get(cls); if (list != null) { for (CustomEventListener<DiscoveryCustomMessage> lsnr : list) { try { - lsnr.onCustomEvent(node, (DiscoveryCustomMessage)data); + lsnr.onCustomEvent(node, customMsg); } catch (Exception e) { - U.error(log, "Failed to notify direct custom event listener: " + data, e); + U.error(log, "Failed to notify direct custom event listener: " + customMsg, e); } } } @@ -1407,10 +1409,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * @param evt Event. + * @param msg Custom message. */ - public void sendCustomEvent(DiscoveryCustomMessage evt) { - getSpi().sendCustomEvent(evt); + public void sendCustomEvent(DiscoveryCustomMessage msg) { + getSpi().sendCustomEvent(new CustomMessageWrapper(msg)); } /** @@ -1634,7 +1636,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { Collection<ClusterNode> topSnapshot, @Nullable Serializable data ) { - assert node != null; + assert node != null : data; evts.add(F.t(type, topVer, node, topSnapshot, data)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5ffaa4cb/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java index 84a5f41..7836e0f 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java @@ -19,7 +19,6 @@ package org.apache.ignite.spi.discovery; import org.apache.ignite.*; import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.lang.*; import org.apache.ignite.spi.*; import org.jetbrains.annotations.*; @@ -142,10 +141,10 @@ public interface DiscoverySpi extends IgniteSpi { /** * Sends custom message across the ring. - * @param evt Event. + * @param msg Custom message. * @throws IgniteException if failed to marshal evt. */ - public void sendCustomEvent(DiscoveryCustomMessage evt) throws IgniteException; + public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException; /** * Initiates failure of provided node. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5ffaa4cb/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java new file mode 100644 index 0000000..1550613 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery; + +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * + */ +public interface DiscoverySpiCustomMessage extends Serializable { + /** + * Whether or not minor version of topology should be increased on message receive. + * + * @return {@code true} if minor topology version should be increased. + */ + public boolean forwardMinorVersion(); + + /** + * Called when message passed the ring. + */ + @Nullable public DiscoverySpiCustomMessage newMessageOnRingEnd(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5ffaa4cb/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java index dc89d6a..3e839f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java @@ -20,7 +20,6 @@ package org.apache.ignite.spi.discovery.tcp; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -389,7 +388,7 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp } /** {@inheritDoc} */ - @Override public void sendCustomEvent(DiscoveryCustomMessage evt) { + @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) { if (segmentation) throw new IgniteException("Failed to send custom message: client is disconnected"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5ffaa4cb/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index f8236f1..6028901 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -21,10 +21,9 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.events.*; -import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.processors.security.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; @@ -38,11 +37,11 @@ import org.apache.ignite.resources.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.discovery.*; import org.apache.ignite.spi.discovery.tcp.internal.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.jdbc.TcpDiscoveryJdbcIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.jdbc.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.sharedfs.TcpDiscoverySharedFsIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.sharedfs.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.spi.discovery.tcp.messages.*; import org.jetbrains.annotations.*; import org.jsr166.*; @@ -1249,7 +1248,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov } /** {@inheritDoc} */ - @Override public void sendCustomEvent(DiscoveryCustomMessage evt) { + @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) { try { msgWorker.addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), marsh.marshal(evt))); } @@ -4479,9 +4478,9 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov stats.onRingMessageReceived(msg); try { - DiscoveryCustomMessage msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader()); + DiscoverySpiCustomMessage msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader()); - DiscoveryCustomMessage nextMsg = msgObj.newMessageOnRingEnd(); + DiscoverySpiCustomMessage nextMsg = msgObj.newMessageOnRingEnd(); if (nextMsg != null) addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), marsh.marshal(nextMsg)));