IGNITE-1169 Implemented send with ack methods on TcpCommunication and GridIoManager. Added tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1c10ade5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1c10ade5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1c10ade5 Branch: refs/heads/ignite-946 Commit: 1c10ade5a50c505ef5ed574ae7001ef7e779cf2e Parents: aec9764 Author: nikolay_tikhonov <ntikho...@gridgain.com> Authored: Fri Jul 31 16:34:24 2015 +0300 Committer: nikolay_tikhonov <ntikho...@gridgain.com> Committed: Fri Jul 31 16:34:53 2015 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoManager.java | 108 ++++- .../util/nio/GridCommunicationClient.java | 5 +- .../util/nio/GridNioFinishedFuture.java | 12 + .../ignite/internal/util/nio/GridNioFuture.java | 14 + .../internal/util/nio/GridNioFutureImpl.java | 15 + .../util/nio/GridNioRecoveryDescriptor.java | 13 +- .../ignite/internal/util/nio/GridNioServer.java | 5 + .../util/nio/GridNioSessionMetaKey.java | 5 +- .../util/nio/GridShmemCommunicationClient.java | 7 +- .../util/nio/GridTcpNioCommunicationClient.java | 14 +- .../communication/tcp/TcpCommunicationSpi.java | 43 +- ...CommunicationRecoveryAckClosureSelfTest.java | 464 +++++++++++++++++++ .../IgniteSpiCommunicationSelfTestSuite.java | 1 + 13 files changed, 685 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index c1fb79a..7e17efc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -36,6 +36,7 @@ import org.apache.ignite.marshaller.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.communication.*; +import org.apache.ignite.spi.communication.tcp.*; import org.apache.ignite.thread.*; import org.jetbrains.annotations.*; import org.jsr166.*; @@ -971,6 +972,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param ordered Ordered flag. * @param timeout Timeout. * @param skipOnTimeout Whether message can be skipped on timeout. + * @param ackClosure Ack closure. * @throws IgniteCheckedException Thrown in case of any errors. */ private void send( @@ -981,7 +983,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa byte plc, boolean ordered, long timeout, - boolean skipOnTimeout + boolean skipOnTimeout, + IgniteInClosure<IgniteException> ackClosure ) throws IgniteCheckedException { assert node != null; assert topic != null; @@ -1001,13 +1004,19 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa processOrderedMessage(locNodeId, ioMsg, plc, null); else processRegularMessage0(ioMsg, locNodeId); + + if (ackClosure != null) + ackClosure.apply(null); } else { if (topicOrd < 0) ioMsg.topicBytes(marsh.marshal(topic)); try { - getSpi().sendMessage(node, ioMsg); + if ((CommunicationSpi)getSpi() instanceof TcpCommunicationSpi) + ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessage(node, ioMsg, ackClosure); + else + getSpi().sendMessage(node, ioMsg); } catch (IgniteSpiException e) { throw new IgniteCheckedException("Failed to send message (node may have left the grid or " + @@ -1050,7 +1059,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (node == null) throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); - send(node, topic, topic.ordinal(), msg, plc, false, 0, false); + send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null); } /** @@ -1062,7 +1071,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa */ public void send(ClusterNode node, Object topic, Message msg, byte plc) throws IgniteCheckedException { - send(node, topic, -1, msg, plc, false, 0, false); + send(node, topic, -1, msg, plc, false, 0, false, null); } /** @@ -1074,7 +1083,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa */ public void send(ClusterNode node, GridTopic topic, Message msg, byte plc) throws IgniteCheckedException { - send(node, topic, topic.ordinal(), msg, plc, false, 0, false); + send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null); } /** @@ -1096,7 +1105,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa ) throws IgniteCheckedException { assert timeout > 0 || skipOnTimeout; - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout); + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null); } /** @@ -1123,11 +1132,24 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (node == null) throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout); + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null); } /** - * @param nodes Destination nodes. + * @param node Destination nodes. + * @param topic Topic to send the message to. + * @param msg Message to send. + * @param plc Type of processing. + * @param ackClosure Ack closure. + * @throws IgniteCheckedException Thrown in case of any errors. + */ + public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, + IgniteInClosure<IgniteException> ackClosure) throws IgniteCheckedException { + send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackClosure); + } + + /** + * @param nodes Destination node. * @param topic Topic to send the message to. * @param msg Message to send. * @param plc Type of processing. @@ -1150,7 +1172,20 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } /** - * @param nodes Destination nodes. + * @param node Destination nodes. + * @param topic Topic to send the message to. + * @param msg Message to send. + * @param plc Type of processing. + * @param ackClosure Ack closure. + * @throws IgniteCheckedException Thrown in case of any errors. + */ + public void send(ClusterNode node, Object topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackClosure) + throws IgniteCheckedException { + send(node, topic, -1, msg, plc, false, 0, false, ackClosure); + } + + /** + * @param nodes Destination node. * @param topic Topic to send the message to. * @param msg Message to send. * @param plc Type of processing. @@ -1182,6 +1217,30 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } /** + * @param node Destination node. + * @param topic Topic to send the message to. + * @param msg Message to send. + * @param plc Type of processing. + * @param timeout Timeout to keep a message on receiving queue. + * @param skipOnTimeout Whether message can be skipped on timeout. + * @param ackClosure Ack closure. + * @throws IgniteCheckedException Thrown in case of any errors. + */ + public void sendOrderedMessage( + ClusterNode node, + Object topic, + Message msg, + byte plc, + long timeout, + boolean skipOnTimeout, + IgniteInClosure<IgniteException> ackClosure + ) throws IgniteCheckedException { + assert timeout > 0 || skipOnTimeout; + + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackClosure); + } + + /** * Sends a peer deployable user message. * * @param nodes Destination nodes. @@ -1301,6 +1360,35 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } /** + * @param nodeId Destination node. + * @param topic Topic to send the message to. + * @param msg Message to send. + * @param plc Type of processing. + * @param timeout Timeout to keep a message on receiving queue. + * @param skipOnTimeout Whether message can be skipped on timeout. + * @param ackClosure Ack closure. + * @throws IgniteCheckedException Thrown in case of any errors. + */ + public void sendOrderedMessage( + UUID nodeId, + Object topic, + Message msg, + byte plc, + long timeout, + boolean skipOnTimeout, + IgniteInClosure<IgniteException> ackClosure + ) throws IgniteCheckedException { + assert timeout > 0 || skipOnTimeout; + + ClusterNode node = ctx.discovery().node(nodeId); + + if (node == null) + throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); + + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackClosure); + } + + /** * @param nodes Destination nodes. * @param topic Topic to send the message to. * @param topicOrd Topic ordinal value. @@ -1334,7 +1422,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa // messages to one node vs. many. if (!nodes.isEmpty()) { for (ClusterNode node : nodes) - send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout); + send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, null); } else if (log.isDebugEnabled()) log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java index 693a5a4..1a26ad5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.util.nio; import org.apache.ignite.*; import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.jetbrains.annotations.*; @@ -94,10 +95,12 @@ public interface GridCommunicationClient { /** * @param nodeId Node ID (provided only if versions of local and remote nodes are different). * @param msg Message to send. + * @param closure Ack closure. * @throws IgniteCheckedException If failed. * @return {@code True} if should try to resend message. */ - public boolean sendMessage(@Nullable UUID nodeId, Message msg) throws IgniteCheckedException; + public boolean sendMessage(@Nullable UUID nodeId, Message msg, @Nullable IgniteInClosure<IgniteException> closure) + throws IgniteCheckedException; /** * @return {@code True} if send is asynchronous. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java index 9029dd2..aac238a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java @@ -17,8 +17,10 @@ package org.apache.ignite.internal.util.nio; +import org.apache.ignite.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; /** * Future that represents already completed result. @@ -57,6 +59,16 @@ public class GridNioFinishedFuture<R> extends GridFinishedFuture<R> implements G } /** {@inheritDoc} */ + @Override public void ackClosure(IgniteInClosure<IgniteException> closure) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public IgniteInClosure<IgniteException> ackClosure() { + return null; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridNioFinishedFuture.class, this, super.toString()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java index 7101f45..5a884f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java @@ -17,7 +17,9 @@ package org.apache.ignite.internal.util.nio; +import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; /** * NIO future. @@ -39,4 +41,16 @@ public interface GridNioFuture<R> extends IgniteInternalFuture<R> { * @return {@code True} if skip recovery for this operation. */ public boolean skipRecovery(); + + /** + * Sets ack closure which will be applied when ack recevied. + * + * @param closure Ack closure. + */ + public void ackClosure(IgniteInClosure<IgniteException> closure); + + /** + * @return Ack closure. + */ + public IgniteInClosure<IgniteException> ackClosure(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java index c5393c4..e71bf92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java @@ -17,8 +17,10 @@ package org.apache.ignite.internal.util.nio; +import org.apache.ignite.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; /** * Default future implementation. @@ -30,6 +32,9 @@ public class GridNioFutureImpl<R> extends GridFutureAdapter<R> implements GridNi /** */ protected boolean msgThread; + /** */ + protected IgniteInClosure<IgniteException> ackClosure; + /** {@inheritDoc} */ @Override public void messageThread(boolean msgThread) { this.msgThread = msgThread; @@ -46,6 +51,16 @@ public class GridNioFutureImpl<R> extends GridFutureAdapter<R> implements GridNi } /** {@inheritDoc} */ + @Override public void ackClosure(IgniteInClosure<IgniteException> closure) { + ackClosure = closure; + } + + /** {@inheritDoc} */ + @Override public IgniteInClosure<IgniteException> ackClosure() { + return ackClosure; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridNioFutureImpl.class, this); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java index 733ae81..a7ed02a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java @@ -182,6 +182,9 @@ public class GridNioRecoveryDescriptor { assert fut.isDone() : fut; + if (fut.ackClosure() != null) + fut.ackClosure().apply(null); + acked++; } } @@ -358,8 +361,14 @@ public class GridNioRecoveryDescriptor { * @param futs Futures to complete. */ private void completeOnNodeLeft(GridNioFuture<?>[] futs) { - for (GridNioFuture<?> msg : futs) - ((GridNioFutureImpl)msg).onDone(new IOException("Failed to send message, node has left: " + node.id())); + for (GridNioFuture<?> msg : futs) { + IOException e = new IOException("Failed to send message, node has left: " + node.id()); + + ((GridNioFutureImpl)msg).onDone(e); + + if (msg.ackClosure() != null) + msg.ackClosure().apply(new IgniteException(e)); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index ed55101..c180837 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -394,6 +394,11 @@ public class GridNioServer<T> { int msgCnt = sys ? ses.offerSystemFuture(fut) : ses.offerFuture(fut); + IgniteInClosure<IgniteException> ackClosure; + + if (!sys && (ackClosure = ses.removeMeta(ACK_CLOSURE.ordinal())) != null) + fut.ackClosure(ackClosure); + if (ses.closed()) { if (ses.removeFuture(fut)) fut.connectionClosed(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java index 004c327..23c1e22 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java @@ -45,7 +45,10 @@ public enum GridNioSessionMetaKey { MSG_WRITER, /** SSL engine. */ - SSL_ENGINE; + SSL_ENGINE, + + /** Ack closure. */ + ACK_CLOSURE; /** Maximum count of NIO session keys in system. */ public static final int MAX_KEYS_CNT = 64; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java index e05c37a..67d4664 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.internal.util.ipc.shmem.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.jetbrains.annotations.*; @@ -113,7 +114,8 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien } /** {@inheritDoc} */ - @Override public synchronized boolean sendMessage(@Nullable UUID nodeId, Message msg) + @Override public synchronized boolean sendMessage(@Nullable UUID nodeId, Message msg, + IgniteInClosure<IgniteException> closure) throws IgniteCheckedException { if (closed()) throw new IgniteCheckedException("Communication client was closed: " + this); @@ -131,6 +133,9 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien markUsed(); + if (closure != null) + closure.apply(null); + return false; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java index abad875..7933001 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.util.nio; import org.apache.ignite.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.jetbrains.annotations.*; @@ -27,6 +28,8 @@ import java.io.*; import java.nio.*; import java.util.*; +import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.*; + /** * Grid client for NIO server. */ @@ -97,11 +100,14 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie } /** {@inheritDoc} */ - @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg) + @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg, IgniteInClosure<IgniteException> closure) throws IgniteCheckedException { // Node ID is never provided in asynchronous send mode. assert nodeId == null; + if (closure != null) + ses.addMeta(ACK_CLOSURE.ordinal(), closure); + GridNioFuture<?> fut = ses.send(msg); if (fut.isDone()) { @@ -109,6 +115,9 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie fut.get(); } catch (IgniteCheckedException e) { + if (closure != null) + ses.removeMeta(ACK_CLOSURE.ordinal()); + if (log.isDebugEnabled()) log.debug("Failed to send message [client=" + this + ", err=" + e + ']'); @@ -119,6 +128,9 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie } } + if (closure != null) + ses.removeMeta(ACK_CLOSURE.ordinal()); + return false; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c10ade5/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 1c74d59..b706edf 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -1620,6 +1620,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** * Creates new shared memory communication server. + * * @return Server. * @throws IgniteCheckedException If failed. */ @@ -1785,11 +1786,41 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** {@inheritDoc} */ @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + sendMessage0(node, msg, null); + } + + /** + * Sends given message to destination node. Note that characteristics of the + * exchange such as durability, guaranteed delivery or error notification is + * dependant on SPI implementation. + * + * @param node Destination node. + * @param msg Message to send. + * @param ackClosure Ack closure. + * @throws org.apache.ignite.spi.IgniteSpiException Thrown in case of any error during sending the message. + * Note that this is not guaranteed that failed communication will result + * in thrown exception as this is dependant on SPI implementation. + */ + public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) + throws IgniteSpiException { + sendMessage0(node, msg, ackClosure); + } + + /** + * @param node Destination node. + * @param msg Message to send. + * @param ackClosure Ack closure. + * @throws org.apache.ignite.spi.IgniteSpiException Thrown in case of any error during sending the message. + * Note that this is not guaranteed that failed communication will result + * in thrown exception as this is dependant on SPI implementation. + */ + private void sendMessage0(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) + throws IgniteSpiException { assert node != null; assert msg != null; if (log.isTraceEnabled()) - log.trace("Sending message to node [node=" + node + ", msg=" + msg + ']'); + log.trace("Sending message with ack to node [node=" + node + ", msg=" + msg + ']'); ClusterNode localNode = getLocalNode(); @@ -1813,7 +1844,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (!client.async() && !localNode.version().equals(node.version())) nodeId = node.id(); - retry = client.sendMessage(nodeId, msg); + retry = client.sendMessage(nodeId, msg, ackClosure); client.release(); @@ -1876,7 +1907,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridCommunicationClient old = clients.put(nodeId, client0); assert old == null : "Client already created " + - "[node=" + node + ", client=" + client0 + ", oldClient=" + old + ']'; + "[node=" + node + ", client=" + client0 + ", oldClient=" + old + ']'; if (client0 instanceof GridTcpNioCommunicationClient) { GridTcpNioCommunicationClient tcpClient = ((GridTcpNioCommunicationClient)client0); @@ -1979,7 +2010,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @return Client. * @throws IgniteCheckedException If failed. */ - @Nullable protected GridCommunicationClient createShmemClient(ClusterNode node, Integer port) throws IgniteCheckedException { + @Nullable protected GridCommunicationClient createShmemClient(ClusterNode node, + Integer port) throws IgniteCheckedException { int attempt = 1; int connectAttempts = 1; @@ -2204,6 +2236,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter meta.put(GridNioSessionMetaKey.SSL_ENGINE.ordinal(), sslEngine); } + if (recoveryDesc != null) { recoveryDesc.onHandshake(rcvCnt); @@ -2433,7 +2466,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter else if (log.isDebugEnabled()) log.debug("Received remote node ID: " + rmtNodeId0); - if (isSslEnabled() ) { + if (isSslEnabled()) { assert sslHnd != null; ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER))); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c10ade5/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java new file mode 100644 index 0000000..e353f2d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.nio.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.plugin.extensions.communication.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.communication.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.*; +import org.apache.ignite.testframework.junits.spi.*; + +import org.eclipse.jetty.util.*; + +import java.net.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * + */ +public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends CommunicationSpi> + extends GridSpiAbstractTest<T> { + /** */ + private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>(); + + /** */ + protected static final List<TcpCommunicationSpi> spis = new ArrayList<>(); + + /** */ + protected static final List<ClusterNode> nodes = new ArrayList<>(); + + /** */ + private static final int SPI_CNT = 2; + + /** + * + */ + static { + GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() { + @Override public Message apply() { + return new GridTestMessage(); + } + }); + } + + /** + * Disable SPI auto-start. + */ + public IgniteTcpCommunicationRecoveryAckClosureSelfTest() { + super(false); + } + + /** */ + @SuppressWarnings({"deprecation"}) + private class TestListener implements CommunicationListener<Message> { + /** */ + private ConcurrentHashSet<Long> msgIds = new ConcurrentHashSet<>(); + + /** */ + private AtomicInteger rcvCnt = new AtomicInteger(); + + /** {@inheritDoc} */ + @Override public void onMessage(UUID nodeId, Message msg, IgniteRunnable msgC) { + info("Test listener received message: " + msg); + + assertTrue("Unexpected message: " + msg, msg instanceof GridTestMessage); + + GridTestMessage msg0 = (GridTestMessage)msg; + + assertTrue("Duplicated message received: " + msg0, msgIds.add(msg0.getMsgId())); + + rcvCnt.incrementAndGet(); + + msgC.run(); + } + + /** {@inheritDoc} */ + @Override public void onDisconnected(UUID nodeId) { + // No-op. + } + } + + /** + * @throws Exception If failed. + */ + public void testAckOnIdle() throws Exception { + checkAck(10, 2000, 9); + } + + /** + * @throws Exception If failed. + */ + public void testAckOnCount() throws Exception { + checkAck(10, 60_000, 10); + } + + /** + * @param ackCnt Recovery acknowledgement count. + * @param idleTimeout Idle connection timeout. + * @param msgPerIter Messages per iteration. + * @throws Exception If failed. + */ + private void checkAck(int ackCnt, int idleTimeout, int msgPerIter) throws Exception { + createSpis(ackCnt, idleTimeout, TcpCommunicationSpi.DFLT_MSG_QUEUE_LIMIT); + + try { + TcpCommunicationSpi spi0 = spis.get(0); + TcpCommunicationSpi spi1 = spis.get(1); + + ClusterNode node0 = nodes.get(0); + ClusterNode node1 = nodes.get(1); + + int msgId = 0; + + int expMsgs = 0; + + for (int i = 0; i < 5; i++) { + info("Iteration: " + i); + + final AtomicInteger ackMsgs = new AtomicInteger(0); + + IgniteInClosure<IgniteException> ackClosure = new CI1<IgniteException>() { + @Override public void apply(IgniteException o) { + assert o == null; + + ackMsgs.incrementAndGet(); + } + }; + + for (int j = 0; j < msgPerIter; j++) { + spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); + + spi1.sendMessage(node0, new GridTestMessage(node1.id(), ++msgId, 0), ackClosure); + } + + expMsgs += msgPerIter; + + for (TcpCommunicationSpi spi : spis) { + GridNioServer srv = U.field(spi, "nioSrvr"); + + Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions"); + + assertFalse(sessions.isEmpty()); + + boolean found = false; + + for (GridNioSession ses : sessions) { + final GridNioRecoveryDescriptor recoveryDesc = ses.recoveryDescriptor(); + + if (recoveryDesc != null) { + found = true; + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return recoveryDesc.messagesFutures().isEmpty(); + } + }, 10_000); + + assertEquals("Unexpected messages: " + recoveryDesc.messagesFutures(), 0, + recoveryDesc.messagesFutures().size()); + + break; + } + } + + assertTrue(found); + } + + final int expMsgs0 = expMsgs; + + for (TcpCommunicationSpi spi : spis) { + final TestListener lsnr = (TestListener)spi.getListener(); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override + public boolean apply() { + return lsnr.rcvCnt.get() >= expMsgs0; + } + }, 5000); + + assertEquals(expMsgs, lsnr.rcvCnt.get()); + } + + assertEquals(msgPerIter * 2, ackMsgs.get()); + } + } + finally { + stopSpis(); + } + } + + /** + * @throws Exception If failed. + */ + public void testQueueOverflow() throws Exception { + for (int i = 0; i < 3; i++) { + try { + startSpis(5, 60_000, 10); + + checkOverflow(); + + break; + } + catch (IgniteCheckedException e) { + if (e.hasCause(BindException.class)) { + if (i < 2) { + info("Got exception caused by BindException, will retry after delay: " + e); + + stopSpis(); + + U.sleep(10_000); + } + else + throw e; + } + else + throw e; + } + finally { + stopSpis(); + } + } + } + + /** + * @throws Exception If failed. + */ + private void checkOverflow() throws Exception { + TcpCommunicationSpi spi0 = spis.get(0); + TcpCommunicationSpi spi1 = spis.get(1); + + ClusterNode node0 = nodes.get(0); + ClusterNode node1 = nodes.get(1); + + final GridNioServer srv1 = U.field(spi1, "nioSrvr"); + + final AtomicInteger ackMsgs = new AtomicInteger(0); + + IgniteInClosure<IgniteException> ackClosure = new CI1<IgniteException>() { + @Override public void apply(IgniteException o) { + assert o == null; + + ackMsgs.incrementAndGet(); + } + }; + + int msgId = 0; + + // Send message to establish connection. + spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); + + // Prevent node1 from send + GridTestUtils.setFieldValue(srv1, "skipWrite", true); + + final GridNioSession ses0 = communicationSession(spi0); + + for (int i = 0; i < 150; i++) + spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); + + // Wait when session is closed because of queue overflow. + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return ses0.closeTime() != 0; + } + }, 5000); + + assertTrue("Failed to wait for session close", ses0.closeTime() != 0); + + GridTestUtils.setFieldValue(srv1, "skipWrite", false); + + for (int i = 0; i < 100; i++) + spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); + + final int expMsgs = 251; + + final TestListener lsnr = (TestListener)spi1.getListener(); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return lsnr.rcvCnt.get() >= expMsgs; + } + }, 5000); + + assertEquals(expMsgs, lsnr.rcvCnt.get()); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return expMsgs == ackMsgs.get(); + } + }, 5000); + } + + /** + * @param spi SPI. + * @return Session. + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + private GridNioSession communicationSession(TcpCommunicationSpi spi) throws Exception { + final GridNioServer srv = U.field(spi, "nioSrvr"); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override + public boolean apply() { + Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions"); + + return !sessions.isEmpty(); + } + }, 5000); + + Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions"); + + assertEquals(1, sessions.size()); + + return sessions.iterator().next(); + } + + /** + * @param ackCnt Recovery acknowledgement count. + * @param idleTimeout Idle connection timeout. + * @param queueLimit Message queue limit. + * @return SPI instance. + */ + protected TcpCommunicationSpi getSpi(int ackCnt, int idleTimeout, int queueLimit) { + TcpCommunicationSpi spi = new TcpCommunicationSpi(); + + spi.setLocalPort(GridTestUtils.getNextCommPort(getClass())); + spi.setIdleConnectionTimeout(idleTimeout); + spi.setTcpNoDelay(true); + spi.setAckSendThreshold(ackCnt); + spi.setMessageQueueLimit(queueLimit); + spi.setSharedMemoryPort(-1); + + return spi; + } + + /** + * @param ackCnt Recovery acknowledgement count. + * @param idleTimeout Idle connection timeout. + * @param queueLimit Message queue limit. + * @throws Exception If failed. + */ + private void startSpis(int ackCnt, int idleTimeout, int queueLimit) throws Exception { + spis.clear(); + nodes.clear(); + spiRsrcs.clear(); + + Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>(); + + for (int i = 0; i < SPI_CNT; i++) { + TcpCommunicationSpi spi = getSpi(ackCnt, idleTimeout, queueLimit); + + GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "gridName", "grid-" + i); + + IgniteTestResources rsrcs = new IgniteTestResources(); + + GridTestNode node = new GridTestNode(rsrcs.getNodeId()); + + GridSpiTestContext ctx = initSpiContext(); + + ctx.setLocalNode(node); + + spiRsrcs.add(rsrcs); + + rsrcs.inject(spi); + + spi.setListener(new TestListener()); + + node.setAttributes(spi.getNodeAttributes()); + + nodes.add(node); + + spi.spiStart(getTestGridName() + (i + 1)); + + spis.add(spi); + + spi.onContextInitialized(ctx); + + ctxs.put(node, ctx); + } + + // For each context set remote nodes. + for (Map.Entry<ClusterNode, GridSpiTestContext> e : ctxs.entrySet()) { + for (ClusterNode n : nodes) { + if (!n.equals(e.getKey())) + e.getValue().remoteNodes().add(n); + } + } + } + + /** + * @param ackCnt Recovery acknowledgement count. + * @param idleTimeout Idle connection timeout. + * @param queueLimit Message queue limit. + * @throws Exception If failed. + */ + private void createSpis(int ackCnt, int idleTimeout, int queueLimit) throws Exception { + for (int i = 0; i < 3; i++) { + try { + startSpis(ackCnt, idleTimeout, queueLimit); + + break; + } + catch (IgniteCheckedException e) { + if (e.hasCause(BindException.class)) { + if (i < 2) { + info("Failed to start SPIs because of BindException, will retry after delay."); + + stopSpis(); + + U.sleep(10_000); + } + else + throw e; + } + else + throw e; + } + } + } + + /** + * @throws Exception If failed. + */ + private void stopSpis() throws Exception { + for (CommunicationSpi<Message> spi : spis) { + spi.onContextDestroyed(); + + spi.setListener(null); + + spi.spiStop(); + } + + for (IgniteTestResources rsrcs : spiRsrcs) + rsrcs.stopThreads(); + + spis.clear(); + nodes.clear(); + spiRsrcs.clear(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c10ade5/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java index 3f71d7d..9b43204 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java @@ -32,6 +32,7 @@ public class IgniteSpiCommunicationSelfTestSuite extends TestSuite { TestSuite suite = new TestSuite("Communication SPI Test Suite"); suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoveryAckSelfTest.class)); + suite.addTest(new TestSuite(IgniteTcpCommunicationRecoveryAckClosureSelfTest.class)); suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoverySelfTest.class)); suite.addTest(new TestSuite(GridTcpCommunicationSpiConcurrentConnectSelfTest.class));