Repository: incubator-ignite Updated Branches: refs/heads/ignite-752 b1a2936e5 -> 8ee7371b5
ignite-752: self-review Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6d5e7d36 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6d5e7d36 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6d5e7d36 Branch: refs/heads/ignite-752 Commit: 6d5e7d3669dee5c34e15dd5a501b1b5ad4d5dc75 Parents: b1a2936 Author: Denis Magda <dma...@gridgain.com> Authored: Wed Jul 22 09:10:23 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Wed Jul 22 09:10:23 2015 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/spi/IgniteSpiAdapter.java | 2 +- .../communication/tcp/TcpCommunicationSpi.java | 4 +-- .../ignite/spi/discovery/tcp/ClientImpl.java | 17 ++++------ .../ignite/spi/discovery/tcp/ServerImpl.java | 35 +++++++++----------- .../TcpDiscoveryStatusCheckMessage.java | 23 ------------- .../discovery/AbstractDiscoverySelfTest.java | 2 +- .../tcp/TcpClientDiscoverySpiSelfTest.java | 35 ++++++++++++++------ 7 files changed, 48 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d5e7d36/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java index 5e8f061..6f5e9e2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java @@ -75,7 +75,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement /** Local node. */ private ClusterNode locNode; - /** Failure detection threshold will not be used usage switch. */ + /** Failure detection threshold usage switch. */ private boolean failureDetectionThresholdEnabled = true; /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d5e7d36/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 6196e9e..a678b2a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -241,9 +241,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (log.isDebugEnabled()) log.debug("Closing communication SPI session on write timeout [remoteAddr=" + ses.remoteAddress() + - (!failureDetectionThresholdEnabled() ? - ", writeTimeout=" + sockWriteTimeout : - ", failureDetectionThreshold=" + failureDetectionThreshold()) + ']'); + ", writeTimeout=" + sockWriteTimeout + ']'); ses.close(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d5e7d36/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index e0d1741..f6a1cdc 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -544,7 +544,8 @@ class ClientImpl extends TcpDiscoveryImpl { log.debug("Message has been sent to address [msg=" + msg + ", addr=" + addr + ", rmtNodeId=" + rmtNodeId + ']'); - return new T3<>(sock, spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)), res.clientAck()); + return new T3<>(sock, spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)), + res.clientAck()); } catch (IOException | IgniteCheckedException e) { U.closeQuiet(sock); @@ -1160,16 +1161,10 @@ class ClientImpl extends TcpDiscoveryImpl { log.error("Reconnect error [join=" + join + ", timeout=" + timeout + ']', e); if (timeout > 0 && (U.currentTimeMillis() - startTime) > timeout) { - String msg; - - if (join) - msg = "Failed to connect to cluster (consider increasing 'joinTimeout' " + - "configuration property) [joinTimeout=" + spi.joinTimeout + ", err=" + e + ']'; - else - msg = "Failed to reconnect to cluster (consider increasing 'networkTimeout' " + - "configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock - + ']'; - + String msg = join ? "Failed to connect to cluster (consider increasing 'joinTimeout' " + + "configuration property) [joinTimeout=" + spi.joinTimeout + ", err=" + e + ']' : + "Failed to reconnect to cluster (consider increasing 'networkTimeout' " + + "configuration property) [networkTimeout=" + spi.netTimeout + ", err=" + e + ']'; U.warn(log, msg); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d5e7d36/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 1f98ba8..877d53c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -3557,29 +3557,27 @@ class ServerImpl extends TcpDiscoveryImpl { if (log.isDebugEnabled()) log.debug("Responded to status check message " + - "[recipient=" + msg.creatorNodeId() + ", status=" + msg.status - () + ']'); - } catch (IgniteSpiException e) { + "[recipient=" + msg.creatorNodeId() + ", status=" + msg.status() + ']'); + } + catch (IgniteSpiException e) { if (e.hasCause(SocketException.class)) { if (log.isDebugEnabled()) log.debug("Failed to respond to status check message (connection " + - "refused) [recipient=" + msg.creatorNodeId() + ", status=" + - msg.status() + ']'); + "refused) [recipient=" + msg.creatorNodeId() + ", status=" + + msg.status() + ']'); onException("Failed to respond to status check message (connection refused) " + - "[recipient=" + msg.creatorNodeId() + ", status=" + msg - .status() + ']', - e); - } else { + "[recipient=" + msg.creatorNodeId() + ", status=" + msg.status() + ']', e); + } + else { if (pingNode(msg.creatorNode())) // Node exists and accepts incoming connections. U.error(log, "Failed to respond to status check message [recipient=" + msg.creatorNodeId() + ", status=" + msg.status() + ']', e); else if (log.isDebugEnabled()) log.debug("Failed to respond to status check message (did the node " + - "stop?) [recipient=" + msg.creatorNodeId() + ", status=" + - msg.status() - + ']'); + "stop?) [recipient=" + msg.creatorNodeId() + ", status=" + msg.status() + + ']'); } } } @@ -3784,10 +3782,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (!next.version().greaterThanEqual(TcpDiscoverySpi.FAILURE_DETECTION_MAJOR_VER, TcpDiscoverySpi.FAILURE_DETECTION_MINOR_VER, TcpDiscoverySpi.FAILURE_DETECTION_MAINT_VER)) { // Preserve backward compatibility with nodes of older versions. - TcpDiscoveryStatusCheckMessage stMsg = new TcpDiscoveryStatusCheckMessage(locNode, null); - stMsg.replacedConnCheckMsg(true); - - processStatusCheckMessage(stMsg); + processStatusCheckMessage(new TcpDiscoveryStatusCheckMessage(locNode, null)); } else if (ring.hasRemoteNodes()) sendMessageAcrossRing(msg); } @@ -4877,8 +4872,8 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Sending message ack to client [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); - writeToSocket(sock, msg, spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() : - spi.getSocketTimeout()); + writeToSocket(sock, msg, spi.failureDetectionThresholdEnabled() ? + spi.failureDetectionThreshold() : spi.getSocketTimeout()); } } else { @@ -4889,8 +4884,8 @@ class ServerImpl extends TcpDiscoveryImpl { prepareNodeAddedMessage(msg, clientNodeId, null, null); - writeToSocket(sock, msg, spi.failureDetectionThresholdEnabled() ? spi.failureDetectionThreshold() : - spi.getSocketTimeout()); + writeToSocket(sock, msg, spi.failureDetectionThresholdEnabled() ? + spi.failureDetectionThreshold() : spi.getSocketTimeout()); } finally { clearNodeAddedMessage(msg); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d5e7d36/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java index aa9d9ac..bec7093 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryStatusCheckMessage.java @@ -49,9 +49,6 @@ public class TcpDiscoveryStatusCheckMessage extends TcpDiscoveryAbstractMessage /** Creator node status (initialized by coordinator). */ private int status; - /** Whether this message replaced {@link TcpDiscoveryConnectionCheckMessage} to preserve backward compatibility. */ - private transient boolean replacedConnCheckMsg; - /** * Constructor. * @@ -101,26 +98,6 @@ public class TcpDiscoveryStatusCheckMessage extends TcpDiscoveryAbstractMessage this.status = status; } - /** - * Checks whether this message is created and sent instead of {@link TcpDiscoveryConnectionCheckMessage} or not. - * - * @return {@code true} if yes, {@code false} otherwise. - */ - public boolean replacedConnCheckMsg() { - return replacedConnCheckMsg; - } - - /** - * Sets whether this message is created and sent instead of {@link TcpDiscoveryConnectionCheckMessage} or not. This - * usually happens when the next node in a topology, that should receive this message, doesn't support processing - * of {@link TcpDiscoveryConnectionCheckMessage} messages. - * - * @param replacedConnCheckMsg {@code true} if replaced, {@code false} otherwise. - */ - public void replacedConnCheckMsg(boolean replacedConnCheckMsg) { - this.replacedConnCheckMsg = replacedConnCheckMsg; - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryStatusCheckMessage.class, this, "super", super.toString()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d5e7d36/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java index 96f3d21..892d87d 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java @@ -428,7 +428,7 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri MBeanServer srv = MBeanServerFactory.createMBeanServer(); adaptor.setPort(Integer.valueOf(GridTestProperties.getProperty("discovery.mbeanserver.selftest.baseport")) + - idx); + idx); srv.registerMBean(adaptor, new ObjectName(HTTP_ADAPTOR_MBEAN_NAME)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d5e7d36/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index a50b060..9df7bd9 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -114,9 +114,6 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { /** */ private boolean reconnectDisabled; - /** */ - private boolean useFailureDetectionThreshold; - /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); @@ -163,10 +160,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { } disco.setJoinTimeout(joinTimeout); + disco.setNetworkTimeout(netTimeout); - if (!useFailureDetectionThreshold()) - disco.setNetworkTimeout(netTimeout); - else + if (useFailureDetectionThreshold()) cfg.setFailureDetectionThreshold(failureDetectionThreshold()); disco.setClientReconnectDisabled(reconnectDisabled); @@ -782,7 +778,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { @Override public void apply(TcpDiscoveryAbstractMessage msg) { try { Thread.sleep(1000000); - } catch (InterruptedException ignored) { + } + catch (InterruptedException ignored) { Thread.interrupted(); } } @@ -921,7 +918,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { startClientNodes(1); assertEquals(G.ignite("server-0").cluster().localNode().id(), - ((TcpDiscoveryNode)G.ignite("client-0").cluster().localNode()).clientRouterNodeId()); + ((TcpDiscoveryNode) G.ignite("client-0").cluster().localNode()).clientRouterNodeId()); checkNodes(2, 1); @@ -1482,6 +1479,20 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testReconnectSegmentedAfterJoinTimeoutServerFailed() throws Exception { + reconnectSegmentedAfterJoinTimeout(true); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectSegmentedAfterJoinTimeoutNetworkError() throws Exception { + reconnectSegmentedAfterJoinTimeout(false); + } + + /** * @param failSrv If {@code true} fails server, otherwise server does not send join message. * @throws Exception If failed. */ @@ -1601,7 +1612,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { assertEquals(1, disconnectLatch.getCount()); disconnectLatch.countDown(); - } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { log.info("Reconnected event."); assertEquals(1, reconnectLatch.getCount()); @@ -1609,7 +1621,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { assertFalse(err.get()); reconnectLatch.countDown(); - } else { + } + else { log.error("Unexpected event: " + evt); err.set(true); @@ -1645,7 +1658,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { } /** - * @throws Exception if failed. + * @throws Exception If failed. */ public void testDisconnectAfterNetworkTimeout() throws Exception { netTimeout = 5000;