# IGNITE-831 Allow to listen specified type of TcpDiscoveryCustomEventMessage.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/55904c18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/55904c18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/55904c18

Branch: refs/heads/ignite-831
Commit: 55904c184771841835ba9e9a51a824d1cfcac75d
Parents: 16429f8
Author: sevdokimov <sevdoki...@gridgain.com>
Authored: Wed Apr 29 12:18:43 2015 +0300
Committer: sevdokimov <sevdoki...@gridgain.com>
Committed: Wed Apr 29 12:18:43 2015 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         | 37 ++++++++++++++------
 .../processors/cache/GridCacheProcessor.java    |  9 ++---
 2 files changed, 32 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55904c18/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 8445c66..5018ed7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -168,7 +168,8 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
     private final MetricsUpdater metricsUpdater = new MetricsUpdater();
 
     /** Custom event listener. */
-    private GridPlainInClosure<Serializable> customEvtLsnr;
+    private ConcurrentMap<Class<?>, 
List<CustomEventListener<DiscoveryCustomMessage>>> customEvtLsnrs
+        = new ConcurrentHashMap8<>();
 
     /** Map of dynamic cache filters. */
     private Map<String, CachePredicate> registeredCaches = new HashMap<>();
@@ -379,12 +380,21 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                 }
 
                 if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
-                    try {
-                        if (customEvtLsnr != null)
-                            customEvtLsnr.apply(data);
-                    }
-                    catch (Exception e) {
-                        U.error(log, "Failed to notify direct custom event 
listener: " + data, e);
+                    if (data != null) {
+                        for (Class cls = data.getClass(); cls != null; cls = 
cls.getSuperclass()) {
+                            List<CustomEventListener<DiscoveryCustomMessage>> 
list = customEvtLsnrs.get(cls);
+
+                            if (list != null) {
+                                for 
(CustomEventListener<DiscoveryCustomMessage> lsnr : list) {
+                                    try {
+                                        lsnr.onCustomEvent(node, 
(DiscoveryCustomMessage)data);
+                                    }
+                                    catch (Exception e) {
+                                        U.error(log, "Failed to notify direct 
custom event listener: " + data, e);
+                                    }
+                                }
+                            }
+                        }
                     }
                 }
 
@@ -492,10 +502,17 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
-     * @param customEvtLsnr Custom event listener.
+     * @param lsnr Custom event listener.
      */
-    public void setCustomEventListener(GridPlainInClosure<Serializable> 
customEvtLsnr) {
-        this.customEvtLsnr = customEvtLsnr;
+    public <T extends DiscoveryCustomMessage> void 
setCustomEventListener(Class<T> msgCls, CustomEventListener<T> lsnr) {
+        List<CustomEventListener<DiscoveryCustomMessage>> list = 
customEvtLsnrs.get(msgCls);
+
+        if (list == null) {
+            list = F.addIfAbsent(customEvtLsnrs, msgCls,
+                new 
CopyOnWriteArrayList<CustomEventListener<DiscoveryCustomMessage>>());
+        }
+
+        list.add((CustomEventListener<DiscoveryCustomMessage>)lsnr);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55904c18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 83f1fed..ecfa05f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -27,6 +27,7 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.discovery.*;
 import org.apache.ignite.internal.processors.*;
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.datastructures.*;
@@ -538,10 +539,10 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
         maxRebalanceOrder = 
validatePreloadOrder(ctx.config().getCacheConfiguration());
 
-        ctx.discovery().setCustomEventListener(new 
GridPlainInClosure<Serializable>() {
-            @Override public void apply(Serializable evt) {
-                if (evt instanceof DynamicCacheChangeBatch)
-                    onCacheChangeRequested((DynamicCacheChangeBatch)evt);
+        ctx.discovery().setCustomEventListener(DynamicCacheChangeBatch.class,
+            new CustomEventListener<DynamicCacheChangeBatch>() {
+            @Override public void onCustomEvent(ClusterNode snd, 
DynamicCacheChangeBatch msg) {
+                onCacheChangeRequested(msg);
             }
         });
 

Reply via email to