Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-1093 a483f5220 -> dc10b85cc


ignite-1093


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a5bd80de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a5bd80de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a5bd80de

Branch: refs/heads/ignite-1093
Commit: a5bd80ded53be77ff568f5c9edbbeddea91d1aff
Parents: a483f52
Author: Anton Vinogradov <vinogradov.an...@gmail.com>
Authored: Fri Aug 21 15:08:39 2015 +0300
Committer: Anton Vinogradov <vinogradov.an...@gmail.com>
Committed: Fri Aug 21 15:08:39 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java |   5 +-
 .../dht/preloader/GridDhtPartitionSupplier.java |   7 +-
 ...GridCacheMassiveRebalancingSyncSelfTest.java | 202 ++++++++++++++-----
 3 files changed, 157 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5bd80de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index e11addc..0c30630 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -291,7 +291,8 @@ public class GridDhtPartitionDemander {
             AffinityTopologyVersion topVer = assigns.topologyVersion();
 
             if (syncFut.isInited()) {
-                syncFut.get();
+                if (!syncFut.isDone())
+                    syncFut.onCancel();
 
                 syncFut = new SyncFuture(assigns);
             }
@@ -791,7 +792,7 @@ public class GridDhtPartitionDemander {
 
             Collection<Integer> parts = remaining.get(nodeId);
 
-            if (parts!=null) {
+            if (parts != null) {
                 parts.remove(p);
 
                 if (parts.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5bd80de/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index c496f8d..546e67b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -81,15 +81,13 @@ class GridDhtPartitionSupplier {
 
         if (!cctx.kernalContext().clientNode()) {
             for (int cnt = 0; cnt < 
cctx.config().getRebalanceThreadPoolSize(); cnt++) {
-                final int idx = cnt;
-
                 cctx.io().addOrderedHandler(topic(cnt, cctx.cacheId()), new 
CI2<UUID, GridDhtPartitionDemandMessage>() {
                     @Override public void apply(UUID id, 
GridDhtPartitionDemandMessage m) {
                         if (!enterBusy())
                             return;
 
                         try {
-                            processMessage(m, id, idx);
+                            processMessage(m, id);
                         }
                         finally {
                             leaveBusy();
@@ -161,9 +159,8 @@ class GridDhtPartitionSupplier {
     /**
      * @param d Demand message.
      * @param id Node uuid.
-     * @param idx Index.
      */
-    private void processMessage(GridDhtPartitionDemandMessage d, UUID id, int 
idx) {
+    private void processMessage(GridDhtPartitionDemandMessage d, UUID id) {
         assert d != null;
         assert id != null;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a5bd80de/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
index cc82e79..d92ec86 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
@@ -39,11 +39,14 @@ public class GridCacheMassiveRebalancingSyncSelfTest 
extends GridCommonAbstractT
     /** */
     protected static TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
 
-    private static int TEST_SIZE = 1_024_000;
+    private static int TEST_SIZE = 1_120_000;
 
     /** cache name. */
     protected static String CACHE_NAME_DHT = "cache";
 
+    /** cache 2 name. */
+    protected static String CACHE_2_NAME_DHT = "cache2";
+
     /** {@inheritDoc} */
     @Override protected long getTestTimeout() {
         return Long.MAX_VALUE;
@@ -53,24 +56,33 @@ public class GridCacheMassiveRebalancingSyncSelfTest 
extends GridCommonAbstractT
     @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
         IgniteConfiguration iCfg = super.getConfiguration(gridName);
 
-        CacheConfiguration<Integer, Integer> cacheCfg = new 
CacheConfiguration<>();
-
         ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
         ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
 
         if (getTestGridName(10).equals(gridName))
             iCfg.setClientMode(true);
 
+        CacheConfiguration<Integer, Integer> cacheCfg = new 
CacheConfiguration<>();
+
         cacheCfg.setName(CACHE_NAME_DHT);
         cacheCfg.setCacheMode(CacheMode.PARTITIONED);
         //cacheCfg.setRebalanceBatchSize(1024);
         //cacheCfg.setRebalanceBatchesCount(1);
         cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
-        cacheCfg.setRebalanceThreadPoolSize(4);
-        //cacheCfg.setRebalanceTimeout(1000000);
+        cacheCfg.setRebalanceThreadPoolSize(8);
         cacheCfg.setBackups(1);
 
-        iCfg.setCacheConfiguration(cacheCfg);
+        CacheConfiguration<Integer, Integer> cacheCfg2 = new 
CacheConfiguration<>();
+
+        cacheCfg2.setName(CACHE_2_NAME_DHT);
+        cacheCfg2.setCacheMode(CacheMode.PARTITIONED);
+        //cacheCfg2.setRebalanceBatchSize(1024);
+        //cacheCfg2.setRebalanceBatchesCount(1);
+        cacheCfg2.setRebalanceMode(CacheRebalanceMode.SYNC);
+        cacheCfg2.setRebalanceThreadPoolSize(8);
+        cacheCfg2.setBackups(1);
+
+        iCfg.setCacheConfiguration(cacheCfg, cacheCfg2);
         return iCfg;
     }
 
@@ -86,6 +98,14 @@ public class GridCacheMassiveRebalancingSyncSelfTest extends 
GridCommonAbstractT
                 stmr.addData(i, i);
             }
         }
+        try (IgniteDataStreamer<Integer, Integer> stmr = 
ignite.dataStreamer(CACHE_2_NAME_DHT)) {
+            for (int i = 0; i < TEST_SIZE; i++) {
+                if (i % 1_000_000 == 0)
+                    log.info("Prepared " + i / 1_000_000 + "m entries.");
+
+                stmr.addData(i, i + 3);
+            }
+        }
     }
 
     /**
@@ -97,7 +117,15 @@ public class GridCacheMassiveRebalancingSyncSelfTest 
extends GridCommonAbstractT
             if (i % 1_000_000 == 0)
                 log.info("Checked " + i / 1_000_000 + "m entries.");
 
-            assert ignite.cache(CACHE_NAME_DHT).get(i).equals(i) : "keys " + i 
+ " does not match";
+            assert ignite.cache(CACHE_NAME_DHT).get(i) != null && 
ignite.cache(CACHE_NAME_DHT).get(i).equals(i) :
+                "keys " + i + " does not match (" + 
ignite.cache(CACHE_NAME_DHT).get(i) + ")";
+        }
+        for (int i = 0; i < TEST_SIZE; i++) {
+            if (i % 1_000_000 == 0)
+                log.info("Checked " + i / 1_000_000 + "m entries.");
+
+            assert ignite.cache(CACHE_2_NAME_DHT).get(i) != null && 
ignite.cache(CACHE_2_NAME_DHT).get(i).equals(i + 3) :
+                "keys " + i + " does not match (" + 
ignite.cache(CACHE_2_NAME_DHT).get(i) + ")";
         }
     }
 
@@ -125,7 +153,7 @@ public class GridCacheMassiveRebalancingSyncSelfTest 
extends GridCommonAbstractT
 
         checkData(grid(1));
 
-        log.info("Spend " + spend + " seconds to preload entries.");
+        log.info("Spend " + spend + " seconds to rebalance entries.");
 
         stopAllGrids();
     }
@@ -148,79 +176,153 @@ public class GridCacheMassiveRebalancingSyncSelfTest 
extends GridCommonAbstractT
         startGrid(3);
         startGrid(4);
 
-        GridCachePreloader p1 = 
((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
-        GridCachePreloader p2 = 
((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
-        GridCachePreloader p3 = 
((GridCacheAdapter)grid(3).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
-        GridCachePreloader p4 = 
((GridCacheAdapter)grid(4).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
+        //wait until cache rebalanced in async mode
 
-        IgniteInternalFuture f4 = p4.syncFuture();
-        f4.get();
+        GridCachePreloader p11 = 
((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
+        GridCachePreloader p12 = 
((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
+        GridCachePreloader p13 = 
((GridCacheAdapter)grid(3).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
+        GridCachePreloader p14 = 
((GridCacheAdapter)grid(4).context().cache().internalCache(CACHE_NAME_DHT)).preloader();
 
-        AffinityTopologyVersion f4Top = 
((GridDhtPartitionDemander.SyncFuture)f4).topologyVersion();
+        GridCachePreloader p21 = 
((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_2_NAME_DHT)).preloader();
+        GridCachePreloader p22 = 
((GridCacheAdapter)grid(2).context().cache().internalCache(CACHE_2_NAME_DHT)).preloader();
+        GridCachePreloader p23 = 
((GridCacheAdapter)grid(3).context().cache().internalCache(CACHE_2_NAME_DHT)).preloader();
+        GridCachePreloader p24 = 
((GridCacheAdapter)grid(4).context().cache().internalCache(CACHE_2_NAME_DHT)).preloader();
 
-        IgniteInternalFuture f1 = p1.syncFuture();
-        IgniteInternalFuture f2 = p2.syncFuture();
-        IgniteInternalFuture f3 = p3.syncFuture();
+        IgniteInternalFuture f24 = p24.syncFuture();
+        f24.get();
 
-        while 
(!((GridDhtPartitionDemander.SyncFuture)f1).topologyVersion().equals(f4Top) ||
-            
!((GridDhtPartitionDemander.SyncFuture)f2).topologyVersion().equals(f4Top) ||
-            
!((GridDhtPartitionDemander.SyncFuture)f3).topologyVersion().equals(f4Top)) {
+        IgniteInternalFuture f14 = p14.syncFuture();
+        f14.get();
+
+        AffinityTopologyVersion f4Top = 
((GridDhtPartitionDemander.SyncFuture)f24).topologyVersion();
+
+        IgniteInternalFuture f11 = p11.syncFuture();
+        IgniteInternalFuture f12 = p12.syncFuture();
+        IgniteInternalFuture f13 = p13.syncFuture();
+
+        while 
(!((GridDhtPartitionDemander.SyncFuture)f11).topologyVersion().equals(f4Top) ||
+            
!((GridDhtPartitionDemander.SyncFuture)f12).topologyVersion().equals(f4Top) ||
+            
!((GridDhtPartitionDemander.SyncFuture)f13).topologyVersion().equals(f4Top)) {
             U.sleep(100);
 
-            f1 = p1.syncFuture();
-            f2 = p2.syncFuture();
-            f3 = p3.syncFuture();
+            f11 = p11.syncFuture();
+            f12 = p12.syncFuture();
+            f13 = p13.syncFuture();
         }
-        f1.get();
-        f2.get();
-        f3.get();
+        f11.get();
+        f12.get();
+        f13.get();
 
-        long spend = (System.currentTimeMillis() - start) / 1000;
+        IgniteInternalFuture f21 = p21.syncFuture();
+        IgniteInternalFuture f22 = p22.syncFuture();
+        IgniteInternalFuture f23 = p23.syncFuture();
+
+        while 
(!((GridDhtPartitionDemander.SyncFuture)f21).topologyVersion().equals(f4Top) ||
+            
!((GridDhtPartitionDemander.SyncFuture)f22).topologyVersion().equals(f4Top) ||
+            
!((GridDhtPartitionDemander.SyncFuture)f23).topologyVersion().equals(f4Top)) {
+            U.sleep(100);
+
+            f21 = p21.syncFuture();
+            f22 = p22.syncFuture();
+            f23 = p23.syncFuture();
+        }
+        f21.get();
+        f22.get();
+        f23.get();
 
-        f1 = p1.syncFuture();
-        f2 = p2.syncFuture();
-        f3 = p3.syncFuture();
-        f4 = p4.syncFuture();
+        //cache rebalanced in async node
+
+        f11 = p11.syncFuture();
+        f12 = p12.syncFuture();
+        f13 = p13.syncFuture();
+        f14 = p14.syncFuture();
+
+        f21 = p21.syncFuture();
+        f22 = p22.syncFuture();
+        f23 = p23.syncFuture();
+        f24 = p24.syncFuture();
 
         stopGrid(0);
 
-        while (f1 == p1.syncFuture() || f2 == p2.syncFuture() || f3 == 
p3.syncFuture() || f4 == p4.syncFuture())
+        //wait until cache rebalanced
+
+        while (f11 == p11.syncFuture() || f12 == p12.syncFuture() || f13 == 
p13.syncFuture() || f14 == p14.syncFuture())
             U.sleep(100);
 
-        p1.syncFuture().get();
-        p2.syncFuture().get();
-        p3.syncFuture().get();
-        p4.syncFuture().get();
+        while (f21 == p21.syncFuture() || f22 == p22.syncFuture() || f23 == 
p23.syncFuture() || f24 == p24.syncFuture())
+            U.sleep(100);
+
+        p11.syncFuture().get();
+        p12.syncFuture().get();
+        p13.syncFuture().get();
+        p14.syncFuture().get();
+
+        p21.syncFuture().get();
+        p22.syncFuture().get();
+        p23.syncFuture().get();
+        p24.syncFuture().get();
 
-        f2 = p2.syncFuture();
-        f3 = p3.syncFuture();
-        f4 = p4.syncFuture();
+        //cache rebalanced
+
+        f12 = p12.syncFuture();
+        f13 = p13.syncFuture();
+        f14 = p14.syncFuture();
+
+        f22 = p22.syncFuture();
+        f23 = p23.syncFuture();
+        f24 = p24.syncFuture();
 
         stopGrid(1);
 
-        while (f2 == p2.syncFuture() || f3 == p3.syncFuture() || f4 == 
p4.syncFuture())
+        //wait until cache rebalanced
+
+        while (f12 == p12.syncFuture() || f13 == p13.syncFuture() || f14 == 
p14.syncFuture())
             U.sleep(100);
 
-        p2.syncFuture().get();
-        p3.syncFuture().get();
-        p4.syncFuture().get();
+        while (f22 == p22.syncFuture() || f23 == p23.syncFuture() || f24 == 
p24.syncFuture())
+            U.sleep(100);
+
+        p12.syncFuture().get();
+        p13.syncFuture().get();
+        p14.syncFuture().get();
+
+        p22.syncFuture().get();
+        p23.syncFuture().get();
+        p24.syncFuture().get();
 
-        f3 = p3.syncFuture();
-        f4 = p4.syncFuture();
+        //cache rebalanced
+
+        f13 = p13.syncFuture();
+        f14 = p14.syncFuture();
+
+        f23 = p23.syncFuture();
+        f24 = p24.syncFuture();
 
         stopGrid(2);
 
-        while (f3 == p3.syncFuture() || f4 == p4.syncFuture())
+        //wait until cache rebalanced
+
+        while (f13 == p13.syncFuture() || f14 == p14.syncFuture())
             U.sleep(100);
 
-        p3.syncFuture().get();
-        p4.syncFuture().get();
+        while (f23 == p23.syncFuture() || f24 == p24.syncFuture())
+            U.sleep(100);
+
+        p13.syncFuture().get();
+        p14.syncFuture().get();
+
+        p23.syncFuture().get();
+        p24.syncFuture().get();
+
+        //cache rebalanced
 
         stopGrid(3);
 
+        long spend = (System.currentTimeMillis() - start) / 1000;
+
         checkData(grid(4));
 
-        log.info("Spend " + spend + " seconds to preload entries.");
+        log.info("Spend " + spend + " seconds to rebalance entries.");
 
         stopAllGrids();
     }

Reply via email to