# IGNITE-831 Allow to listen specified type of TcpDiscoveryCustomEventMessage.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/55904c18 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/55904c18 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/55904c18 Branch: refs/heads/ignite-831 Commit: 55904c184771841835ba9e9a51a824d1cfcac75d Parents: 16429f8 Author: sevdokimov <sevdoki...@gridgain.com> Authored: Wed Apr 29 12:18:43 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Wed Apr 29 12:18:43 2015 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 37 ++++++++++++++------ .../processors/cache/GridCacheProcessor.java | 9 ++--- 2 files changed, 32 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55904c18/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 8445c66..5018ed7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -168,7 +168,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { private final MetricsUpdater metricsUpdater = new MetricsUpdater(); /** Custom event listener. */ - private GridPlainInClosure<Serializable> customEvtLsnr; + private ConcurrentMap<Class<?>, List<CustomEventListener<DiscoveryCustomMessage>>> customEvtLsnrs + = new ConcurrentHashMap8<>(); /** Map of dynamic cache filters. */ private Map<String, CachePredicate> registeredCaches = new HashMap<>(); @@ -379,12 +380,21 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } if (type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT) { - try { - if (customEvtLsnr != null) - customEvtLsnr.apply(data); - } - catch (Exception e) { - U.error(log, "Failed to notify direct custom event listener: " + data, e); + if (data != null) { + for (Class cls = data.getClass(); cls != null; cls = cls.getSuperclass()) { + List<CustomEventListener<DiscoveryCustomMessage>> list = customEvtLsnrs.get(cls); + + if (list != null) { + for (CustomEventListener<DiscoveryCustomMessage> lsnr : list) { + try { + lsnr.onCustomEvent(node, (DiscoveryCustomMessage)data); + } + catch (Exception e) { + U.error(log, "Failed to notify direct custom event listener: " + data, e); + } + } + } + } } } @@ -492,10 +502,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** - * @param customEvtLsnr Custom event listener. + * @param lsnr Custom event listener. */ - public void setCustomEventListener(GridPlainInClosure<Serializable> customEvtLsnr) { - this.customEvtLsnr = customEvtLsnr; + public <T extends DiscoveryCustomMessage> void setCustomEventListener(Class<T> msgCls, CustomEventListener<T> lsnr) { + List<CustomEventListener<DiscoveryCustomMessage>> list = customEvtLsnrs.get(msgCls); + + if (list == null) { + list = F.addIfAbsent(customEvtLsnrs, msgCls, + new CopyOnWriteArrayList<CustomEventListener<DiscoveryCustomMessage>>()); + } + + list.add((CustomEventListener<DiscoveryCustomMessage>)lsnr); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55904c18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 83f1fed..ecfa05f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -27,6 +27,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.processors.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.datastructures.*; @@ -538,10 +539,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { maxRebalanceOrder = validatePreloadOrder(ctx.config().getCacheConfiguration()); - ctx.discovery().setCustomEventListener(new GridPlainInClosure<Serializable>() { - @Override public void apply(Serializable evt) { - if (evt instanceof DynamicCacheChangeBatch) - onCacheChangeRequested((DynamicCacheChangeBatch)evt); + ctx.discovery().setCustomEventListener(DynamicCacheChangeBatch.class, + new CustomEventListener<DynamicCacheChangeBatch>() { + @Override public void onCustomEvent(ClusterNode snd, DynamicCacheChangeBatch msg) { + onCacheChangeRequested(msg); } });