ignite-99 review
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/16298c53 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/16298c53 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/16298c53 Branch: refs/heads/sprint-1 Commit: 16298c5317a2d19ac737cff1149cbc43e938e951 Parents: ec7ea1c Author: Yakov Zhdanov <yzhda...@gridgain.com> Authored: Tue Jan 27 15:02:20 2015 +0300 Committer: Yakov Zhdanov <yzhda...@gridgain.com> Committed: Tue Jan 27 15:02:20 2015 +0300 ---------------------------------------------------------------------- .../ignite/cache/affinity/CacheAffinity.java | 9 +-- .../affinity/GridAffinityAssignment.java | 6 ++ .../affinity/GridAffinityProcessor.java | 81 +++++++++++--------- .../ignite/IgniteCacheAffinityAbstractTest.java | 11 +-- 4 files changed, 56 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16298c53/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinity.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinity.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinity.java index dc81a20..4708500 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinity.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinity.java @@ -61,8 +61,6 @@ public interface CacheAffinity<K> { /** * Returns {@code true} if given node is the primary node for given key. - * To check if local node is primary for given key, pass - * {@link org.apache.ignite.Ignite#localNode()} as first parameter. * * @param n Node to check. * @param key Key to check. @@ -72,8 +70,6 @@ public interface CacheAffinity<K> { /** * Returns {@code true} if local node is one of the backup nodes for given key. - * To check if local node is primary for given key, pass {@link org.apache.ignite.Ignite#localNode()} - * as first parameter. * * @param n Node to check. * @param key Key to check. @@ -83,11 +79,10 @@ public interface CacheAffinity<K> { /** * Returns {@code true} if local node is primary or one of the backup nodes - * for given key. To check if local node is primary or backup for given key, pass - * {@link org.apache.ignite.Ignite#localNode()} as first parameter. * <p> * This method is essentially equivalent to calling - * <i>"{@link #isPrimary(org.apache.ignite.cluster.ClusterNode, Object)} || {@link #isBackup(org.apache.ignite.cluster.ClusterNode, Object)})"</i>, + * <i>"{@link #isPrimary(org.apache.ignite.cluster.ClusterNode, Object)} || + * {@link #isBackup(org.apache.ignite.cluster.ClusterNode, Object)})"</i>, * however it is more efficient as it makes both checks at once. * * @param n Node to check. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16298c53/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java index 580f64c..1890fa4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignment.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.affinity; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.util.typedef.internal.*; import java.io.*; import java.util.*; @@ -165,4 +166,9 @@ class GridAffinityAssignment implements Serializable { return topVer == ((GridAffinityAssignment)o).topVer; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridAffinityAssignment.class, this, super.toString()); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16298c53/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java index 0839637..f36faa2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java @@ -77,24 +77,28 @@ public class GridAffinityProcessor extends GridProcessorAdapter { // Clean up affinity functions if such cache no more exists. if (evtType == EVT_NODE_FAILED || evtType == EVT_NODE_LEFT) { - final Collection<String> caches = new HashSet<>(); + Collection<String> caches = new HashSet<>(); - for (ClusterNode clusterNode : ctx.discovery().allNodes()) + for (ClusterNode clusterNode : ((IgniteDiscoveryEvent)evt).topologyNodes()) caches.addAll(U.cacheNames(clusterNode)); - final Collection<AffinityAssignmentKey> rmv = new GridLeanSet<>(); + final Collection<AffinityAssignmentKey> rmv = new HashSet<>(); for (AffinityAssignmentKey key : affMap.keySet()) { - if (!caches.contains(key.cacheName) || key.topVer < discoEvt.topologyVersion() - 1) + if (!caches.contains(key.cacheName) || key.topVer < discoEvt.topologyVersion() - 10) rmv.add(key); } - ctx.timeout().addTimeoutObject(new GridTimeoutObjectAdapter( - IgniteUuid.fromUuid(ctx.localNodeId()), AFFINITY_MAP_CLEAN_UP_DELAY) { - @Override public void onTimeout() { - affMap.keySet().removeAll(rmv); - } - }); + if (!rmv.isEmpty()) { + ctx.timeout().addTimeoutObject( + new GridTimeoutObjectAdapter( + IgniteUuid.fromUuid(ctx.localNodeId()), + AFFINITY_MAP_CLEAN_UP_DELAY) { + @Override public void onTimeout() { + affMap.keySet().removeAll(rmv); + } + }); + } } } }; @@ -107,14 +111,13 @@ public class GridAffinityProcessor extends GridProcessorAdapter { } /** {@inheritDoc} */ - @Override public void onKernalStart() throws IgniteCheckedException { + @Override public void start() throws IgniteCheckedException { ctx.event().addLocalEventListener(lsnr, EVT_NODE_FAILED, EVT_NODE_LEFT, EVT_NODE_JOINED); } /** {@inheritDoc} */ @Override public void onKernalStop(boolean cancel) { - if (ctx != null && ctx.event() != null) - ctx.event().removeLocalEventListener(lsnr); + ctx.event().removeLocalEventListener(lsnr); } /** @@ -372,7 +375,8 @@ public class GridAffinityProcessor extends GridProcessorAdapter { } /** - * Requests {@link org.apache.ignite.cache.affinity.CacheAffinityFunction} and {@link org.apache.ignite.cache.affinity.CacheAffinityKeyMapper} from remote node. + * Requests {@link CacheAffinityFunction} and + * {@link CacheAffinityKeyMapper} from remote node. * * @param cacheName Name of cache on which affinity is requested. * @param n Node from which affinity is requested. @@ -451,18 +455,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { if (F.isEmpty(nodes)) throw new IgniteCheckedException("Failed to get affinity nodes [aff=" + aff + ", key=" + key + ']'); - Collection<ClusterNode> primaryNodes = new HashSet<>(); - - for (ClusterNode n : nodes) { - if (aff.assignment.primaryPartitions(n.id()).contains(part)) - primaryNodes.add(n); - - } - - if (F.isEmpty(primaryNodes)) - throw new IgniteCheckedException("Failed to get affinity nodes [aff=" + aff + ", key=" + key + ']'); - - return primaryNodes.iterator().next(); + return nodes.iterator().next(); } /** {@inheritDoc} */ @@ -501,6 +494,11 @@ public class GridAffinityProcessor extends GridProcessorAdapter { this.assignment = assignment; this.portableEnabled = portableEnabled; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(AffinityInfo.class, this); + } } /** @@ -543,6 +541,11 @@ public class GridAffinityProcessor extends GridProcessorAdapter { return res; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(AffinityAssignmentKey.class, this); + } } /** @@ -564,7 +567,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { ctx.gateway().readLock(); try { - return affinityCache(cacheName, topologyVersion()).affFunc.partitions(); + return cache().affFunc.partitions(); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -579,7 +582,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { ctx.gateway().readLock(); try { - return affinityCache(cacheName, topologyVersion()).affFunc.partition(key); + return cache().affFunc.partition(key); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -594,7 +597,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { ctx.gateway().readLock(); try { - return affinityCache(cacheName, topologyVersion()) + return cache() .assignment.primaryPartitions(n.id()).contains(partition(key)); } catch (IgniteCheckedException e) { @@ -605,12 +608,16 @@ public class GridAffinityProcessor extends GridProcessorAdapter { } } + private AffinityInfo cache() throws IgniteCheckedException { + return affinityCache(cacheName, topologyVersion()); + } + /** {@inheritDoc} */ @Override public boolean isBackup(ClusterNode n, K key) { ctx.gateway().readLock(); try { - return affinityCache(cacheName, topologyVersion()) + return cache() .assignment.backupPartitions(n.id()).contains(partition(key)); } catch (IgniteCheckedException e) { @@ -638,7 +645,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { ctx.gateway().readLock(); try { - Set<Integer> parts = affinityCache(cacheName, topologyVersion()).assignment.primaryPartitions(n.id()); + Set<Integer> parts = cache().assignment.primaryPartitions(n.id()); return U.toIntArray(parts); } @@ -655,7 +662,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { ctx.gateway().readLock(); try { - Set<Integer> parts = affinityCache(cacheName, topologyVersion()).assignment.backupPartitions(n.id()); + Set<Integer> parts = cache().assignment.backupPartitions(n.id()); return U.toIntArray(parts); } @@ -674,7 +681,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { try { Collection<Integer> parts = new HashSet<>(); - AffinityInfo affInfo = affinityCache(cacheName, topologyVersion()); + AffinityInfo affInfo = cache(); for (int partsCnt = affInfo.affFunc.partitions(), part = 0; part < partsCnt; part++) { for (ClusterNode affNode : affInfo.assignment.get(part)) { @@ -701,7 +708,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { ctx.gateway().readLock(); try { - return affinityCache(cacheName, topologyVersion()).mapper.affinityKey(key); + return cache().mapper.affinityKey(key); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -747,7 +754,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { ctx.gateway().readLock(); try { - return affinityCache(cacheName, topologyVersion()).assignment.get(partition(key)); + return cache().assignment.get(partition(key)); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -762,7 +769,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { ctx.gateway().readLock(); try { - return F.first(affinityCache(cacheName, topologyVersion()).assignment.get(part)); + return F.first(cache().assignment.get(part)); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -796,7 +803,7 @@ public class GridAffinityProcessor extends GridProcessorAdapter { ctx.gateway().readLock(); try { - return affinityCache(cacheName, topologyVersion()).assignment.get(part); + return cache().assignment.get(part); } catch (IgniteCheckedException e) { throw new IgniteException(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/16298c53/modules/core/src/test/java/org/apache/ignite/IgniteCacheAffinityAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/IgniteCacheAffinityAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/IgniteCacheAffinityAbstractTest.java index aed7ea7..562ce0b 100644 --- a/modules/core/src/test/java/org/apache/ignite/IgniteCacheAffinityAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/IgniteCacheAffinityAbstractTest.java @@ -33,15 +33,12 @@ public abstract class IgniteCacheAffinityAbstractTest extends IgniteCacheAbstrac CacheConfiguration cache1 = cacheConfiguration(null); cache1.setName(CACHE1); - if (gridName.contains("0")) { + if (gridName.contains("0")) cfg.setCacheConfiguration(); - } - else if (gridName.contains("1")) { + else if (gridName.contains("1")) cfg.setCacheConfiguration(cache0); - } - else { + else cfg.setCacheConfiguration(cache0, cache1); - } return cfg; } @@ -230,7 +227,7 @@ public abstract class IgniteCacheAffinityAbstractTest extends IgniteCacheAbstrac /** * @return Cluster nodes. */ - Collection<ClusterNode> nodes() { + private Collection<ClusterNode> nodes() { Set<ClusterNode> nodes = new HashSet<>(); for (int i = 0; i < gridCount(); ++i)