Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-901 0e8d2ccc9 -> 59cc047d0


# ignite-901


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/59cc047d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/59cc047d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/59cc047d

Branch: refs/heads/ignite-901
Commit: 59cc047d07d6850166d2e7d105d0e2464ae9d680
Parents: 0e8d2cc
Author: sboikov <sboi...@gridgain.com>
Authored: Wed Jul 15 17:47:17 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Wed Jul 15 17:47:17 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ClientImpl.java    | 44 +++++++++++++-------
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  3 ++
 .../messages/TcpDiscoveryAbstractMessage.java   |  3 ++
 .../messages/TcpDiscoveryHandshakeResponse.java | 14 +++++++
 4 files changed, 48 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59cc047d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index e23c191..a294a3a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -385,7 +385,7 @@ class ClientImpl extends TcpDiscoveryImpl {
      * @see TcpDiscoverySpi#joinTimeout
      */
     @SuppressWarnings("BusyWait")
-    @Nullable private Socket joinTopology(boolean recon, long timeout) throws 
IgniteSpiException, InterruptedException {
+    @Nullable private T2<Socket, Boolean> joinTopology(boolean recon, long 
timeout) throws IgniteSpiException, InterruptedException {
         Collection<InetSocketAddress> addrs = null;
 
         long startTime = U.currentTimeMillis();
@@ -425,7 +425,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 InetSocketAddress addr = it.next();
 
-                T2<Socket, Integer> sockAndRes = sendJoinRequest(recon, addr);
+                T3<Socket, Integer, Boolean> sockAndRes = 
sendJoinRequest(recon, addr);
 
                 if (sockAndRes == null) {
                     it.remove();
@@ -439,7 +439,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 switch (sockAndRes.get2()) {
                     case RES_OK:
-                        return sock;
+                        return new T2<>(sock, sockAndRes.get3());
 
                     case RES_CONTINUE_JOIN:
                     case RES_WAIT:
@@ -470,9 +470,9 @@ class ClientImpl extends TcpDiscoveryImpl {
     /**
      * @param recon {@code True} if reconnects.
      * @param addr Address.
-     * @return Socket and connect response.
+     * @return Socket, connect response and client acknowledge flag.
      */
-    @Nullable private T2<Socket, Integer> sendJoinRequest(boolean recon, 
InetSocketAddress addr) {
+    @Nullable private T3<Socket, Integer, Boolean> sendJoinRequest(boolean 
recon, InetSocketAddress addr) {
         assert addr != null;
 
         if (log.isDebugEnabled())
@@ -541,7 +541,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                     log.debug("Message has been sent to address [msg=" + msg + 
", addr=" + addr +
                         ", rmtNodeId=" + rmtNodeId + ']');
 
-                return new T2<>(sock, spi.readReceipt(sock, ackTimeout0));
+                return new T3<>(sock, spi.readReceipt(sock, ackTimeout0), 
res.clientAck());
             }
             catch (IOException | IgniteCheckedException e) {
                 U.closeQuiet(sock);
@@ -863,6 +863,9 @@ class ClientImpl extends TcpDiscoveryImpl {
         private Socket sock;
 
         /** */
+        private boolean clientAck;
+
+        /** */
         private final Queue<TcpDiscoveryAbstractMessage> queue = new 
ArrayDeque<>();
 
         /** */
@@ -888,11 +891,14 @@ class ClientImpl extends TcpDiscoveryImpl {
 
         /**
          * @param sock Socket.
+         * @param clientAck {@code True} is server supports client message 
acknowlede.
          */
-        private void setSocket(Socket sock) {
+        private void setSocket(Socket sock, boolean clientAck) {
             synchronized (mux) {
                 this.sock = sock;
 
+                this.clientAck = clientAck;
+
                 unackedMsg = null;
 
                 mux.notifyAll();
@@ -952,7 +958,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                 for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : 
spi.sendMsgLsnrs)
                     msgLsnr.apply(msg);
 
-                boolean ack = !(msg instanceof TcpDiscoveryPingResponse);
+                boolean ack = clientAck && !(msg instanceof 
TcpDiscoveryPingResponse);
 
                 try {
                     if (ack) {
@@ -1019,6 +1025,9 @@ class ClientImpl extends TcpDiscoveryImpl {
         private volatile Socket sock;
 
         /** */
+        private boolean clientAck;
+
+        /** */
         private boolean join;
 
         /**
@@ -1054,9 +1063,9 @@ class ClientImpl extends TcpDiscoveryImpl {
 
             try {
                 while (true) {
-                    sock = joinTopology(true, timeout);
+                    T2<Socket, Boolean> joinRes = joinTopology(true, timeout);
 
-                    if (sock == null) {
+                    if (joinRes == null) {
                         if (join) {
                             joinError(new IgniteSpiException("Join process 
timed out, connection failed and " +
                                 "failed to reconnect (consider increasing 
'joinTimeout' configuration property) " +
@@ -1069,6 +1078,9 @@ class ClientImpl extends TcpDiscoveryImpl {
                         return;
                     }
 
+                    sock = joinRes.get1();
+                    clientAck = joinRes.get2();
+
                     if (isInterrupted())
                         throw new InterruptedException();
 
@@ -1373,9 +1385,9 @@ class ClientImpl extends TcpDiscoveryImpl {
 
             joinCnt++;
 
-            final Socket sock = joinTopology(false, spi.joinTimeout);
+            T2<Socket, Boolean> joinRes = joinTopology(false, spi.joinTimeout);
 
-            if (sock == null) {
+            if (joinRes == null) {
                 if (join)
                     joinError(new IgniteSpiException("Join process timed 
out."));
                 else {
@@ -1387,9 +1399,9 @@ class ClientImpl extends TcpDiscoveryImpl {
                 return;
             }
 
-            currSock = sock;
+            currSock = joinRes.get1();
 
-            sockWriter.setSocket(sock);
+            sockWriter.setSocket(joinRes.get1(), joinRes.get2());
 
             if (spi.joinTimeout > 0) {
                 final int joinCnt0 = joinCnt;
@@ -1402,7 +1414,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                 }, spi.joinTimeout);
             }
 
-            sockReader.setSocket(sock, locNode.clientRouterNodeId());
+            sockReader.setSocket(joinRes.get1(), locNode.clientRouterNodeId());
         }
 
         /**
@@ -1760,7 +1772,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                     currSock = reconnector.sock;
 
-                    sockWriter.setSocket(currSock);
+                    sockWriter.setSocket(currSock, reconnector.clientAck);
                     sockReader.setSocket(currSock, 
locNode.clientRouterNodeId());
 
                     reconnector = null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59cc047d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 39f06d1..1a28e86 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -4180,6 +4180,9 @@ class ServerImpl extends TcpDiscoveryImpl {
                     TcpDiscoveryHandshakeResponse res =
                         new TcpDiscoveryHandshakeResponse(locNodeId, 
locNode.internalOrder());
 
+                    if (req.client())
+                        res.clientAck(true);
+
                     spi.writeToSocket(sock, res);
 
                     // It can happen if a remote node is stopped and it has a 
loopback address in the list of addresses,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59cc047d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index 21dbf4f..6f52152 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -40,6 +40,9 @@ public abstract class TcpDiscoveryAbstractMessage implements 
Serializable {
     /** */
     protected static final int CLIENT_RECON_SUCCESS_FLAG_POS = 2;
 
+    /** */
+    protected static final int CLIENT_ACK_FLAG_POS = 4;
+
     /** Sender of the message (transient). */
     private transient UUID sndNodeId;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59cc047d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
index 5c2f798..ac4be50 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java
@@ -61,6 +61,20 @@ public class TcpDiscoveryHandshakeResponse extends 
TcpDiscoveryAbstractMessage {
         this.order = order;
     }
 
+    /**
+     * @return {@code True} if server supports client message acknowledge.
+     */
+    public boolean clientAck() {
+        return getFlag(CLIENT_ACK_FLAG_POS);
+    }
+
+    /**
+     * @param clientAck {@code True} if server supports client message 
acknowledge.
+     */
+    public void clientAck(boolean clientAck) {
+        setFlag(CLIENT_ACK_FLAG_POS, clientAck);
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(TcpDiscoveryHandshakeResponse.class, this, "super", 
super.toString());

Reply via email to