Repository: incubator-ignite Updated Branches: refs/heads/ignite-1093 64319443a -> 9da4b9367
ignite-1093 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9da4b936 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9da4b936 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9da4b936 Branch: refs/heads/ignite-1093 Commit: 9da4b936795813b1786a7c0c5ed2abe87f504e09 Parents: 6431944 Author: Anton Vinogradov <vinogradov.an...@gmail.com> Authored: Mon Aug 17 12:38:11 2015 +0300 Committer: Anton Vinogradov <vinogradov.an...@gmail.com> Committed: Mon Aug 17 12:38:11 2015 +0300 ---------------------------------------------------------------------- .../dht/preloader/GridDhtPartitionDemander.java | 13 +++- ...GridCacheMassiveRebalancingSyncSelfTest.java | 81 +++++++++++++++----- 2 files changed, 72 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9da4b936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 262ccb7..6d024de 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -255,6 +255,14 @@ public class GridDhtPartitionDemander { } /** + * @param type Type. + * @param discoEvt Discovery event. + */ + private void preloadEvent(int type, DiscoveryEvent discoEvt) { + preloadEvent(-1, type, discoEvt); + } + + /** * @param part Partition. * @param type Type. * @param discoEvt Discovery event. @@ -796,7 +804,10 @@ public class GridDhtPartitionDemander { missed.clear(); - cctx.shared().exchange().scheduleResendPartitions();//TODO: Is in necessary? + cctx.shared().exchange().scheduleResendPartitions(); + + if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && !cctx.isReplicated()) + preloadEvent(EVT_CACHE_REBALANCE_STOPPED, assigns.exchangeFuture().discoveryEvent()); onDone(cancelled); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9da4b936/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java index cd12954..91352ee 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -57,7 +58,7 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder); ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true); - if (getTestGridName(3).equals(gridName)) + if (getTestGridName(10).equals(gridName)) iCfg.setClientMode(true); cacheCfg.setName(CACHE_NAME_DHT); @@ -141,45 +142,83 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT long start = System.currentTimeMillis(); + //will be started simultaneously in case of ASYNC mode startGrid(1); startGrid(2); + startGrid(3); + startGrid(4); - IgniteInternalFuture f2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture(); - f2.get(); + GridCachePreloader p1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader(); + GridCachePreloader p2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader(); + GridCachePreloader p3 = ((GridCacheAdapter)grid(3).context().cache().internalCache(CACHE_NAME_DHT)).preloader(); + GridCachePreloader p4 = ((GridCacheAdapter)grid(4).context().cache().internalCache(CACHE_NAME_DHT)).preloader(); - IgniteInternalFuture f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture(); - while (!((GridDhtPartitionDemander.SyncFuture)f1).topologyVersion().equals(((GridDhtPartitionDemander.SyncFuture)f2).topologyVersion())) { + IgniteInternalFuture f4 = p4.syncFuture(); + f4.get(); + + AffinityTopologyVersion f4Top = ((GridDhtPartitionDemander.SyncFuture)f4).topologyVersion(); + + IgniteInternalFuture f1 = p1.syncFuture(); + IgniteInternalFuture f2 = p2.syncFuture(); + IgniteInternalFuture f3 = p3.syncFuture(); + + while (!((GridDhtPartitionDemander.SyncFuture)f1).topologyVersion().equals(f4Top) || + !((GridDhtPartitionDemander.SyncFuture)f2).topologyVersion().equals(f4Top) || + !((GridDhtPartitionDemander.SyncFuture)f3).topologyVersion().equals(f4Top)) { U.sleep(100); - f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture(); + f1 = p1.syncFuture(); + f2 = p2.syncFuture(); + f3 = p3.syncFuture(); } f1.get(); + f2.get(); + f3.get(); long spend = (System.currentTimeMillis() - start) / 1000; - f1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture(); - f2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture(); + f1 = p1.syncFuture(); + f2 = p2.syncFuture(); + f3 = p3.syncFuture(); + f4 = p4.syncFuture(); stopGrid(0); - //TODO: refactor to get futures by topology - while (f1 == ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture() || - f2 == ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture()) + while (f1 == p1.syncFuture() || f2 == p2.syncFuture() || f3 == p3.syncFuture() || f4 == p4.syncFuture()) U.sleep(100); - ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get(); - ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get(); + p1.syncFuture().get(); + p2.syncFuture().get(); + p3.syncFuture().get(); + p4.syncFuture().get(); - f2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture(); + f2 = p2.syncFuture(); + f3 = p3.syncFuture(); + f4 = p4.syncFuture(); stopGrid(1); - while (f2 == ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture()) + while (f2 == p2.syncFuture() || f3 == p3.syncFuture() || f4 == p4.syncFuture()) U.sleep(100); - ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture().get(); + p2.syncFuture().get(); + p3.syncFuture().get(); + p4.syncFuture().get(); - checkData(grid(2)); + f3 = p3.syncFuture(); + f4 = p4.syncFuture(); + + stopGrid(2); + + while (f3 == p3.syncFuture() || f4 == p4.syncFuture()) + U.sleep(100); + + p3.syncFuture().get(); + p4.syncFuture().get(); + + stopGrid(3); + + checkData(grid(4)); log.info("Spend " + spend + " seconds to preload entries."); @@ -198,7 +237,7 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT startGrid(1); startGrid(2); - startGrid(3); + startGrid(10); Thread t = new Thread(new Runnable() { @Override public void run() { @@ -214,10 +253,10 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT int size = 1000; for (int i = 0; i < size; i++) - grid(3).cachex(CACHE_NAME_DHT).remove(i); + grid(10).cachex(CACHE_NAME_DHT).remove(i); for (int i = 0; i < size; i++) - grid(3).cachex(CACHE_NAME_DHT).put(i, i); + grid(10).cachex(CACHE_NAME_DHT).put(i, i); spend += System.currentTimeMillis() - start; @@ -245,7 +284,7 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT cancelled.set(true); t.join(); - checkData(grid(3)); + checkData(grid(10)); //stopAllGrids(); }