IGNITE-709 Bug fix avoid hang GridContinuousProcessor
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5cb0e664 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5cb0e664 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5cb0e664 Branch: refs/heads/ignite-23 Commit: 5cb0e6643ba82ec021e0bd8f1aaf0a65d28b8391 Parents: d411238 Author: sevdokimov <sevdoki...@gridgain.com> Authored: Mon May 25 12:39:19 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Mon May 25 12:39:19 2015 +0300 ---------------------------------------------------------------------- .../internal/GridEventConsumeHandler.java | 81 +++++++++++++++----- 1 file changed, 61 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5cb0e664/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index 505204d..0bba809 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -20,9 +20,9 @@ package org.apache.ignite.internal; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; +import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.interop.*; import org.apache.ignite.internal.managers.deployment.*; -import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.continuous.*; @@ -131,40 +131,81 @@ class GridEventConsumeHandler implements GridContinuousHandler { final boolean loc = nodeId.equals(ctx.localNodeId()); lsnr = new GridLocalEventListener() { - @Override public void onEvent(Event evt) { + /** node ID, routine ID, event */ + private final Queue<T3<UUID, UUID, Event>> notificationQueue = new LinkedList<>(); + + private boolean notificationInProgress; + + @Override public void onEvent(final Event evt) { if (filter == null || filter.apply(evt)) { if (loc) { if (!cb.apply(nodeId, evt)) ctx.continuous().stopRoutine(routineId); } else { - GridDiscoveryManager disco = ctx.discovery(); + if (ctx.discovery().node(nodeId) == null) + return; + + synchronized (notificationQueue) { + notificationQueue.add(new T3<>(nodeId, routineId, evt)); + + if (!notificationInProgress) { + ctx.getSystemExecutorService().submit(new Runnable() { + @Override public void run() { + while (true) { + T3<UUID, UUID, Event> t3; + + synchronized (notificationQueue) { + t3 = notificationQueue.poll(); + + if (t3 == null) { + notificationInProgress = false; - ClusterNode node = disco.node(nodeId); + return; + } + } - if (node != null) { - try { - EventWrapper wrapper = new EventWrapper(evt); + try { + Event evt = t3.get3(); - if (evt instanceof CacheEvent) { - String cacheName = ((CacheEvent)evt).cacheName(); + EventWrapper wrapper = new EventWrapper(evt); - if (ctx.config().isPeerClassLoadingEnabled() && disco.cacheNode(node, cacheName)) { - wrapper.p2pMarshal(ctx.config().getMarshaller()); + if (evt instanceof CacheEvent) { + String cacheName = ((CacheEvent)evt).cacheName(); - wrapper.cacheName = cacheName; + ClusterNode node = ctx.discovery().node(t3.get1()); - GridCacheDeploymentManager depMgr = - ctx.cache().internalCache(cacheName).context().deploy(); + if (node == null) + continue; - depMgr.prepare(wrapper); + if (ctx.config().isPeerClassLoadingEnabled() + && ctx.discovery().cacheNode(node, cacheName)) { + wrapper.p2pMarshal(ctx.config().getMarshaller()); + + wrapper.cacheName = cacheName; + + GridCacheDeploymentManager depMgr = + ctx.cache().internalCache(cacheName).context().deploy(); + + depMgr.prepare(wrapper); + } + } + + ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null, + false, false); + } + catch (ClusterTopologyCheckedException ignored) { + // No-op. + } + catch (Throwable e) { + U.error(ctx.log(GridEventConsumeHandler.class), + "Failed to send event notification to node: " + nodeId, e); + } + } } - } + }); - ctx.continuous().addNotification(nodeId, routineId, wrapper, null, false, false); - } - catch (IgniteCheckedException e) { - U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, e); + notificationInProgress = true; } } }