IGNITE-1275 - Use topology-safe method in marshaller context to prevent deadlocks.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6b93ee7a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6b93ee7a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6b93ee7a Branch: refs/heads/master Commit: 6b93ee7a39b94b6edb52de7543fb222ef44a1bd3 Parents: abbd308 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Thu Aug 20 16:19:01 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Thu Aug 20 16:19:01 2015 -0700 ---------------------------------------------------------------------- .../ignite/internal/MarshallerContextImpl.java | 2 +- .../processors/cache/GridCacheAdapter.java | 82 +++++-- .../distributed/dht/GridDhtCacheAdapter.java | 12 +- .../cache/distributed/dht/GridDhtGetFuture.java | 12 +- .../dht/GridPartitionedGetFuture.java | 86 +++++--- .../dht/atomic/GridDhtAtomicCache.java | 16 +- .../dht/colocated/GridDhtColocatedCache.java | 19 +- .../distributed/near/GridNearAtomicCache.java | 6 +- .../distributed/near/GridNearCacheAdapter.java | 15 +- .../distributed/near/GridNearCacheEntry.java | 4 +- .../distributed/near/GridNearGetFuture.java | 101 ++++++--- .../near/GridNearTransactionalCache.java | 9 +- .../cache/distributed/near/GridNearTxLocal.java | 7 +- .../local/atomic/GridLocalAtomicCache.java | 17 +- .../IgniteCacheTopologySafeGetSelfTest.java | 215 +++++++++++++++++++ ...gniteCachePutRetryTransactionalSelfTest.java | 2 + .../IgniteCacheFailoverTestSuite.java | 2 + 17 files changed, 494 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java index 87bd3b6..dc0fd57 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java @@ -136,7 +136,7 @@ public class MarshallerContextImpl extends MarshallerContextAdapter { throw new IllegalStateException("Failed to initialize marshaller context (grid is stopping)."); } - String clsName = cache0.get(id); + String clsName = cache0.getTopologySafe(id); if (clsName == null) { File file = new File(workDir, id + ".classname"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 992edd8..c7fbbfc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -526,7 +526,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /*subj id*/null, /*task name*/null, /*deserialize portable*/false, - /*skip values*/true + /*skip values*/true, + /*can remap*/true ).chain(new CX1<IgniteInternalFuture<Map<K, V>>, Boolean>() { @Override public Boolean applyx(IgniteInternalFuture<Map<K, V>> fut) throws IgniteCheckedException { Map<K, V> map = fut.get(); @@ -560,7 +561,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /*subj id*/null, /*task name*/null, /*deserialize portable*/false, - /*skip values*/true + /*skip values*/true, + /*can remap*/true ).chain(new CX1<IgniteInternalFuture<Map<K, V>>, Boolean>() { @Override public Boolean applyx(IgniteInternalFuture<Map<K, V>> fut) throws IgniteCheckedException { Map<K, V> kvMap = fut.get(); @@ -894,7 +896,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public Set<Cache.Entry<K, V>> entrySet() { - return entrySet((CacheEntryPredicate[]) null); + return entrySet((CacheEntryPredicate[])null); } /** {@inheritDoc} */ @@ -919,12 +921,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public Set<K> primaryKeySet() { - return primaryKeySet((CacheEntryPredicate[]) null); + return primaryKeySet((CacheEntryPredicate[])null); } /** {@inheritDoc} */ @Override public Collection<V> values() { - return values((CacheEntryPredicate[]) null); + return values((CacheEntryPredicate[])null); } /** @@ -1210,22 +1212,57 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V @Override public V getForcePrimary(K key) throws IgniteCheckedException { String taskName = ctx.kernalContext().job().currentTaskName(); - return getAllAsync(F.asList(key), /*force primary*/true, /*skip tx*/false, null, null, taskName, true, false) - .get().get(key); + return getAllAsync( + F.asList(key), + /*force primary*/true, + /*skip tx*/false, + /*cached entry*/null, + /*subject id*/null, + taskName, + /*deserialize cache objects*/true, + /*skip values*/false, + /*can remap*/true + ).get().get(key); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<V> getForcePrimaryAsync(final K key) { String taskName = ctx.kernalContext().job().currentTaskName(); - return getAllAsync(Collections.singletonList(key), /*force primary*/true, /*skip tx*/false, null, null, - taskName, true, false).chain(new CX1<IgniteInternalFuture<Map<K, V>>, V>() { - @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException { + return getAllAsync( + Collections.singletonList(key), + /*force primary*/true, + /*skip tx*/false, + null, + null, + taskName, + true, + false, + /*can remap*/true + ).chain(new CX1<IgniteInternalFuture<Map<K, V>>, V>() { + @Override + public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException { return e.get().get(key); } }); } + public V getTopologySafe(K key) throws IgniteCheckedException { + String taskName = ctx.kernalContext().job().currentTaskName(); + + return getAllAsync( + F.asList(key), + /*force primary*/false, + /*skip tx*/false, + /*cached entry*/null, + /*subject id*/null, + taskName, + /*deserialize cache objects*/true, + /*skip values*/false, + /*can remap*/false + ).get().get(key); + } + /** {@inheritDoc} */ @Nullable @Override public Map<K, V> getAllOutTx(Set<? extends K> keys) throws IgniteCheckedException { return getAllOutTxAsync(keys).get(); @@ -1242,7 +1279,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V null, taskName, !ctx.keepPortable(), - false); + /*skip values*/false, + /*can remap*/true); } /** @@ -1582,7 +1620,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V @Nullable UUID subjId, String taskName, boolean deserializePortable, - boolean skipVals + boolean skipVals, + boolean canRemap ) { CacheOperationContext opCtx = ctx.operationContextPerCall(); @@ -1597,7 +1636,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V deserializePortable, forcePrimary, skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null), - skipVals); + skipVals, + canRemap); } /** @@ -1623,7 +1663,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V final boolean deserializePortable, final boolean forcePrimary, @Nullable IgniteCacheExpiryPolicy expiry, - final boolean skipVals + final boolean skipVals, + boolean canRemap ) { ctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -1638,7 +1679,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V deserializePortable, expiry, skipVals, - false); + false, + canRemap); } /** @@ -1661,7 +1703,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V final boolean deserializePortable, @Nullable IgniteCacheExpiryPolicy expiry, final boolean skipVals, - final boolean keepCacheObjects + final boolean keepCacheObjects, + boolean canRemap ) { if (F.isEmpty(keys)) return new GridFinishedFuture<>(Collections.<K1, V1>emptyMap()); @@ -1684,7 +1727,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V assert keys != null; final AffinityTopologyVersion topVer = tx == null - ? ctx.affinity().affinityTopologyVersion() + ? (canRemap ? ctx.affinity().affinityTopologyVersion(): ctx.shared().exchange().readyAffinityVersion()) : tx.topologyVersion(); final Map<K1, V1> map = new GridLeanMap<>(keys.size()); @@ -4461,7 +4504,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V null, taskName, deserializePortable, - false); + false, + /*can remap*/true); } /** @@ -4682,7 +4726,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** - * @param tx Transaction. + * */ public void execute() { tx = ctx.tm().newTx( http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index adea9e0..a7b3b1e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -527,7 +527,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap @Nullable UUID subjId, String taskName, boolean deserializePortable, - boolean skipVals + boolean skipVals, + boolean canRemap ) { CacheOperationContext opCtx = ctx.operationContextPerCall(); @@ -540,7 +541,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap deserializePortable, forcePrimary, null, - skipVals); + skipVals, + canRemap); } /** @@ -558,7 +560,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap @Nullable UUID subjId, String taskName, @Nullable IgniteCacheExpiryPolicy expiry, - boolean skipVals + boolean skipVals, + boolean canRemap ) { return getAllAsync0(keys, readThrough, @@ -568,7 +571,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap false, expiry, skipVals, - /*keep cache objects*/true); + /*keep cache objects*/true, + canRemap); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index 742fbfe..9005541 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -349,12 +349,14 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col } else { if (tx == null) { - fut = cache().getDhtAllAsync(keys.keySet(), + fut = cache().getDhtAllAsync( + keys.keySet(), readThrough, subjId, taskName, expiryPlc, - skipVals); + skipVals, + /*can remap*/true); } else { fut = tx.getAllAsync(cctx, @@ -387,12 +389,14 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col } else { if (tx == null) { - return cache().getDhtAllAsync(keys.keySet(), + return cache().getDhtAllAsync( + keys.keySet(), readThrough, subjId, taskName, expiryPlc, - skipVals); + skipVals, + /*can remap*/true); } else { return tx.getAllAsync(cctx, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 79d5e75..a85962f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -61,7 +61,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M DFLT_MAX_REMAP_CNT); /** Context. */ - private GridCacheContext<K, V> cctx; + private final GridCacheContext<K, V> cctx; /** Keys. */ private Collection<KeyCacheObject> keys; @@ -105,6 +105,9 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M /** Skip values flag. */ private boolean skipVals; + /** Flag indicating whether future can be remapped on a newer topology version. */ + private final boolean canRemap; + /** * @param cctx Context. * @param keys Keys. @@ -130,7 +133,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M String taskName, boolean deserializePortable, @Nullable IgniteCacheExpiryPolicy expiryPlc, - boolean skipVals + boolean skipVals, + boolean canRemap ) { super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size())); @@ -147,6 +151,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M this.taskName = taskName; this.expiryPlc = expiryPlc; this.skipVals = skipVals; + this.canRemap = canRemap; futId = IgniteUuid.randomUuid(); @@ -160,7 +165,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M * Initializes future. */ public void init() { - AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer : cctx.affinity().affinityTopologyVersion(); + AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer : + canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion(); map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer); @@ -334,7 +340,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M remapKeys.add(key); } - AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion()); + AffinityTopologyVersion updTopVer = cctx.discovery().topologyVersionEx(); assert updTopVer.compareTo(topVer) > 0 : "Got invalid partitions for local node but topology version did " + "not change [topVer=" + topVer + ", updTopVer=" + updTopVer + @@ -461,7 +467,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M } } - ClusterNode node = cctx.affinity().primary(key, topVer); + ClusterNode node = affinityNode(key, topVer); if (node == null) { onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + @@ -522,6 +528,28 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M } /** + * Finds affinity node to send get request to. + * + * @param key Key to get. + * @param topVer Topology version. + * @return Affinity node from which the key will be requested. + */ + private ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) { + if (!canRemap) { + List<ClusterNode> nodes = cctx.affinity().nodes(key, topVer); + + for (ClusterNode node : nodes) { + if (cctx.discovery().alive(node)) + return node; + } + + return null; + } + else + return cctx.affinity().primary(key, topVer); + } + + /** * @param infos Entry infos. * @return Result map. */ @@ -557,14 +585,14 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M private final IgniteUuid futId = IgniteUuid.randomUuid(); /** Node ID. */ - private ClusterNode node; + private final ClusterNode node; /** Keys. */ @GridToStringInclude - private LinkedHashMap<KeyCacheObject, Boolean> keys; + private final LinkedHashMap<KeyCacheObject, Boolean> keys; /** Topology version on which this future was mapped. */ - private AffinityTopologyVersion topVer; + private final AffinityTopologyVersion topVer; /** {@code True} if remapped after node left. */ private boolean remapped; @@ -625,30 +653,38 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M if (log.isDebugEnabled()) log.debug("Remote node left grid while sending or waiting for reply (will retry): " + this); - final AffinityTopologyVersion updTopVer = - new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion())); + // Try getting from existing nodes. + if (!canRemap) { + map(keys.keySet(), F.t(node, keys), topVer); + + onDone(Collections.<K, V>emptyMap()); + } + else { + final AffinityTopologyVersion updTopVer = + new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion())); - final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this, - cctx.kernalContext().config().getNetworkTimeout(), - updTopVer, - e); + final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this, + cctx.kernalContext().config().getNetworkTimeout(), + updTopVer, + e); - cctx.affinity().affinityReadyFuture(updTopVer).listen( - new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { - if (timeout.finish()) { - cctx.kernalContext().timeout().removeTimeoutObject(timeout); + cctx.affinity().affinityReadyFuture(updTopVer).listen( + new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { + if (timeout.finish()) { + cctx.kernalContext().timeout().removeTimeoutObject(timeout); - // Remap. - map(keys.keySet(), F.t(node, keys), updTopVer); + // Remap. + map(keys.keySet(), F.t(node, keys), updTopVer); - onDone(Collections.<K, V>emptyMap()); + onDone(Collections.<K, V>emptyMap()); + } } } - } - ); + ); - cctx.kernalContext().timeout().addTimeoutObject(timeout); + cctx.kernalContext().timeout().addTimeoutObject(timeout); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 96e6edc..5b82162 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -248,7 +248,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Nullable UUID subjId, final String taskName, final boolean deserializePortable, - final boolean skipVals + final boolean skipVals, + final boolean canRemap ) { ctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -278,7 +279,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { deserializePortable, expiryPlc, skipVals, - skipStore); + skipStore, + canRemap); } }); } @@ -870,8 +872,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean deserializePortable, @Nullable ExpiryPolicy expiryPlc, boolean skipVals, - boolean skipStore) { - AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); + boolean skipStore, + boolean canRemap + ) { + AffinityTopologyVersion topVer = canRemap ? ctx.affinity().affinityTopologyVersion() : + ctx.shared().exchange().readyAffinityVersion(); final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc); @@ -971,7 +976,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { taskName, deserializePortable, expiry, - skipVals); + skipVals, + canRemap); fut.init(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 221b230..eb7c78f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -155,7 +155,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte @Nullable UUID subjId, String taskName, final boolean deserializePortable, - final boolean skipVals + final boolean skipVals, + boolean canRemap ) { ctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -183,7 +184,9 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte }); } - AffinityTopologyVersion topVer = tx == null ? ctx.affinity().affinityTopologyVersion() : tx.topologyVersion(); + AffinityTopologyVersion topVer = tx == null ? + (canRemap ? ctx.affinity().affinityTopologyVersion() : ctx.shared().exchange().readyAffinityVersion()) : + tx.topologyVersion(); subjId = ctx.subjectIdPerCall(subjId, opCtx); @@ -197,7 +200,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte taskName, deserializePortable, skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null), - skipVals); + skipVals, + canRemap); } /** {@inheritDoc} */ @@ -226,7 +230,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @param skipVals Skip values flag. * @return Loaded values. */ - public IgniteInternalFuture<Map<K, V>> loadAsync(@Nullable Collection<KeyCacheObject> keys, + public IgniteInternalFuture<Map<K, V>> loadAsync( + @Nullable Collection<KeyCacheObject> keys, boolean readThrough, boolean reload, boolean forcePrimary, @@ -235,7 +240,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte String taskName, boolean deserializePortable, @Nullable IgniteCacheExpiryPolicy expiryPlc, - boolean skipVals + boolean skipVals, + boolean canRemap ) { if (keys == null || keys.isEmpty()) return new GridFinishedFuture<>(Collections.<K, V>emptyMap()); @@ -340,7 +346,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte taskName, deserializePortable, expiryPlc, - skipVals); + skipVals, + canRemap); fut.init(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 041f83a..2bf5365 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -364,7 +364,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { @Nullable UUID subjId, String taskName, boolean deserializePortable, - boolean skipVals + boolean skipVals, + boolean canRemap ) { ctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -387,7 +388,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { deserializePortable, skipVals ? null : opCtx != null ? opCtx.expiry() : null, skipVals, - opCtx != null && opCtx.skipStore()); + opCtx != null && opCtx.skipStore(), + canRemap); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 351d6cd..ba0692c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -195,13 +195,14 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda return (IgniteInternalFuture)loadAsync(tx, keys, reload, - false, + /*force primary*/false, subjId, taskName, - true, - null, + /*deserialize portable*/true, + /*expiry policy*/null, skipVals, - /*skip store*/false); + /*skip store*/false, + /*can remap*/true); } /** @@ -226,7 +227,8 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda boolean deserializePortable, @Nullable ExpiryPolicy expiryPlc, boolean skipVal, - boolean skipStore + boolean skipStore, + boolean canRemap ) { if (F.isEmpty(keys)) return new GridFinishedFuture<>(Collections.<K, V>emptyMap()); @@ -245,7 +247,8 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda taskName, deserializePortable, expiry, - skipVal); + skipVal, + canRemap); // init() will register future for responses if future has remote mappings. fut.init(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index 194c68a..6f4f15e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -333,7 +333,9 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { true, null, false, - /*skip store*/false).get().get(keyValue(false)); + /*skip store*/false, + /*can remap*/true + ).get().get(keyValue(false)); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index d109d2b..ca460c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -62,7 +62,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma private static final int MAX_REMAP_CNT = getInteger(IGNITE_NEAR_GET_MAX_REMAPS, DFLT_MAX_REMAP_CNT); /** Context. */ - private GridCacheContext<K, V> cctx; + private final GridCacheContext<K, V> cctx; /** Keys. */ private Collection<KeyCacheObject> keys; @@ -106,6 +106,9 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma /** Expiry policy. */ private IgniteCacheExpiryPolicy expiryPlc; + /** Flag indicating that get should be done on a locked topology version. */ + private final boolean canRemap; + /** * @param cctx Context. * @param keys Keys. @@ -131,7 +134,8 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma String taskName, boolean deserializePortable, @Nullable IgniteCacheExpiryPolicy expiryPlc, - boolean skipVals + boolean skipVals, + boolean canRemap ) { super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size())); @@ -148,6 +152,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma this.deserializePortable = deserializePortable; this.expiryPlc = expiryPlc; this.skipVals = skipVals; + this.canRemap = canRemap; futId = IgniteUuid.randomUuid(); @@ -161,7 +166,9 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma * Initializes future. */ public void init() { - AffinityTopologyVersion topVer = tx == null ? cctx.affinity().affinityTopologyVersion() : tx.topologyVersion(); + AffinityTopologyVersion topVer = tx == null ? + (canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion()) : + tx.topologyVersion(); map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer); @@ -327,7 +334,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma remapKeys.add(key); } - AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion()); + AffinityTopologyVersion updTopVer = cctx.discovery().topologyVersionEx(); assert updTopVer.compareTo(topVer) > 0 : "Got invalid partitions for local node but topology version did " + "not change [topVer=" + topVer + ", updTopVer=" + updTopVer + @@ -435,7 +442,7 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma taskName, expiryPlc); - ClusterNode primary = null; + ClusterNode affNode = null; if (v == null && allowLocRead && cctx.affinityNode()) { GridDhtCacheAdapter<K, V> dht = cache().dht(); @@ -472,16 +479,16 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma near.metrics0().onRead(true); } else { - primary = cctx.affinity().primary(key, topVer); + affNode = affinityNode(key, topVer); - if (primary == null) { + if (affNode == null) { onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + "(all partition nodes left the grid).")); return savedVers; } - if (!primary.isLocal() && cctx.cache().configuration().isStatisticsEnabled() && !skipVals) + if (!affNode.isLocal() && cctx.cache().configuration().isStatisticsEnabled() && !skipVals) near.metrics0().onRead(false); } } @@ -507,10 +514,10 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0))); } else { - if (primary == null) { - primary = cctx.affinity().primary(key, topVer); + if (affNode == null) { + affNode = affinityNode(key, topVer); - if (primary == null) { + if (affNode == null) { onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + "(all partition nodes left the grid).")); @@ -527,13 +534,13 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma savedVers.put(key, nearEntry == null ? null : nearEntry.dhtVersion()); - LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(primary); + LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(affNode); if (keys != null && keys.containsKey(key)) { if (remapCnt.incrementAndGet() > MAX_REMAP_CNT) { onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " + MAX_REMAP_CNT + " attempts (key got remapped to the same node) " + - "[key=" + key + ", node=" + U.toShortString(primary) + ", mappings=" + mapped + ']')); + "[key=" + key + ", node=" + U.toShortString(affNode) + ", mappings=" + mapped + ']')); return savedVers; } @@ -545,10 +552,10 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma if (!addRdr && tx.readCommitted() && !tx.writeSet().contains(cctx.txKey(key))) addRdr = true; - LinkedHashMap<KeyCacheObject, Boolean> old = mappings.get(primary); + LinkedHashMap<KeyCacheObject, Boolean> old = mappings.get(affNode); if (old == null) - mappings.put(primary, old = new LinkedHashMap<>(3, 1f)); + mappings.put(affNode, old = new LinkedHashMap<>(3, 1f)); old.put(key, addRdr); } @@ -579,6 +586,28 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma } /** + * Affinity node to send get request to. + * + * @param key Key to get. + * @param topVer Topology version. + * @return Affinity node to get key from. + */ + private ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) { + if (!canRemap) { + List<ClusterNode> affNodes = cctx.affinity().nodes(key, topVer); + + for (ClusterNode node : affNodes) { + if (cctx.discovery().alive(node)) + return node; + } + + return null; + } + else + return cctx.affinity().primary(key, topVer); + } + + /** * @return Near cache. */ private GridNearCacheAdapter<K, V> cache() { @@ -752,30 +781,38 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma if (log.isDebugEnabled()) log.debug("Remote node left grid while sending or waiting for reply (will retry): " + this); - final AffinityTopologyVersion updTopVer = - new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion())); + // Try getting value from alive nodes. + if (!canRemap) { + // Remap + map(keys.keySet(), F.t(node, keys), topVer); + + onDone(Collections.<K, V>emptyMap()); + } else { + final AffinityTopologyVersion updTopVer = + new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion())); - final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this, - cctx.kernalContext().config().getNetworkTimeout(), - updTopVer, - e); + final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this, + cctx.kernalContext().config().getNetworkTimeout(), + updTopVer, + e); - cctx.affinity().affinityReadyFuture(updTopVer).listen( - new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { - if (timeout.finish()) { - cctx.kernalContext().timeout().removeTimeoutObject(timeout); + cctx.affinity().affinityReadyFuture(updTopVer).listen( + new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { + if (timeout.finish()) { + cctx.kernalContext().timeout().removeTimeoutObject(timeout); - // Remap. - map(keys.keySet(), F.t(node, keys), updTopVer); + // Remap. + map(keys.keySet(), F.t(node, keys), updTopVer); - onDone(Collections.<K, V>emptyMap()); + onDone(Collections.<K, V>emptyMap()); + } } } - } - ); + ); - cctx.kernalContext().timeout().addTimeoutObject(timeout); + cctx.kernalContext().timeout().addTimeoutObject(timeout); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index 696acfb..a1f1383 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -101,7 +101,8 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> @Nullable UUID subjId, String taskName, final boolean deserializePortable, - final boolean skipVals + final boolean skipVals, + boolean canRemap ) { ctx.checkSecurity(SecurityPermission.CACHE_READ); @@ -142,7 +143,8 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> deserializePortable, skipVals ? null : opCtx != null ? opCtx.expiry() : null, skipVals, - skipStore); + skipStore, + canRemap); } /** @@ -172,7 +174,8 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> tx.resolveTaskName(), deserializePortable, expiryPlc, - skipVals); + skipVals, + /*can remap*/true); // init() will register future for responses if it has remote mappings. fut.init(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index cb391e4..5ff7345 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -313,7 +313,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { }); } else if (cacheCtx.isColocated()) { - return cacheCtx.colocated().loadAsync(keys, + return cacheCtx.colocated().loadAsync( + keys, readThrough, /*reload*/false, /*force primary*/false, @@ -322,7 +323,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { resolveTaskName(), deserializePortable, accessPolicy(cacheCtx, keys), - skipVals).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() { + skipVals, + /*can remap*/true + ).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() { @Override public Boolean apply(IgniteInternalFuture<Map<Object, Object>> f) { try { Map<Object, Object> map = f.get(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index bcbdec4..c648f11 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -458,7 +458,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { @Nullable UUID subjId, final String taskName, final boolean deserializePortable, - final boolean skipVals + final boolean skipVals, + boolean canRemap ) { A.notNull(keys, "keys"); @@ -570,8 +571,18 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { if (success || !storeEnabled) return vals; - return getAllAsync(keys, opCtx == null || !opCtx.skipStore(), null, false, subjId, taskName, deserializePortable, - false, expiry, skipVals).get(); + return getAllAsync( + keys, + opCtx == null || !opCtx.skipStore(), + null, + false, + subjId, + taskName, + deserializePortable, + /*force primary*/false, + expiry, + skipVals, + /*can remap*/true).get(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTopologySafeGetSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTopologySafeGetSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTopologySafeGetSelfTest.java new file mode 100644 index 0000000..ef031f6 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTopologySafeGetSelfTest.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; +import org.apache.ignite.transactions.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.transactions.TransactionConcurrency.*; +import static org.apache.ignite.transactions.TransactionIsolation.*; + +/** + * + */ +public class IgniteCacheTopologySafeGetSelfTest extends GridCommonAbstractTest { + /** Number of initial grids. */ + public static final int GRID_CNT = 4; + + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** TX commit latch. */ + private CountDownLatch releaseLatch; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCacheConfiguration( + cacheCfg("tx", TRANSACTIONAL, false), + cacheCfg("atomic", ATOMIC, false), + cacheCfg("tx_near", TRANSACTIONAL, true), + cacheCfg("atomic_near", ATOMIC, true)); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + return cfg; + } + + /** + * @param name Cache name. + * @param cacheMode Cache mode. + * @param near Near enabled flag. + * @return Cache configuration. + */ + @SuppressWarnings("unchecked") + private CacheConfiguration cacheCfg(String name, CacheAtomicityMode cacheMode, boolean near) { + CacheConfiguration cfg = new CacheConfiguration(name); + + cfg.setAtomicityMode(cacheMode); + cfg.setBackups(1); + + if (near) + cfg.setNearConfiguration(new NearCacheConfiguration()); + else + cfg.setNearConfiguration(null); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testGetTopologySafeNodeJoin() throws Exception { + checkGetTopologySafeNodeJoin(false); + } + + /** + * @throws Exception If failed. + */ + public void testGetTopologySafeNodeJoinPrimaryLeave() throws Exception { + checkGetTopologySafeNodeJoin(true); + } + + /** + * @throws Exception If failed. + */ + public void checkGetTopologySafeNodeJoin(boolean failPrimary) throws Exception { + startGrids(GRID_CNT); + + awaitPartitionMapExchange(); + + try { + ClusterNode targetNode = ignite(1).cluster().localNode(); + + info(">>> Target node: " + targetNode.id()); + + // Populate caches with a key that does not belong to ignite(0). + int key = -1; + for (int i = 0; i < 100; i++) { + Collection<ClusterNode> nodes = ignite(0).affinity("tx").mapKeyToPrimaryAndBackups(i); + ClusterNode primaryNode = F.first(nodes); + + if (!nodes.contains(ignite(0).cluster().localNode()) && primaryNode.id().equals(targetNode.id())) { + ignite(1).cache("tx").put(i, i); + ignite(1).cache("atomic").put(i, i); + ignite(1).cache("tx_near").put(i, i); + ignite(1).cache("atomic_near").put(i, i); + + key = i; + + + break; + } + } + + assertTrue(key != -1); + + IgniteInternalFuture<?> txFut = startBlockingTxAsync(); + + IgniteInternalFuture<?> nodeFut = startNodeAsync(); + + if (failPrimary) + stopGrid(1); + + assertEquals(key, ((IgniteKernal)ignite(0)).internalCache("tx").getTopologySafe(key)); + assertEquals(key, ((IgniteKernal)ignite(0)).internalCache("atomic").getTopologySafe(key)); + assertEquals(key, ((IgniteKernal)ignite(0)).internalCache("tx_near").getTopologySafe(key)); + assertEquals(key, ((IgniteKernal)ignite(0)).internalCache("atomic_near").getTopologySafe(key)); + + releaseTx(); + + txFut.get(); + nodeFut.get(); + } + finally { + stopAllGrids(); + } + } + + private IgniteInternalFuture<?> startNodeAsync() throws Exception { + IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override + public Object call() throws Exception { + startGrid(GRID_CNT); + + return null; + } + }); + + U.sleep(1000); + + return fut; + } + + /** + * @return TX release future. + * @throws Exception If failed. + */ + private IgniteInternalFuture<?> startBlockingTxAsync() throws Exception { + final CountDownLatch lockLatch = new CountDownLatch(1); + + IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + try (Transaction ignore = ignite(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + for (int i = 0; i < 30; i++) + ignite(0).cache("tx").get("value-" + i); + + releaseLatch = new CountDownLatch(1); + + lockLatch.countDown(); + + releaseLatch.await(); + } + + return null; + } + }); + + lockLatch.await(); + + return fut; + } + + /** + * + */ + private void releaseTx() { + assert releaseLatch != null; + + releaseLatch.countDown(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java index 9c4446d..c2fc46c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java @@ -120,6 +120,8 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr stopGrid(stopIdx); + U.sleep(500); + startGrid(stopIdx); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6b93ee7a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java index af2b85c..b64471b 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java @@ -80,6 +80,8 @@ public class IgniteCacheFailoverTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheSizeFailoverTest.class); + suite.addTestSuite(IgniteCacheTopologySafeGetSelfTest.class); + return suite; } }