Repository: incubator-ignite Updated Branches: refs/heads/ignite-591 839fe79fa -> cfeec2ddb
merge from ignite-747 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4031db76 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4031db76 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4031db76 Branch: refs/heads/ignite-591 Commit: 4031db76d2bd9992001a5b63f17af7739e82cff0 Parents: 0f1b31a Author: Denis Magda <dma...@gridgain.com> Authored: Wed Jul 8 10:19:11 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Wed Jul 8 10:19:11 2015 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 20 +++++++++++ .../tcp/internal/TcpDiscoveryNode.java | 2 +- .../tcp/internal/TcpDiscoveryNodesRing.java | 8 ++++- .../tcp/internal/TcpDiscoveryStatistics.java | 10 ++++-- ...acheAtomicReplicatedNodeRestartSelfTest.java | 8 ++--- .../tcp/TcpDiscoveryMultiThreadedTest.java | 38 ++++++++++++++++++++ .../IgniteSpiDiscoverySelfTestSuite.java | 3 ++ 7 files changed, 81 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4031db76/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index f8fae34..d51293e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -2881,6 +2881,24 @@ class ServerImpl extends TcpDiscoveryImpl { msg.verify(locNodeId); } + else if (!locNodeId.equals(node.id()) && ring.node(node.id()) != null) { + // Local node already has node from message in local topology. + // Just pass it to coordinator via the ring. + if (ring.hasRemoteNodes()) + sendMessageAcrossRing(msg); + + if (log.isDebugEnabled()) + log.debug("Local node already has node being added. Passing TcpDiscoveryNodeAddedMessage to " + + "coordinator for final processing [ring=" + ring + ", node=" + node + ", locNode=" + + locNode + ", msg=" + msg + ']'); + + if (debugMode) + debugLog("Local node already has node being added. Passing TcpDiscoveryNodeAddedMessage to " + + "coordinator for final processing [ring=" + ring + ", node=" + node + ", locNode=" + + locNode + ", msg=" + msg + ']'); + + return; + } if (msg.verified() && !locNodeId.equals(node.id())) { if (node.internalOrder() <= ring.maxInternalOrder()) { @@ -3163,6 +3181,8 @@ class ServerImpl extends TcpDiscoveryImpl { if (msg.verified() && locNodeId.equals(nodeId) && spiStateCopy() == CONNECTING) { assert node != null; + assert topVer > 0 : "Invalid topology version: " + msg; + ring.topologyVersion(topVer); node.order(topVer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4031db76/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java index 36ae39e..4b4df45 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java @@ -300,7 +300,7 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste * @param order Order of the node. */ public void order(long order) { - assert order >= 0 : "Order is invalid: " + this; + assert order > 0 : "Order is invalid: " + this; this.order = order; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4031db76/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java index e9eaa1d..acb479d 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNodesRing.java @@ -34,7 +34,13 @@ public class TcpDiscoveryNodesRing { /** Visible nodes filter. */ public static final IgnitePredicate<TcpDiscoveryNode> VISIBLE_NODES = new P1<TcpDiscoveryNode>() { @Override public boolean apply(TcpDiscoveryNode node) { - return node.visible(); + if (node.visible()) { + assert node.order() > 0 : "Invalid node order: " + node; + + return true; + } + + return false; } }; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4031db76/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java index da8c4ea..377d8a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java @@ -256,7 +256,10 @@ public class TcpDiscoveryStatistics { if (maxMsgQueueTime < duration) maxMsgQueueTime = duration; - avgMsgQueueTime = (avgMsgQueueTime * (totalReceivedMessages() -1)) / totalProcessedMessages(); + int totalProcMsgs = totalProcessedMessages(); + + if (totalProcMsgs != 0) + avgMsgQueueTime = (avgMsgQueueTime * (totalProcMsgs - 1)) / totalProcMsgs; } msgsProcStartTs.put(msg.id(), U.currentTimeMillis()); @@ -275,7 +278,10 @@ public class TcpDiscoveryStatistics { if (startTs != null) { long duration = U.currentTimeMillis() - startTs; - avgMsgProcTime = (avgMsgProcTime * (totalProcessedMessages() - 1) + duration) / totalProcessedMessages(); + int totalProcMsgs = totalProcessedMessages(); + + if (totalProcMsgs != 0) + avgMsgProcTime = (avgMsgProcTime * (totalProcMsgs - 1) + duration) / totalProcMsgs; if (duration > maxMsgProcTime) { maxMsgProcTime = duration; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4031db76/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java index 54409d1..b4ed18d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheAtomicReplicatedNodeRestartSelfTest.java @@ -26,17 +26,17 @@ import static org.apache.ignite.cache.CacheAtomicityMode.*; */ public class IgniteCacheAtomicReplicatedNodeRestartSelfTest extends GridCacheReplicatedNodeRestartSelfTest { /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-747"); + @Override public void testRestartWithPutSixNodesTwoBackups() throws Throwable { + fail("https://issues.apache.org/jira/browse/IGNITE-1095"); } /** {@inheritDoc} */ - @Override public void testRestartWithPutSixNodesTwoBackups() throws Throwable { + @Override public void testRestartWithPutEightNodesTwoBackups() throws Throwable { fail("https://issues.apache.org/jira/browse/IGNITE-1095"); } /** {@inheritDoc} */ - @Override public void testRestartWithPutEightNodesTwoBackups() throws Throwable { + @Override public void testRestartWithPutTenNodesTwoBackups() throws Throwable { fail("https://issues.apache.org/jira/browse/IGNITE-1095"); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4031db76/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java index cfefff4..0bf7cad 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java @@ -21,8 +21,10 @@ import org.apache.ignite.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.junits.common.*; import java.util.concurrent.*; @@ -100,6 +102,8 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { * @throws Exception If any error occurs. */ public void testMultiThreaded() throws Exception { + fail("https://issues.apache.org/jira/browse/IGNITE-1100"); + execute(); } @@ -126,6 +130,40 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { } /** + * @throws Exception If any error occurs. + */ + public void testMultipleStartOnCoordinatorStop() throws Exception{ + startGrids(GRID_CNT); + + final CyclicBarrier barrier = new CyclicBarrier(GRID_CNT + 4); + + final AtomicInteger startIdx = new AtomicInteger(GRID_CNT); + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + barrier.await(); + + Ignite ignite = startGrid(startIdx.getAndIncrement()); + + assertFalse(ignite.configuration().isClientMode()); + + log.info("Started node: " + ignite.name()); + + return null; + } + }, GRID_CNT + 3, "start-thread"); + + barrier.await(); + + U.sleep(ThreadLocalRandom.current().nextInt(10, 100)); + + for (int i = 0; i < GRID_CNT; i++) + stopGrid(i); + + fut.get(); + } + + /** * @throws Exception If failed. */ private void execute() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4031db76/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java index 498f50c..6f59f14 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java @@ -57,6 +57,9 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite { suite.addTest(new TestSuite(TcpDiscoveryNodeConsistentIdSelfTest.class)); + suite.addTest(new TestSuite(TcpDiscoveryRestartTest.class)); + suite.addTest(new TestSuite(TcpDiscoveryMultiThreadedTest.class)); + return suite; } }