ignite-446
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/840dd57b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/840dd57b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/840dd57b Branch: refs/heads/ignite-446 Commit: 840dd57b9211ca5012a42eb7d78a0e7392cd4da5 Parents: 37d1068 Author: avinogradov <avinogra...@gridgain.com> Authored: Wed Apr 15 12:07:04 2015 +0300 Committer: avinogradov <avinogra...@gridgain.com> Committed: Wed Apr 15 12:07:04 2015 +0300 ---------------------------------------------------------------------- .../distributed/dht/GridDhtTopologyFuture.java | 8 ++++++++ .../dht/atomic/GridNearAtomicUpdateFuture.java | 6 ++++++ .../colocated/GridDhtColocatedLockFuture.java | 10 ++++++++-- .../GridDhtPartitionsExchangeFuture.java | 13 +++++++++++-- .../distributed/near/GridNearLockFuture.java | 12 +++++++++--- .../near/GridNearTxPrepareFuture.java | 8 ++++++++ .../cache/IgniteTopologyValidatorCacheTest.java | 20 +++++++++++++++++--- 7 files changed, 67 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/840dd57b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java index d704e13..8a02ff2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTopologyFuture.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.processors.cache.*; /** * Future that implements a barrier after which dht topology is safe to use. Topology is considered to be @@ -49,4 +50,11 @@ public interface GridDhtTopologyFuture extends IgniteInternalFuture<AffinityTopo * @return Topology version. */ public AffinityTopologyVersion topologyVersion(); + + /** + * Returns is cache topology valid. + * @param cctx Cache context. + * @return valid ot not. + */ + public boolean isCacheTopologyValid(GridCacheContext cctx); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/840dd57b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index ac4ae2c2..0ed625a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -428,6 +428,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> GridDhtTopologyFuture fut = cctx.topologyVersionFuture(); if (fut.isDone()) { + if (!fut.isCacheTopologyValid(cctx)) { + onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " + + cctx.name())); + return; + } + topVer = fut.topologyVersion(); if (futVer == null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/840dd57b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 7b05065..3bb553f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -561,6 +561,12 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity GridDhtTopologyFuture fut = cctx.topologyVersionFuture(); if (fut.isDone()) { + if (!fut.isCacheTopologyValid(cctx)) { + onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " + + cctx.name())); + return; + } + AffinityTopologyVersion topVer = fut.topologyVersion(); if (tx != null) @@ -1012,7 +1018,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity */ private boolean addLocalKey( KeyCacheObject key, - AffinityTopologyVersion topVer, + AffinityTopologyVersion topVer, Collection<KeyCacheObject> distributedKeys ) throws IgniteCheckedException { GridDistributedCacheEntry entry = cctx.colocated().entryExx(key, topVer, false); @@ -1044,7 +1050,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity * @throws IgniteCheckedException If mapping failed. */ private GridNearLockMapping map( - KeyCacheObject key, + KeyCacheObject key, @Nullable GridNearLockMapping mapping, AffinityTopologyVersion topVer ) throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/840dd57b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 3cc892d..2683684 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -146,7 +146,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT /** Dynamic cache change requests. */ private Collection<DynamicCacheChangeRequest> reqs; - private Map<String, Boolean> cacheValidRes = new ConcurrentHashMap8<>(); + private volatile Map<Integer, Boolean> cacheValidRes; /** * Dummy future created to trigger reassignments if partition @@ -831,11 +831,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT /** {@inheritDoc} */ @Override public boolean onDone(AffinityTopologyVersion res, Throwable err) { + Map<Integer, Boolean> m = new HashMap<>(); + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (!CU.isSystemCache(cacheCtx.name()) && (cacheCtx.config().getTopologyValidator() != null)) - cacheValidRes.put(cacheCtx.name(), cacheCtx.config().getTopologyValidator().validate(discoEvt.topologyNodes())); + m.put(cacheCtx.cacheId(), cacheCtx.config().getTopologyValidator().validate(discoEvt.topologyNodes())); } + cacheValidRes = m; + cctx.cache().onExchangeDone(exchId.topologyVersion(), reqs, err); cctx.exchange().onExchangeDone(this, err); @@ -863,6 +867,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT return dummy; } + /** {@inheritDoc} */ + @Override public boolean isCacheTopologyValid(GridCacheContext cctx) { + return cacheValidRes.containsKey(cctx.cacheId()) ? cacheValidRes.get(cctx.cacheId()) : true; + } + /** * Cleans up resources to avoid excessive memory usage. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/840dd57b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 789fd93..8f3e476 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -289,8 +289,8 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B * @throws GridCacheEntryRemovedException If entry was removed. */ @Nullable private GridCacheMvccCandidate addEntry( - AffinityTopologyVersion topVer, - GridNearCacheEntry entry, + AffinityTopologyVersion topVer, + GridNearCacheEntry entry, UUID dhtNodeId ) throws GridCacheEntryRemovedException { // Check if lock acquisition is timed out. @@ -693,6 +693,12 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B GridDhtTopologyFuture fut = cctx.topologyVersionFuture(); if (fut.isDone()) { + if (!fut.isCacheTopologyValid(cctx)) { + onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " + + cctx.name())); + return; + } + AffinityTopologyVersion topVer = fut.topologyVersion(); if (tx != null) @@ -1145,7 +1151,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B * @throws IgniteCheckedException If mapping for key failed. */ private GridNearLockMapping map( - KeyCacheObject key, + KeyCacheObject key, @Nullable GridNearLockMapping mapping, AffinityTopologyVersion topVer ) throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/840dd57b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java index 39a33b0..5791df5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java @@ -341,6 +341,14 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut } if (topFut.isDone()) { + for (GridCacheContext ctx : cctx.cacheContexts()){ + if (!topFut.isCacheTopologyValid(ctx)) { + onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " + + ctx.name())); + return; + } + } + tx.topologyVersion(topFut.topologyVersion()); prepare0(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/840dd57b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorCacheTest.java index 07d2d1d..45da755 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTopologyValidatorCacheTest.java @@ -17,10 +17,12 @@ package org.apache.ignite.internal.processors.cache; +import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; +import javax.cache.*; import java.io.Serializable; import java.util.*; @@ -60,7 +62,7 @@ public class IgniteTopologyValidatorCacheTest extends IgniteCacheAbstractTest im for (CacheConfiguration cCfg : iCfg.getCacheConfiguration()) { cCfg.setTopologyValidator(new TopologyValidator() { @Override public boolean validate(Collection<ClusterNode> nodes) { - return nodes.size() >= 3; + return nodes.size() >= 2; } }); } @@ -70,8 +72,20 @@ public class IgniteTopologyValidatorCacheTest extends IgniteCacheAbstractTest im /** topology validator test */ public void testTopologyValidator() throws Exception { - startGrid(); - jcache().getName(); + try { + jcache().put("1", "1"); + assert false : "topology validation broken"; + }catch (CacheException ex){ + //No-op + } + + startGrid(2); + + try { + jcache().put("1", "1"); + }catch (CacheException ex){ + assert false : "topology validation broken"; + } } }