# IGNITE-709 Improve ping from server to client.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a001312d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a001312d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a001312d Branch: refs/heads/ignite-836_2 Commit: a001312d3a6b1175be4ef98e6113ea6eaf179246 Parents: 505a03e Author: sevdokimov <sevdoki...@gridgain.com> Authored: Tue May 12 16:30:34 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Tue May 12 16:30:34 2015 +0300 ---------------------------------------------------------------------- .../internal/util/future/SettableFuture.java | 86 ++++++++++++ .../discovery/tcp/TcpClientDiscoverySpi.java | 11 ++ .../spi/discovery/tcp/TcpDiscoverySpi.java | 138 ++++++++++++++++--- .../tcp/TcpClientDiscoverySpiSelfTest.java | 37 ++++- 4 files changed, 248 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a001312d/modules/core/src/main/java/org/apache/ignite/internal/util/future/SettableFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/SettableFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/SettableFuture.java new file mode 100644 index 0000000..673b6b6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/SettableFuture.java @@ -0,0 +1,86 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.internal.util.future; + +import java.util.concurrent.*; + +/** + * Simple implementation of {@link Future} + */ +public class SettableFuture<T> implements Future<T> { + /** */ + private final CountDownLatch latch = new CountDownLatch(1); + + /** Result of computation. */ + private T res; + + /** Exception threw during the computation. */ + private ExecutionException err; + + /** {@inheritDoc} */ + @Override public boolean cancel(boolean mayInterruptIfRunning) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public boolean isCancelled() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isDone() { + return latch.getCount() == 0; + } + + /** {@inheritDoc} */ + @Override public T get() throws InterruptedException, ExecutionException { + latch.await(); + + if (err != null) + throw err; + + return res; + } + + /** {@inheritDoc} */ + @Override public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, + TimeoutException { + + if (!latch.await(timeout, unit)) + throw new TimeoutException(); + + if (err != null) + throw err; + + return res; + } + + /** + * Computation is done successful. + * + * @param res Result of computation. + */ + public void set(T res) { + this.res = res; + + latch.countDown(); + } + + /** + * Computation failed. + * + * @param throwable Error. + */ + public void setException(Throwable throwable) { + err = new ExecutionException(throwable); + + latch.countDown(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a001312d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java index d55d1c5..d1446a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java @@ -1109,6 +1109,8 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp processCustomMessage((TcpDiscoveryCustomEventMessage)msg); else if (msg instanceof TcpDiscoveryClientPingResponse) processClientPingResponse((TcpDiscoveryClientPingResponse)msg); + else if (msg instanceof TcpDiscoveryPingRequest) + processPingRequest((TcpDiscoveryPingRequest)msg); stats.onMessageProcessingFinished(msg); } @@ -1416,6 +1418,15 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp } /** + * Router want to ping this client. + * + * @param msg Message. + */ + private void processPingRequest(TcpDiscoveryPingRequest msg) { + sockWriter.sendMessage(new TcpDiscoveryPingResponse(getLocalNodeId())); + } + + /** * @param nodeId Node ID. * @param metrics Metrics. * @param cacheMetrics Cache metrics. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a001312d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index e00f798..f9c6130 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -51,6 +51,7 @@ import java.net.*; import java.text.*; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.*; import static org.apache.ignite.events.EventType.*; import static org.apache.ignite.internal.IgniteNodeAttributes.*; @@ -204,7 +205,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov private int reconCnt = DFLT_RECONNECT_CNT; /** */ - private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 10, 2000, TimeUnit.MILLISECONDS, + private final Executor utilityPool = new ThreadPoolExecutor(0, 10, 2000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); /** Nodes ring. */ @@ -1143,8 +1144,28 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov UUID locNodeId = getLocalNodeId(); - if (F.contains(locNodeAddrs, addr)) - return F.t(getLocalNodeId(), clientNodeId != null && clientMsgWorkers.containsKey(clientNodeId)); + if (F.contains(locNodeAddrs, addr)) { + if (clientNodeId == null) + return F.t(getLocalNodeId(), false); + + ClientMessageWorker clientWorker = clientMsgWorkers.get(clientNodeId); + + if (clientWorker == null) + return F.t(getLocalNodeId(), false); + + boolean clientPingRes; + + try { + clientPingRes = clientWorker.ping(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedCheckedException(e); + } + + return F.t(getLocalNodeId(), clientPingRes); + } GridFutureAdapter<IgniteBiTuple<UUID, Boolean>> fut = new GridFutureAdapter<>(); @@ -2659,6 +2680,9 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov else if (msg instanceof TcpDiscoveryClientPingRequest) processClientPingRequest((TcpDiscoveryClientPingRequest)msg); + else if (msg instanceof TcpDiscoveryPingResponse) + processPingResponse((TcpDiscoveryPingResponse)msg); + else assert false : "Unknown message type: " + msg.getClass().getSimpleName(); @@ -4499,6 +4523,16 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov /** * @param msg Message. */ + private void processPingResponse(final TcpDiscoveryPingResponse msg) { + ClientMessageWorker clientWorker = clientMsgWorkers.get(msg.creatorNodeId()); + + if (clientWorker != null) + clientWorker.pingResult(true); + } + + /** + * @param msg Message. + */ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) { if (isLocalNodeCoordinator()) { if (msg.verified()) { @@ -4660,9 +4694,6 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov /** */ private volatile UUID nodeId; - /** */ - private volatile boolean client; - /** * Constructor. * @@ -4682,6 +4713,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov @Override protected void body() throws InterruptedException { UUID locNodeId = getLocalNodeId(); + ClientMessageWorker clientMsgWrk = null; + try { InputStream in; @@ -4746,8 +4779,12 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov TcpDiscoveryPingResponse res = new TcpDiscoveryPingResponse(locNodeId); - if (req.clientNodeId() != null) - res.clientExists(clientMsgWorkers.containsKey(req.clientNodeId())); + if (req.clientNodeId() != null) { + ClientMessageWorker clientWorker = clientMsgWorkers.get(req.clientNodeId()); + + if (clientWorker != null) + res.clientExists(clientWorker.ping()); + } writeToSocket(sock, res); } @@ -4761,10 +4798,8 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov TcpDiscoveryHandshakeRequest req = (TcpDiscoveryHandshakeRequest)msg; UUID nodeId = req.creatorNodeId(); - boolean client = req.client(); this.nodeId = nodeId; - this.client = client; TcpDiscoveryHandshakeResponse res = new TcpDiscoveryHandshakeResponse(locNodeId, locNode.internalOrder()); @@ -4774,7 +4809,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov // It can happen if a remote node is stopped and it has a loopback address in the list of addresses, // the local node sends a handshake request message on the loopback address, so we get here. if (locNodeId.equals(nodeId)) { - assert !client; + assert !req.client(); if (log.isDebugEnabled()) log.debug("Handshake request from local node: " + req); @@ -4782,12 +4817,12 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov return; } - if (client) { + if (req.client()) { if (log.isDebugEnabled()) log.debug("Created client message worker [locNodeId=" + locNodeId + ", rmtNodeId=" + nodeId + ", sock=" + sock + ']'); - ClientMessageWorker clientMsgWrk = new ClientMessageWorker(sock, nodeId); + clientMsgWrk = new ClientMessageWorker(sock, nodeId); clientMsgWrk.start(); @@ -4796,11 +4831,11 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (log.isDebugEnabled()) log.debug("Initialized connection with remote node [nodeId=" + nodeId + - ", client=" + client + ']'); + ", client=" + req.client() + ']'); if (debugMode) debugLog("Initialized connection with remote node [nodeId=" + nodeId + - ", client=" + client + ']'); + ", client=" + req.client() + ']'); } catch (IOException e) { if (log.isDebugEnabled()) @@ -4881,7 +4916,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov if (!req.responded()) { boolean ok = processJoinRequestMessage(req); - if (client && ok) + if (clientMsgWrk != null && ok) continue; else // Direct join request - no need to handle this socket anymore. @@ -4889,7 +4924,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov } } else if (msg instanceof TcpDiscoveryClientReconnectMessage) { - if (client) { + if (clientMsgWrk != null) { TcpDiscoverySpiState state = spiStateCopy(); if (state == CONNECTED) { @@ -5026,7 +5061,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov msgWorker.addMessage(msg); // Send receipt back. - if (!client) + if (clientMsgWrk == null) writeToSocket(sock, RES_OK); } catch (IgniteCheckedException e) { @@ -5080,12 +5115,14 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov } } finally { - if (client) { + if (clientMsgWrk != null) { if (log.isDebugEnabled()) log.debug("Client connection failed [sock=" + sock + ", locNodeId=" + locNodeId + ", rmtNodeId=" + nodeId + ']'); - U.interrupt(clientMsgWorkers.remove(nodeId)); + clientMsgWorkers.remove(nodeId, clientMsgWrk); + + U.interrupt(clientMsgWrk); } U.closeQuiet(sock); @@ -5238,6 +5275,9 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov /** Current client metrics. */ private volatile ClusterMetrics metrics; + /** */ + private final AtomicReference<SettableFuture<Boolean>> pingFut = new AtomicReference<>(); + /** * @param sock Socket. * @param nodeId Node ID. @@ -5300,16 +5340,72 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov onException("Client connection failed [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + nodeId + ", msg=" + msg + ']', e); - U.interrupt(clientMsgWorkers.remove(nodeId)); + clientMsgWorkers.remove(nodeId, this); + + U.interrupt(this); U.closeQuiet(sock); } } + /** + * + */ + public void pingResult(boolean res) { + SettableFuture<Boolean> fut = pingFut.getAndSet(null); + + if (fut != null) + fut.set(res); + } + + /** + * + */ + public boolean ping() throws InterruptedException { + if (isNodeStopping()) + return false; + + SettableFuture<Boolean> fut; + + while (true) { + fut = pingFut.get(); + + if (fut != null) + break; + + fut = new SettableFuture<>(); + + if (pingFut.compareAndSet(null, fut)) { + TcpDiscoveryPingRequest pingReq = new TcpDiscoveryPingRequest(getLocalNodeId(), nodeId); + + pingReq.verify(getLocalNodeId()); + + addMessage(pingReq); + + break; + } + } + + try { + return fut.get(ackTimeout, TimeUnit.MILLISECONDS); + } + catch (ExecutionException e) { + throw new IgniteSpiException("Internal error: ping future cannot be done with exception", e); + } + catch (TimeoutException ignored) { + if (pingFut.compareAndSet(fut, null)) + fut.set(false); + + return false; + } + } + /** {@inheritDoc} */ @Override protected void cleanup() { super.cleanup(); + pingResult(false); + U.closeQuiet(sock); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a001312d/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index 49ef4aa..507b3e7 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -383,6 +383,30 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testPingFailedClientNode() throws Exception { + startServerNodes(2); + startClientNodes(1); + + Ignite srv0 = G.ignite("server-0"); + Ignite srv1 = G.ignite("server-1"); + Ignite client = G.ignite("client-0"); + + ((TcpDiscoverySpiAdapter)srv0.configuration().getDiscoverySpi()).setAckTimeout(1000); + + ((TestTcpClientDiscovery)client.configuration().getDiscoverySpi()).pauseSocketWrite(); + + assert !((IgniteEx)srv1).context().discovery().pingNode(client.cluster().localNode().id()); + assert !((IgniteEx)srv0).context().discovery().pingNode(client.cluster().localNode().id()); + + ((TestTcpClientDiscovery)client.configuration().getDiscoverySpi()).resumeAll(); + + assert ((IgniteEx)srv1).context().discovery().pingNode(client.cluster().localNode().id()); + assert ((IgniteEx)srv0).context().discovery().pingNode(client.cluster().localNode().id()); + } + + /** + * @throws Exception If failed. + */ public void testClientReconnectOnRouterFail() throws Exception { clientsPerSrv = 1; @@ -461,7 +485,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { clientLeftLatch = new CountDownLatch(1); - ((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).resume(); + ((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).resumeAll(); await(clientLeftLatch); @@ -1042,7 +1066,14 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { /** * */ - private void pauseAll() { + public void pauseSocketWrite() { + pauseResumeOperation(true, writeLock); + } + + /** + * + */ + public void pauseAll() { pauseResumeOperation(true, openSockLock, writeLock); brokeConnection(); @@ -1051,7 +1082,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { /** * */ - private void resume() { + public void resumeAll() { pauseResumeOperation(false, openSockLock, writeLock); } }