Repository: incubator-ignite Updated Branches: refs/heads/ignite-1169 5c3d82973 -> e7bd078b4
IGNITE-1169 Changed futures to Closure. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e7bd078b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e7bd078b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e7bd078b Branch: refs/heads/ignite-1169 Commit: e7bd078b4133022786271d748664419d1dee0a8f Parents: 5c3d829 Author: nikolay_tikhonov <ntikho...@gridgain.com> Authored: Thu Jul 30 13:03:46 2015 +0300 Committer: nikolay_tikhonov <ntikho...@gridgain.com> Committed: Thu Jul 30 13:03:46 2015 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoManager.java | 109 +++++++++++++++++-- .../util/nio/GridCommunicationClient.java | 14 +-- .../util/nio/GridNioFinishedFuture.java | 11 ++ .../ignite/internal/util/nio/GridNioFuture.java | 13 +++ .../internal/util/nio/GridNioFutureImpl.java | 14 +++ .../util/nio/GridNioRecoveryDescriptor.java | 64 +++-------- .../ignite/internal/util/nio/GridNioServer.java | 6 + .../util/nio/GridNioSessionMetaKey.java | 5 +- .../util/nio/GridShmemCommunicationClient.java | 12 +- .../util/nio/GridTcpNioCommunicationClient.java | 42 ++----- .../communication/tcp/TcpCommunicationSpi.java | 90 +++++---------- ...mmunicationSpiRecoveryAckFutureSelfTest.java | 45 +++++--- 12 files changed, 235 insertions(+), 190 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e7bd078b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 65b6fad..272950e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -36,6 +36,7 @@ import org.apache.ignite.marshaller.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.communication.*; +import org.apache.ignite.spi.communication.tcp.*; import org.apache.ignite.thread.*; import org.jetbrains.annotations.*; import org.jsr166.*; @@ -974,6 +975,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param ordered Ordered flag. * @param timeout Timeout. * @param skipOnTimeout Whether message can be skipped on timeout. + * @param ackClosure Ack closure. * @throws IgniteCheckedException Thrown in case of any errors. */ private void send( @@ -984,7 +986,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa byte plc, boolean ordered, long timeout, - boolean skipOnTimeout + boolean skipOnTimeout, + IgniteInClosure<Exception> ackClosure ) throws IgniteCheckedException { assert node != null; assert topic != null; @@ -1004,13 +1007,19 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa processOrderedMessage(locNodeId, ioMsg, plc, null); else processRegularMessage0(ioMsg, locNodeId); + + if (ackClosure != null) + ackClosure.apply(null); } else { if (topicOrd < 0) ioMsg.topicBytes(marsh.marshal(topic)); try { - getSpi().sendMessage(node, ioMsg); + if ((CommunicationSpi)getSpi() instanceof TcpCommunicationSpi) + ((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessageWithAck(node, ioMsg, ackClosure); + else + getSpi().sendMessage(node, ioMsg); } catch (IgniteSpiException e) { throw new IgniteCheckedException("Failed to send message (node may have left the grid or " + @@ -1030,7 +1039,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * <p> * How to use it: * <ol> - * <li>Replace {@link #send(ClusterNode, Object, int, Message, byte, boolean, long, boolean)} + * <li>Replace {@link #send(ClusterNode, Object, int, Message, byte, boolean, long, boolean, IgniteInClosure)} * with this method.</li> * <li>Start all grids for your test, then set {@link #TURBO_DEBUG_MODE} to {@code true}.</li> * <li>Perform test operations on the topology. No network will be there.</li> @@ -1132,7 +1141,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (node == null) throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); - send(node, topic, topic.ordinal(), msg, plc, false, 0, false); + send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null); } /** @@ -1144,7 +1153,19 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa */ public void send(ClusterNode node, Object topic, Message msg, byte plc) throws IgniteCheckedException { - send(node, topic, -1, msg, plc, false, 0, false); + send(node, topic, -1, msg, plc, false, 0, false, null); + } + + /** + * @param node Destination node. + * @param topic Topic to send the message to. + * @param msg Message to send. + * @param plc Type of processing. + * @throws IgniteCheckedException Thrown in case of any errors. + */ + public void sendWithAck(ClusterNode node, Object topic, Message msg, byte plc) + throws IgniteCheckedException { + send(node, topic, -1, msg, plc, false, 0, false, null); } /** @@ -1156,7 +1177,20 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa */ public void send(ClusterNode node, GridTopic topic, Message msg, byte plc) throws IgniteCheckedException { - send(node, topic, topic.ordinal(), msg, plc, false, 0, false); + send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null); + } + + /** + * @param node Destination node. + * @param topic Topic to send the message to. + * @param msg Message to send. + * @param plc Type of processing. + * @param ackClosure Ack closure. + * @throws IgniteCheckedException Thrown in case of any errors. + */ + public void sendWithAck(ClusterNode node, GridTopic topic, Message msg, byte plc, + IgniteInClosure<Exception> ackClosure) throws IgniteCheckedException { + send(node, topic, topic.ordinal(), msg, plc, false, 0, false, ackClosure); } /** @@ -1178,7 +1212,31 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa ) throws IgniteCheckedException { assert timeout > 0 || skipOnTimeout; - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout); + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null); + } + + /** + * @param node Destination node. + * @param topic Topic to send the message to. + * @param msg Message to send. + * @param plc Type of processing. + * @param timeout Timeout to keep a message on receiving queue. + * @param skipOnTimeout Whether message can be skipped on timeout. + * @param ackClosure Ack closure. + * @throws IgniteCheckedException Thrown in case of any errors. + */ + public void sendOrderedMessageWithAck( + ClusterNode node, + Object topic, + Message msg, + byte plc, + long timeout, + boolean skipOnTimeout, + IgniteInClosure<Exception> ackClosure + ) throws IgniteCheckedException { + assert timeout > 0 || skipOnTimeout; + + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackClosure); } /** @@ -1188,6 +1246,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param plc Type of processing. * @param timeout Timeout to keep a message on receiving queue. * @param skipOnTimeout Whether message can be skipped on timeout. + * @param ackClosure Ack closure. * @throws IgniteCheckedException Thrown in case of any errors. */ public void sendOrderedMessage( @@ -1196,7 +1255,37 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa Message msg, byte plc, long timeout, - boolean skipOnTimeout + boolean skipOnTimeout, + IgniteInClosure<Exception> ackClosure + ) throws IgniteCheckedException { + assert timeout > 0 || skipOnTimeout; + + ClusterNode node = ctx.discovery().node(nodeId); + + if (node == null) + throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); + + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackClosure); + } + + /** + * @param nodeId Destination node. + * @param topic Topic to send the message to. + * @param msg Message to send. + * @param plc Type of processing. + * @param timeout Timeout to keep a message on receiving queue. + * @param skipOnTimeout Whether message can be skipped on timeout. + * @param ackClosure Ack closure. + * @throws IgniteCheckedException Thrown in case of any errors. + */ + public void sendOrderedMessageWithAck( + UUID nodeId, + Object topic, + Message msg, + byte plc, + long timeout, + boolean skipOnTimeout, + IgniteInClosure<Exception> ackClosure ) throws IgniteCheckedException { assert timeout > 0 || skipOnTimeout; @@ -1205,7 +1294,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (node == null) throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId); - send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout); + send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackClosure); } /** @@ -1416,7 +1505,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa // messages to one node vs. many. if (!nodes.isEmpty()) { for (ClusterNode node : nodes) - send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout); + send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, null); } else if (log.isDebugEnabled()) log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e7bd078b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java index 0403272..336aab9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java @@ -18,8 +18,8 @@ package org.apache.ignite.internal.util.nio; import org.apache.ignite.*; -import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.jetbrains.annotations.*; @@ -95,19 +95,11 @@ public interface GridCommunicationClient { /** * @param nodeId Node ID (provided only if versions of local and remote nodes are different). * @param msg Message to send. + * @param closure Ack closure. * @throws IgniteCheckedException If failed. * @return {@code True} if should try to resend message. */ - public boolean sendMessage(@Nullable UUID nodeId, Message msg) throws IgniteCheckedException; - - /** - * @param nodeId Node ID (provided only if versions of local and remote nodes are different). - * @param msg Message to send. - * @param fut Future which done when will be received ack on the message. - * @throws IgniteCheckedException If failed. - * @return {@code True} if should try to resend message. - */ - public boolean sendMessageWithAck(@Nullable UUID nodeId, Message msg, GridFutureAdapter<Boolean> fut) + public boolean sendMessage(@Nullable UUID nodeId, Message msg, IgniteInClosure<Exception> closure) throws IgniteCheckedException; /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e7bd078b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java index 9029dd2..21cf17c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.util.nio; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; /** * Future that represents already completed result. @@ -57,6 +58,16 @@ public class GridNioFinishedFuture<R> extends GridFinishedFuture<R> implements G } /** {@inheritDoc} */ + @Override public void ackClosure(IgniteInClosure<Exception> closure) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public IgniteInClosure<Exception> ackClosure() { + return null; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridNioFinishedFuture.class, this, super.toString()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e7bd078b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java index 7101f45..2b77089 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.util.nio; import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; /** * NIO future. @@ -39,4 +40,16 @@ public interface GridNioFuture<R> extends IgniteInternalFuture<R> { * @return {@code True} if skip recovery for this operation. */ public boolean skipRecovery(); + + /** + * Sets ack closure which will be applied when ack recevied. + * + * @param closure Ack closure. + */ + public void ackClosure(IgniteInClosure<Exception> closure); + + /** + * @return Ack closure. + */ + public IgniteInClosure<Exception> ackClosure(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e7bd078b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java index c5393c4..847b7d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.util.nio; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; /** * Default future implementation. @@ -30,6 +31,9 @@ public class GridNioFutureImpl<R> extends GridFutureAdapter<R> implements GridNi /** */ protected boolean msgThread; + /** */ + protected IgniteInClosure<Exception> ackClosure; + /** {@inheritDoc} */ @Override public void messageThread(boolean msgThread) { this.msgThread = msgThread; @@ -46,6 +50,16 @@ public class GridNioFutureImpl<R> extends GridFutureAdapter<R> implements GridNi } /** {@inheritDoc} */ + @Override public void ackClosure(IgniteInClosure<Exception> closure) { + ackClosure = closure; + } + + /** {@inheritDoc} */ + @Override public IgniteInClosure<Exception> ackClosure() { + return ackClosure; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridNioFutureImpl.class, this); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e7bd078b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java index 66ae60f..e528361 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java @@ -37,9 +37,6 @@ public class GridNioRecoveryDescriptor { /** Unacknowledged message futures. */ private final ArrayDeque<GridNioFuture<?>> msgFuts; - /** Unacknowledged message futures. */ - private final Map<GridNioFuture<?>, GridFutureAdapter<Boolean>> ackFuts; - /** Number of messages to resend. */ private int resendCnt; @@ -83,7 +80,6 @@ public class GridNioRecoveryDescriptor { assert queueLimit > 0; msgFuts = new ArrayDeque<>(queueLimit); - ackFuts = new HashMap<>(queueLimit); this.queueLimit = queueLimit; this.node = node; @@ -171,16 +167,6 @@ public class GridNioRecoveryDescriptor { } /** - * @param nioFut fut NIO future. - * @param fut ack future. - */ - public void add(GridNioFuture<?> nioFut, GridFutureAdapter<Boolean> fut) { - assert fut != null; - - ackFuts.put(nioFut, fut); - } - - /** * @param rcvCnt Number of messages received by remote node. */ public void ackReceived(long rcvCnt) { @@ -188,8 +174,6 @@ public class GridNioRecoveryDescriptor { log.debug("Handle acknowledgment [acked=" + acked + ", rcvCnt=" + rcvCnt + ", msgFuts=" + msgFuts.size() + ']'); - GridFutureAdapter<Boolean> ackFut; - while (acked < rcvCnt) { GridNioFuture<?> fut = msgFuts.pollFirst(); @@ -199,13 +183,10 @@ public class GridNioRecoveryDescriptor { assert fut.isDone() : fut; - acked++; - - if (!ackFuts.isEmpty() && (ackFut = ackFuts.get(fut)) != null) { - ackFut.onDone(true); + if (fut.ackClosure() != null) + fut.ackClosure().apply(null); - ackFuts.remove(fut); - } + acked++; } } @@ -214,7 +195,6 @@ public class GridNioRecoveryDescriptor { */ public void onNodeLeft() { GridNioFuture<?>[] futs = null; - GridFutureAdapter<?>[] akFuts = null; synchronized (this) { nodeLeft = true; @@ -224,16 +204,10 @@ public class GridNioRecoveryDescriptor { msgFuts.clear(); } - - if (!reserved && !ackFuts.isEmpty()) { - akFuts = ackFuts.values().toArray(new GridFutureAdapter<?>[ackFuts.size()]); - - ackFuts.clear(); - } } if (futs != null) - completeOnNodeLeft(futs, akFuts); + completeOnNodeLeft(futs); } /** @@ -244,13 +218,6 @@ public class GridNioRecoveryDescriptor { } /** - * @return Futures for unacknowledged messages. - */ - public Collection<GridFutureAdapter<Boolean>> ackMessageFutures() { - return ackFuts.values(); - } - - /** * @param node Node. * @return {@code True} if node is not null and has the same order as initial remtoe node. */ @@ -315,7 +282,6 @@ public class GridNioRecoveryDescriptor { */ public void release() { GridNioFuture<?>[] futs = null; - GridFutureAdapter<?>[] akFuts = null; synchronized (this) { connected = false; @@ -340,16 +306,10 @@ public class GridNioRecoveryDescriptor { msgFuts.clear(); } - - if (nodeLeft && !ackFuts.isEmpty()) { - akFuts = ackFuts.values().toArray(new GridFutureAdapter<?>[ackFuts.size()]); - - ackFuts.clear(); - } } if (futs != null) - completeOnNodeLeft(futs, akFuts); + completeOnNodeLeft(futs); } /** @@ -400,14 +360,16 @@ public class GridNioRecoveryDescriptor { /** * @param futs Futures to complete. - * @param ackFuts Ack futures to complete. */ - private void completeOnNodeLeft(GridNioFuture<?>[] futs, GridFutureAdapter<?>[] ackFuts) { - for (GridNioFuture<?> msg : futs) - ((GridNioFutureImpl)msg).onDone(new IOException("Failed to send message, node has left: " + node.id())); + private void completeOnNodeLeft(GridNioFuture<?>[] futs) { + for (GridNioFuture<?> msg : futs) { + IOException e = new IOException("Failed to send message, node has left: " + node.id()); + + ((GridNioFutureImpl)msg).onDone(e); - for (GridFutureAdapter<?> fut : ackFuts) - fut.onDone(new IOException("Failed to send message, node has left: " + node.id())); + if (msg.ackClosure() != null) + msg.ackClosure().apply(e); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e7bd078b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index b57bf22..f4a27fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.nio.ssl.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; @@ -391,6 +392,11 @@ public class GridNioServer<T> { int msgCnt = sys ? ses.offerSystemFuture(fut) : ses.offerFuture(fut); + IgniteInClosure<Exception> ackClosure; + + if (!sys && (ackClosure = ses.removeMeta(ACK_CLOSURE.ordinal())) != null) + fut.ackClosure(ackClosure); + if (ses.closed()) { if (ses.removeFuture(fut)) fut.connectionClosed(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e7bd078b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java index 004c327..23c1e22 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java @@ -45,7 +45,10 @@ public enum GridNioSessionMetaKey { MSG_WRITER, /** SSL engine. */ - SSL_ENGINE; + SSL_ENGINE, + + /** Ack closure. */ + ACK_CLOSURE; /** Maximum count of NIO session keys in system. */ public static final int MAX_KEYS_CNT = 64; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e7bd078b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java index 134d271..9cf87c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java @@ -18,11 +18,10 @@ package org.apache.ignite.internal.util.nio; import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.ipc.shmem.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.jetbrains.annotations.*; @@ -115,7 +114,8 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien } /** {@inheritDoc} */ - @Override public synchronized boolean sendMessage(@Nullable UUID nodeId, Message msg) + @Override public synchronized boolean sendMessage(@Nullable UUID nodeId, Message msg, + IgniteInClosure<Exception> closure) throws IgniteCheckedException { if (closed()) throw new IgniteCheckedException("Communication client was closed: " + this); @@ -137,12 +137,6 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien } /** {@inheritDoc} */ - @Override public boolean sendMessageWithAck(@Nullable UUID nodeId, Message msg, - GridFutureAdapter<Boolean> fut) throws IgniteCheckedException { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ @Override public void sendMessage(ByteBuffer data) throws IgniteCheckedException { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e7bd078b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java index 834371f..4122e48 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java @@ -18,9 +18,9 @@ package org.apache.ignite.internal.util.nio; import org.apache.ignite.*; -import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.jetbrains.annotations.*; @@ -28,6 +28,8 @@ import java.io.*; import java.nio.*; import java.util.*; +import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.*; + /** * Grid client for NIO server. */ @@ -98,11 +100,14 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie } /** {@inheritDoc} */ - @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg) + @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg, IgniteInClosure<Exception> closure) throws IgniteCheckedException { // Node ID is never provided in asynchronous send mode. assert nodeId == null; + if (closure != null) + ses.addMeta(ACK_CLOSURE.ordinal(), closure); + GridNioFuture<?> fut = ses.send(msg); if (fut.isDone()) { @@ -110,34 +115,9 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie fut.get(); } catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send message [client=" + this + ", err=" + e + ']'); - - if (e.getCause() instanceof IOException) - return true; - else - throw new IgniteCheckedException("Failed to send message [client=" + this + ']', e); - } - } + if (closure != null) + ses.removeMeta(ACK_CLOSURE.ordinal()); - return false; - } - - /** {@inheritDoc} */ - @Override public boolean sendMessageWithAck(@Nullable UUID nodeId, Message msg, - GridFutureAdapter<Boolean> fut) throws IgniteCheckedException { - // Node ID is never provided in asynchronous send mode. - assert nodeId == null; - - GridNioRecoveryDescriptor recovery = ses.recoveryDescriptor(); - - GridNioFuture<?> nioFut = ses.send(msg); - - if (nioFut.isDone()) { - try { - nioFut.get(); - } - catch (IgniteCheckedException e) { if (log.isDebugEnabled()) log.debug("Failed to send message [client=" + this + ", err=" + e + ']'); @@ -148,8 +128,8 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie } } - if (recovery != null) - recovery.add(nioFut, fut); + if (closure != null) + ses.removeMeta(ACK_CLOSURE.ordinal()); return false; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e7bd078b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 18184f3..b055eff 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -1555,6 +1555,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** * Creates new shared memory communication server. + * * @return Server. * @throws IgniteCheckedException If failed. */ @@ -1696,56 +1697,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** {@inheritDoc} */ @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { - assert node != null; - assert msg != null; - - if (log.isTraceEnabled()) - log.trace("Sending message to node [node=" + node + ", msg=" + msg + ']'); - - UUID locNodeId = getLocalNodeId(); - - if (node.id().equals(locNodeId)) - notifyListener(locNodeId, msg, NOOP); - else { - GridCommunicationClient client = null; - - try { - boolean retry; - - do { - client = reserveClient(node); - - UUID nodeId = null; - - if (!client.async() && !getSpiContext().localNode().version().equals(node.version())) - nodeId = node.id(); - - retry = client.sendMessage(nodeId, msg); - - client.release(); - - client = null; - - if (!retry) - sentMsgsCnt.increment(); - else { - ClusterNode node0 = getSpiContext().node(node.id()); - - if (node0 == null) - throw new IgniteCheckedException("Failed to send message to remote node " + - "(node has left the grid): " + node.id()); - } - } - while (retry); - } - catch (IgniteCheckedException e) { - throw new IgniteSpiException("Failed to send message to remote node: " + node, e); - } - finally { - if (client != null && clients.remove(node.id(), client)) - client.forceClose(); - } - } + sendMessage(node, msg, null); } /** @@ -1753,28 +1705,41 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * exchange such as durability, guaranteed delivery or error notification is * dependant on SPI implementation. * - * @param destNode Destination node. + * @param node Destination node. * @param msg Message to send. - * @return Future to be completed when ack will be received. + * @param ackClosure Ack closure. * @throws org.apache.ignite.spi.IgniteSpiException Thrown in case of any error during sending the message. * Note that this is not guaranteed that failed communication will result * in thrown exception as this is dependant on SPI implementation. */ - public IgniteInternalFuture<Boolean> sendMessageWithAck(ClusterNode node, Message msg) throws IgniteSpiException { + public void sendMessageWithAck(ClusterNode node, Message msg, IgniteInClosure<Exception> ackClosure) + throws IgniteSpiException { + sendMessage(node, msg, ackClosure); + } + + /** + * @param node Destination node. + * @param msg Message to send. + * @param ackClosure Ack closure. + * @throws org.apache.ignite.spi.IgniteSpiException Thrown in case of any error during sending the message. + * Note that this is not guaranteed that failed communication will result + * in thrown exception as this is dependant on SPI implementation. + */ + private void sendMessage(ClusterNode node, Message msg, IgniteInClosure<Exception> ackClosure) + throws IgniteSpiException { assert node != null; assert msg != null; if (log.isTraceEnabled()) log.trace("Sending message with ack to node [node=" + node + ", msg=" + msg + ']'); - IgniteInternalFuture<Boolean> fut = null; - UUID locNodeId = getLocalNodeId(); if (node.id().equals(locNodeId)) { notifyListener(locNodeId, msg, NOOP); - fut = new GridFinishedFuture<>(true); + if (ackClosure != null) + ackClosure.apply(null); } else { GridCommunicationClient client = null; @@ -1790,9 +1755,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (!client.async() && !getSpiContext().localNode().version().equals(node.version())) nodeId = node.id(); - fut = new GridFutureAdapter<>(); - - retry = client.sendMessageWithAck(nodeId, msg, (GridFutureAdapter)fut); + retry = client.sendMessage(nodeId, msg, ackClosure); client.release(); @@ -1818,8 +1781,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter client.forceClose(); } } - - return fut; } /** @@ -1857,7 +1818,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridCommunicationClient old = clients.put(nodeId, client0); assert old == null : "Client already created " + - "[node=" + node + ", client=" + client0 + ", oldClient=" + old + ']'; + "[node=" + node + ", client=" + client0 + ", oldClient=" + old + ']'; if (client0 instanceof GridTcpNioCommunicationClient) { GridTcpNioCommunicationClient tcpClient = ((GridTcpNioCommunicationClient)client0); @@ -1953,7 +1914,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @return Client. * @throws IgniteCheckedException If failed. */ - @Nullable protected GridCommunicationClient createShmemClient(ClusterNode node, Integer port) throws IgniteCheckedException { + @Nullable protected GridCommunicationClient createShmemClient(ClusterNode node, + Integer port) throws IgniteCheckedException { int attempt = 1; int connectAttempts = 1; @@ -2360,7 +2322,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter else if (log.isDebugEnabled()) log.debug("Received remote node ID: " + rmtNodeId0); - if (isSslEnabled() ) { + if (isSslEnabled()) { assert sslHnd != null; ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER))); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e7bd078b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java index 56feda1..3f788ba 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java @@ -141,12 +141,20 @@ public class GridTcpCommunicationSpiRecoveryAckFutureSelfTest<T extends Communic for (int i = 0; i < 5; i++) { info("Iteration: " + i); - List<IgniteInternalFuture<Boolean>> futs = new ArrayList<>(); + final AtomicInteger ackMsgs = new AtomicInteger(0); + + IgniteInClosure<Exception> ackClosure = new CI1<Exception>() { + @Override public void apply(Exception o) { + assert o == null; + + ackMsgs.incrementAndGet(); + } + }; for (int j = 0; j < msgPerIter; j++) { - futs.add(spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0))); + spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); - futs.add(spi1.sendMessageWithAck(node0, new GridTestMessage(node1.id(), ++msgId, 0))); + spi1.sendMessageWithAck(node0, new GridTestMessage(node1.id(), ++msgId, 0), ackClosure); } expMsgs += msgPerIter; @@ -175,9 +183,6 @@ public class GridTcpCommunicationSpiRecoveryAckFutureSelfTest<T extends Communic assertEquals("Unexpected messages: " + recoveryDesc.messagesFutures(), 0, recoveryDesc.messagesFutures().size()); - assertEquals("Unexpected ack messages: " + recoveryDesc.ackMessageFutures(), 0, - recoveryDesc.ackMessageFutures().size()); - break; } } @@ -200,8 +205,7 @@ public class GridTcpCommunicationSpiRecoveryAckFutureSelfTest<T extends Communic assertEquals(expMsgs, lsnr.rcvCnt.get()); } - for (IgniteInternalFuture<Boolean> f : futs) - assert f.get(); + assertEquals(msgPerIter * 2, ackMsgs.get()); } } finally { @@ -254,10 +258,20 @@ public class GridTcpCommunicationSpiRecoveryAckFutureSelfTest<T extends Communic final GridNioServer srv1 = U.field(spi1, "nioSrvr"); + final AtomicInteger ackMsgs = new AtomicInteger(0); + + IgniteInClosure<Exception> ackClosure = new CI1<Exception>() { + @Override public void apply(Exception o) { + assert o == null; + + ackMsgs.incrementAndGet(); + } + }; + int msgId = 0; // Send message to establish connection. - spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0)); + spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); // Prevent node1 from send GridTestUtils.setFieldValue(srv1, "skipWrite", true); @@ -265,7 +279,7 @@ public class GridTcpCommunicationSpiRecoveryAckFutureSelfTest<T extends Communic final GridNioSession ses0 = communicationSession(spi0); for (int i = 0; i < 150; i++) - spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0)); + spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); // Wait when session is closed because of queue overflow. GridTestUtils.waitForCondition(new GridAbsPredicate() { @@ -279,20 +293,25 @@ public class GridTcpCommunicationSpiRecoveryAckFutureSelfTest<T extends Communic GridTestUtils.setFieldValue(srv1, "skipWrite", false); for (int i = 0; i < 100; i++) - spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0)); + spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); final int expMsgs = 251; final TestListener lsnr = (TestListener)spi1.getListener(); GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override - public boolean apply() { + @Override public boolean apply() { return lsnr.rcvCnt.get() >= expMsgs; } }, 5000); assertEquals(expMsgs, lsnr.rcvCnt.get()); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return expMsgs == ackMsgs.get(); + } + }, 5000); } /**