# IGNITE-831 Handle node segmented message.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/660648bb Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/660648bb Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/660648bb Branch: refs/heads/ignite-709_3 Commit: 660648bbb56f6a99ee4883c4fb986b33ec95f9ac Parents: 37c6007 Author: sevdokimov <sevdoki...@gridgain.com> Authored: Tue May 5 20:41:58 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Tue May 5 20:41:58 2015 +0300 ---------------------------------------------------------------------- .../continuous/GridContinuousProcessor.java | 23 ++++++++++++++++++++ .../discovery/tcp/TcpClientDiscoverySpi.java | 3 +++ 2 files changed, 26 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/660648bb/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index d1923d9..0769479 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -146,6 +146,26 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } }, EVT_NODE_LEFT, EVT_NODE_FAILED); + ctx.event().addLocalEventListener(new GridLocalEventListener() { + @Override public void onEvent(Event evt) { + for (Iterator<StartFuture> itr = startFuts.values().iterator(); itr.hasNext(); ) { + StartFuture fut = itr.next(); + + itr.remove(); + + fut.onDone(new IgniteException("Topology segmented")); + } + + for (Iterator<StopFuture> itr = stopFuts.values().iterator(); itr.hasNext(); ) { + StopFuture fut = itr.next(); + + itr.remove(); + + fut.onDone(new IgniteException("Topology segmented")); + } + } + }, EVT_NODE_SEGMENTED); + ctx.discovery().setCustomEventListener(StartRoutineDiscoveryMessage.class, new CustomEventListener<StartRoutineDiscoveryMessage>() { @Override public void onCustomEvent(ClusterNode snd, StartRoutineDiscoveryMessage msg) { @@ -504,6 +524,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter { unregisterHandler(routineId, routine.hnd, true); ctx.discovery().sendCustomEvent(new StopRoutineDiscoveryMessage(routineId)); + + if (ctx.isStopping()) + fut.onDone(); } return fut; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/660648bb/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java index 59f4708..dc89d6a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java @@ -390,6 +390,9 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp /** {@inheritDoc} */ @Override public void sendCustomEvent(DiscoveryCustomMessage evt) { + if (segmentation) + throw new IgniteException("Failed to send custom message: client is disconnected"); + try { sockWriter.sendMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), marsh.marshal(evt))); }