#ignite-784: Need to check for null result of 'ClusterNode GridCacheAffinityManager.primary()'.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/381fd1a4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/381fd1a4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/381fd1a4 Branch: refs/heads/ignite-80 Commit: 381fd1a4c152e6c1039fc6122aef7f385479f0d7 Parents: f13b63d Author: ivasilinets <ivasilin...@gridgain.com> Authored: Fri Apr 24 10:52:25 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Fri Apr 24 10:52:25 2015 +0300 ---------------------------------------------------------------------- .../cache/CacheServerNotFoundException.java | 12 ++--- .../ClusterTopologyServerNotFoundException.java | 12 ++--- .../processors/cache/GridCacheUtils.java | 47 +------------------- .../dht/GridPartitionedGetFuture.java | 7 +++ .../dht/atomic/GridNearAtomicUpdateFuture.java | 24 ++++++++-- .../dht/colocated/GridDhtColocatedCache.java | 20 +++++++-- .../colocated/GridDhtColocatedLockFuture.java | 4 ++ .../distributed/near/GridNearCacheEntry.java | 7 ++- .../distributed/near/GridNearGetFuture.java | 17 ++++++- .../distributed/near/GridNearLockFuture.java | 4 ++ .../near/GridNearTransactionalCache.java | 18 ++++++-- .../cache/query/GridCacheQueryManager.java | 11 +++-- 12 files changed, 111 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/381fd1a4/modules/core/src/main/java/org/apache/ignite/cache/CacheServerNotFoundException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheServerNotFoundException.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheServerNotFoundException.java index 3c035d3..f5ccac7 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/CacheServerNotFoundException.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheServerNotFoundException.java @@ -26,18 +26,18 @@ public class CacheServerNotFoundException extends CacheException { private static final long serialVersionUID = 0L; /** - * @param message Error message. + * @param msg Error message. */ - public CacheServerNotFoundException(String message) { - super(message); + public CacheServerNotFoundException(String msg) { + super(msg); } /** - * @param message Error message. + * @param msg Error message. * @param cause Error cause. */ - public CacheServerNotFoundException(String message, Throwable cause) { - super(message, cause); + public CacheServerNotFoundException(String msg, Throwable cause) { + super(msg, cause); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/381fd1a4/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyServerNotFoundException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyServerNotFoundException.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyServerNotFoundException.java index 22bcad0..1812112 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyServerNotFoundException.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyServerNotFoundException.java @@ -24,17 +24,17 @@ public class ClusterTopologyServerNotFoundException extends ClusterTopologyCheck private static final long serialVersionUID = 0L; /** - * @param message Error message. + * @param msg Error message. */ - public ClusterTopologyServerNotFoundException(String message) { - super(message); + public ClusterTopologyServerNotFoundException(String msg) { + super(msg); } /** - * @param message Error message. + * @param msg Error message. * @param cause Exception cause. */ - public ClusterTopologyServerNotFoundException(String message, Throwable cause) { - super(message, cause); + public ClusterTopologyServerNotFoundException(String msg, Throwable cause) { + super(msg, cause); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/381fd1a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 51385ed..e7c7f9d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -1025,45 +1025,6 @@ public class GridCacheUtils { } /** - * @param ctx Context. - * @param keys Keys. - * @return Mapped keys. - */ - @SuppressWarnings( {"unchecked", "MismatchedQueryAndUpdateOfCollection"}) - public static <K> Map<ClusterNode, Collection<K>> mapKeysToNodes(GridCacheContext<K, ?> ctx, - Collection<? extends K> keys) { - if (keys == null || keys.isEmpty()) - return Collections.emptyMap(); - - // Map all keys to local node for local caches. - if (ctx.config().getCacheMode() == LOCAL) - return F.asMap(ctx.localNode(), (Collection<K>)keys); - - AffinityTopologyVersion topVer = new AffinityTopologyVersion(ctx.discovery().topologyVersion()); - - if (CU.affinityNodes(ctx, topVer).isEmpty()) - return Collections.emptyMap(); - - if (keys.size() == 1) - return Collections.singletonMap(ctx.affinity().primary(F.first(keys), topVer), (Collection<K>)keys); - - Map<ClusterNode, Collection<K>> map = new GridLeanMap<>(5); - - for (K k : keys) { - ClusterNode primary = ctx.affinity().primary(k, topVer); - - Collection<K> mapped = map.get(primary); - - if (mapped == null) - map.put(primary, mapped = new LinkedList<>()); - - mapped.add(k); - } - - return map; - } - - /** * @param t Exception to check. * @return {@code true} if caused by lock timeout. */ @@ -1221,7 +1182,7 @@ public class GridCacheUtils { * @return Primary node for the key. */ @SuppressWarnings( {"unchecked"}) - public static ClusterNode primaryNode(GridCacheContext ctx, Object key) { + @Nullable public static ClusterNode primaryNode(GridCacheContext ctx, Object key) { assert ctx != null; assert key != null; @@ -1230,11 +1191,7 @@ public class GridCacheUtils { if (cfg.getCacheMode() != PARTITIONED) return ctx.localNode(); - ClusterNode primary = ctx.affinity().primary(key, ctx.affinity().affinityTopologyVersion()); - - assert primary != null; - - return primary; + return ctx.affinity().primary(key, ctx.affinity().affinityTopologyVersion()); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/381fd1a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index f37daf8..35ba2a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -462,6 +462,13 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M ClusterNode node = cctx.affinity().primary(key, topVer); + if (node == null) { + onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + + "(all partition nodes left the grid).")); + + return false; + } + remote = !node.isLocal(); LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(node); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/381fd1a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 57d7224..072ab52 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -563,12 +563,14 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (op != TRANSFORM) val = cctx.toCacheObject(val); - Collection<ClusterNode> primaryNodes = mapKey(cacheKey, topVer, fastMap); + ClusterNode primary = cctx.affinity().primary(cacheKey, topVer); - // One key and no backups. - assert primaryNodes.size() == 1 : "Should be mapped to single node: " + primaryNodes; + if (primary == null) { + onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + + "left the grid).")); - ClusterNode primary = F.first(primaryNodes); + return; + } GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest( cctx.cacheId(), @@ -685,9 +687,23 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap); + if (affNodes.isEmpty()) { + onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + + "(all partition nodes left the grid).")); + + return; + } + int i = 0; for (ClusterNode affNode : affNodes) { + if (affNode == null) { + onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + + "(all partition nodes left the grid).")); + + return; + } + UUID nodeId = affNode.id(); GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/381fd1a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 4489776..c92d9ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -415,6 +415,16 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte assert topVer.compareTo(AffinityTopologyVersion.ZERO) > 0; + // Send request to remove from remote nodes. + ClusterNode primary = ctx.affinity().primary(key, topVer); + + if (primary == null) { + if (log.isDebugEnabled()) + log.debug("Failed to unlock keys (all partition nodes left the grid)."); + + continue; + } + if (map == null) { Collection<ClusterNode> affNodes = CU.allNodes(ctx, topVer); @@ -426,9 +436,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (ver == null) ver = lock.version(); - // Send request to remove from remote nodes. - ClusterNode primary = ctx.affinity().primary(key, topVer); - if (!lock.reentry()) { if (!ver.equals(lock.version())) throw new IgniteCheckedException("Failed to unlock (if keys were locked separately, " + @@ -528,6 +535,13 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte ClusterNode primary = ctx.affinity().primary(key, topVer); + if (primary == null) { + if (log.isDebugEnabled()) + log.debug("Failed to remove locks (all partition nodes left the grid)."); + + continue; + } + if (!primary.isLocal()) { // Send request to remove from remote nodes. GridNearUnlockRequest req = map.get(primary); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/381fd1a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 09567be..5b74b31 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -1080,6 +1080,10 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity ClusterNode primary = cctx.affinity().primary(key, topVer); + if (primary == null) + throw new ClusterTopologyServerNotFoundException("Failed to lock keys " + + "(all partition nodes left the grid)."); + if (cctx.discovery().node(primary.id()) == null) // If primary node left the grid before lock acquisition, fail the whole future. throw newTopologyException(null, primary.id()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/381fd1a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index f5d925c..c7fa4ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -168,7 +168,12 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { } } - recordNodeId(cctx.affinity().primary(key, topVer).id(), topVer); + ClusterNode primaryNode = cctx.affinity().primary(key, topVer); + + if (primaryNode == null) + this.topVer = -1L; + else + recordNodeId(primaryNode.id(), topVer); dhtVer = e.isNew() || e.isDeleted() ? null : e.version(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/381fd1a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index e75c49e..fc178e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -472,6 +472,13 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma else { primary = cctx.affinity().primary(key, topVer); + if (primary == null) { + onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + + "(all partition nodes left the grid).")); + + return savedVers; + } + if (!primary.isLocal() && cctx.cache().configuration().isStatisticsEnabled() && !skipVals) near.metrics0().onRead(false); } @@ -498,9 +505,17 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0))); } else { - if (primary == null) + if (primary == null) { primary = cctx.affinity().primary(key, topVer); + if (primary == null) { + onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + + "(all partition nodes left the grid).")); + + return savedVers; + } + } + GridNearCacheEntry nearEntry = allowLocRead ? near.peekExx(key) : null; entry = nearEntry; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/381fd1a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index de32872..a427b65 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -1180,6 +1180,10 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B ClusterNode primary = cctx.affinity().primary(key, topVer); + if (primary == null) + throw new ClusterTopologyServerNotFoundException("Failed to lock keys " + + "(all partition nodes left the grid)."); + if (cctx.discovery().node(primary.id()) == null) // If primary node left the grid before lock acquisition, fail the whole future. throw newTopologyException(null, primary.id()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/381fd1a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index 03dd020..581c7e0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -525,6 +525,13 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> // Send request to remove from remote nodes. ClusterNode primary = ctx.affinity().primary(key, topVer); + if (primary == null) { + if (log.isDebugEnabled()) + log.debug("Failed to unlock key (all partition nodes left the grid)."); + + break; + } + GridNearUnlockRequest req = map.get(primary); if (req == null) { @@ -639,6 +646,13 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> ClusterNode primary = ctx.affinity().primary(key, cand.topologyVersion()); + if (primary == null) { + if (log.isDebugEnabled()) + log.debug("Failed to unlock key (all partition nodes left the grid)."); + + break; + } + if (!primary.isLocal()) { req = map.get(primary); @@ -659,9 +673,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> continue; } - req.addKey( - entry.key(), - ctx); + req.addKey(entry.key(), ctx); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/381fd1a4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 1eb29dc..50a22d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -1300,10 +1300,15 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte V val = row.getValue(); - if (log.isDebugEnabled()) - log.debug("Record [key=" + key + ", val=" + val + ", incBackups=" + - incBackups + "priNode=" + U.id8(CU.primaryNode(cctx, key).id()) + + if (log.isDebugEnabled()) { + ClusterNode primaryNode = CU.primaryNode(cctx, key); + + log.debug("Record [key=" + key + + ", val=" + val + + ", incBackups=" + incBackups + + ", priNode=" + (primaryNode != null ? U.id8(primaryNode.id()) : null) + ", node=" + U.id8(cctx.localNode().id()) + ']'); + } if (val == null) { if (log.isDebugEnabled())