Repository: incubator-ignite Updated Branches: refs/heads/master abb2cef13 -> 271550fed
Squashed commit of the following: commit ed8dac68bb008c17246ecea5169b34a55b860869 Merge: 6f915db a127756 Author: Denis Magda <dma...@gridgain.com> Date: Mon Jul 27 16:56:39 2015 +0300 Merge remote-tracking branch 'remotes/origin/master' into ignite-1139 commit 6f915db1890c81af035984f07a7195da9048a67f Author: Denis Magda <dma...@gridgain.com> Date: Mon Jul 27 09:50:53 2015 +0300 ignite-1139: uncommented tests commit aadbdda1dab5e1c350afb0ac5e7f1182095ecd70 Author: Denis Magda <dma...@gridgain.com> Date: Mon Jul 27 09:30:50 2015 +0300 ignite-1139: set cancel to true when stopping a client node commit 86c6f6a8df6e828e5cc3c606c334925e948dee7a Author: Denis Magda <dma...@gridgain.com> Date: Mon Jul 27 09:06:49 2015 +0300 ignite-1139: temporaly disable some SPI tests commit e6a2d88063a1c32478f3ee1dea80c2ffe2ee19af Author: Denis Magda <dma...@gridgain.com> Date: Mon Jul 27 08:51:51 2015 +0300 ignite- commit f39086536e3afd031ed158e9cd2d65afb71a32bf Merge: 14ee9df 84f8b95 Author: Denis Magda <dma...@gridgain.com> Date: Mon Jul 27 08:42:28 2015 +0300 Merge branch 'ignite-1139' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-1139 commit 14ee9df2251716d1a3913742ce05154e2e958b56 Merge: fd6b0e3 0341759 Author: Denis Magda <dma...@gridgain.com> Date: Mon Jul 27 08:39:31 2015 +0300 Merge remote-tracking branch 'remotes/origin/master' into ignite-1139 commit 84f8b956e40ae88d11e0ef125442203a497b8c4b Author: dmagda <magda7...@gmail.com> Date: Fri Jul 24 13:35:32 2015 +0300 ignite-1139: - fixed race in GridDhtPartitionsExchangeFuture - fixed NPE in TcpCommunicationSpi when this SPI was not in the fully initialized state commit 89da409d5e6a62e744c4030475bbbfcb822a103c Merge: fd6b0e3 ed5d3ed Author: dmagda <magda7...@gmail.com> Date: Fri Jul 24 08:55:26 2015 +0300 Merge remote-tracking branch 'remotes/origin/master' into ignite-1139 commit fd6b0e3684df97875947c7864487b658ac599fce Author: Denis Magda <dma...@gridgain.com> Date: Thu Jul 23 16:08:21 2015 +0300 ignite-1139: unmuted test Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/271550fe Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/271550fe Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/271550fe Branch: refs/heads/master Commit: 271550fed7662c5032f9e4fb49cd135f3a55a46e Parents: abb2cef Author: Denis Magda <dma...@gridgain.com> Authored: Fri Jul 31 13:49:08 2015 +0300 Committer: Yakov Zhdanov <yzhda...@gridgain.com> Committed: Fri Jul 31 13:49:08 2015 +0300 ---------------------------------------------------------------------- .../GridDhtPartitionsExchangeFuture.java | 20 +++++----- .../communication/tcp/TcpCommunicationSpi.java | 41 +++++++++++++++++--- .../tcp/TcpDiscoveryMultiThreadedTest.java | 8 ++-- 3 files changed, 50 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/271550fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 3664220..cbf6b40 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -583,7 +583,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT onDone(exchId.topologyVersion()); } else - sendPartitions(); + sendPartitions(oldest); } else { rmtIds = Collections.emptyList(); @@ -816,9 +816,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (log.isDebugEnabled()) log.debug("Initialized future: " + this); + ClusterNode oldest = oldestNode.get(); + // If this node is not oldest. - if (!oldestNode.get().id().equals(cctx.localNodeId())) - sendPartitions(); + if (!oldest.id().equals(cctx.localNodeId())) + sendPartitions(oldest); else { boolean allReceived = allReceived(); @@ -948,11 +950,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** - * + * @param oldestNode Oldest node. */ - private void sendPartitions() { - ClusterNode oldestNode = this.oldestNode.get(); - + private void sendPartitions(ClusterNode oldestNode) { try { sendLocalPartitions(oldestNode, exchId); } @@ -1402,8 +1402,10 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT * */ private void recheck() { + ClusterNode oldest = oldestNode.get(); + // If this is the oldest node. - if (oldestNode.get().id().equals(cctx.localNodeId())) { + if (oldest.id().equals(cctx.localNodeId())) { Collection<UUID> remaining = remaining(); if (!remaining.isEmpty()) { @@ -1423,7 +1425,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } } else - sendPartitions(); + sendPartitions(oldest); // Schedule another send. scheduleRecheck(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/271550fe/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index f76025d..1c74d59 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -1791,7 +1791,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (log.isTraceEnabled()) log.trace("Sending message to node [node=" + node + ", msg=" + msg + ']'); - if (node.id().equals(getLocalNode().id())) + ClusterNode localNode = getLocalNode(); + + if (localNode == null) + throw new IgniteSpiException("Local node has not been started or fully initialized " + + "[isStopping=" + getSpiContext().isStopping() + ']'); + + if (node.id().equals(localNode.id())) notifyListener(node.id(), msg, NOOP); else { GridCommunicationClient client = null; @@ -1804,7 +1810,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter UUID nodeId = null; - if (!client.async() && !getSpiContext().localNode().version().equals(node.version())) + if (!client.async() && !localNode.version().equals(node.version())) nodeId = node.id(); retry = client.sendMessage(nodeId, msg); @@ -2435,8 +2441,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter else ch.write(ByteBuffer.wrap(U.IGNITE_HEADER)); + ClusterNode localNode = getLocalNode(); + + if (localNode == null) + throw new IgniteCheckedException("Local node has not been started or " + + "fully initialized [isStopping=" + getSpiContext().isStopping() + ']'); + if (recovery != null) { - HandshakeMessage msg = new HandshakeMessage(getLocalNode().id(), + HandshakeMessage msg = new HandshakeMessage(localNode.id(), recovery.incrementConnectCount(), recovery.receivedCount()); @@ -2629,7 +2641,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * @return Node ID message. */ private NodeIdMessage nodeIdMessage() { - return new NodeIdMessage(getLocalNode().id()); + ClusterNode localNode = getLocalNode(); + + UUID id; + + if (localNode == null) { + U.warn(log, "Local node is not started or fully initialized [isStopping=" + + getSpiContext().isStopping() + ']'); + + id = new UUID(0, 0); + } + else + id = localNode.id(); + + return new NodeIdMessage(id); } /** {@inheritDoc} */ @@ -3145,7 +3170,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } try { - UUID id = getLocalNode().id(); + ClusterNode localNode = getLocalNode(); + + if (localNode == null) + throw new IgniteSpiException("Local node has not been started or fully initialized " + + "[isStopping=" + getSpiContext().isStopping() + ']'); + + UUID id = localNode.id(); NodeIdMessage msg = new NodeIdMessage(id); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/271550fe/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java index 69dd538..f7c73b6 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java @@ -88,9 +88,9 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { - super.afterTest(); - stopAllGrids(); + + super.afterTest(); } /** {@inheritDoc} */ @@ -102,8 +102,6 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { * @throws Exception If any error occurs. */ public void testMultiThreadedClientsRestart() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-1139"); - clientFlagGlobal = false; info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min."); @@ -126,7 +124,7 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { int idx = clientIdx.getAndIncrement(); while (!done.get()) { - stopGrid(idx); + stopGrid(idx, true); startGrid(idx); }