Repository: incubator-ignite Updated Branches: refs/heads/ignite-709_2 8aad0997b -> 3dd778209
# IGNITE-709 Check filter.apply(evt) in system thread, not in the discovery worker thread. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3dd77820 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3dd77820 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3dd77820 Branch: refs/heads/ignite-709_2 Commit: 3dd778209c9341a2ffabcced703e31b9a1ae8705 Parents: 8aad099 Author: sevdokimov <sevdoki...@gridgain.com> Authored: Mon May 25 18:31:33 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Mon May 25 18:32:55 2015 +0300 ---------------------------------------------------------------------- .../internal/GridEventConsumeHandler.java | 109 ++++++++++--------- 1 file changed, 56 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dd77820/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index 0bba809..bb8366a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -136,78 +136,81 @@ class GridEventConsumeHandler implements GridContinuousHandler { private boolean notificationInProgress; - @Override public void onEvent(final Event evt) { - if (filter == null || filter.apply(evt)) { - if (loc) { - if (!cb.apply(nodeId, evt)) - ctx.continuous().stopRoutine(routineId); - } - else { - if (ctx.discovery().node(nodeId) == null) - return; + @Override public void onEvent(Event evt) { + synchronized (notificationQueue) { + notificationQueue.add(new T3<>(nodeId, routineId, evt)); - synchronized (notificationQueue) { - notificationQueue.add(new T3<>(nodeId, routineId, evt)); + if (!notificationInProgress) { + ctx.getSystemExecutorService().submit(new Runnable() { + @Override public void run() { + while (true) { + T3<UUID, UUID, Event> t3; - if (!notificationInProgress) { - ctx.getSystemExecutorService().submit(new Runnable() { - @Override public void run() { - while (true) { - T3<UUID, UUID, Event> t3; + synchronized (notificationQueue) { + t3 = notificationQueue.poll(); - synchronized (notificationQueue) { - t3 = notificationQueue.poll(); + if (t3 == null) { + notificationInProgress = false; - if (t3 == null) { - notificationInProgress = false; + return; + } + } - return; - } - } + try { + Event evt = t3.get3(); - try { - Event evt = t3.get3(); + if (filter != null && !filter.apply(evt)) + continue; - EventWrapper wrapper = new EventWrapper(evt); + if (loc) { + if (!cb.apply(nodeId, evt)) { + ctx.continuous().stopRoutine(routineId); - if (evt instanceof CacheEvent) { - String cacheName = ((CacheEvent)evt).cacheName(); + return; + } - ClusterNode node = ctx.discovery().node(t3.get1()); + continue; + } - if (node == null) - continue; + ClusterNode node = ctx.discovery().node(t3.get1()); - if (ctx.config().isPeerClassLoadingEnabled() - && ctx.discovery().cacheNode(node, cacheName)) { - wrapper.p2pMarshal(ctx.config().getMarshaller()); + if (node == null) + continue; - wrapper.cacheName = cacheName; + EventWrapper wrapper = new EventWrapper(evt); - GridCacheDeploymentManager depMgr = - ctx.cache().internalCache(cacheName).context().deploy(); + if (evt instanceof CacheEvent) { + String cacheName = ((CacheEvent)evt).cacheName(); - depMgr.prepare(wrapper); - } - } + if (ctx.config().isPeerClassLoadingEnabled() + && ctx.discovery().cacheNode(node, cacheName)) { + wrapper.p2pMarshal(ctx.config().getMarshaller()); - ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null, - false, false); - } - catch (ClusterTopologyCheckedException ignored) { - // No-op. - } - catch (Throwable e) { - U.error(ctx.log(GridEventConsumeHandler.class), - "Failed to send event notification to node: " + nodeId, e); + wrapper.cacheName = cacheName; + + GridCacheDeploymentManager depMgr = + ctx.cache().internalCache(cacheName).context().deploy(); + + depMgr.prepare(wrapper); } } - } - }); - notificationInProgress = true; + ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null, + false, false); + } + catch (ClusterTopologyCheckedException + | IgniteInterruptedCheckedException ignored) { + // No-op. + } + catch (Throwable e) { + U.error(ctx.log(GridEventConsumeHandler.class), + "Failed to send event notification to node: " + nodeId, e); + } + } } - } + }); + + notificationInProgress = true; } } }