Repository: incubator-ignite Updated Branches: refs/heads/ignite-901 0e8d2ccc9 -> 59cc047d0
# ignite-901 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/59cc047d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/59cc047d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/59cc047d Branch: refs/heads/ignite-901 Commit: 59cc047d07d6850166d2e7d105d0e2464ae9d680 Parents: 0e8d2cc Author: sboikov <sboi...@gridgain.com> Authored: Wed Jul 15 17:47:17 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Jul 15 17:47:17 2015 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ClientImpl.java | 44 +++++++++++++------- .../ignite/spi/discovery/tcp/ServerImpl.java | 3 ++ .../messages/TcpDiscoveryAbstractMessage.java | 3 ++ .../messages/TcpDiscoveryHandshakeResponse.java | 14 +++++++ 4 files changed, 48 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59cc047d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index e23c191..a294a3a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -385,7 +385,7 @@ class ClientImpl extends TcpDiscoveryImpl { * @see TcpDiscoverySpi#joinTimeout */ @SuppressWarnings("BusyWait") - @Nullable private Socket joinTopology(boolean recon, long timeout) throws IgniteSpiException, InterruptedException { + @Nullable private T2<Socket, Boolean> joinTopology(boolean recon, long timeout) throws IgniteSpiException, InterruptedException { Collection<InetSocketAddress> addrs = null; long startTime = U.currentTimeMillis(); @@ -425,7 +425,7 @@ class ClientImpl extends TcpDiscoveryImpl { InetSocketAddress addr = it.next(); - T2<Socket, Integer> sockAndRes = sendJoinRequest(recon, addr); + T3<Socket, Integer, Boolean> sockAndRes = sendJoinRequest(recon, addr); if (sockAndRes == null) { it.remove(); @@ -439,7 +439,7 @@ class ClientImpl extends TcpDiscoveryImpl { switch (sockAndRes.get2()) { case RES_OK: - return sock; + return new T2<>(sock, sockAndRes.get3()); case RES_CONTINUE_JOIN: case RES_WAIT: @@ -470,9 +470,9 @@ class ClientImpl extends TcpDiscoveryImpl { /** * @param recon {@code True} if reconnects. * @param addr Address. - * @return Socket and connect response. + * @return Socket, connect response and client acknowledge flag. */ - @Nullable private T2<Socket, Integer> sendJoinRequest(boolean recon, InetSocketAddress addr) { + @Nullable private T3<Socket, Integer, Boolean> sendJoinRequest(boolean recon, InetSocketAddress addr) { assert addr != null; if (log.isDebugEnabled()) @@ -541,7 +541,7 @@ class ClientImpl extends TcpDiscoveryImpl { log.debug("Message has been sent to address [msg=" + msg + ", addr=" + addr + ", rmtNodeId=" + rmtNodeId + ']'); - return new T2<>(sock, spi.readReceipt(sock, ackTimeout0)); + return new T3<>(sock, spi.readReceipt(sock, ackTimeout0), res.clientAck()); } catch (IOException | IgniteCheckedException e) { U.closeQuiet(sock); @@ -863,6 +863,9 @@ class ClientImpl extends TcpDiscoveryImpl { private Socket sock; /** */ + private boolean clientAck; + + /** */ private final Queue<TcpDiscoveryAbstractMessage> queue = new ArrayDeque<>(); /** */ @@ -888,11 +891,14 @@ class ClientImpl extends TcpDiscoveryImpl { /** * @param sock Socket. + * @param clientAck {@code True} is server supports client message acknowlede. */ - private void setSocket(Socket sock) { + private void setSocket(Socket sock, boolean clientAck) { synchronized (mux) { this.sock = sock; + this.clientAck = clientAck; + unackedMsg = null; mux.notifyAll(); @@ -952,7 +958,7 @@ class ClientImpl extends TcpDiscoveryImpl { for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sendMsgLsnrs) msgLsnr.apply(msg); - boolean ack = !(msg instanceof TcpDiscoveryPingResponse); + boolean ack = clientAck && !(msg instanceof TcpDiscoveryPingResponse); try { if (ack) { @@ -1019,6 +1025,9 @@ class ClientImpl extends TcpDiscoveryImpl { private volatile Socket sock; /** */ + private boolean clientAck; + + /** */ private boolean join; /** @@ -1054,9 +1063,9 @@ class ClientImpl extends TcpDiscoveryImpl { try { while (true) { - sock = joinTopology(true, timeout); + T2<Socket, Boolean> joinRes = joinTopology(true, timeout); - if (sock == null) { + if (joinRes == null) { if (join) { joinError(new IgniteSpiException("Join process timed out, connection failed and " + "failed to reconnect (consider increasing 'joinTimeout' configuration property) " + @@ -1069,6 +1078,9 @@ class ClientImpl extends TcpDiscoveryImpl { return; } + sock = joinRes.get1(); + clientAck = joinRes.get2(); + if (isInterrupted()) throw new InterruptedException(); @@ -1373,9 +1385,9 @@ class ClientImpl extends TcpDiscoveryImpl { joinCnt++; - final Socket sock = joinTopology(false, spi.joinTimeout); + T2<Socket, Boolean> joinRes = joinTopology(false, spi.joinTimeout); - if (sock == null) { + if (joinRes == null) { if (join) joinError(new IgniteSpiException("Join process timed out.")); else { @@ -1387,9 +1399,9 @@ class ClientImpl extends TcpDiscoveryImpl { return; } - currSock = sock; + currSock = joinRes.get1(); - sockWriter.setSocket(sock); + sockWriter.setSocket(joinRes.get1(), joinRes.get2()); if (spi.joinTimeout > 0) { final int joinCnt0 = joinCnt; @@ -1402,7 +1414,7 @@ class ClientImpl extends TcpDiscoveryImpl { }, spi.joinTimeout); } - sockReader.setSocket(sock, locNode.clientRouterNodeId()); + sockReader.setSocket(joinRes.get1(), locNode.clientRouterNodeId()); } /** @@ -1760,7 +1772,7 @@ class ClientImpl extends TcpDiscoveryImpl { currSock = reconnector.sock; - sockWriter.setSocket(currSock); + sockWriter.setSocket(currSock, reconnector.clientAck); sockReader.setSocket(currSock, locNode.clientRouterNodeId()); reconnector = null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59cc047d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 39f06d1..1a28e86 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -4180,6 +4180,9 @@ class ServerImpl extends TcpDiscoveryImpl { TcpDiscoveryHandshakeResponse res = new TcpDiscoveryHandshakeResponse(locNodeId, locNode.internalOrder()); + if (req.client()) + res.clientAck(true); + spi.writeToSocket(sock, res); // It can happen if a remote node is stopped and it has a loopback address in the list of addresses, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59cc047d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java index 21dbf4f..6f52152 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java @@ -40,6 +40,9 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable { /** */ protected static final int CLIENT_RECON_SUCCESS_FLAG_POS = 2; + /** */ + protected static final int CLIENT_ACK_FLAG_POS = 4; + /** Sender of the message (transient). */ private transient UUID sndNodeId; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/59cc047d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java index 5c2f798..ac4be50 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryHandshakeResponse.java @@ -61,6 +61,20 @@ public class TcpDiscoveryHandshakeResponse extends TcpDiscoveryAbstractMessage { this.order = order; } + /** + * @return {@code True} if server supports client message acknowledge. + */ + public boolean clientAck() { + return getFlag(CLIENT_ACK_FLAG_POS); + } + + /** + * @param clientAck {@code True} if server supports client message acknowledge. + */ + public void clientAck(boolean clientAck) { + setFlag(CLIENT_ACK_FLAG_POS, clientAck); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryHandshakeResponse.class, this, "super", super.toString());