# IGNITE-312 Implement sendCustomMessage() method.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a93ebc81 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a93ebc81 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a93ebc81 Branch: refs/heads/ignite-312 Commit: a93ebc8112b93fdda1571876ec28f4bbf35d4d6a Parents: bad6e4b Author: sevdokimov <sergey.evdoki...@jetbrains.com> Authored: Mon Feb 23 23:57:55 2015 +0300 Committer: sevdokimov <sergey.evdoki...@jetbrains.com> Committed: Mon Feb 23 23:57:55 2015 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/events/EventType.java | 3 + .../apache/ignite/internal/IgniteKernal.java | 14 +++-- .../discovery/GridDiscoveryManager.java | 51 ++++++++++++--- .../processors/cache/GridCacheProcessor.java | 33 ++++++++++ .../ignite/spi/discovery/DiscoverySpi.java | 6 ++ .../spi/discovery/DiscoverySpiListener.java | 9 +++ .../discovery/tcp/TcpClientDiscoverySpi.java | 5 ++ .../spi/discovery/tcp/TcpDiscoverySpi.java | 33 +++++++++- .../tcp/messages/TcpDiscoveryCustomMessage.java | 66 ++++++++++++++++++++ .../discovery/AbstractDiscoverySelfTest.java | 13 ++++ 10 files changed, 216 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a93ebc81/modules/core/src/main/java/org/apache/ignite/events/EventType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/events/EventType.java b/modules/core/src/main/java/org/apache/ignite/events/EventType.java index 9f7e2c4..4319d2a 100644 --- a/modules/core/src/main/java/org/apache/ignite/events/EventType.java +++ b/modules/core/src/main/java/org/apache/ignite/events/EventType.java @@ -153,6 +153,9 @@ public interface EventType { */ public static final int EVT_NODE_SEGMENTED = 14; + /** */ + public static final int EVT_DISCOVERY_CUSTOM_EVT = 15; + public static final int EVT_CLIENT_NODE_DISCONNECTED = 16; public static final int EVT_CLIENT_NODE_RECONNECTED = 17; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a93ebc81/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 8f1179f..717ece7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -687,6 +687,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { startProcessor(ctx, rsrcProc, attrs); + // Create GridDiscoveryManager first, because another processors can register listeners on it. + GridDiscoveryManager discoveryMgr = new GridDiscoveryManager(ctx); + + ctx.add(discoveryMgr); + // Inject resources into lifecycle beans. if (!cfg.isDaemon() && cfg.getLifecycleBeans() != null) { for (LifecycleBean bean : cfg.getLifecycleBeans()) { @@ -775,7 +780,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { gw.setState(STARTED); // Start discovery manager last to make sure that grid is fully initialized. - startManager(ctx, new GridDiscoveryManager(ctx), attrs); + discoveryMgr.addSpiAttributes(attrs); + discoveryMgr.setNodeAttributes(attrs, VER); + discoveryMgr.start(); } finally { gw.writeUnlock(); @@ -1392,11 +1399,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { throws IgniteCheckedException { mgr.addSpiAttributes(attrs); - // Set all node attributes into discovery manager, - // so they can be distributed to all nodes. - if (mgr instanceof GridDiscoveryManager) - ((GridDiscoveryManager)mgr).setNodeAttributes(attrs, VER); - // Add manager to registry before it starts to avoid // cases when manager is started but registry does not // have it yet. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a93ebc81/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 449464a..c21d8c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -162,6 +162,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** Metrics update worker. */ private final MetricsUpdater metricsUpdater = new MetricsUpdater(); + /** */ + private final CopyOnWriteArrayList<IgniteClosure<Object, Void>> customMsgListeners = new CopyOnWriteArrayList<>(); + /** @param ctx Context. */ public GridDiscoveryManager(GridKernalContext ctx) { super(ctx, ctx.config().getDiscoverySpi()); @@ -305,6 +308,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { discoWrk.addEvent(type, topVer, node, topSnapshot); } + + @Override public void onCustomMessage(Object msg, long topVer) { + discoWrk.addCustomEvent(msg, topVer); + } }); getSpi().setDataExchange(new DiscoverySpiDataExchange() { @@ -1182,6 +1189,19 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ).start(); } + public void sendCustomEvent(Object evt) { + getSpi().sendCustomEvent(evt); + } + + public void addCustomEvantListener(IgniteClosure<Object, Void> listener) { + customMsgListeners.add(listener); + } + + public void removeCustomEvantListener(IgniteClosure<Object, Void> listener) { + customMsgListeners.remove(listener); + } + + /** Worker for network segment checks. */ private class SegmentCheckWorker extends GridWorker { /** */ @@ -1258,7 +1278,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** Worker for discovery events. */ private class DiscoveryWorker extends GridWorker { /** Event queue. */ - private final BlockingQueue<GridTuple4<Integer, Long, ClusterNode, Collection<ClusterNode>>> evts = + private final BlockingQueue<GridTuple5<Integer, Long, ClusterNode, Collection<ClusterNode>, Object>> evts = new LinkedBlockingQueue<>(); /** Node segmented event fired flag. */ @@ -1289,12 +1309,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { evt.eventNode(node); evt.type(type); - evt.topologySnapshot(topVer, new ArrayList<>( - F.viewReadOnly(topSnapshot, new C1<ClusterNode, ClusterNode>() { - @Override public ClusterNode apply(ClusterNode e) { - return e; - } - }, daemonFilter))); + evt.topologySnapshot(topVer, U.<ClusterNode, ClusterNode>arrayList(topSnapshot, daemonFilter)); if (type == EVT_NODE_METRICS_UPDATED) evt.message("Metrics were updated: " + node); @@ -1327,7 +1342,16 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { void addEvent(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot) { assert node != null; - evts.add(F.t(type, topVer, node, topSnapshot)); + evts.add(F.t(type, topVer, node, topSnapshot, null)); + } + + /** + * @param evt Event. + * @param topVer Topology version. + */ + void addCustomEvent(Object evt, long topVer) { + evts.add(new T5<Integer, Long, ClusterNode, Collection<ClusterNode>, Object>(EVT_DISCOVERY_CUSTOM_EVT, + topVer, null, null, evt)); } /** @@ -1361,7 +1385,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** @throws InterruptedException If interrupted. */ @SuppressWarnings("DuplicateCondition") private void body0() throws InterruptedException { - GridTuple4<Integer, Long, ClusterNode, Collection<ClusterNode>> evt = evts.take(); + GridTuple5<Integer, Long, ClusterNode, Collection<ClusterNode>, Object> evt = evts.take(); int type = evt.get1(); @@ -1369,7 +1393,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ClusterNode node = evt.get3(); - boolean isDaemon = node.isDaemon(); + boolean isDaemon = node == null || node.isDaemon(); boolean segmented = false; @@ -1470,6 +1494,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { break; } + case EVT_DISCOVERY_CUSTOM_EVT: { + for (IgniteClosure<Object, Void> listener : customMsgListeners) + listener.apply(evt.get5()); + + break; + } + // Don't log metric update to avoid flooding the log. case EVT_NODE_METRICS_UPDATED: break; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a93ebc81/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index e99c706..c4166c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -46,6 +46,7 @@ import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.apache.ignite.lifecycle.*; import org.apache.ignite.spi.*; import org.jetbrains.annotations.*; @@ -547,6 +548,38 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (ctx.config().isDaemon()) return; + // todo temporary code, remove it. + + ctx.discovery().addCustomEvantListener(new IgniteClosure<Object, Void>() { + @Override public Void apply(Object o) { + if (o instanceof String) + System.out.println("Message received: " + o); + + return null; + } + }); + + new Thread(new Runnable() { + @Override public void run() { + try { + System.out.println("Local nodeId=" + ctx.localNodeId()); + + for (int i = 0; i < 10; i++) { + Thread.sleep(5000); + + String msg = "e" + i + " [" + ctx.localNodeId() + ']'; + + System.out.println("Sent message: " + msg); + + ctx.discovery().sendCustomEvent(msg); + } + } + catch (InterruptedException e) { + e.printStackTrace(); + } + } + }).start(); + DeploymentMode depMode = ctx.config().getDeploymentMode(); if (!F.isEmpty(ctx.config().getCacheConfiguration())) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a93ebc81/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java index 168ae52..269e2c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java @@ -137,4 +137,10 @@ public interface DiscoverySpi extends IgniteSpi { * does not support this method. */ public long getGridStartTime(); + + /** + * Sends custom message across the ring. + * @param evt Event. + */ + public void sendCustomEvent(Object evt); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a93ebc81/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java index 3e0ef02..89e615e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java @@ -40,4 +40,13 @@ public interface DiscoverySpiListener { */ public void onDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot, @Nullable Map<Long, Collection<ClusterNode>> topHist); + + /** + * Notification for node custom events. + * + * @param msg The custom message. + * @param topVer Topology version or {@code 0} if configured discovery SPI implementation + * does not support versioning. + */ + public void onCustomMessage(Object msg, long topVer); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a93ebc81/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java index 51df9db..21c93e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java @@ -373,6 +373,11 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp // No-op. } + /** {@inheritDoc} */ + @Override public void sendCustomEvent(Object evt) { + throw new UnsupportedOperationException(); + } + /** * @param recon Reconnect flag. * @return Whether joined successfully. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a93ebc81/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index aef8259..30e4631 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1246,6 +1246,11 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov this.nodeAuth = nodeAuth; } + /** {@inheritDoc} */ + @Override public void sendCustomEvent(Object evt) { + msgWorker.addMessage(new TcpDiscoveryCustomMessage(getLocalNodeId(), evt)); + } + /** * Tries to join this node to topology. * @@ -2550,6 +2555,9 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov else if (msg instanceof TcpDiscoveryDiscardMessage) processDiscardMessage((TcpDiscoveryDiscardMessage)msg); + else if (msg instanceof TcpDiscoveryCustomMessage) + processCustomMessage((TcpDiscoveryCustomMessage)msg); + else assert false : "Unknown message type: " + msg.getClass().getSimpleName(); @@ -4367,6 +4375,29 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (ring.hasRemoteNodes()) sendMessageAcrossRing(msg); } + + /** + * @param msg Message. + */ + private void processCustomMessage(TcpDiscoveryCustomMessage msg) { + if (msg.creatorNodeId().equals(getLocalNodeId())) { + if (msg.senderNodeId() != null) + return; + + msg.senderNodeId(getLocalNodeId()); + } + else { + DiscoverySpiListener lsnr = TcpDiscoverySpi.this.lsnr; + + TcpDiscoverySpiState spiState = spiStateCopy(); + + if (lsnr != null && (spiState == CONNECTED || spiState == DISCONNECTING)) + lsnr.onCustomMessage(msg.message(), msg.topologyVersion()); + } + + if (ring.hasRemoteNodes()) + sendMessageAcrossRing(msg); + } } /** @@ -4529,7 +4560,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov } } - if (!U.bytesEqual(buf, 0, U.IGNITE_HEADER, 0, 4)) { + if (!Arrays.equals(buf, U.IGNITE_HEADER)) { if (log.isDebugEnabled()) log.debug("Unknown connection detected (is some other software connecting to " + "this Ignite port?) " + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a93ebc81/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomMessage.java new file mode 100644 index 0000000..36887c9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomMessage.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import java.io.*; +import java.util.*; + +/** + * Wrapped for custom message. + */ +public class TcpDiscoveryCustomMessage extends TcpDiscoveryAbstractMessage { + /** */ + private Object msg; + + /** + * Public default no-arg constructor for {@link java.io.Externalizable} interface. + */ + public TcpDiscoveryCustomMessage() { + // No-op. + } + + /** + * @param creatorNodeId Creator node id. + */ + public TcpDiscoveryCustomMessage(UUID creatorNodeId, Object msg) { + super(creatorNodeId); + + this.msg = msg; + } + + /** + * @return Message. + */ + public Object message() { + return msg; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + super.writeExternal(out); + + out.writeObject(msg); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + + msg = in.readObject(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a93ebc81/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java index 14154ab..cf25b3d 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java @@ -138,6 +138,11 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri if (type == EVT_NODE_METRICS_UPDATED) isMetricsUpdate = true; } + + /** {@inheritDoc} */ + @Override public void onCustomMessage(Object msg, long topVer) { + // No-op. + } } /** @@ -212,6 +217,10 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri && node.id().equals(spi.getLocalNode().id())) spiCnt.addAndGet(1); } + + @Override public void onCustomMessage(Object msg, long topVer) { + // No-op. + } }; locUpdCnts[i] = spiCnt; @@ -377,6 +386,10 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri mux.notifyAll(); } } + + @Override public void onCustomMessage(Object msg, long topVer) { + // No-op. + } }); spi.setDataExchange(new DiscoverySpiDataExchange() {