ignite-99 review

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/16298c53
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/16298c53
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/16298c53

Branch: refs/heads/sprint-1
Commit: 16298c5317a2d19ac737cff1149cbc43e938e951
Parents: ec7ea1c
Author: Yakov Zhdanov <yzhda...@gridgain.com>
Authored: Tue Jan 27 15:02:20 2015 +0300
Committer: Yakov Zhdanov <yzhda...@gridgain.com>
Committed: Tue Jan 27 15:02:20 2015 +0300

----------------------------------------------------------------------
 .../ignite/cache/affinity/CacheAffinity.java    |  9 +--
 .../affinity/GridAffinityAssignment.java        |  6 ++
 .../affinity/GridAffinityProcessor.java         | 81 +++++++++++---------
 .../ignite/IgniteCacheAffinityAbstractTest.java | 11 +--
 4 files changed, 56 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16298c53/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinity.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinity.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinity.java
index dc81a20..4708500 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinity.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinity.java
@@ -61,8 +61,6 @@ public interface CacheAffinity<K> {
 
     /**
      * Returns {@code true} if given node is the primary node for given key.
-     * To check if local node is primary for given key, pass
-     * {@link org.apache.ignite.Ignite#localNode()} as first parameter.
      *
      * @param n Node to check.
      * @param key Key to check.
@@ -72,8 +70,6 @@ public interface CacheAffinity<K> {
 
     /**
      * Returns {@code true} if local node is one of the backup nodes for given 
key.
-     * To check if local node is primary for given key, pass {@link 
org.apache.ignite.Ignite#localNode()}
-     * as first parameter.
      *
      * @param n Node to check.
      * @param key Key to check.
@@ -83,11 +79,10 @@ public interface CacheAffinity<K> {
 
     /**
      * Returns {@code true} if local node is primary or one of the backup nodes
-     * for given key. To check if local node is primary or backup for given 
key, pass
-     * {@link org.apache.ignite.Ignite#localNode()} as first parameter.
      * <p>
      * This method is essentially equivalent to calling
-     * <i>"{@link #isPrimary(org.apache.ignite.cluster.ClusterNode, Object)} 
|| {@link #isBackup(org.apache.ignite.cluster.ClusterNode, Object)})"</i>,
+     * <i>"{@link #isPrimary(org.apache.ignite.cluster.ClusterNode, Object)} ||
+     *      {@link #isBackup(org.apache.ignite.cluster.ClusterNode, 
Object)})"</i>,
      * however it is more efficient as it makes both checks at once.
      *
      * @param n Node to check.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16298c53/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
index 580f64c..1890fa4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.affinity;
 
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
 
 import java.io.*;
 import java.util.*;
@@ -165,4 +166,9 @@ class GridAffinityAssignment implements Serializable {
 
         return topVer == ((GridAffinityAssignment)o).topVer;
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridAffinityAssignment.class, this, 
super.toString());
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16298c53/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index 0839637..f36faa2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -77,24 +77,28 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
 
             // Clean up affinity functions if such cache no more exists.
             if (evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT) {
-                final Collection<String> caches = new HashSet<>();
+                Collection<String> caches = new HashSet<>();
 
-                for (ClusterNode clusterNode : ctx.discovery().allNodes())
+                for (ClusterNode clusterNode : 
((IgniteDiscoveryEvent)evt).topologyNodes())
                     caches.addAll(U.cacheNames(clusterNode));
 
-                final Collection<AffinityAssignmentKey> rmv = new 
GridLeanSet<>();
+                final Collection<AffinityAssignmentKey> rmv = new HashSet<>();
 
                 for (AffinityAssignmentKey key : affMap.keySet()) {
-                    if (!caches.contains(key.cacheName) || key.topVer < 
discoEvt.topologyVersion() - 1)
+                    if (!caches.contains(key.cacheName) || key.topVer < 
discoEvt.topologyVersion() - 10)
                         rmv.add(key);
                 }
 
-                ctx.timeout().addTimeoutObject(new GridTimeoutObjectAdapter(
-                    IgniteUuid.fromUuid(ctx.localNodeId()), 
AFFINITY_MAP_CLEAN_UP_DELAY) {
-                    @Override public void onTimeout() {
-                        affMap.keySet().removeAll(rmv);
-                    }
-                });
+                if (!rmv.isEmpty()) {
+                    ctx.timeout().addTimeoutObject(
+                        new GridTimeoutObjectAdapter(
+                            IgniteUuid.fromUuid(ctx.localNodeId()),
+                            AFFINITY_MAP_CLEAN_UP_DELAY) {
+                                @Override public void onTimeout() {
+                                    affMap.keySet().removeAll(rmv);
+                                }
+                            });
+                }
             }
         }
     };
@@ -107,14 +111,13 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void onKernalStart() throws IgniteCheckedException {
+    @Override public void start() throws IgniteCheckedException {
         ctx.event().addLocalEventListener(lsnr, EVT_NODE_FAILED, 
EVT_NODE_LEFT, EVT_NODE_JOINED);
     }
 
     /** {@inheritDoc} */
     @Override public void onKernalStop(boolean cancel) {
-        if (ctx != null && ctx.event() != null)
-            ctx.event().removeLocalEventListener(lsnr);
+        ctx.event().removeLocalEventListener(lsnr);
     }
 
     /**
@@ -372,7 +375,8 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
     }
 
     /**
-     * Requests {@link org.apache.ignite.cache.affinity.CacheAffinityFunction} 
and {@link org.apache.ignite.cache.affinity.CacheAffinityKeyMapper} from remote 
node.
+     * Requests {@link CacheAffinityFunction} and
+     * {@link CacheAffinityKeyMapper} from remote node.
      *
      * @param cacheName Name of cache on which affinity is requested.
      * @param n Node from which affinity is requested.
@@ -451,18 +455,7 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
         if (F.isEmpty(nodes))
             throw new IgniteCheckedException("Failed to get affinity nodes 
[aff=" + aff + ", key=" + key + ']');
 
-        Collection<ClusterNode> primaryNodes = new HashSet<>();
-
-        for (ClusterNode n : nodes) {
-            if (aff.assignment.primaryPartitions(n.id()).contains(part))
-                primaryNodes.add(n);
-
-        }
-
-        if (F.isEmpty(primaryNodes))
-            throw new IgniteCheckedException("Failed to get affinity nodes 
[aff=" + aff + ", key=" + key + ']');
-
-        return primaryNodes.iterator().next();
+        return nodes.iterator().next();
     }
 
     /** {@inheritDoc} */
@@ -501,6 +494,11 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
             this.assignment = assignment;
             this.portableEnabled = portableEnabled;
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(AffinityInfo.class, this);
+        }
     }
 
     /**
@@ -543,6 +541,11 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
 
             return res;
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(AffinityAssignmentKey.class, this);
+        }
     }
 
     /**
@@ -564,7 +567,7 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
             ctx.gateway().readLock();
 
             try {
-                return affinityCache(cacheName, 
topologyVersion()).affFunc.partitions();
+                return cache().affFunc.partitions();
             }
             catch (IgniteCheckedException e) {
                 throw new IgniteException(e);
@@ -579,7 +582,7 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
             ctx.gateway().readLock();
 
             try {
-                return affinityCache(cacheName, 
topologyVersion()).affFunc.partition(key);
+                return cache().affFunc.partition(key);
             }
             catch (IgniteCheckedException e) {
                 throw new IgniteException(e);
@@ -594,7 +597,7 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
             ctx.gateway().readLock();
 
             try {
-                return affinityCache(cacheName, topologyVersion())
+                return cache()
                     
.assignment.primaryPartitions(n.id()).contains(partition(key));
             }
             catch (IgniteCheckedException e) {
@@ -605,12 +608,16 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
             }
         }
 
+        private AffinityInfo cache() throws IgniteCheckedException {
+            return affinityCache(cacheName, topologyVersion());
+        }
+
         /** {@inheritDoc} */
         @Override public boolean isBackup(ClusterNode n, K key) {
             ctx.gateway().readLock();
 
             try {
-                return affinityCache(cacheName, topologyVersion())
+                return cache()
                     
.assignment.backupPartitions(n.id()).contains(partition(key));
             }
             catch (IgniteCheckedException e) {
@@ -638,7 +645,7 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
             ctx.gateway().readLock();
 
             try {
-                Set<Integer> parts = affinityCache(cacheName, 
topologyVersion()).assignment.primaryPartitions(n.id());
+                Set<Integer> parts = 
cache().assignment.primaryPartitions(n.id());
 
                 return U.toIntArray(parts);
             }
@@ -655,7 +662,7 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
             ctx.gateway().readLock();
 
             try {
-                Set<Integer> parts = affinityCache(cacheName, 
topologyVersion()).assignment.backupPartitions(n.id());
+                Set<Integer> parts = 
cache().assignment.backupPartitions(n.id());
 
                 return U.toIntArray(parts);
             }
@@ -674,7 +681,7 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
             try {
                 Collection<Integer> parts = new HashSet<>();
 
-                AffinityInfo affInfo = affinityCache(cacheName, 
topologyVersion());
+                AffinityInfo affInfo = cache();
 
                 for (int partsCnt = affInfo.affFunc.partitions(), part = 0; 
part < partsCnt; part++) {
                     for (ClusterNode affNode : affInfo.assignment.get(part)) {
@@ -701,7 +708,7 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
             ctx.gateway().readLock();
 
             try {
-                return affinityCache(cacheName, 
topologyVersion()).mapper.affinityKey(key);
+                return cache().mapper.affinityKey(key);
             }
             catch (IgniteCheckedException e) {
                 throw new IgniteException(e);
@@ -747,7 +754,7 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
             ctx.gateway().readLock();
 
             try {
-                return affinityCache(cacheName, 
topologyVersion()).assignment.get(partition(key));
+                return cache().assignment.get(partition(key));
             }
             catch (IgniteCheckedException e) {
                 throw new IgniteException(e);
@@ -762,7 +769,7 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
             ctx.gateway().readLock();
 
             try {
-                return F.first(affinityCache(cacheName, 
topologyVersion()).assignment.get(part));
+                return F.first(cache().assignment.get(part));
             }
             catch (IgniteCheckedException e) {
                 throw new IgniteException(e);
@@ -796,7 +803,7 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
             ctx.gateway().readLock();
 
             try {
-                return affinityCache(cacheName, 
topologyVersion()).assignment.get(part);
+                return cache().assignment.get(part);
             }
             catch (IgniteCheckedException e) {
                 throw new IgniteException(e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16298c53/modules/core/src/test/java/org/apache/ignite/IgniteCacheAffinityAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/IgniteCacheAffinityAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/IgniteCacheAffinityAbstractTest.java
index aed7ea7..562ce0b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/IgniteCacheAffinityAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/IgniteCacheAffinityAbstractTest.java
@@ -33,15 +33,12 @@ public abstract class IgniteCacheAffinityAbstractTest 
extends IgniteCacheAbstrac
         CacheConfiguration cache1 = cacheConfiguration(null);
         cache1.setName(CACHE1);
 
-        if (gridName.contains("0")) {
+        if (gridName.contains("0"))
             cfg.setCacheConfiguration();
-        }
-        else if (gridName.contains("1")) {
+        else if (gridName.contains("1"))
             cfg.setCacheConfiguration(cache0);
-        }
-        else {
+        else
             cfg.setCacheConfiguration(cache0, cache1);
-        }
 
         return cfg;
     }
@@ -230,7 +227,7 @@ public abstract class IgniteCacheAffinityAbstractTest 
extends IgniteCacheAbstrac
     /**
      * @return Cluster nodes.
      */
-    Collection<ClusterNode> nodes() {
+    private Collection<ClusterNode> nodes() {
         Set<ClusterNode> nodes = new HashSet<>();
 
         for (int i = 0; i < gridCount(); ++i)

Reply via email to