Repository: incubator-ignite Updated Branches: refs/heads/ignite-yardstick-client-2 [created] 3fa39b19e
# handle multiple workers per client Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3fa39b19 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3fa39b19 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3fa39b19 Branch: refs/heads/ignite-yardstick-client-2 Commit: 3fa39b19e222aeced4ced738a3f58a597b1b170d Parents: c97f013 Author: sboikov <sboi...@gridgain.com> Authored: Thu Jun 25 09:58:54 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Jun 25 09:58:54 2015 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 43 ++++++++++++++++---- 1 file changed, 36 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3fa39b19/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index f959379..76ddf75 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -4127,15 +4127,44 @@ class ServerImpl extends TcpDiscoveryImpl { } if (req.client()) { - if (log.isDebugEnabled()) - log.debug("Created client message worker [locNodeId=" + locNodeId + + ClientMessageWorker clientMsgWrk0 = new ClientMessageWorker(sock, nodeId); + + while (true) { + ClientMessageWorker old = clientMsgWorkers.putIfAbsent(nodeId, clientMsgWrk0); + + if (old == null) + break; + + log.info("Have old client: " + nodeId); + + if (old.isInterrupted()) { + clientMsgWorkers.remove(nodeId, old); + + continue; + } + + old.join(500); + + old = clientMsgWorkers.putIfAbsent(nodeId, clientMsgWrk0); + + if (old == null) + break; + + log.error("Already have client message worker [locNodeId=" + locNodeId + ", rmtNodeId=" + nodeId + ", sock=" + sock + ']'); - clientMsgWrk = new ClientMessageWorker(sock, nodeId); + return; + } - clientMsgWrk.start(); + //if (log.isDebugEnabled()) + log.info("Created client message worker [locNodeId=" + locNodeId + + ", rmtNodeId=" + nodeId + ", sock=" + sock + ']'); + + assert clientMsgWrk0 == clientMsgWorkers.get(nodeId); + + clientMsgWrk = clientMsgWrk0; - clientMsgWorkers.put(nodeId, clientMsgWrk); + clientMsgWrk.start(); } if (log.isDebugEnabled()) @@ -4423,8 +4452,8 @@ class ServerImpl extends TcpDiscoveryImpl { } finally { if (clientMsgWrk != null) { - if (log.isDebugEnabled()) - log.debug("Client connection failed [sock=" + sock + ", locNodeId=" + locNodeId + + //if (log.isDebugEnabled()) + log.error("Client connection failed [sock=" + sock + ", locNodeId=" + locNodeId + ", rmtNodeId=" + nodeId + ']'); clientMsgWorkers.remove(nodeId, clientMsgWrk);