# IGNITE-831 Allow to listen specified type of TcpDiscoveryCustomEventMessage.
(cherry picked from commit 55904c1)
(cherry picked from commit 5f7565a)


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/94fb3c56
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/94fb3c56
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/94fb3c56

Branch: refs/heads/ignite-836_2
Commit: 94fb3c5658973a642281e48ffdfa78731373c604
Parents: 1a4de26
Author: sevdokimov <sergey.evdoki...@jetbrains.com>
Authored: Wed Apr 29 12:18:43 2015 +0300
Committer: sevdokimov <sergey.evdoki...@jetbrains.com>
Committed: Mon May 4 12:08:02 2015 +0300

----------------------------------------------------------------------
 .../managers/discovery/CustomEventListener.java | 32 +++++++++++++++++
 .../discovery/GridDiscoveryManager.java         | 37 ++++++++++++++------
 .../processors/cache/GridCacheProcessor.java    |  9 ++---
 3 files changed, 64 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/94fb3c56/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java
new file mode 100644
index 0000000..5c11968
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.discovery;
+
+import org.apache.ignite.cluster.*;
+
+/**
+ *
+ * @param <T>
+ */
+public interface CustomEventListener<T extends DiscoveryCustomMessage> {
+    /**
+     * @param snd Send.
+     * @param msg Message.
+     */
+    public void onCustomEvent(ClusterNode snd, T msg);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/94fb3c56/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 0df7d5f..3d35bee 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -168,7 +168,8 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
     private final MetricsUpdater metricsUpdater = new MetricsUpdater();
 
     /** Custom event listener. */
-    private GridPlainInClosure<Serializable> customEvtLsnr;
+    private ConcurrentMap<Class<?>, 
List<CustomEventListener<DiscoveryCustomMessage>>> customEvtLsnrs
+        = new ConcurrentHashMap8<>();
 
     /** Map of dynamic cache filters. */
     private Map<String, CachePredicate> registeredCaches = new HashMap<>();
@@ -379,12 +380,21 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                 }
 
                 if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) {
-                    try {
-                        if (customEvtLsnr != null)
-                            customEvtLsnr.apply(data);
-                    }
-                    catch (Exception e) {
-                        U.error(log, "Failed to notify direct custom event 
listener: " + data, e);
+                    if (data != null) {
+                        for (Class cls = data.getClass(); cls != null; cls = 
cls.getSuperclass()) {
+                            List<CustomEventListener<DiscoveryCustomMessage>> 
list = customEvtLsnrs.get(cls);
+
+                            if (list != null) {
+                                for 
(CustomEventListener<DiscoveryCustomMessage> lsnr : list) {
+                                    try {
+                                        lsnr.onCustomEvent(node, 
(DiscoveryCustomMessage)data);
+                                    }
+                                    catch (Exception e) {
+                                        U.error(log, "Failed to notify direct 
custom event listener: " + data, e);
+                                    }
+                                }
+                            }
+                        }
                     }
                 }
 
@@ -492,10 +502,17 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
-     * @param customEvtLsnr Custom event listener.
+     * @param lsnr Custom event listener.
      */
-    public void setCustomEventListener(GridPlainInClosure<Serializable> 
customEvtLsnr) {
-        this.customEvtLsnr = customEvtLsnr;
+    public <T extends DiscoveryCustomMessage> void 
setCustomEventListener(Class<T> msgCls, CustomEventListener<T> lsnr) {
+        List<CustomEventListener<DiscoveryCustomMessage>> list = 
customEvtLsnrs.get(msgCls);
+
+        if (list == null) {
+            list = F.addIfAbsent(customEvtLsnrs, msgCls,
+                new 
CopyOnWriteArrayList<CustomEventListener<DiscoveryCustomMessage>>());
+        }
+
+        list.add((CustomEventListener<DiscoveryCustomMessage>)lsnr);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/94fb3c56/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 77fa104..c8870a2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -27,6 +27,7 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.discovery.*;
 import org.apache.ignite.internal.processors.*;
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.datastructures.*;
@@ -541,10 +542,10 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
 
         maxRebalanceOrder = 
validatePreloadOrder(ctx.config().getCacheConfiguration());
 
-        ctx.discovery().setCustomEventListener(new 
GridPlainInClosure<Serializable>() {
-            @Override public void apply(Serializable evt) {
-                if (evt instanceof DynamicCacheChangeBatch)
-                    onCacheChangeRequested((DynamicCacheChangeBatch)evt);
+        ctx.discovery().setCustomEventListener(DynamicCacheChangeBatch.class,
+            new CustomEventListener<DynamicCacheChangeBatch>() {
+            @Override public void onCustomEvent(ClusterNode snd, 
DynamicCacheChangeBatch msg) {
+                onCacheChangeRequested(msg);
             }
         });
 

Reply via email to