# IGNITE-312 Implement sendCustomMessage() method.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a93ebc81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a93ebc81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a93ebc81

Branch: refs/heads/ignite-312
Commit: a93ebc8112b93fdda1571876ec28f4bbf35d4d6a
Parents: bad6e4b
Author: sevdokimov <sergey.evdoki...@jetbrains.com>
Authored: Mon Feb 23 23:57:55 2015 +0300
Committer: sevdokimov <sergey.evdoki...@jetbrains.com>
Committed: Mon Feb 23 23:57:55 2015 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/events/EventType.java     |  3 +
 .../apache/ignite/internal/IgniteKernal.java    | 14 +++--
 .../discovery/GridDiscoveryManager.java         | 51 ++++++++++++---
 .../processors/cache/GridCacheProcessor.java    | 33 ++++++++++
 .../ignite/spi/discovery/DiscoverySpi.java      |  6 ++
 .../spi/discovery/DiscoverySpiListener.java     |  9 +++
 .../discovery/tcp/TcpClientDiscoverySpi.java    |  5 ++
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 33 +++++++++-
 .../tcp/messages/TcpDiscoveryCustomMessage.java | 66 ++++++++++++++++++++
 .../discovery/AbstractDiscoverySelfTest.java    | 13 ++++
 10 files changed, 216 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a93ebc81/modules/core/src/main/java/org/apache/ignite/events/EventType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/events/EventType.java 
b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
index 9f7e2c4..4319d2a 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/EventType.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
@@ -153,6 +153,9 @@ public interface EventType {
      */
     public static final int EVT_NODE_SEGMENTED = 14;
 
+    /** */
+    public static final int EVT_DISCOVERY_CUSTOM_EVT = 15;
+
     public static final int EVT_CLIENT_NODE_DISCONNECTED = 16;
 
     public static final int EVT_CLIENT_NODE_RECONNECTED = 17;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a93ebc81/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 8f1179f..717ece7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -687,6 +687,11 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
 
             startProcessor(ctx, rsrcProc, attrs);
 
+            // Create GridDiscoveryManager first, because another processors 
can register listeners on it.
+            GridDiscoveryManager discoveryMgr = new GridDiscoveryManager(ctx);
+
+            ctx.add(discoveryMgr);
+
             // Inject resources into lifecycle beans.
             if (!cfg.isDaemon() && cfg.getLifecycleBeans() != null) {
                 for (LifecycleBean bean : cfg.getLifecycleBeans()) {
@@ -775,7 +780,9 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
                 gw.setState(STARTED);
 
                 // Start discovery manager last to make sure that grid is 
fully initialized.
-                startManager(ctx, new GridDiscoveryManager(ctx), attrs);
+                discoveryMgr.addSpiAttributes(attrs);
+                discoveryMgr.setNodeAttributes(attrs, VER);
+                discoveryMgr.start();
             }
             finally {
                 gw.writeUnlock();
@@ -1392,11 +1399,6 @@ public class IgniteKernal implements IgniteEx, 
IgniteMXBean, Externalizable {
         throws IgniteCheckedException {
         mgr.addSpiAttributes(attrs);
 
-        // Set all node attributes into discovery manager,
-        // so they can be distributed to all nodes.
-        if (mgr instanceof GridDiscoveryManager)
-            ((GridDiscoveryManager)mgr).setNodeAttributes(attrs, VER);
-
         // Add manager to registry before it starts to avoid
         // cases when manager is started but registry does not
         // have it yet.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a93ebc81/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 449464a..c21d8c6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -162,6 +162,9 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
     /** Metrics update worker. */
     private final MetricsUpdater metricsUpdater = new MetricsUpdater();
 
+    /** */
+    private final CopyOnWriteArrayList<IgniteClosure<Object, Void>> 
customMsgListeners = new CopyOnWriteArrayList<>();
+
     /** @param ctx Context. */
     public GridDiscoveryManager(GridKernalContext ctx) {
         super(ctx, ctx.config().getDiscoverySpi());
@@ -305,6 +308,10 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
 
                 discoWrk.addEvent(type, topVer, node, topSnapshot);
             }
+
+            @Override public void onCustomMessage(Object msg, long topVer) {
+                discoWrk.addCustomEvent(msg, topVer);
+            }
         });
 
         getSpi().setDataExchange(new DiscoverySpiDataExchange() {
@@ -1182,6 +1189,19 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
         ).start();
     }
 
+    public void sendCustomEvent(Object evt) {
+        getSpi().sendCustomEvent(evt);
+    }
+
+    public void addCustomEvantListener(IgniteClosure<Object, Void> listener) {
+        customMsgListeners.add(listener);
+    }
+
+    public void removeCustomEvantListener(IgniteClosure<Object, Void> 
listener) {
+        customMsgListeners.remove(listener);
+    }
+
+
     /** Worker for network segment checks. */
     private class SegmentCheckWorker extends GridWorker {
         /** */
@@ -1258,7 +1278,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
     /** Worker for discovery events. */
     private class DiscoveryWorker extends GridWorker {
         /** Event queue. */
-        private final BlockingQueue<GridTuple4<Integer, Long, ClusterNode, 
Collection<ClusterNode>>> evts =
+        private final BlockingQueue<GridTuple5<Integer, Long, ClusterNode, 
Collection<ClusterNode>, Object>> evts =
             new LinkedBlockingQueue<>();
 
         /** Node segmented event fired flag. */
@@ -1289,12 +1309,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                 evt.eventNode(node);
                 evt.type(type);
 
-                evt.topologySnapshot(topVer, new ArrayList<>(
-                    F.viewReadOnly(topSnapshot, new C1<ClusterNode, 
ClusterNode>() {
-                        @Override public ClusterNode apply(ClusterNode e) {
-                            return e;
-                        }
-                    }, daemonFilter)));
+                evt.topologySnapshot(topVer, U.<ClusterNode, 
ClusterNode>arrayList(topSnapshot, daemonFilter));
 
                 if (type == EVT_NODE_METRICS_UPDATED)
                     evt.message("Metrics were updated: " + node);
@@ -1327,7 +1342,16 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
         void addEvent(int type, long topVer, ClusterNode node, 
Collection<ClusterNode> topSnapshot) {
             assert node != null;
 
-            evts.add(F.t(type, topVer, node, topSnapshot));
+            evts.add(F.t(type, topVer, node, topSnapshot, null));
+        }
+
+        /**
+         * @param evt Event.
+         * @param topVer Topology version.
+         */
+        void addCustomEvent(Object evt, long topVer) {
+            evts.add(new T5<Integer, Long, ClusterNode, 
Collection<ClusterNode>, Object>(EVT_DISCOVERY_CUSTOM_EVT,
+                topVer, null, null, evt));
         }
 
         /**
@@ -1361,7 +1385,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
         /** @throws InterruptedException If interrupted. */
         @SuppressWarnings("DuplicateCondition")
         private void body0() throws InterruptedException {
-            GridTuple4<Integer, Long, ClusterNode, Collection<ClusterNode>> 
evt = evts.take();
+            GridTuple5<Integer, Long, ClusterNode, Collection<ClusterNode>, 
Object> evt = evts.take();
 
             int type = evt.get1();
 
@@ -1369,7 +1393,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
 
             ClusterNode node = evt.get3();
 
-            boolean isDaemon = node.isDaemon();
+            boolean isDaemon = node == null || node.isDaemon();
 
             boolean segmented = false;
 
@@ -1470,6 +1494,13 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                     break;
                 }
 
+                case EVT_DISCOVERY_CUSTOM_EVT: {
+                    for (IgniteClosure<Object, Void> listener : 
customMsgListeners)
+                        listener.apply(evt.get5());
+
+                    break;
+                }
+
                 // Don't log metric update to avoid flooding the log.
                 case EVT_NODE_METRICS_UPDATED:
                     break;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a93ebc81/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index e99c706..c4166c0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.lifecycle.*;
 import org.apache.ignite.spi.*;
 import org.jetbrains.annotations.*;
@@ -547,6 +548,38 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
         if (ctx.config().isDaemon())
             return;
 
+        // todo temporary code, remove it.
+
+        ctx.discovery().addCustomEvantListener(new IgniteClosure<Object, 
Void>() {
+            @Override public Void apply(Object o) {
+                if (o instanceof String)
+                    System.out.println("Message received: " + o);
+
+                return null;
+            }
+        });
+
+        new Thread(new Runnable() {
+            @Override public void run() {
+                try {
+                    System.out.println("Local nodeId=" + ctx.localNodeId());
+
+                    for (int i = 0; i < 10; i++) {
+                        Thread.sleep(5000);
+
+                        String msg = "e" + i + " [" + ctx.localNodeId() + ']';
+
+                        System.out.println("Sent message: " + msg);
+
+                        ctx.discovery().sendCustomEvent(msg);
+                    }
+                }
+                catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }).start();
+
         DeploymentMode depMode = ctx.config().getDeploymentMode();
 
         if (!F.isEmpty(ctx.config().getCacheConfiguration())) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a93ebc81/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
index 168ae52..269e2c9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
@@ -137,4 +137,10 @@ public interface DiscoverySpi extends IgniteSpi {
      *         does not support this method.
      */
     public long getGridStartTime();
+
+    /**
+     * Sends custom message across the ring.
+     * @param evt Event.
+     */
+    public void sendCustomEvent(Object evt);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a93ebc81/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
index 3e0ef02..89e615e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
@@ -40,4 +40,13 @@ public interface DiscoverySpiListener {
      */
     public void onDiscovery(int type, long topVer, ClusterNode node, 
Collection<ClusterNode> topSnapshot,
         @Nullable Map<Long, Collection<ClusterNode>> topHist);
+
+    /**
+     * Notification for node custom events.
+     *
+     * @param msg The custom message.
+     * @param topVer Topology version or {@code 0} if configured discovery SPI 
implementation
+     *      does not support versioning.
+     */
+    public void onCustomMessage(Object msg, long topVer);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a93ebc81/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
index 51df9db..21c93e7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
@@ -373,6 +373,11 @@ public class TcpClientDiscoverySpi extends 
TcpDiscoverySpiAdapter implements Tcp
         // No-op.
     }
 
+    /** {@inheritDoc} */
+    @Override public void sendCustomEvent(Object evt) {
+        throw new UnsupportedOperationException();
+    }
+
     /**
      * @param recon Reconnect flag.
      * @return Whether joined successfully.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a93ebc81/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index aef8259..30e4631 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1246,6 +1246,11 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
         this.nodeAuth = nodeAuth;
     }
 
+    /** {@inheritDoc} */
+    @Override public void sendCustomEvent(Object evt) {
+        msgWorker.addMessage(new TcpDiscoveryCustomMessage(getLocalNodeId(), 
evt));
+    }
+
     /**
      * Tries to join this node to topology.
      *
@@ -2550,6 +2555,9 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
             else if (msg instanceof TcpDiscoveryDiscardMessage)
                 processDiscardMessage((TcpDiscoveryDiscardMessage)msg);
 
+            else if (msg instanceof TcpDiscoveryCustomMessage)
+                processCustomMessage((TcpDiscoveryCustomMessage)msg);
+
             else
                 assert false : "Unknown message type: " + 
msg.getClass().getSimpleName();
 
@@ -4367,6 +4375,29 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
             if (ring.hasRemoteNodes())
                 sendMessageAcrossRing(msg);
         }
+
+        /**
+         * @param msg Message.
+         */
+        private void processCustomMessage(TcpDiscoveryCustomMessage msg) {
+            if (msg.creatorNodeId().equals(getLocalNodeId())) {
+                if (msg.senderNodeId() != null)
+                    return;
+
+                msg.senderNodeId(getLocalNodeId());
+            }
+            else {
+                DiscoverySpiListener lsnr = TcpDiscoverySpi.this.lsnr;
+
+                TcpDiscoverySpiState spiState = spiStateCopy();
+
+                if (lsnr != null && (spiState == CONNECTED || spiState == 
DISCONNECTING))
+                    lsnr.onCustomMessage(msg.message(), msg.topologyVersion());
+            }
+
+            if (ring.hasRemoteNodes())
+                sendMessageAcrossRing(msg);
+        }
     }
 
     /**
@@ -4529,7 +4560,7 @@ public class TcpDiscoverySpi extends 
TcpDiscoverySpiAdapter implements TcpDiscov
                         }
                     }
 
-                    if (!U.bytesEqual(buf, 0, U.IGNITE_HEADER, 0, 4)) {
+                    if (!Arrays.equals(buf, U.IGNITE_HEADER)) {
                         if (log.isDebugEnabled())
                             log.debug("Unknown connection detected (is some 
other software connecting to " +
                                 "this Ignite port?) " +

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a93ebc81/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomMessage.java
new file mode 100644
index 0000000..36887c9
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCustomMessage.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.messages;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Wrapped for custom message.
+ */
+public class TcpDiscoveryCustomMessage extends TcpDiscoveryAbstractMessage {
+    /** */
+    private Object msg;
+
+    /**
+     * Public default no-arg constructor for {@link java.io.Externalizable} 
interface.
+     */
+    public TcpDiscoveryCustomMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param creatorNodeId Creator node id.
+     */
+    public TcpDiscoveryCustomMessage(UUID creatorNodeId, Object msg) {
+        super(creatorNodeId);
+
+        this.msg = msg;
+    }
+
+    /**
+     * @return Message.
+     */
+    public Object message() {
+        return msg;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        super.writeExternal(out);
+
+        out.writeObject(msg);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        super.readExternal(in);
+
+        msg = in.readObject();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a93ebc81/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
index 14154ab..cf25b3d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
@@ -138,6 +138,11 @@ public abstract class AbstractDiscoverySelfTest<T extends 
IgniteSpi> extends Gri
             if (type == EVT_NODE_METRICS_UPDATED)
                 isMetricsUpdate = true;
         }
+
+        /** {@inheritDoc} */
+        @Override public void onCustomMessage(Object msg, long topVer) {
+            // No-op.
+        }
     }
 
     /**
@@ -212,6 +217,10 @@ public abstract class AbstractDiscoverySelfTest<T extends 
IgniteSpi> extends Gri
                         && node.id().equals(spi.getLocalNode().id()))
                         spiCnt.addAndGet(1);
                 }
+
+                @Override public void onCustomMessage(Object msg, long topVer) 
{
+                    // No-op.
+                }
             };
 
             locUpdCnts[i] = spiCnt;
@@ -377,6 +386,10 @@ public abstract class AbstractDiscoverySelfTest<T extends 
IgniteSpi> extends Gri
                             mux.notifyAll();
                         }
                     }
+
+                    @Override public void onCustomMessage(Object msg, long 
topVer) {
+                        // No-op.
+                    }
                 });
 
                 spi.setDataExchange(new DiscoverySpiDataExchange() {

Reply via email to