Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-901 1654f3615 -> c0d948b7f


# ignite-901


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c0d948b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c0d948b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c0d948b7

Branch: refs/heads/ignite-901
Commit: c0d948b7fa0ff61e597c882d48462d0ad774e056
Parents: 1654f36
Author: sboikov <sboi...@gridgain.com>
Authored: Wed Jul 15 13:20:55 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Wed Jul 15 13:48:08 2015 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      |  2 +-
 .../ignite/spi/discovery/tcp/ClientImpl.java    | 56 +++++++++++++++++
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 65 ++++++++++++++++----
 .../messages/TcpDiscoveryClientAckResponse.java | 58 +++++++++++++++++
 4 files changed, 169 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0d948b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 0db5273..1f6a8bb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -674,7 +674,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                         node.id() + ", msg=" + m + ']');
             }
             catch (IgniteCheckedException e) {
-                U.error(log, "Failed to send partitions full message [node=" + 
node + ']', e);
+                U.warn(log, "Failed to send partitions full message [node=" + 
node + ", err=" + e + ']');
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0d948b7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 1ae9b85..fa8098c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -818,6 +818,9 @@ class ClientImpl extends TcpDiscoveryImpl {
                         if (log.isDebugEnabled())
                             log.debug("Message has been received: " + msg);
 
+                        if (msg instanceof TcpDiscoveryClientAckResponse)
+                            
sockWriter.ackReceived((TcpDiscoveryClientAckResponse)msg);
+
                         spi.stats.onMessageReceived(msg);
 
                         if (spi.ensured(msg) && joinLatch.getCount() == 0L)
@@ -859,6 +862,9 @@ class ClientImpl extends TcpDiscoveryImpl {
         /** */
         private final Queue<TcpDiscoveryAbstractMessage> queue = new 
ArrayDeque<>();
 
+        /** */
+        private TcpDiscoveryAbstractMessage unackedMsg;
+
         /**
          *
          */
@@ -884,6 +890,8 @@ class ClientImpl extends TcpDiscoveryImpl {
             synchronized (mux) {
                 this.sock = sock;
 
+                unackedMsg = null;
+
                 mux.notifyAll();
             }
         }
@@ -897,6 +905,21 @@ class ClientImpl extends TcpDiscoveryImpl {
             }
         }
 
+        /**
+         * @param res Acknowledge response.
+         */
+        void ackReceived(TcpDiscoveryClientAckResponse res) {
+            synchronized (mux) {
+                if (unackedMsg != null) {
+                    assert unackedMsg.id().equals(res.messageId()) : 
unackedMsg;
+
+                    unackedMsg = null;
+                }
+
+                mux.notifyAll();
+            }
+        }
+
         /** {@inheritDoc} */
         @Override protected void body() throws InterruptedException {
             TcpDiscoveryAbstractMessage msg = null;
@@ -926,10 +949,43 @@ class ClientImpl extends TcpDiscoveryImpl {
                 for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : 
spi.sendMsgLsnrs)
                     msgLsnr.apply(msg);
 
+                boolean ack = !(msg instanceof TcpDiscoveryPingResponse);
+
                 try {
+                    if (ack) {
+                        synchronized (mux) {
+                            assert unackedMsg == null : unackedMsg;
+
+                            unackedMsg = msg;
+                        }
+                    }
+
                     spi.writeToSocket(sock, msg);
 
                     msg = null;
+
+                    if (ack) {
+                        long waitEnd = U.currentTimeMillis() + spi.ackTimeout;
+
+                        TcpDiscoveryAbstractMessage unacked;
+
+                        synchronized (mux) {
+                            while (unackedMsg != null && U.currentTimeMillis() 
< waitEnd)
+                                mux.wait(waitEnd);
+
+                            unacked = unackedMsg;
+
+                            unackedMsg = null;
+                        }
+
+                        if (unacked != null) {
+                            if (log.isDebugEnabled())
+                                log.debug("Failed to get acknowledge for 
message, will try to reconnect " +
+                                "[msg=" + unacked + ", timeout=" + 
spi.ackTimeout + ']');
+
+                            throw new IOException("Failed to get acknowledge 
for message: " + unacked);
+                        }
+                    }
                 }
                 catch (IOException e) {
                     if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0d948b7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 0003486..39f06d1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -4323,7 +4323,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 if (state == CONNECTED) {
                                     spi.writeToSocket(msg, sock, RES_OK);
 
-                                    if (clientMsgWrk != null && 
clientMsgWrk.getState() == State.NEW)
+                                    if (clientMsgWrk.getState() == State.NEW)
                                         clientMsgWrk.start();
 
                                     msgWorker.addMessage(msg);
@@ -4467,7 +4467,14 @@ class ServerImpl extends TcpDiscoveryImpl {
                         msgWorker.addMessage(msg);
 
                         // Send receipt back.
-                        if (clientMsgWrk == null)
+                        if (clientMsgWrk != null) {
+                            TcpDiscoveryClientAckResponse ack = new 
TcpDiscoveryClientAckResponse(locNodeId, msg.id());
+
+                            ack.verify(locNodeId);
+
+                            clientMsgWrk.addMessage(ack);
+                        }
+                        else
                             spi.writeToSocket(msg, sock, RES_OK);
                     }
                     catch (IgniteCheckedException e) {
@@ -4577,8 +4584,11 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 msg.responded(true);
 
-                if (clientMsgWrk != null && clientMsgWrk.getState() == 
State.NEW)
+                if (clientMsgWrk != null && clientMsgWrk.getState() == 
State.NEW) {
+                    clientMsgWrk.clientVersion(U.productVersion(msg.node()));
+
                     clientMsgWrk.start();
+                }
 
                 msgWorker.addMessage(msg);
 
@@ -4689,6 +4699,9 @@ class ServerImpl extends TcpDiscoveryImpl {
         /** */
         private final AtomicReference<GridFutureAdapter<Boolean>> pingFut = 
new AtomicReference<>();
 
+        /** */
+        private IgniteProductVersion clientVer;
+
         /**
          * @param sock Socket.
          * @param clientNodeId Node ID.
@@ -4701,6 +4714,13 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         * @param clientVer Client version.
+         */
+        void clientVersion(IgniteProductVersion clientVer) {
+            this.clientVer = clientVer;
+        }
+
+        /**
          * @return Current client metrics.
          */
         ClusterMetrics metrics() {
@@ -4719,17 +4739,40 @@ class ServerImpl extends TcpDiscoveryImpl {
             try {
                 assert msg.verified() : msg;
 
-                if (log.isDebugEnabled())
-                    log.debug("Redirecting message to client [sock=" + sock + 
", locNodeId="
-                        + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + 
", msg=" + msg + ']');
+                if (msg instanceof TcpDiscoveryClientAckResponse) {
+                    if (clientVer == null) {
+                        ClusterNode node = spi.getNode(clientNodeId);
 
-                try {
-                    prepareNodeAddedMessage(msg, clientNodeId, null, null);
+                        if (node != null)
+                            clientVer = IgniteUtils.productVersion(node);
+                        else if (log.isDebugEnabled())
+                            log.debug("Skip sending message ack to client, 
fail to get client node " +
+                                "[sock=" + sock + ", locNodeId=" + 
getLocalNodeId() +
+                                ", rmtNodeId=" + clientNodeId + ", msg=" + msg 
+ ']');
+                    }
 
-                    writeToSocket(sock, msg);
+                    if (clientVer != null &&
+                        
clientVer.compareTo(TcpDiscoveryClientAckResponse.CLIENT_ACK_SINCE_VERSION) >= 
0) {
+                        if (log.isDebugEnabled())
+                            log.debug("Sending message ack to client [sock=" + 
sock + ", locNodeId="
+                                + getLocalNodeId() + ", rmtNodeId=" + 
clientNodeId + ", msg=" + msg + ']');
+
+                        writeToSocket(sock, msg);
+                    }
                 }
-                finally {
-                    clearNodeAddedMessage(msg);
+                else {
+                    try {
+                        if (log.isDebugEnabled())
+                            log.debug("Redirecting message to client [sock=" + 
sock + ", locNodeId="
+                                + getLocalNodeId() + ", rmtNodeId=" + 
clientNodeId + ", msg=" + msg + ']');
+
+                        prepareNodeAddedMessage(msg, clientNodeId, null, null);
+
+                        writeToSocket(sock, msg);
+                    }
+                    finally {
+                        clearNodeAddedMessage(msg);
+                    }
                 }
             }
             catch (IgniteCheckedException | IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0d948b7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java
new file mode 100644
index 0000000..89cc071
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.messages;
+
+import org.apache.ignite.lang.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class TcpDiscoveryClientAckResponse extends TcpDiscoveryAbstractMessage 
{
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    public static final IgniteProductVersion CLIENT_ACK_SINCE_VERSION = 
IgniteProductVersion.fromString("1.4.1");
+
+    /** */
+    private final IgniteUuid msgId;
+
+    /**
+     * @param creatorNodeId Creator node ID.
+     * @param msgId Message ID to ack.
+     */
+    public TcpDiscoveryClientAckResponse(UUID creatorNodeId, IgniteUuid msgId) 
{
+        super(creatorNodeId);
+
+        this.msgId = msgId;
+    }
+
+    /**
+     * @return Acknowledged message ID.
+     */
+    public IgniteUuid messageId() {
+        return msgId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean highPriority() {
+        return true;
+    }
+}

Reply via email to