Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-1093 bd317a016 -> a483f5220


ignite-1093


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c1f84324
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c1f84324
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c1f84324

Branch: refs/heads/ignite-1093
Commit: c1f843248eb452d48ac33ff5f7bedeebc3b43b70
Parents: bd317a0
Author: Anton Vinogradov <vinogradov.an...@gmail.com>
Authored: Thu Aug 20 19:18:28 2015 +0300
Committer: Anton Vinogradov <vinogradov.an...@gmail.com>
Committed: Thu Aug 20 19:18:28 2015 +0300

----------------------------------------------------------------------
 .../dht/preloader/GridDhtPartitionDemander.java | 26 +++++++++-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  2 +-
 ...ridCacheMassiveRebalancingAsyncSelfTest.java | 54 ++++++++++++++++++++
 ...GridCacheMassiveRebalancingSyncSelfTest.java |  4 +-
 4 files changed, 81 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1f84324/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 6d024de..72d43c9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -24,6 +24,7 @@ import org.apache.ignite.configuration.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
@@ -701,6 +702,8 @@ public class GridDhtPartitionDemander {
 
         private ConcurrentHashMap8<UUID, Collection<Integer>> missed = new 
ConcurrentHashMap8<>();
 
+        private volatile GridLocalEventListener lsnr;
+
         /** Assignments. */
         private volatile GridDhtPreloaderAssignments assigns;
 
@@ -714,8 +717,17 @@ public class GridDhtPartitionDemander {
             return assigns != null ? assigns.topologyVersion() : null;
         }
 
-        void init(
-            GridDhtPreloaderAssignments assigns) {
+        void init(GridDhtPreloaderAssignments assigns) {
+            final SyncFuture fut = this;
+
+            lsnr = new GridLocalEventListener() {
+                @Override public void onEvent(Event evt) {
+                    fut.onCancel();
+                }
+            };
+
+            cctx.events().addListener(lsnr, EVT_NODE_FAILED);
+
             this.assigns = assigns;
         }
 
@@ -736,6 +748,14 @@ public class GridDhtPartitionDemander {
             return assigns.get(node);
         }
 
+        void onCancel() {
+            remaining.clear();
+
+            cancelled = true;
+
+            checkIsDone();
+        }
+
         void onCancel(UUID nodeId, AffinityTopologyVersion topVer) {
             if (isDone() || !topVer.equals(assigns.topologyVersion()))
                 return;
@@ -809,6 +829,8 @@ public class GridDhtPartitionDemander {
                 if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) && 
!cctx.isReplicated())
                     preloadEvent(EVT_CACHE_REBALANCE_STOPPED, 
assigns.exchangeFuture().discoveryEvent());
 
+                cctx.events().removeListener(lsnr);
+
                 onDone(cancelled);
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1f84324/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 18a540c..2546774 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1878,7 +1878,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter 
implements DiscoverySpi, T
      * <p>
      * This method is intended for test purposes only.
      */
-    void simulateNodeFailure() {
+    protected void simulateNodeFailure() {
         impl.simulateNodeFailure();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1f84324/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java
index 8bcd6d1..ca564ed 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingAsyncSelfTest.java
@@ -17,8 +17,13 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
 
+import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
 
 /**
  *
@@ -32,6 +37,55 @@ public class GridCacheMassiveRebalancingAsyncSelfTest 
extends GridCacheMassiveRe
 
         cacheCfg.setRebalanceMode(CacheRebalanceMode.ASYNC);
 
+        iCfg.setDiscoverySpi(new FailableTcpDiscoverySpi());
+
+        ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
+        ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
+
+        if (getTestGridName(20).equals(gridName))
+           spi =(FailableTcpDiscoverySpi)iCfg.getDiscoverySpi();
+
         return iCfg;
     }
+
+    public static class FailableTcpDiscoverySpi extends TcpDiscoverySpi{
+        public void fail(){
+            simulateNodeFailure();
+        }
+    }
+
+    private volatile FailableTcpDiscoverySpi spi;
+
+    /**
+     * @throws Exception
+     */
+    public void testNodeFailedAtRebalancing() throws Exception {
+        Ignite ignite = startGrid(0);
+
+        generateData(ignite);
+
+        log.info("Preloading started.");
+
+        startGrid(1);
+
+        IgniteInternalFuture f1 = 
((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
+
+        f1.get();
+
+        startGrid(20);
+
+        U.sleep(500);
+
+        spi.fail();
+
+        U.sleep(500);
+
+        f1 = 
((GridCacheAdapter)grid(1).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
+        IgniteInternalFuture f0 = 
((GridCacheAdapter)grid(0).context().cache().internalCache(CACHE_NAME_DHT)).preloader().syncFuture();
+
+        f1.get();
+        f0.get();
+
+        stopAllGrids();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c1f84324/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
index 91352ee..9606810 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheMassiveRebalancingSyncSelfTest.java
@@ -37,9 +37,9 @@ import java.util.concurrent.atomic.*;
  */
 public class GridCacheMassiveRebalancingSyncSelfTest extends 
GridCommonAbstractTest {
     /** */
-    private static TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
+    protected static TcpDiscoveryIpFinder ipFinder = new 
TcpDiscoveryVmIpFinder(true);
 
-    private static int TEST_SIZE = 1_024_000;
+    private static int TEST_SIZE = 10_024_000;
 
     /** cache name. */
     protected static String CACHE_NAME_DHT = "cache";

Reply via email to