Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-1169 5c3d82973 -> e7bd078b4


IGNITE-1169 Changed futures to Closure.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e7bd078b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e7bd078b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e7bd078b

Branch: refs/heads/ignite-1169
Commit: e7bd078b4133022786271d748664419d1dee0a8f
Parents: 5c3d829
Author: nikolay_tikhonov <ntikho...@gridgain.com>
Authored: Thu Jul 30 13:03:46 2015 +0300
Committer: nikolay_tikhonov <ntikho...@gridgain.com>
Committed: Thu Jul 30 13:03:46 2015 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   | 109 +++++++++++++++++--
 .../util/nio/GridCommunicationClient.java       |  14 +--
 .../util/nio/GridNioFinishedFuture.java         |  11 ++
 .../ignite/internal/util/nio/GridNioFuture.java |  13 +++
 .../internal/util/nio/GridNioFutureImpl.java    |  14 +++
 .../util/nio/GridNioRecoveryDescriptor.java     |  64 +++--------
 .../ignite/internal/util/nio/GridNioServer.java |   6 +
 .../util/nio/GridNioSessionMetaKey.java         |   5 +-
 .../util/nio/GridShmemCommunicationClient.java  |  12 +-
 .../util/nio/GridTcpNioCommunicationClient.java |  42 ++-----
 .../communication/tcp/TcpCommunicationSpi.java  |  90 +++++----------
 ...mmunicationSpiRecoveryAckFutureSelfTest.java |  45 +++++---
 12 files changed, 235 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e7bd078b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 65b6fad..272950e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -36,6 +36,7 @@ import org.apache.ignite.marshaller.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.apache.ignite.spi.*;
 import org.apache.ignite.spi.communication.*;
+import org.apache.ignite.spi.communication.tcp.*;
 import org.apache.ignite.thread.*;
 import org.jetbrains.annotations.*;
 import org.jsr166.*;
@@ -974,6 +975,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
      * @param ordered Ordered flag.
      * @param timeout Timeout.
      * @param skipOnTimeout Whether message can be skipped on timeout.
+     * @param ackClosure Ack closure.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     private void send(
@@ -984,7 +986,8 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         byte plc,
         boolean ordered,
         long timeout,
-        boolean skipOnTimeout
+        boolean skipOnTimeout,
+        IgniteInClosure<Exception> ackClosure
     ) throws IgniteCheckedException {
         assert node != null;
         assert topic != null;
@@ -1004,13 +1007,19 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
                 processOrderedMessage(locNodeId, ioMsg, plc, null);
             else
                 processRegularMessage0(ioMsg, locNodeId);
+
+            if (ackClosure != null)
+                ackClosure.apply(null);
         }
         else {
             if (topicOrd < 0)
                 ioMsg.topicBytes(marsh.marshal(topic));
 
             try {
-                getSpi().sendMessage(node, ioMsg);
+                if ((CommunicationSpi)getSpi() instanceof TcpCommunicationSpi)
+                    
((TcpCommunicationSpi)(CommunicationSpi)getSpi()).sendMessageWithAck(node, 
ioMsg, ackClosure);
+                else
+                    getSpi().sendMessage(node, ioMsg);
             }
             catch (IgniteSpiException e) {
                 throw new IgniteCheckedException("Failed to send message (node 
may have left the grid or " +
@@ -1030,7 +1039,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
      * <p>
      * How to use it:
      * <ol>
-     *     <li>Replace {@link #send(ClusterNode, Object, int, Message, byte, 
boolean, long, boolean)}
+     *     <li>Replace {@link #send(ClusterNode, Object, int, Message, byte, 
boolean, long, boolean, IgniteInClosure)}
      *          with this method.</li>
      *     <li>Start all grids for your test, then set {@link 
#TURBO_DEBUG_MODE} to {@code true}.</li>
      *     <li>Perform test operations on the topology. No network will be 
there.</li>
@@ -1132,7 +1141,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         if (node == null)
             throw new IgniteCheckedException("Failed to send message to node 
(has node left grid?): " + nodeId);
 
-        send(node, topic, topic.ordinal(), msg, plc, false, 0, false);
+        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null);
     }
 
     /**
@@ -1144,7 +1153,19 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void send(ClusterNode node, Object topic, Message msg, byte plc)
         throws IgniteCheckedException {
-        send(node, topic, -1, msg, plc, false, 0, false);
+        send(node, topic, -1, msg, plc, false, 0, false, null);
+    }
+
+    /**
+     * @param node Destination node.
+     * @param topic Topic to send the message to.
+     * @param msg Message to send.
+     * @param plc Type of processing.
+     * @throws IgniteCheckedException Thrown in case of any errors.
+     */
+    public void sendWithAck(ClusterNode node, Object topic, Message msg, byte 
plc)
+        throws IgniteCheckedException {
+        send(node, topic, -1, msg, plc, false, 0, false, null);
     }
 
     /**
@@ -1156,7 +1177,20 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void send(ClusterNode node, GridTopic topic, Message msg, byte plc)
         throws IgniteCheckedException {
-        send(node, topic, topic.ordinal(), msg, plc, false, 0, false);
+        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null);
+    }
+
+    /**
+     * @param node Destination node.
+     * @param topic Topic to send the message to.
+     * @param msg Message to send.
+     * @param plc Type of processing.
+     * @param ackClosure Ack closure.
+     * @throws IgniteCheckedException Thrown in case of any errors.
+     */
+    public void sendWithAck(ClusterNode node, GridTopic topic, Message msg, 
byte plc,
+        IgniteInClosure<Exception> ackClosure) throws IgniteCheckedException {
+        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, 
ackClosure);
     }
 
     /**
@@ -1178,7 +1212,31 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
     ) throws IgniteCheckedException {
         assert timeout > 0 || skipOnTimeout;
 
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout);
+        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, 
null);
+    }
+
+    /**
+     * @param node Destination node.
+     * @param topic Topic to send the message to.
+     * @param msg Message to send.
+     * @param plc Type of processing.
+     * @param timeout Timeout to keep a message on receiving queue.
+     * @param skipOnTimeout Whether message can be skipped on timeout.
+     * @param ackClosure Ack closure.
+     * @throws IgniteCheckedException Thrown in case of any errors.
+     */
+    public void sendOrderedMessageWithAck(
+        ClusterNode node,
+        Object topic,
+        Message msg,
+        byte plc,
+        long timeout,
+        boolean skipOnTimeout,
+        IgniteInClosure<Exception> ackClosure
+    ) throws IgniteCheckedException {
+        assert timeout > 0 || skipOnTimeout;
+
+        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, 
ackClosure);
     }
 
     /**
@@ -1188,6 +1246,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
      * @param plc Type of processing.
      * @param timeout Timeout to keep a message on receiving queue.
      * @param skipOnTimeout Whether message can be skipped on timeout.
+     * @param ackClosure Ack closure.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     public void sendOrderedMessage(
@@ -1196,7 +1255,37 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         Message msg,
         byte plc,
         long timeout,
-        boolean skipOnTimeout
+        boolean skipOnTimeout,
+        IgniteInClosure<Exception> ackClosure
+    ) throws IgniteCheckedException {
+        assert timeout > 0 || skipOnTimeout;
+
+        ClusterNode node = ctx.discovery().node(nodeId);
+
+        if (node == null)
+            throw new IgniteCheckedException("Failed to send message to node 
(has node left grid?): " + nodeId);
+
+        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, 
ackClosure);
+    }
+
+    /**
+     * @param nodeId Destination node.
+     * @param topic Topic to send the message to.
+     * @param msg Message to send.
+     * @param plc Type of processing.
+     * @param timeout Timeout to keep a message on receiving queue.
+     * @param skipOnTimeout Whether message can be skipped on timeout.
+     * @param ackClosure Ack closure.
+     * @throws IgniteCheckedException Thrown in case of any errors.
+     */
+    public void sendOrderedMessageWithAck(
+        UUID nodeId,
+        Object topic,
+        Message msg,
+        byte plc,
+        long timeout,
+        boolean skipOnTimeout,
+        IgniteInClosure<Exception> ackClosure
     ) throws IgniteCheckedException {
         assert timeout > 0 || skipOnTimeout;
 
@@ -1205,7 +1294,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         if (node == null)
             throw new IgniteCheckedException("Failed to send message to node 
(has node left grid?): " + nodeId);
 
-        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout);
+        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, 
ackClosure);
     }
 
     /**
@@ -1416,7 +1505,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
             // messages to one node vs. many.
             if (!nodes.isEmpty()) {
                 for (ClusterNode node : nodes)
-                    send(node, topic, topicOrd, msg, plc, ordered, timeout, 
skipOnTimeout);
+                    send(node, topic, topicOrd, msg, plc, ordered, timeout, 
skipOnTimeout, null);
             }
             else if (log.isDebugEnabled())
                 log.debug("Failed to send message to empty nodes collection 
[topic=" + topic + ", msg=" +

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e7bd078b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
index 0403272..336aab9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
@@ -18,8 +18,8 @@
 package org.apache.ignite.internal.util.nio;
 
 import org.apache.ignite.*;
-import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.jetbrains.annotations.*;
 
@@ -95,19 +95,11 @@ public interface GridCommunicationClient {
     /**
      * @param nodeId Node ID (provided only if versions of local and remote 
nodes are different).
      * @param msg Message to send.
+     * @param closure Ack closure.
      * @throws IgniteCheckedException If failed.
      * @return {@code True} if should try to resend message.
      */
-    public boolean sendMessage(@Nullable UUID nodeId, Message msg) throws 
IgniteCheckedException;
-
-    /**
-     * @param nodeId Node ID (provided only if versions of local and remote 
nodes are different).
-     * @param msg Message to send.
-     * @param fut Future which done when will be received ack on the message.
-     * @throws IgniteCheckedException If failed.
-     * @return {@code True} if should try to resend message.
-     */
-    public boolean sendMessageWithAck(@Nullable UUID nodeId, Message msg, 
GridFutureAdapter<Boolean> fut)
+    public boolean sendMessage(@Nullable UUID nodeId, Message msg, 
IgniteInClosure<Exception> closure)
         throws IgniteCheckedException;
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e7bd078b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java
index 9029dd2..21cf17c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFinishedFuture.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.util.nio;
 
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
 
 /**
  * Future that represents already completed result.
@@ -57,6 +58,16 @@ public class GridNioFinishedFuture<R> extends 
GridFinishedFuture<R> implements G
     }
 
     /** {@inheritDoc} */
+    @Override public void ackClosure(IgniteInClosure<Exception> closure) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInClosure<Exception> ackClosure() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridNioFinishedFuture.class, this, super.toString());
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e7bd078b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
index 7101f45..2b77089 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFuture.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.util.nio;
 
 import org.apache.ignite.internal.*;
+import org.apache.ignite.lang.*;
 
 /**
  * NIO future.
@@ -39,4 +40,16 @@ public interface GridNioFuture<R> extends 
IgniteInternalFuture<R> {
      * @return {@code True} if skip recovery for this operation.
      */
     public boolean skipRecovery();
+
+    /**
+     * Sets ack closure which will be applied when ack recevied.
+     *
+     * @param closure Ack closure.
+     */
+    public void ackClosure(IgniteInClosure<Exception> closure);
+
+    /**
+     * @return Ack closure.
+     */
+    public IgniteInClosure<Exception> ackClosure();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e7bd078b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java
index c5393c4..847b7d6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioFutureImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.util.nio;
 
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
 
 /**
  * Default future implementation.
@@ -30,6 +31,9 @@ public class GridNioFutureImpl<R> extends 
GridFutureAdapter<R> implements GridNi
     /** */
     protected boolean msgThread;
 
+    /** */
+    protected IgniteInClosure<Exception> ackClosure;
+
     /** {@inheritDoc} */
     @Override public void messageThread(boolean msgThread) {
         this.msgThread = msgThread;
@@ -46,6 +50,16 @@ public class GridNioFutureImpl<R> extends 
GridFutureAdapter<R> implements GridNi
     }
 
     /** {@inheritDoc} */
+    @Override public void ackClosure(IgniteInClosure<Exception> closure) {
+        ackClosure = closure;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInClosure<Exception> ackClosure() {
+        return ackClosure;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridNioFutureImpl.class, this);
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e7bd078b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 66ae60f..e528361 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -37,9 +37,6 @@ public class GridNioRecoveryDescriptor {
     /** Unacknowledged message futures. */
     private final ArrayDeque<GridNioFuture<?>> msgFuts;
 
-    /** Unacknowledged message futures. */
-    private final Map<GridNioFuture<?>, GridFutureAdapter<Boolean>> ackFuts;
-
     /** Number of messages to resend. */
     private int resendCnt;
 
@@ -83,7 +80,6 @@ public class GridNioRecoveryDescriptor {
         assert queueLimit > 0;
 
         msgFuts = new ArrayDeque<>(queueLimit);
-        ackFuts = new HashMap<>(queueLimit);
 
         this.queueLimit = queueLimit;
         this.node = node;
@@ -171,16 +167,6 @@ public class GridNioRecoveryDescriptor {
     }
 
     /**
-     * @param nioFut fut NIO future.
-     * @param fut ack future.
-     */
-    public void add(GridNioFuture<?> nioFut, GridFutureAdapter<Boolean> fut) {
-        assert fut != null;
-
-        ackFuts.put(nioFut, fut);
-    }
-
-    /**
      * @param rcvCnt Number of messages received by remote node.
      */
     public void ackReceived(long rcvCnt) {
@@ -188,8 +174,6 @@ public class GridNioRecoveryDescriptor {
             log.debug("Handle acknowledgment [acked=" + acked + ", rcvCnt=" + 
rcvCnt +
                 ", msgFuts=" + msgFuts.size() + ']');
 
-        GridFutureAdapter<Boolean> ackFut;
-
         while (acked < rcvCnt) {
             GridNioFuture<?> fut = msgFuts.pollFirst();
 
@@ -199,13 +183,10 @@ public class GridNioRecoveryDescriptor {
 
             assert fut.isDone() : fut;
 
-            acked++;
-
-            if (!ackFuts.isEmpty() && (ackFut = ackFuts.get(fut)) != null) {
-                ackFut.onDone(true);
+            if (fut.ackClosure() != null)
+                fut.ackClosure().apply(null);
 
-                ackFuts.remove(fut);
-            }
+            acked++;
         }
     }
 
@@ -214,7 +195,6 @@ public class GridNioRecoveryDescriptor {
      */
     public void onNodeLeft() {
         GridNioFuture<?>[] futs = null;
-        GridFutureAdapter<?>[] akFuts = null;
 
         synchronized (this) {
             nodeLeft = true;
@@ -224,16 +204,10 @@ public class GridNioRecoveryDescriptor {
 
                 msgFuts.clear();
             }
-
-            if (!reserved && !ackFuts.isEmpty()) {
-                akFuts = ackFuts.values().toArray(new 
GridFutureAdapter<?>[ackFuts.size()]);
-
-                ackFuts.clear();
-            }
         }
 
         if (futs != null)
-            completeOnNodeLeft(futs, akFuts);
+            completeOnNodeLeft(futs);
     }
 
     /**
@@ -244,13 +218,6 @@ public class GridNioRecoveryDescriptor {
     }
 
     /**
-     * @return Futures for unacknowledged messages.
-     */
-    public Collection<GridFutureAdapter<Boolean>> ackMessageFutures() {
-        return ackFuts.values();
-    }
-
-    /**
      * @param node Node.
      * @return {@code True} if node is not null and has the same order as 
initial remtoe node.
      */
@@ -315,7 +282,6 @@ public class GridNioRecoveryDescriptor {
      */
     public void release() {
         GridNioFuture<?>[] futs = null;
-        GridFutureAdapter<?>[] akFuts = null;
 
         synchronized (this) {
             connected = false;
@@ -340,16 +306,10 @@ public class GridNioRecoveryDescriptor {
 
                 msgFuts.clear();
             }
-
-            if (nodeLeft && !ackFuts.isEmpty()) {
-                akFuts = ackFuts.values().toArray(new 
GridFutureAdapter<?>[ackFuts.size()]);
-
-                ackFuts.clear();
-            }
         }
 
         if (futs != null)
-            completeOnNodeLeft(futs, akFuts);
+            completeOnNodeLeft(futs);
     }
 
     /**
@@ -400,14 +360,16 @@ public class GridNioRecoveryDescriptor {
 
     /**
      * @param futs Futures to complete.
-     * @param ackFuts Ack futures to complete.
      */
-    private void completeOnNodeLeft(GridNioFuture<?>[] futs, 
GridFutureAdapter<?>[] ackFuts) {
-        for (GridNioFuture<?> msg : futs)
-            ((GridNioFutureImpl)msg).onDone(new IOException("Failed to send 
message, node has left: " + node.id()));
+    private void completeOnNodeLeft(GridNioFuture<?>[] futs) {
+        for (GridNioFuture<?> msg : futs) {
+            IOException e = new IOException("Failed to send message, node has 
left: " + node.id());
+
+            ((GridNioFutureImpl)msg).onDone(e);
 
-        for (GridFutureAdapter<?> fut : ackFuts)
-            fut.onDone(new IOException("Failed to send message, node has left: 
" + node.id()));
+            if (msg.ackClosure() != null)
+                msg.ackClosure().apply(e);
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e7bd078b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index b57bf22..f4a27fa 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.nio.ssl.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
@@ -391,6 +392,11 @@ public class GridNioServer<T> {
 
         int msgCnt = sys ? ses.offerSystemFuture(fut) : ses.offerFuture(fut);
 
+        IgniteInClosure<Exception> ackClosure;
+
+        if (!sys && (ackClosure = ses.removeMeta(ACK_CLOSURE.ordinal())) != 
null)
+            fut.ackClosure(ackClosure);
+
         if (ses.closed()) {
             if (ses.removeFuture(fut))
                 fut.connectionClosed();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e7bd078b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
index 004c327..23c1e22 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionMetaKey.java
@@ -45,7 +45,10 @@ public enum GridNioSessionMetaKey {
     MSG_WRITER,
 
     /** SSL engine. */
-    SSL_ENGINE;
+    SSL_ENGINE,
+
+    /** Ack closure. */
+    ACK_CLOSURE;
 
     /** Maximum count of NIO session keys in system. */
     public static final int MAX_KEYS_CNT = 64;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e7bd078b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
index 134d271..9cf87c6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
@@ -18,11 +18,10 @@
 package org.apache.ignite.internal.util.nio;
 
 import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.ipc.shmem.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.jetbrains.annotations.*;
 
@@ -115,7 +114,8 @@ public class GridShmemCommunicationClient extends 
GridAbstractCommunicationClien
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized boolean sendMessage(@Nullable UUID nodeId, 
Message msg)
+    @Override public synchronized boolean sendMessage(@Nullable UUID nodeId, 
Message msg,
+        IgniteInClosure<Exception> closure)
         throws IgniteCheckedException {
         if (closed())
             throw new IgniteCheckedException("Communication client was closed: 
" + this);
@@ -137,12 +137,6 @@ public class GridShmemCommunicationClient extends 
GridAbstractCommunicationClien
     }
 
     /** {@inheritDoc} */
-    @Override public boolean sendMessageWithAck(@Nullable UUID nodeId, Message 
msg,
-        GridFutureAdapter<Boolean> fut) throws IgniteCheckedException {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
     @Override public void sendMessage(ByteBuffer data) throws 
IgniteCheckedException {
         throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e7bd078b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
index 834371f..4122e48 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
@@ -18,9 +18,9 @@
 package org.apache.ignite.internal.util.nio;
 
 import org.apache.ignite.*;
-import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.jetbrains.annotations.*;
 
@@ -28,6 +28,8 @@ import java.io.*;
 import java.nio.*;
 import java.util.*;
 
+import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.*;
+
 /**
  * Grid client for NIO server.
  */
@@ -98,11 +100,14 @@ public class GridTcpNioCommunicationClient extends 
GridAbstractCommunicationClie
     }
 
     /** {@inheritDoc} */
-    @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg)
+    @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg, 
IgniteInClosure<Exception> closure)
         throws IgniteCheckedException {
         // Node ID is never provided in asynchronous send mode.
         assert nodeId == null;
 
+        if (closure != null)
+            ses.addMeta(ACK_CLOSURE.ordinal(), closure);
+
         GridNioFuture<?> fut = ses.send(msg);
 
         if (fut.isDone()) {
@@ -110,34 +115,9 @@ public class GridTcpNioCommunicationClient extends 
GridAbstractCommunicationClie
                 fut.get();
             }
             catch (IgniteCheckedException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to send message [client=" + this + ", 
err=" + e + ']');
-
-                if (e.getCause() instanceof IOException)
-                    return true;
-                else
-                    throw new IgniteCheckedException("Failed to send message 
[client=" + this + ']', e);
-            }
-        }
+                if (closure != null)
+                    ses.removeMeta(ACK_CLOSURE.ordinal());
 
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean sendMessageWithAck(@Nullable UUID nodeId, Message 
msg,
-        GridFutureAdapter<Boolean> fut) throws IgniteCheckedException {
-        // Node ID is never provided in asynchronous send mode.
-        assert nodeId == null;
-
-        GridNioRecoveryDescriptor recovery = ses.recoveryDescriptor();
-
-        GridNioFuture<?> nioFut = ses.send(msg);
-
-        if (nioFut.isDone()) {
-            try {
-                nioFut.get();
-            }
-            catch (IgniteCheckedException e) {
                 if (log.isDebugEnabled())
                     log.debug("Failed to send message [client=" + this + ", 
err=" + e + ']');
 
@@ -148,8 +128,8 @@ public class GridTcpNioCommunicationClient extends 
GridAbstractCommunicationClie
             }
         }
 
-        if (recovery != null)
-            recovery.add(nioFut, fut);
+        if (closure != null)
+            ses.removeMeta(ACK_CLOSURE.ordinal());
 
         return false;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e7bd078b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 18184f3..b055eff 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1555,6 +1555,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
     /**
      * Creates new shared memory communication server.
+     *
      * @return Server.
      * @throws IgniteCheckedException If failed.
      */
@@ -1696,56 +1697,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
     /** {@inheritDoc} */
     @Override public void sendMessage(ClusterNode node, Message msg) throws 
IgniteSpiException {
-        assert node != null;
-        assert msg != null;
-
-        if (log.isTraceEnabled())
-            log.trace("Sending message to node [node=" + node + ", msg=" + msg 
+ ']');
-
-        UUID locNodeId = getLocalNodeId();
-
-        if (node.id().equals(locNodeId))
-            notifyListener(locNodeId, msg, NOOP);
-        else {
-            GridCommunicationClient client = null;
-
-            try {
-                boolean retry;
-
-                do {
-                    client = reserveClient(node);
-
-                    UUID nodeId = null;
-
-                    if (!client.async() && 
!getSpiContext().localNode().version().equals(node.version()))
-                        nodeId = node.id();
-
-                    retry = client.sendMessage(nodeId, msg);
-
-                    client.release();
-
-                    client = null;
-
-                    if (!retry)
-                        sentMsgsCnt.increment();
-                    else {
-                        ClusterNode node0 = getSpiContext().node(node.id());
-
-                        if (node0 == null)
-                            throw new IgniteCheckedException("Failed to send 
message to remote node " +
-                                "(node has left the grid): " + node.id());
-                    }
-                }
-                while (retry);
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteSpiException("Failed to send message to remote 
node: " + node, e);
-            }
-            finally {
-                if (client != null && clients.remove(node.id(), client))
-                    client.forceClose();
-            }
-        }
+        sendMessage(node, msg, null);
     }
 
     /**
@@ -1753,28 +1705,41 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
      * exchange such as durability, guaranteed delivery or error notification 
is
      * dependant on SPI implementation.
      *
-     * @param destNode Destination node.
+     * @param node Destination node.
      * @param msg Message to send.
-     * @return Future to be completed when ack will be received.
+     * @param ackClosure Ack closure.
      * @throws org.apache.ignite.spi.IgniteSpiException Thrown in case of any 
error during sending the message.
      *      Note that this is not guaranteed that failed communication will 
result
      *      in thrown exception as this is dependant on SPI implementation.
      */
-    public IgniteInternalFuture<Boolean> sendMessageWithAck(ClusterNode node, 
Message msg) throws IgniteSpiException {
+    public void sendMessageWithAck(ClusterNode node, Message msg, 
IgniteInClosure<Exception> ackClosure)
+        throws IgniteSpiException {
+        sendMessage(node, msg, ackClosure);
+    }
+
+    /**
+     * @param node Destination node.
+     * @param msg Message to send.
+     * @param ackClosure Ack closure.
+     * @throws org.apache.ignite.spi.IgniteSpiException Thrown in case of any 
error during sending the message.
+     *      Note that this is not guaranteed that failed communication will 
result
+     *      in thrown exception as this is dependant on SPI implementation.
+     */
+    private void sendMessage(ClusterNode node, Message msg, 
IgniteInClosure<Exception> ackClosure)
+        throws IgniteSpiException {
         assert node != null;
         assert msg != null;
 
         if (log.isTraceEnabled())
             log.trace("Sending message with ack to node [node=" + node + ", 
msg=" + msg + ']');
 
-        IgniteInternalFuture<Boolean> fut = null;
-
         UUID locNodeId = getLocalNodeId();
 
         if (node.id().equals(locNodeId)) {
             notifyListener(locNodeId, msg, NOOP);
 
-            fut = new GridFinishedFuture<>(true);
+            if (ackClosure != null)
+                ackClosure.apply(null);
         }
         else {
             GridCommunicationClient client = null;
@@ -1790,9 +1755,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     if (!client.async() && 
!getSpiContext().localNode().version().equals(node.version()))
                         nodeId = node.id();
 
-                    fut = new GridFutureAdapter<>();
-
-                    retry = client.sendMessageWithAck(nodeId, msg, 
(GridFutureAdapter)fut);
+                    retry = client.sendMessage(nodeId, msg, ackClosure);
 
                     client.release();
 
@@ -1818,8 +1781,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     client.forceClose();
             }
         }
-
-        return fut;
     }
 
     /**
@@ -1857,7 +1818,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                                 GridCommunicationClient old = 
clients.put(nodeId, client0);
 
                                 assert old == null : "Client already created " 
+
-                                        "[node=" + node + ", client=" + 
client0 + ", oldClient=" + old + ']';
+                                    "[node=" + node + ", client=" + client0 + 
", oldClient=" + old + ']';
 
                                 if (client0 instanceof 
GridTcpNioCommunicationClient) {
                                     GridTcpNioCommunicationClient tcpClient = 
((GridTcpNioCommunicationClient)client0);
@@ -1953,7 +1914,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * @return Client.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable protected GridCommunicationClient createShmemClient(ClusterNode 
node, Integer port) throws IgniteCheckedException {
+    @Nullable protected GridCommunicationClient createShmemClient(ClusterNode 
node,
+        Integer port) throws IgniteCheckedException {
         int attempt = 1;
 
         int connectAttempts = 1;
@@ -2360,7 +2322,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     else if (log.isDebugEnabled())
                         log.debug("Received remote node ID: " + rmtNodeId0);
 
-                    if (isSslEnabled() ) {
+                    if (isSslEnabled()) {
                         assert sslHnd != null;
 
                         
ch.write(sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER)));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e7bd078b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java
index 56feda1..3f788ba 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java
@@ -141,12 +141,20 @@ public class 
GridTcpCommunicationSpiRecoveryAckFutureSelfTest<T extends Communic
             for (int i = 0; i < 5; i++) {
                 info("Iteration: " + i);
 
-                List<IgniteInternalFuture<Boolean>> futs = new ArrayList<>();
+                final AtomicInteger ackMsgs = new AtomicInteger(0);
+
+                IgniteInClosure<Exception> ackClosure = new CI1<Exception>() {
+                    @Override public void apply(Exception o) {
+                        assert o == null;
+
+                        ackMsgs.incrementAndGet();
+                    }
+                };
 
                 for (int j = 0; j < msgPerIter; j++) {
-                    futs.add(spi0.sendMessageWithAck(node1, new 
GridTestMessage(node0.id(), ++msgId, 0)));
+                    spi0.sendMessageWithAck(node1, new 
GridTestMessage(node0.id(), ++msgId, 0), ackClosure);
 
-                    futs.add(spi1.sendMessageWithAck(node0, new 
GridTestMessage(node1.id(), ++msgId, 0)));
+                    spi1.sendMessageWithAck(node0, new 
GridTestMessage(node1.id(), ++msgId, 0), ackClosure);
                 }
 
                 expMsgs += msgPerIter;
@@ -175,9 +183,6 @@ public class 
GridTcpCommunicationSpiRecoveryAckFutureSelfTest<T extends Communic
                             assertEquals("Unexpected messages: " + 
recoveryDesc.messagesFutures(), 0,
                                 recoveryDesc.messagesFutures().size());
 
-                            assertEquals("Unexpected ack messages: " + 
recoveryDesc.ackMessageFutures(), 0,
-                                recoveryDesc.ackMessageFutures().size());
-
                             break;
                         }
                     }
@@ -200,8 +205,7 @@ public class 
GridTcpCommunicationSpiRecoveryAckFutureSelfTest<T extends Communic
                     assertEquals(expMsgs, lsnr.rcvCnt.get());
                 }
 
-                for (IgniteInternalFuture<Boolean> f : futs)
-                    assert f.get();
+                assertEquals(msgPerIter * 2, ackMsgs.get());
             }
         }
         finally {
@@ -254,10 +258,20 @@ public class 
GridTcpCommunicationSpiRecoveryAckFutureSelfTest<T extends Communic
 
         final GridNioServer srv1 = U.field(spi1, "nioSrvr");
 
+        final AtomicInteger ackMsgs = new AtomicInteger(0);
+
+        IgniteInClosure<Exception> ackClosure = new CI1<Exception>() {
+            @Override public void apply(Exception o) {
+                assert o == null;
+
+                ackMsgs.incrementAndGet();
+            }
+        };
+
         int msgId = 0;
 
         // Send message to establish connection.
-        spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), 
++msgId, 0));
+        spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), 
++msgId, 0), ackClosure);
 
         // Prevent node1 from send
         GridTestUtils.setFieldValue(srv1, "skipWrite", true);
@@ -265,7 +279,7 @@ public class 
GridTcpCommunicationSpiRecoveryAckFutureSelfTest<T extends Communic
         final GridNioSession ses0 = communicationSession(spi0);
 
         for (int i = 0; i < 150; i++)
-            spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), 
++msgId, 0));
+            spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), 
++msgId, 0), ackClosure);
 
         // Wait when session is closed because of queue overflow.
         GridTestUtils.waitForCondition(new GridAbsPredicate() {
@@ -279,20 +293,25 @@ public class 
GridTcpCommunicationSpiRecoveryAckFutureSelfTest<T extends Communic
         GridTestUtils.setFieldValue(srv1, "skipWrite", false);
 
         for (int i = 0; i < 100; i++)
-            spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), 
++msgId, 0));
+            spi0.sendMessageWithAck(node1, new GridTestMessage(node0.id(), 
++msgId, 0), ackClosure);
 
         final int expMsgs = 251;
 
         final TestListener lsnr = (TestListener)spi1.getListener();
 
         GridTestUtils.waitForCondition(new GridAbsPredicate() {
-            @Override
-            public boolean apply() {
+            @Override public boolean apply() {
                 return lsnr.rcvCnt.get() >= expMsgs;
             }
         }, 5000);
 
         assertEquals(expMsgs, lsnr.rcvCnt.get());
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return expMsgs == ackMsgs.get();
+            }
+        }, 5000);
     }
 
     /**


Reply via email to