Repository: incubator-ignite Updated Branches: refs/heads/ignite-901 1654f3615 -> c0d948b7f
# ignite-901 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c0d948b7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c0d948b7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c0d948b7 Branch: refs/heads/ignite-901 Commit: c0d948b7fa0ff61e597c882d48462d0ad774e056 Parents: 1654f36 Author: sboikov <sboi...@gridgain.com> Authored: Wed Jul 15 13:20:55 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Jul 15 13:48:08 2015 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 2 +- .../ignite/spi/discovery/tcp/ClientImpl.java | 56 +++++++++++++++++ .../ignite/spi/discovery/tcp/ServerImpl.java | 65 ++++++++++++++++---- .../messages/TcpDiscoveryClientAckResponse.java | 58 +++++++++++++++++ 4 files changed, 169 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0d948b7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 0db5273..1f6a8bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -674,7 +674,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana node.id() + ", msg=" + m + ']'); } catch (IgniteCheckedException e) { - U.error(log, "Failed to send partitions full message [node=" + node + ']', e); + U.warn(log, "Failed to send partitions full message [node=" + node + ", err=" + e + ']'); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0d948b7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 1ae9b85..fa8098c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -818,6 +818,9 @@ class ClientImpl extends TcpDiscoveryImpl { if (log.isDebugEnabled()) log.debug("Message has been received: " + msg); + if (msg instanceof TcpDiscoveryClientAckResponse) + sockWriter.ackReceived((TcpDiscoveryClientAckResponse)msg); + spi.stats.onMessageReceived(msg); if (spi.ensured(msg) && joinLatch.getCount() == 0L) @@ -859,6 +862,9 @@ class ClientImpl extends TcpDiscoveryImpl { /** */ private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>(); + /** */ + private TcpDiscoveryAbstractMessage unackedMsg; + /** * */ @@ -884,6 +890,8 @@ class ClientImpl extends TcpDiscoveryImpl { synchronized (mux) { this.sock = sock; + unackedMsg = null; + mux.notifyAll(); } } @@ -897,6 +905,21 @@ class ClientImpl extends TcpDiscoveryImpl { } } + /** + * @param res Acknowledge response. + */ + void ackReceived(TcpDiscoveryClientAckResponse res) { + synchronized (mux) { + if (unackedMsg != null) { + assert unackedMsg.id().equals(res.messageId()) : unackedMsg; + + unackedMsg = null; + } + + mux.notifyAll(); + } + } + /** {@inheritDoc} */ @Override protected void body() throws InterruptedException { TcpDiscoveryAbstractMessage msg = null; @@ -926,10 +949,43 @@ class ClientImpl extends TcpDiscoveryImpl { for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sendMsgLsnrs) msgLsnr.apply(msg); + boolean ack = !(msg instanceof TcpDiscoveryPingResponse); + try { + if (ack) { + synchronized (mux) { + assert unackedMsg == null : unackedMsg; + + unackedMsg = msg; + } + } + spi.writeToSocket(sock, msg); msg = null; + + if (ack) { + long waitEnd = U.currentTimeMillis() + spi.ackTimeout; + + TcpDiscoveryAbstractMessage unacked; + + synchronized (mux) { + while (unackedMsg != null && U.currentTimeMillis() < waitEnd) + mux.wait(waitEnd); + + unacked = unackedMsg; + + unackedMsg = null; + } + + if (unacked != null) { + if (log.isDebugEnabled()) + log.debug("Failed to get acknowledge for message, will try to reconnect " + + "[msg=" + unacked + ", timeout=" + spi.ackTimeout + ']'); + + throw new IOException("Failed to get acknowledge for message: " + unacked); + } + } } catch (IOException e) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0d948b7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 0003486..39f06d1 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -4323,7 +4323,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (state == CONNECTED) { spi.writeToSocket(msg, sock, RES_OK); - if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW) + if (clientMsgWrk.getState() == State.NEW) clientMsgWrk.start(); msgWorker.addMessage(msg); @@ -4467,7 +4467,14 @@ class ServerImpl extends TcpDiscoveryImpl { msgWorker.addMessage(msg); // Send receipt back. - if (clientMsgWrk == null) + if (clientMsgWrk != null) { + TcpDiscoveryClientAckResponse ack = new TcpDiscoveryClientAckResponse(locNodeId, msg.id()); + + ack.verify(locNodeId); + + clientMsgWrk.addMessage(ack); + } + else spi.writeToSocket(msg, sock, RES_OK); } catch (IgniteCheckedException e) { @@ -4577,8 +4584,11 @@ class ServerImpl extends TcpDiscoveryImpl { msg.responded(true); - if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW) + if (clientMsgWrk != null && clientMsgWrk.getState() == State.NEW) { + clientMsgWrk.clientVersion(U.productVersion(msg.node())); + clientMsgWrk.start(); + } msgWorker.addMessage(msg); @@ -4689,6 +4699,9 @@ class ServerImpl extends TcpDiscoveryImpl { /** */ private final AtomicReference<GridFutureAdapter<Boolean>> pingFut = new AtomicReference<>(); + /** */ + private IgniteProductVersion clientVer; + /** * @param sock Socket. * @param clientNodeId Node ID. @@ -4701,6 +4714,13 @@ class ServerImpl extends TcpDiscoveryImpl { } /** + * @param clientVer Client version. + */ + void clientVersion(IgniteProductVersion clientVer) { + this.clientVer = clientVer; + } + + /** * @return Current client metrics. */ ClusterMetrics metrics() { @@ -4719,17 +4739,40 @@ class ServerImpl extends TcpDiscoveryImpl { try { assert msg.verified() : msg; - if (log.isDebugEnabled()) - log.debug("Redirecting message to client [sock=" + sock + ", locNodeId=" - + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); + if (msg instanceof TcpDiscoveryClientAckResponse) { + if (clientVer == null) { + ClusterNode node = spi.getNode(clientNodeId); - try { - prepareNodeAddedMessage(msg, clientNodeId, null, null); + if (node != null) + clientVer = IgniteUtils.productVersion(node); + else if (log.isDebugEnabled()) + log.debug("Skip sending message ack to client, fail to get client node " + + "[sock=" + sock + ", locNodeId=" + getLocalNodeId() + + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); + } - writeToSocket(sock, msg); + if (clientVer != null && + clientVer.compareTo(TcpDiscoveryClientAckResponse.CLIENT_ACK_SINCE_VERSION) >= 0) { + if (log.isDebugEnabled()) + log.debug("Sending message ack to client [sock=" + sock + ", locNodeId=" + + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); + + writeToSocket(sock, msg); + } } - finally { - clearNodeAddedMessage(msg); + else { + try { + if (log.isDebugEnabled()) + log.debug("Redirecting message to client [sock=" + sock + ", locNodeId=" + + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); + + prepareNodeAddedMessage(msg, clientNodeId, null, null); + + writeToSocket(sock, msg); + } + finally { + clearNodeAddedMessage(msg); + } } } catch (IgniteCheckedException | IOException e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c0d948b7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java new file mode 100644 index 0000000..89cc071 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import org.apache.ignite.lang.*; + +import java.util.*; + +/** + * + */ +public class TcpDiscoveryClientAckResponse extends TcpDiscoveryAbstractMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + public static final IgniteProductVersion CLIENT_ACK_SINCE_VERSION = IgniteProductVersion.fromString("1.4.1"); + + /** */ + private final IgniteUuid msgId; + + /** + * @param creatorNodeId Creator node ID. + * @param msgId Message ID to ack. + */ + public TcpDiscoveryClientAckResponse(UUID creatorNodeId, IgniteUuid msgId) { + super(creatorNodeId); + + this.msgId = msgId; + } + + /** + * @return Acknowledged message ID. + */ + public IgniteUuid messageId() { + return msgId; + } + + /** {@inheritDoc} */ + @Override public boolean highPriority() { + return true; + } +}