# IGNITE-709 Improve ping from client to server.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/505a03e9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/505a03e9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/505a03e9 Branch: refs/heads/ignite-836_2 Commit: 505a03e92db2747ed5beb07eb47cce17271d387c Parents: c05e368 Author: sevdokimov <sevdoki...@gridgain.com> Authored: Tue May 12 13:43:26 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Tue May 12 13:43:26 2015 +0300 ---------------------------------------------------------------------- .../discovery/tcp/TcpClientDiscoverySpi.java | 56 +++++++++++++++- .../spi/discovery/tcp/TcpDiscoverySpi.java | 62 ++++++++++++++++-- .../messages/TcpDiscoveryClientPingRequest.java | 56 ++++++++++++++++ .../TcpDiscoveryClientPingResponse.java | 67 ++++++++++++++++++++ .../tcp/TcpClientDiscoverySpiSelfTest.java | 30 +++++++++ 5 files changed, 264 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/505a03e9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java index 6752bf5..d55d1c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java @@ -20,6 +20,7 @@ package org.apache.ignite.spi.discovery.tcp; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; @@ -75,6 +76,9 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp /** Remote nodes. */ private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>(); + /** Remote nodes. */ + private final ConcurrentMap<UUID, GridFutureAdapter<Boolean>> pingFuts = new ConcurrentHashMap8<>(); + /** Socket writer. */ private SocketWriter sockWriter; @@ -316,6 +320,9 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp } } + for (GridFutureAdapter<Boolean> fut : pingFuts.values()) + fut.onDone(false); + rmtNodes.clear(); U.interrupt(sockTimeoutWorker); @@ -359,15 +366,46 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp } /** {@inheritDoc} */ - @Override public boolean pingNode(UUID nodeId) { - assert nodeId != null; + @Override public boolean pingNode(@NotNull final UUID nodeId) { + if (getSpiContext().isStopping()) + return false; if (nodeId.equals(getLocalNodeId())) return true; TcpDiscoveryNode node = rmtNodes.get(nodeId); - return node != null && node.visible(); + if (node == null || !node.visible()) + return false; + + GridFutureAdapter<Boolean> fut = pingFuts.get(nodeId); + + if (fut == null) { + fut = new GridFutureAdapter<>(); + + GridFutureAdapter<Boolean> oldFut = pingFuts.putIfAbsent(nodeId, fut); + + if (oldFut != null) + fut = oldFut; + else + sockWriter.sendMessage(new TcpDiscoveryClientPingRequest(getLocalNodeId(), nodeId)); + } + + final GridFutureAdapter<Boolean> finalFut = fut; + + timer.schedule(new TimerTask() { + @Override public void run() { + if (pingFuts.remove(nodeId, finalFut)) + finalFut.onDone(false); + } + }, netTimeout); + + try { + return fut.get(); + } + catch (IgniteCheckedException e) { + throw new IgniteSpiException(e); // Should newer occur + } } /** {@inheritDoc} */ @@ -1069,6 +1107,8 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp processClientReconnectMessage((TcpDiscoveryClientReconnectMessage)msg); else if (msg instanceof TcpDiscoveryCustomEventMessage) processCustomMessage((TcpDiscoveryCustomEventMessage)msg); + else if (msg instanceof TcpDiscoveryClientPingResponse) + processClientPingResponse((TcpDiscoveryClientPingResponse)msg); stats.onMessageProcessingFinished(msg); } @@ -1366,6 +1406,16 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp } /** + * @param msg Message. + */ + private void processClientPingResponse(TcpDiscoveryClientPingResponse msg) { + GridFutureAdapter<Boolean> fut = pingFuts.remove(msg.nodeToPing()); + + if (fut != null) + fut.onDone(msg.result()); + } + + /** * @param nodeId Node ID. * @param metrics Metrics. * @param cacheMetrics Cache metrics. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/505a03e9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 3624791..e00f798 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -203,6 +203,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) private int reconCnt = DFLT_RECONNECT_CNT; + /** */ + private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 10, 2000, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>()); + /** Nodes ring. */ @GridToStringExclude private final TcpDiscoveryNodesRing ring = new TcpDiscoveryNodesRing(); @@ -285,6 +289,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov private final CopyOnWriteArrayList<IgniteInClosure<TcpDiscoveryAbstractMessage>> sendMsgLsnrs = new CopyOnWriteArrayList<>(); + /** */ + private final CopyOnWriteArrayList<IgniteInClosure<Socket>> incomeConnLsnrs = + new CopyOnWriteArrayList<>(); + /** {@inheritDoc} */ @IgniteInstanceResource @Override public void injectResources(Ignite ignite) { @@ -2034,15 +2042,29 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov /** * <strong>FOR TEST ONLY!!!</strong> */ - public void addSendMessageListener(IgniteInClosure<TcpDiscoveryAbstractMessage> msg) { - sendMsgLsnrs.add(msg); + public void addSendMessageListener(IgniteInClosure<TcpDiscoveryAbstractMessage> lsnr) { + sendMsgLsnrs.add(lsnr); + } + + /** + * <strong>FOR TEST ONLY!!!</strong> + */ + public void removeSendMessageListener(IgniteInClosure<TcpDiscoveryAbstractMessage> lsnr) { + sendMsgLsnrs.remove(lsnr); + } + + /** + * <strong>FOR TEST ONLY!!!</strong> + */ + public void addIncomeConnectionListener(IgniteInClosure<Socket> lsnr) { + incomeConnLsnrs.add(lsnr); } /** * <strong>FOR TEST ONLY!!!</strong> */ - public void removeSendMessageListener(IgniteInClosure<TcpDiscoveryAbstractMessage> msg) { - sendMsgLsnrs.remove(msg); + public void removeIncomeConnectionListener(IgniteInClosure<Socket> lsnr) { + incomeConnLsnrs.remove(lsnr); } /** @@ -2634,6 +2656,9 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov else if (msg instanceof TcpDiscoveryCustomEventMessage) processCustomMessage((TcpDiscoveryCustomEventMessage)msg); + else if (msg instanceof TcpDiscoveryClientPingRequest) + processClientPingRequest((TcpDiscoveryClientPingRequest)msg); + else assert false : "Unknown message type: " + msg.getClass().getSimpleName(); @@ -4448,6 +4473,32 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov /** * @param msg Message. */ + private void processClientPingRequest(final TcpDiscoveryClientPingRequest msg) { + utilityPool.execute(new Runnable() { + @Override public void run() { + boolean res = pingNode(msg.nodeToPing()); + + final ClientMessageWorker worker = clientMsgWorkers.get(msg.creatorNodeId()); + + if (worker == null) { + if (log.isDebugEnabled()) + log.debug("Ping request from dead client node, will be skipped: " + msg.creatorNodeId()); + } + else { + TcpDiscoveryClientPingResponse pingRes = new TcpDiscoveryClientPingResponse( + getLocalNodeId(), msg.nodeToPing(), res); + + pingRes.verify(getLocalNodeId()); + + worker.addMessage(pingRes); + } + } + }); + } + + /** + * @param msg Message. + */ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) { if (isLocalNodeCoordinator()) { if (msg.verified()) { @@ -4643,6 +4694,9 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov sock.setSoTimeout((int)netTimeout); + for (IgniteInClosure<Socket> connLsnr : incomeConnLsnrs) + connLsnr.apply(sock); + in = new BufferedInputStream(sock.getInputStream()); byte[] buf = new byte[4]; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/505a03e9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java new file mode 100644 index 0000000..f9f164d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingRequest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Ping request. + */ +public class TcpDiscoveryClientPingRequest extends TcpDiscoveryAbstractMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Pinged client node ID. */ + private final UUID nodeToPing; + + /** + * @param creatorNodeId Creator node ID. + * @param nodeToPing Pinged client node ID. + */ + public TcpDiscoveryClientPingRequest(UUID creatorNodeId, @Nullable UUID nodeToPing) { + super(creatorNodeId); + + this.nodeToPing = nodeToPing; + } + + /** + * @return Pinged client node ID. + */ + @Nullable public UUID nodeToPing() { + return nodeToPing; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryClientPingRequest.class, this, "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/505a03e9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java new file mode 100644 index 0000000..26a2b00 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientPingResponse.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Ping request. + */ +public class TcpDiscoveryClientPingResponse extends TcpDiscoveryAbstractMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Pinged client node ID. */ + private final UUID nodeToPing; + + /** */ + private final boolean res; + + /** + * @param creatorNodeId Creator node ID. + * @param nodeToPing Pinged client node ID. + */ + public TcpDiscoveryClientPingResponse(UUID creatorNodeId, @Nullable UUID nodeToPing, boolean res) { + super(creatorNodeId); + + this.nodeToPing = nodeToPing; + this.res = res; + } + + /** + * @return Pinged client node ID. + */ + @Nullable public UUID nodeToPing() { + return nodeToPing; + } + + /** + * @return Result of ping. + */ + public boolean result() { + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryClientPingResponse.class, this, "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/505a03e9/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index a06bfd9..49ef4aa 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -353,6 +353,36 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testPingFailedNodeFromClient() throws Exception { + startServerNodes(2); + startClientNodes(1); + + Ignite srv0 = G.ignite("server-0"); + Ignite srv1 = G.ignite("server-1"); + Ignite client = G.ignite("client-0"); + + final CountDownLatch latch = new CountDownLatch(1); + + ((TcpDiscoverySpi)srv1.configuration().getDiscoverySpi()).addIncomeConnectionListener(new IgniteInClosure<Socket>() { + @Override public void apply(Socket sock) { + try { + latch.await(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); + + assert ((IgniteEx)client).context().discovery().pingNode(srv0.cluster().localNode().id()); + assert !((IgniteEx)client).context().discovery().pingNode(srv1.cluster().localNode().id()); + + latch.countDown(); + } + + /** + * @throws Exception If failed. + */ public void testClientReconnectOnRouterFail() throws Exception { clientsPerSrv = 1;