# ignite-1134 avoid hang on nio session send after nio server stop
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/79712aa2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/79712aa2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/79712aa2 Branch: refs/heads/ignite-788-last-review Commit: 79712aa2ee30ccff6d7e30e7ed13707d6bad40bf Parents: f62744e Author: sboikov <sboi...@gridgain.com> Authored: Tue Jul 21 10:45:39 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Jul 21 10:45:39 2015 +0300 ---------------------------------------------------------------------- .../distributed/dht/GridDhtTxPrepareFuture.java | 7 +- .../ignite/internal/util/nio/GridNioServer.java | 9 ++ .../util/nio/GridSelectorNioSessionImpl.java | 8 ++ .../internal/util/nio/GridNioSelfTest.java | 88 ++++++++++++++++++-- .../internal/util/nio/GridNioSslSelfTest.java | 16 ++-- 5 files changed, 107 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/79712aa2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 6efa4d8..fbc8c84 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -880,7 +880,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter fut.onNodeLeft(e); } catch (IgniteCheckedException e) { - fut.onResult(e); + if (!cctx.kernalContext().isStopping()) + fut.onResult(e); } } @@ -927,7 +928,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter assert req.transactionNodes() != null; - //noinspection TryWithIdenticalCatches try { cctx.io().send(nearMapping.node(), req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL); } @@ -935,7 +935,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter fut.onNodeLeft(e); } catch (IgniteCheckedException e) { - fut.onResult(e); + if (!cctx.kernalContext().isStopping()) + fut.onResult(e); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/79712aa2/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 24e1e08..b36f9f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -311,6 +311,9 @@ public class GridNioServer<T> { U.join(clientWorkers, log); filterChain.stop(); + + for (GridSelectorNioSessionImpl ses : sessions) + ses.onServerStopped(); } } @@ -1496,6 +1499,9 @@ public class GridNioServer<T> { req.onDone(e); } + + if (closed) + ses.onServerStopped(); } catch (ClosedChannelException e) { U.warn(log, "Failed to register accepted socket channel to selector (channel was closed): " @@ -1525,6 +1531,9 @@ public class GridNioServer<T> { sessions.remove(ses); + if (closed) + ses.onServerStopped(); + SelectionKey key = ses.key(); // Shutdown input and output so that remote client will see correct socket close. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/79712aa2/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java index cf240ca..458786b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java @@ -290,6 +290,14 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { return super.addMeta(key, val); } + /** + * + */ + void onServerStopped() { + if (sem != null) + sem.release(1_000_000); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridSelectorNioSessionImpl.class, this, super.toString()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/79712aa2/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java index c81ed56..fa8d4a0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java @@ -393,6 +393,62 @@ public class GridNioSelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testSendAfterServerStop() throws Exception { + final AtomicReference<GridNioSession> sesRef = new AtomicReference<>(); + + final CountDownLatch connectLatch = new CountDownLatch(1); + + GridNioServerListener lsnr = new GridNioServerListenerAdapter() { + @Override public void onConnected(GridNioSession ses) { + info("On connected: " + ses); + + sesRef.set(ses); + + connectLatch.countDown(); + } + + @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { + } + + @Override public void onMessage(GridNioSession ses, Object msg) { + log.info("Message: " + msg); + } + }; + + GridNioServer.Builder<?> builder = serverBuilder(PORT, new GridPlainParser(), lsnr); + + GridNioServer<?> srvr = builder.sendQueueLimit(5).build(); + + srvr.start(); + + try { + Socket s = createSocket(); + + s.connect(new InetSocketAddress(U.getLocalHost(), PORT), 1000); + + s.getOutputStream().write(new byte[1]); + + U.await(connectLatch); + + GridNioSession ses = sesRef.get(); + + assertNotNull(ses); + + ses.send(new byte[1]); + + srvr.stop(); + + for (int i = 0; i < 10; i++) + ses.send(new byte[1]); + } + finally { + srvr.stop(); + } + } + + /** * Sends message and validates reply. * * @param msg Message to send. @@ -480,10 +536,29 @@ public class GridNioSelfTest extends GridCommonAbstractTest { * @return Started server. * @throws Exception If failed. */ - @SuppressWarnings("unchecked") - protected GridNioServer<?> startServer(int port, GridNioParser parser, GridNioServerListener lsnr) + protected final GridNioServer<?> startServer(int port, GridNioParser parser, GridNioServerListener lsnr) throws Exception { - GridNioServer<?> srvr = GridNioServer.builder() + GridNioServer<?> srvr = serverBuilder(port, parser, lsnr).build(); + + srvr.start(); + + return srvr; + } + + /** + * @param port Port to listen. + * @param parser Parser to use. + * @param lsnr Listener. + * @return Server builder. + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + protected GridNioServer.Builder<?> serverBuilder(int port, + GridNioParser parser, + GridNioServerListener lsnr) + throws Exception + { + return GridNioServer.builder() .address(U.getLocalHost()) .port(port) .listener(lsnr) @@ -496,12 +571,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest { .socketSendBufferSize(0) .socketReceiveBufferSize(0) .sendQueueLimit(0) - .filters(new GridNioCodecFilter(parser, log, false)) - .build(); - - srvr.start(); - - return srvr; + .filters(new GridNioCodecFilter(parser, log, false)); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/79712aa2/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSslSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSslSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSslSelfTest.java index 930b5d1..73f5ba5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSslSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSslSelfTest.java @@ -52,9 +52,12 @@ public class GridNioSslSelfTest extends GridNioSelfTest { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override protected GridNioServer<?> startServer(int port, GridNioParser parser, GridNioServerListener lsnr) - throws Exception { - GridNioServer<?> srvr = GridNioServer.builder() + @Override protected GridNioServer.Builder<?> serverBuilder(int port, + GridNioParser parser, + GridNioServerListener lsnr) + throws Exception + { + return GridNioServer.builder() .address(U.getLocalHost()) .port(port) .listener(lsnr) @@ -69,12 +72,7 @@ public class GridNioSslSelfTest extends GridNioSelfTest { .sendQueueLimit(0) .filters( new GridNioCodecFilter(parser, log, false), - new GridNioSslFilter(sslCtx, log)) - .build(); - - srvr.start(); - - return srvr; + new GridNioSslFilter(sslCtx, log)); } /** {@inheritDoc} */