Repository: incubator-ignite Updated Branches: refs/heads/ignite-709_2 bf80c27f1 -> 7625bdcaf
# IGNITE-709 Add additional tests. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7625bdca Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7625bdca Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7625bdca Branch: refs/heads/ignite-709_2 Commit: 7625bdcafdcaed16c8648ac45f6ec832f62268ad Parents: bf80c27 Author: sevdokimov <sevdoki...@gridgain.com> Authored: Thu May 7 16:23:42 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Thu May 7 16:23:42 2015 +0300 ---------------------------------------------------------------------- .../spi/discovery/tcp/TcpClientDiscoverySpi.java | 13 +++++++++++++ .../ignite/spi/discovery/tcp/TcpDiscoverySpi.java | 13 ++++++++++++- 2 files changed, 25 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7625bdca/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java index c319f9e..def9568 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java @@ -466,10 +466,14 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp msg.client(true); + System.out.println("TcpClientDiscoverySpi.SocketReader: join write: " + msg); + writeToSocket(sock, msg); int res = readReceipt(sock, ackTimeout); + System.out.println("TcpClientDiscoverySpi.SocketReader: join res: " + (res == RES_OK ? "OK" : "" + res)); + switch (res) { case RES_OK: return sock; @@ -667,6 +671,9 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp msg.senderNodeId(rmtNodeId); + if (!(msg instanceof TcpDiscoveryHeartbeatMessage)) + System.out.println("TcpClientDiscoverySpi.SocketReader: read: " + msg); + if (log.isDebugEnabled()) log.debug("Message has been received: " + msg); @@ -777,6 +784,9 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp } try { + if (!(msg instanceof TcpDiscoveryHeartbeatMessage)) + System.out.println("TcpClientDiscoverySpi.SocketReader: write: " + msg); + writeToSocket(sock, msg); msg = null; @@ -924,6 +934,9 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp while (true) { Object msg = queue.take(); + if (!(msg instanceof TcpDiscoveryHeartbeatMessage)) + System.out.println("TcpClientDiscoverySpi.MessageWorker: process: " + msg); + if (msg == JOIN_TIMEOUT) { if (joinLatch.getCount() > 0) { joinErr = new IgniteSpiException("Join process timed out [sock=" + sock + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7625bdca/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index a9a9306..c1b60a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -3401,7 +3401,18 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (routerNode == null) throw new IgniteSpiException("Router node for client does not exist: " + node); - assert !routerNode.isClient(); + if (routerNode.isClient()) + throw new IgniteSpiException("Router node is a client node: " + node); + + if (routerNode.id().equals(getLocalNodeId())) { + ClientMessageWorker worker = clientMsgWorkers.get(node.id()); + + msg.verify(getLocalNodeId()); // Client worker require verified messages. + + worker.addMessage(msg); + + return; + } trySendMessageDirectly(routerNode, msg);