# IGNITE-831 Allow to listen specified type of TcpDiscoveryCustomEventMessage. (cherry picked from commit 55904c1) (cherry picked from commit 5f7565a)
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/94fb3c56 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/94fb3c56 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/94fb3c56 Branch: refs/heads/ignite-836_2 Commit: 94fb3c5658973a642281e48ffdfa78731373c604 Parents: 1a4de26 Author: sevdokimov <sergey.evdoki...@jetbrains.com> Authored: Wed Apr 29 12:18:43 2015 +0300 Committer: sevdokimov <sergey.evdoki...@jetbrains.com> Committed: Mon May 4 12:08:02 2015 +0300 ---------------------------------------------------------------------- .../managers/discovery/CustomEventListener.java | 32 +++++++++++++++++ .../discovery/GridDiscoveryManager.java | 37 ++++++++++++++------ .../processors/cache/GridCacheProcessor.java | 9 ++--- 3 files changed, 64 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/94fb3c56/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java new file mode 100644 index 0000000..5c11968 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomEventListener.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.discovery; + +import org.apache.ignite.cluster.*; + +/** + * + * @param <T> + */ +public interface CustomEventListener<T extends DiscoveryCustomMessage> { + /** + * @param snd Send. + * @param msg Message. + */ + public void onCustomEvent(ClusterNode snd, T msg); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/94fb3c56/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 0df7d5f..3d35bee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -168,7 +168,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { private final MetricsUpdater metricsUpdater = new MetricsUpdater(); /** Custom event listener. */ - private GridPlainInClosure<Serializable> customEvtLsnr; + private ConcurrentMap<Class<?>, List<CustomEventListener<DiscoveryCustomMessage>>> customEvtLsnrs + = new ConcurrentHashMap8<>(); /** Map of dynamic cache filters. */ private Map<String, CachePredicate> registeredCaches = new HashMap<>(); @@ -379,12 +380,21 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) { - try { - if (customEvtLsnr != null) - customEvtLsnr.apply(data); - } - catch (Exception e) { - U.error(log, "Failed to notify direct custom event listener: " + data, e); + if (data != null) { + for (Class cls = data.getClass(); cls != null; cls = cls.getSuperclass()) { + List<CustomEventListener<DiscoveryCustomMessage>> list = customEvtLsnrs.get(cls); + + if (list != null) { + for (CustomEventListener<DiscoveryCustomMessage> lsnr : list) { + try { + lsnr.onCustomEvent(node, (DiscoveryCustomMessage)data); + } + catch (Exception e) { + U.error(log, "Failed to notify direct custom event listener: " + data, e); + } + } + } + } } } @@ -492,10 +502,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * @param customEvtLsnr Custom event listener. + * @param lsnr Custom event listener. */ - public void setCustomEventListener(GridPlainInClosure<Serializable> customEvtLsnr) { - this.customEvtLsnr = customEvtLsnr; + public <T extends DiscoveryCustomMessage> void setCustomEventListener(Class<T> msgCls, CustomEventListener<T> lsnr) { + List<CustomEventListener<DiscoveryCustomMessage>> list = customEvtLsnrs.get(msgCls); + + if (list == null) { + list = F.addIfAbsent(customEvtLsnrs, msgCls, + new CopyOnWriteArrayList<CustomEventListener<DiscoveryCustomMessage>>()); + } + + list.add((CustomEventListener<DiscoveryCustomMessage>)lsnr); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/94fb3c56/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 77fa104..c8870a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -27,6 +27,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.processors.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.datastructures.*; @@ -541,10 +542,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { maxRebalanceOrder = validatePreloadOrder(ctx.config().getCacheConfiguration()); - ctx.discovery().setCustomEventListener(new GridPlainInClosure<Serializable>() { - @Override public void apply(Serializable evt) { - if (evt instanceof DynamicCacheChangeBatch) - onCacheChangeRequested((DynamicCacheChangeBatch)evt); + ctx.discovery().setCustomEventListener(DynamicCacheChangeBatch.class, + new CustomEventListener<DynamicCacheChangeBatch>() { + @Override public void onCustomEvent(ClusterNode snd, DynamicCacheChangeBatch msg) { + onCacheChangeRequested(msg); } });