Repository: incubator-ignite Updated Branches: refs/heads/ignite-1093 bd317a016 -> a483f5220
ignite-1093 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c1f84324 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c1f84324 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c1f84324 Branch: refs/heads/ignite-1093 Commit: c1f843248eb452d48ac33ff5f7bedeebc3b43b70 Parents: bd317a0 Author: Anton Vinogradov <vinogradov.an...@gmail.com> Authored: Thu Aug 20 19:18:28 2015 +0300 Committer: Anton Vinogradov <vinogradov.an...@gmail.com> Committed: Thu Aug 20 19:18:28 2015 +0300 ---------------------------------------------------------------------- .../dht/preloader/GridDhtPartitionDemander.java | 26 +++++++++- .../spi/discovery/tcp/TcpDiscoverySpi.java | 2 +- ...ridCacheMassiveRebalancingAsyncSelfTest.java | 54 ++++++++++++++++++++ ...GridCacheMassiveRebalancingSyncSelfTest.java | 4 +- 4 files changed, 81 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1f84324/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 6d024de..72d43c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -24,6 +24,7 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; @@ -701,6 +702,8 @@ public class GridDhtPartitionDemander { private ConcurrentHashMap8<UUID, Collection<Integer>> missed = new ConcurrentHashMap8<>(); + private volatile GridLocalEventListener lsnr; + /** Assignments. */ private volatile GridDhtPreloaderAssignments assigns; @@ -714,8 +717,17 @@ public class GridDhtPartitionDemander { return assigns != null ? assigns.topologyVersion() : null; } - void init( - GridDhtPreloaderAssignments assigns) { + void init(GridDhtPreloaderAssignments assigns) { + final SyncFuture fut = this; + + lsnr = new GridLocalEventListener() { + @Override public void onEvent(Event evt) { + fut.onCancel(); + } + }; + + cctx.events().addListener(lsnr, EVT_NODE_FAILED); + this.assigns = assigns; } @@ -736,6 +748,14 @@ public class GridDhtPartitionDemander { return assigns.get(node); } + void onCancel() { + remaining.clear(); + + cancelled = true; + + checkIsDone(); + } + void onCancel(UUID nodeId, AffinityTopologyVersion topVer) { if (isDone() || !topVer.equals(assigns.topologyVersion())) return; @@ -809,6 +829,8 @@ public class GridDhtPartitionDemander { if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && !cctx.isReplicated()) preloadEvent(EVT_CACHE_REBALANCE_STOPPED, assigns.exchangeFuture().discoveryEvent()); + cctx.events().removeListener(lsnr); + onDone(cancelled); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1f84324/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 18a540c..2546774 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1878,7 +1878,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T * <p> * This method is intended for test purposes only. */ - void simulateNodeFailure() { + protected void simulateNodeFailure() { impl.simulateNodeFailure(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1f84324/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java index 8bcd6d1..ca564ed 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java @@ -17,8 +17,13 @@ package org.apache.ignite.internal.processors.cache.distributed.rebalancing; +import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.spi.discovery.tcp.*; /** * @@ -32,6 +37,55 @@ public class GridCacheMassiveRebalancingAsyncSelfTest extends GridCacheMassiveRe cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC); + iCfg.setDiscoverySpi(new FailableTcpDiscoverySpi()); + + ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder); + ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true); + + if (getTestGridName(20).equals(gridName)) + spi =(FailableTcpDiscoverySpi)iCfg.getDiscoverySpi(); + return iCfg; } + + public static class FailableTcpDiscoverySpi extends TcpDiscoverySpi{ + public void fail(){ + simulateNodeFailure(); + } + } + + private volatile FailableTcpDiscoverySpi spi; + + /** + * @throws Exception + */ + public void testNodeFailedAtRebalancing() throws Exception { + Ignite ignite = startGrid(0); + + generateData(ignite); + + log.info("Preloading started."); + + startGrid(1); + + IgniteInternalFuture f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture(); + + f1.get(); + + startGrid(20); + + U.sleep(500); + + spi.fail(); + + U.sleep(500); + + f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture(); + IgniteInternalFuture f0 = ((GridCacheAdapter)grid(0).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture(); + + f1.get(); + f0.get(); + + stopAllGrids(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1f84324/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java index 91352ee..9606810 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java @@ -37,9 +37,9 @@ import java.util.concurrent.atomic.*; */ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractTest { /** */ - private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - private static int TEST_SIZE = 1_024_000; + private static int TEST_SIZE = 10_024_000; /** cache name. */ protected static String CACHE_NAME_DHT = "cache";