Repository: incubator-ignite Updated Branches: refs/heads/ignite-1093 a483f5220 -> dc10b85cc
ignite-1093 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a5bd80de Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a5bd80de Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a5bd80de Branch: refs/heads/ignite-1093 Commit: a5bd80ded53be77ff568f5c9edbbeddea91d1aff Parents: a483f52 Author: Anton Vinogradov <vinogradov.an...@gmail.com> Authored: Fri Aug 21 15:08:39 2015 +0300 Committer: Anton Vinogradov <vinogradov.an...@gmail.com> Committed: Fri Aug 21 15:08:39 2015 +0300 ---------------------------------------------------------------------- .../dht/preloader/GridDhtPartitionDemander.java | 5 +- .../dht/preloader/GridDhtPartitionSupplier.java | 7 +- ...GridCacheMassiveRebalancingSyncSelfTest.java | 202 ++++++++++++++----- 3 files changed, 157 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5bd80de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index e11addc..0c30630 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -291,7 +291,8 @@ public class GridDhtPartitionDemander { AffinityTopologyVersion topVer = assigns.topologyVersion(); if (syncFut.isInited()) { - syncFut.get(); + if (!syncFut.isDone()) + syncFut.onCancel(); syncFut = new SyncFuture(assigns); } @@ -791,7 +792,7 @@ public class GridDhtPartitionDemander { Collection<Integer> parts = remaining.get(nodeId); - if (parts!=null) { + if (parts != null) { parts.remove(p); if (parts.isEmpty()) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5bd80de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index c496f8d..546e67b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -81,15 +81,13 @@ class GridDhtPartitionSupplier { if (!cctx.kernalContext().clientNode()) { for (int cnt = 0; cnt < cctx.config().getRebalanceThreadPoolSize(); cnt++) { - final int idx = cnt; - cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId()), new CI2<UUID, GridDhtPartitionDemandMessage>() { @Override public void apply(UUID id, GridDhtPartitionDemandMessage m) { if (!enterBusy()) return; try { - processMessage(m, id, idx); + processMessage(m, id); } finally { leaveBusy(); @@ -161,9 +159,8 @@ class GridDhtPartitionSupplier { /** * @param d Demand message. * @param id Node uuid. - * @param idx Index. */ - private void processMessage(GridDhtPartitionDemandMessage d, UUID id, int idx) { + private void processMessage(GridDhtPartitionDemandMessage d, UUID id) { assert d != null; assert id != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5bd80de/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java index cc82e79..d92ec86 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java @@ -39,11 +39,14 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT /** */ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - private static int TEST_SIZE = 1_024_000; + private static int TEST_SIZE = 1_120_000; /** cache name. */ protected static String CACHE_NAME_DHT = "cache"; + /** cache 2 name. */ + protected static String CACHE_2_NAME_DHT = "cache2"; + /** {@inheritDoc} */ @Override protected long getTestTimeout() { return Long.MAX_VALUE; @@ -53,24 +56,33 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration iCfg = super.getConfiguration(gridName); - CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>(); - ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder); ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true); if (getTestGridName(10).equals(gridName)) iCfg.setClientMode(true); + CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>(); + cacheCfg.setName(CACHE_NAME_DHT); cacheCfg.setCacheMode(CacheMode.PARTITIONED); //cacheCfg.setRebalanceBatchSize(1024); //cacheCfg.setRebalanceBatchesCount(1); cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC); - cacheCfg.setRebalanceThreadPoolSize(4); - //cacheCfg.setRebalanceTimeout(1000000); + cacheCfg.setRebalanceThreadPoolSize(8); cacheCfg.setBackups(1); - iCfg.setCacheConfiguration(cacheCfg); + CacheConfiguration<Integer, Integer> cacheCfg2 = new CacheConfiguration<>(); + + cacheCfg2.setName(CACHE_2_NAME_DHT); + cacheCfg2.setCacheMode(CacheMode.PARTITIONED); + //cacheCfg2.setRebalanceBatchSize(1024); + //cacheCfg2.setRebalanceBatchesCount(1); + cacheCfg2.setRebalanceMode(CacheRebalanceMode.SYNC); + cacheCfg2.setRebalanceThreadPoolSize(8); + cacheCfg2.setBackups(1); + + iCfg.setCacheConfiguration(cacheCfg, cacheCfg2); return iCfg; } @@ -86,6 +98,14 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT stmr.addData(i, i); } } + try (IgniteDataStreamer<Integer, Integer> stmr = ignite.dataStreamer(CACHE_2_NAME_DHT)) { + for (int i = 0; i < TEST_SIZE; i++) { + if (i % 1_000_000 == 0) + log.info("Prepared " + i / 1_000_000 + "m entries."); + + stmr.addData(i, i + 3); + } + } } /** @@ -97,7 +117,15 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT if (i % 1_000_000 == 0) log.info("Checked " + i / 1_000_000 + "m entries."); - assert ignite.cache(CACHE_NAME_DHT).get(i).equals(i) : "keys " + i + " does not match"; + assert ignite.cache(CACHE_NAME_DHT).get(i) != null && ignite.cache(CACHE_NAME_DHT).get(i).equals(i) : + "keys " + i + " does not match (" + ignite.cache(CACHE_NAME_DHT).get(i) + ")"; + } + for (int i = 0; i < TEST_SIZE; i++) { + if (i % 1_000_000 == 0) + log.info("Checked " + i / 1_000_000 + "m entries."); + + assert ignite.cache(CACHE_2_NAME_DHT).get(i) != null && ignite.cache(CACHE_2_NAME_DHT).get(i).equals(i + 3) : + "keys " + i + " does not match (" + ignite.cache(CACHE_2_NAME_DHT).get(i) + ")"; } } @@ -125,7 +153,7 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT checkData(grid(1)); - log.info("Spend " + spend + " seconds to preload entries."); + log.info("Spend " + spend + " seconds to rebalance entries."); stopAllGrids(); } @@ -148,79 +176,153 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends GridCommonAbstractT startGrid(3); startGrid(4); - GridCachePreloader p1 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader(); - GridCachePreloader p2 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader(); - GridCachePreloader p3 = ((GridCacheAdapter)grid(3).context().cache().internalCache(CACHE_NAME_DHT)).preloader(); - GridCachePreloader p4 = ((GridCacheAdapter)grid(4).context().cache().internalCache(CACHE_NAME_DHT)).preloader(); + //wait until cache rebalanced in async mode - IgniteInternalFuture f4 = p4.syncFuture(); - f4.get(); + GridCachePreloader p11 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader(); + GridCachePreloader p12 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader(); + GridCachePreloader p13 = ((GridCacheAdapter)grid(3).context().cache().internalCache(CACHE_NAME_DHT)).preloader(); + GridCachePreloader p14 = ((GridCacheAdapter)grid(4).context().cache().internalCache(CACHE_NAME_DHT)).preloader(); - AffinityTopologyVersion f4Top = ((GridDhtPartitionDemander.SyncFuture)f4).topologyVersion(); + GridCachePreloader p21 = ((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_2_NAME_DHT)).preloader(); + GridCachePreloader p22 = ((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_2_NAME_DHT)).preloader(); + GridCachePreloader p23 = ((GridCacheAdapter)grid(3).context().cache().internalCache(CACHE_2_NAME_DHT)).preloader(); + GridCachePreloader p24 = ((GridCacheAdapter)grid(4).context().cache().internalCache(CACHE_2_NAME_DHT)).preloader(); - IgniteInternalFuture f1 = p1.syncFuture(); - IgniteInternalFuture f2 = p2.syncFuture(); - IgniteInternalFuture f3 = p3.syncFuture(); + IgniteInternalFuture f24 = p24.syncFuture(); + f24.get(); - while (!((GridDhtPartitionDemander.SyncFuture)f1).topologyVersion().equals(f4Top) || - !((GridDhtPartitionDemander.SyncFuture)f2).topologyVersion().equals(f4Top) || - !((GridDhtPartitionDemander.SyncFuture)f3).topologyVersion().equals(f4Top)) { + IgniteInternalFuture f14 = p14.syncFuture(); + f14.get(); + + AffinityTopologyVersion f4Top = ((GridDhtPartitionDemander.SyncFuture)f24).topologyVersion(); + + IgniteInternalFuture f11 = p11.syncFuture(); + IgniteInternalFuture f12 = p12.syncFuture(); + IgniteInternalFuture f13 = p13.syncFuture(); + + while (!((GridDhtPartitionDemander.SyncFuture)f11).topologyVersion().equals(f4Top) || + !((GridDhtPartitionDemander.SyncFuture)f12).topologyVersion().equals(f4Top) || + !((GridDhtPartitionDemander.SyncFuture)f13).topologyVersion().equals(f4Top)) { U.sleep(100); - f1 = p1.syncFuture(); - f2 = p2.syncFuture(); - f3 = p3.syncFuture(); + f11 = p11.syncFuture(); + f12 = p12.syncFuture(); + f13 = p13.syncFuture(); } - f1.get(); - f2.get(); - f3.get(); + f11.get(); + f12.get(); + f13.get(); - long spend = (System.currentTimeMillis() - start) / 1000; + IgniteInternalFuture f21 = p21.syncFuture(); + IgniteInternalFuture f22 = p22.syncFuture(); + IgniteInternalFuture f23 = p23.syncFuture(); + + while (!((GridDhtPartitionDemander.SyncFuture)f21).topologyVersion().equals(f4Top) || + !((GridDhtPartitionDemander.SyncFuture)f22).topologyVersion().equals(f4Top) || + !((GridDhtPartitionDemander.SyncFuture)f23).topologyVersion().equals(f4Top)) { + U.sleep(100); + + f21 = p21.syncFuture(); + f22 = p22.syncFuture(); + f23 = p23.syncFuture(); + } + f21.get(); + f22.get(); + f23.get(); - f1 = p1.syncFuture(); - f2 = p2.syncFuture(); - f3 = p3.syncFuture(); - f4 = p4.syncFuture(); + //cache rebalanced in async node + + f11 = p11.syncFuture(); + f12 = p12.syncFuture(); + f13 = p13.syncFuture(); + f14 = p14.syncFuture(); + + f21 = p21.syncFuture(); + f22 = p22.syncFuture(); + f23 = p23.syncFuture(); + f24 = p24.syncFuture(); stopGrid(0); - while (f1 == p1.syncFuture() || f2 == p2.syncFuture() || f3 == p3.syncFuture() || f4 == p4.syncFuture()) + //wait until cache rebalanced + + while (f11 == p11.syncFuture() || f12 == p12.syncFuture() || f13 == p13.syncFuture() || f14 == p14.syncFuture()) U.sleep(100); - p1.syncFuture().get(); - p2.syncFuture().get(); - p3.syncFuture().get(); - p4.syncFuture().get(); + while (f21 == p21.syncFuture() || f22 == p22.syncFuture() || f23 == p23.syncFuture() || f24 == p24.syncFuture()) + U.sleep(100); + + p11.syncFuture().get(); + p12.syncFuture().get(); + p13.syncFuture().get(); + p14.syncFuture().get(); + + p21.syncFuture().get(); + p22.syncFuture().get(); + p23.syncFuture().get(); + p24.syncFuture().get(); - f2 = p2.syncFuture(); - f3 = p3.syncFuture(); - f4 = p4.syncFuture(); + //cache rebalanced + + f12 = p12.syncFuture(); + f13 = p13.syncFuture(); + f14 = p14.syncFuture(); + + f22 = p22.syncFuture(); + f23 = p23.syncFuture(); + f24 = p24.syncFuture(); stopGrid(1); - while (f2 == p2.syncFuture() || f3 == p3.syncFuture() || f4 == p4.syncFuture()) + //wait until cache rebalanced + + while (f12 == p12.syncFuture() || f13 == p13.syncFuture() || f14 == p14.syncFuture()) U.sleep(100); - p2.syncFuture().get(); - p3.syncFuture().get(); - p4.syncFuture().get(); + while (f22 == p22.syncFuture() || f23 == p23.syncFuture() || f24 == p24.syncFuture()) + U.sleep(100); + + p12.syncFuture().get(); + p13.syncFuture().get(); + p14.syncFuture().get(); + + p22.syncFuture().get(); + p23.syncFuture().get(); + p24.syncFuture().get(); - f3 = p3.syncFuture(); - f4 = p4.syncFuture(); + //cache rebalanced + + f13 = p13.syncFuture(); + f14 = p14.syncFuture(); + + f23 = p23.syncFuture(); + f24 = p24.syncFuture(); stopGrid(2); - while (f3 == p3.syncFuture() || f4 == p4.syncFuture()) + //wait until cache rebalanced + + while (f13 == p13.syncFuture() || f14 == p14.syncFuture()) U.sleep(100); - p3.syncFuture().get(); - p4.syncFuture().get(); + while (f23 == p23.syncFuture() || f24 == p24.syncFuture()) + U.sleep(100); + + p13.syncFuture().get(); + p14.syncFuture().get(); + + p23.syncFuture().get(); + p24.syncFuture().get(); + + //cache rebalanced stopGrid(3); + long spend = (System.currentTimeMillis() - start) / 1000; + checkData(grid(4)); - log.info("Spend " + spend + " seconds to preload entries."); + log.info("Spend " + spend + " seconds to rebalance entries."); stopAllGrids(); }