IGNITE-709 Bug fix avoid hang GridContinuousProcessor
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a154d2c9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a154d2c9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a154d2c9 Branch: refs/heads/ignite-929 Commit: a154d2c9582c378b04121f535c3fa59b0499dbaa Parents: 9480e8d Author: sevdokimov <sergey.evdoki...@jetbrains.com> Authored: Mon May 25 09:42:59 2015 +0300 Committer: sevdokimov <sergey.evdoki...@jetbrains.com> Committed: Mon May 25 09:42:59 2015 +0300 ---------------------------------------------------------------------- .../continuous/GridContinuousProcessor.java | 44 +++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a154d2c9/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index d67a45a..d5c2488 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -92,6 +92,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** Number of retries using to send messages. */ private int retryCnt = 3; + private ExecutorService sendNotificationThreadPool; + /** * @param ctx Kernal context. */ @@ -109,6 +111,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter { marsh = ctx.config().getMarshaller(); + sendNotificationThreadPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(), new IgniteThreadFactory(ctx.gridName(), "notification-sender")); + ctx.event().addLocalEventListener(new GridLocalEventListener() { @SuppressWarnings({"fallthrough", "TooBroadScope"}) @Override public void onEvent(Event evt) { @@ -268,6 +273,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { if (log.isDebugEnabled()) log.debug("Continuous processor stopped."); + + sendNotificationThreadPool.shutdownNow(); } /** {@inheritDoc} */ @@ -578,7 +585,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { Collection<Object> toSnd = info.add(obj); if (toSnd != null) - sendNotification(nodeId, routineId, null, toSnd, orderedTopic, msg); + sendNotificationAsync(nodeId, routineId, null, toSnd, orderedTopic, msg); } } } @@ -609,6 +616,41 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** + * @param nodeId Node ID. + * @param routineId Routine ID. + * @param futId Future ID. + * @param toSnd Notification object to send. + * @param orderedTopic Topic for ordered notifications. + * If {@code null}, non-ordered message will be sent. + * @throws IgniteCheckedException In case of error. + */ + private void sendNotificationAsync(final UUID nodeId, + final UUID routineId, + @Nullable final IgniteUuid futId, + final Collection<Object> toSnd, + @Nullable final Object orderedTopic, + final boolean msg) { + assert nodeId != null; + assert routineId != null; + assert toSnd != null; + assert !toSnd.isEmpty(); + + sendNotificationThreadPool.execute(new Runnable() { + @Override public void run() { + try { + sendWithRetries(nodeId, + new GridContinuousMessage(MSG_EVT_NOTIFICATION, routineId, futId, toSnd, msg), + orderedTopic); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send event notification to node: " + nodeId, e); + } + } + }); + + } + + /** * @param node Sender. * @param req Start request. */