# ignite-hadoop-tests onKernalStop for affinity manager

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/fbad20c8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/fbad20c8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/fbad20c8

Branch: refs/heads/ignite-hadoop-tests
Commit: fbad20c81f82c364f1e4c6884e3cb4f7ef3c3057
Parents: 31dd659
Author: sboikov <sboi...@gridgain.com>
Authored: Fri Feb 13 14:11:39 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Fri Feb 13 14:11:39 2015 +0300

----------------------------------------------------------------------
 .../affinity/GridAffinityAssignmentCache.java   | 30 ++++++++++++++++++--
 .../cache/GridCacheAffinityManager.java         |  5 ++++
 2 files changed, 32 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fbad20c8/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index a5261e0..ae0b155 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -74,6 +74,9 @@ public class GridAffinityAssignmentCache {
     /** Log. */
     private IgniteLogger log;
 
+    /** Node stop flag. */
+    private volatile boolean stopping;
+
     /**
      * Constructs affinity cached calculations.
      *
@@ -81,10 +84,15 @@ public class GridAffinityAssignmentCache {
      * @param cacheName Cache name.
      * @param aff Affinity function.
      * @param affMapper Affinity key mapper.
+     * @param backups Number of backups.
      */
     @SuppressWarnings("unchecked")
-    public GridAffinityAssignmentCache(GridCacheContext ctx, String cacheName, 
CacheAffinityFunction aff,
-        CacheAffinityKeyMapper affMapper, int backups) {
+    public GridAffinityAssignmentCache(GridCacheContext ctx,
+        String cacheName,
+        CacheAffinityFunction aff,
+        CacheAffinityKeyMapper affMapper,
+        int backups)
+    {
         this.ctx = ctx;
         this.aff = aff;
         this.affMapper = affMapper;
@@ -118,10 +126,24 @@ public class GridAffinityAssignmentCache {
     }
 
     /**
+     * Kernal stop callback.
+     */
+    public void onKernalStop() {
+        stopping = true;
+
+        IgniteCheckedException err =
+            new IgniteCheckedException("Failed to wait for topology update, 
node is stopping.");
+
+        for (AffinityReadyFuture fut : readyFuts.values())
+            fut.onDone(err);
+    }
+
+    /**
      * Calculates affinity cache for given topology version.
      *
      * @param topVer Topology version to calculate affinity cache for.
      * @param discoEvt Discovery event that caused this topology version 
change.
+     * @return Affinity assignments.
      */
     @SuppressWarnings("IfMayBeConditional")
     public List<List<ClusterNode>> calculate(long topVer, DiscoveryEvent 
discoEvt) {
@@ -229,7 +251,7 @@ public class GridAffinityAssignmentCache {
      * @param topVer Topology version to await for.
      * @return Future that will be completed after affinity for topology 
version {@code topVer} is calculated.
      */
-    public IgniteInternalFuture<Long> readyFuture(long topVer) {
+    @Nullable public IgniteInternalFuture<Long> readyFuture(long topVer) {
         GridAffinityAssignment aff = head.get();
 
         if (aff.topologyVersion() >= topVer) {
@@ -252,6 +274,8 @@ public class GridAffinityAssignmentCache {
 
             fut.onDone(topVer);
         }
+        else if (stopping)
+            fut.onDone(new IgniteCheckedException("Failed to wait for topology 
update, node is stopping."));
 
         return fut;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fbad20c8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index 131b294..d3510e4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -87,6 +87,11 @@ public class GridCacheAffinityManager<K, V> extends 
GridCacheManagerAdapter<K, V
     }
 
     /** {@inheritDoc} */
+    @Override protected void onKernalStop0(boolean cancel) {
+        aff.onKernalStop();
+    }
+
+    /** {@inheritDoc} */
     @Override protected void stop0(boolean cancel) {
         aff = null;
     }

Reply via email to