#ignite-373: Use one topology version for all nodes in clear(), removeAll() and size().
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f670f400 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f670f400 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f670f400 Branch: refs/heads/ignite-373 Commit: f670f4001517754eed1e22f7ef333fb1a21ce93a Parents: 48fdafa Author: ivasilinets <ivasilin...@gridgain.com> Authored: Wed May 13 12:31:44 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Wed May 13 12:31:44 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 47 +++++++++++------ .../GridDistributedCacheAdapter.java | 54 +++++++++++++------- 2 files changed, 65 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f670f400/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 3826bfa..76ad7a1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1133,7 +1133,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); - ctx.kernalContext().task().execute(new ClearTask(ctx, keys), null).get(); + ctx.kernalContext().task().execute( + new ClearTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), keys), null).get(); } } @@ -1152,7 +1153,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (!nodes.isEmpty()) { ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); - return ctx.kernalContext().task().execute(new ClearTask(ctx, keys), null); + return ctx.kernalContext().task().execute( + new ClearTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), keys), null); } else return new GridFinishedFuture<>(); @@ -3571,7 +3573,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); - return ctx.kernalContext().task().execute(new SizeTask(ctx, peekModes), null); + return ctx.kernalContext().task().execute( + new SizeTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes), null); } /** {@inheritDoc} */ @@ -5583,8 +5586,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** */ private static final long serialVersionUID = 0L; - /** Cache context. */ - private GridCacheContext ctx; + /** Cache name. */ + private String cacheName; + + /** Affinity topology version. */ + private AffinityTopologyVersion topVer; /** Peek modes. */ private CachePeekMode[] peekModes; @@ -5597,10 +5603,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** - * @param ctx Cache context. + * @param cacheName Cache name. + * @param topVer Affinity topology version. + * @param peekModes Cache peek modes. */ - public SizeTask(GridCacheContext ctx, CachePeekMode[] peekModes) { - this.ctx = ctx; + public SizeTask(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes) { + this.cacheName = cacheName; + this.topVer = topVer; this.peekModes = peekModes; } @@ -5610,7 +5619,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V Map<ComputeJob, ClusterNode> jobs = new HashMap(); for (ClusterNode node : subgrid) - jobs.put(new SizeJob(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes), node); + jobs.put(new SizeJob(cacheName, topVer, peekModes), node); return jobs; } @@ -5640,8 +5649,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** */ private static final long serialVersionUID = 0L; - /** Cache context. */ - private GridCacheContext ctx; + /** Cache name. */ + private String cacheName; + + /** Affinity topology version. */ + private AffinityTopologyVersion topVer; /** Keys to clear. */ private Set<? extends K> keys; @@ -5654,11 +5666,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** - * @param ctx Cache context. + * @param cacheName Cache name. + * @param topVer Affinity topology version. * @param keys Keys to clear. */ - public ClearTask(GridCacheContext ctx, Set<? extends K> keys) { - this.ctx = ctx; + public ClearTask(String cacheName, AffinityTopologyVersion topVer, Set<? extends K> keys) { + this.cacheName = cacheName; + this.topVer = topVer; this.keys = keys; } @@ -5668,9 +5682,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V Map<ComputeJob, ClusterNode> jobs = new HashMap(); for (ClusterNode node : subgrid) { - jobs.put(keys == null ? - new GlobalClearAllJob(ctx.name(), ctx.affinity().affinityTopologyVersion()) : - new GlobalClearKeySetJob<K>(ctx.name(), ctx.affinity().affinityTopologyVersion(), keys), + jobs.put(keys == null ? new GlobalClearAllJob(cacheName, topVer) : + new GlobalClearKeySetJob<K>(cacheName, topVer, keys), node); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f670f400/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index 6f939e1..c172a87 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -143,7 +143,11 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter try { AffinityTopologyVersion topVer; - boolean retry = false; + boolean retry; + + CacheOperationContext opCtx = ctx.operationContextPerCall(); + + boolean skipStore = opCtx != null && opCtx.skipStore(); do { retry = false; @@ -156,7 +160,8 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter if (!nodes.isEmpty()) { ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); - retry = !ctx.kernalContext().task().execute(new RemoveAllTask(ctx), null).get(); + retry = !ctx.kernalContext().task().execute( + new RemoveAllTask(ctx.name(), topVer, skipStore), null).get(); } } while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) != 0 || retry); @@ -173,7 +178,11 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); - removeAllAsync(opFut, topVer); + CacheOperationContext opCtx = ctx.operationContextPerCall(); + + boolean skipStore = opCtx != null && opCtx.skipStore(); + + removeAllAsync(opFut, topVer, skipStore); return opFut; } @@ -181,14 +190,17 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter /** * @param opFut Future. * @param topVer Topology version. + * @param skipStore Skip store flag. */ - private void removeAllAsync(final GridFutureAdapter<Void> opFut, final AffinityTopologyVersion topVer) { + private void removeAllAsync(final GridFutureAdapter<Void> opFut, final AffinityTopologyVersion topVer, + final boolean skipStore) { Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes(); if (!nodes.isEmpty()) { ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); - IgniteInternalFuture<Boolean> rmvAll = ctx.kernalContext().task().execute(new RemoveAllTask(ctx), null); + IgniteInternalFuture<Boolean> rmvAll = ctx.kernalContext().task().execute( + new RemoveAllTask(ctx.name(), topVer, skipStore), null); rmvAll.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() { @Override public void apply(IgniteInternalFuture<Boolean> fut) { @@ -200,7 +212,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter if (topVer0.equals(topVer) && !retry) opFut.onDone(); else - removeAllAsync(opFut, topVer0); + removeAllAsync(opFut, topVer0, skipStore); } catch (ClusterGroupEmptyCheckedException ignore) { if (log.isDebugEnabled()) @@ -236,8 +248,14 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter /** */ private static final long serialVersionUID = 0L; - /** Cache context. */ - private GridCacheContext ctx; + /** Cache name. */ + private String cacheName; + + /** Affinity topology version. */ + private AffinityTopologyVersion topVer; + + /** Skip store flag. */ + private boolean skipStore; /** * Empty constructor for serialization. @@ -247,10 +265,14 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter } /** - * @param ctx Cache context. + * @param cacheName Cache name. + * @param topVer Affinity topology version. + * @param skipStore Skip store flag. */ - public RemoveAllTask(GridCacheContext ctx) { - this.ctx = ctx; + public RemoveAllTask(String cacheName, AffinityTopologyVersion topVer, boolean skipStore) { + this.cacheName = cacheName; + this.topVer = topVer; + this.skipStore = skipStore; } /** {@inheritDoc} */ @@ -258,14 +280,8 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter @Nullable Object arg) throws IgniteException { Map<ComputeJob, ClusterNode> jobs = new HashMap(); - CacheOperationContext opCtx = ctx.operationContextPerCall(); - - boolean skipStore = opCtx != null && opCtx.skipStore(); - - for (ClusterNode node : subgrid) { - jobs.put(new GlobalRemoveAllJob(ctx.name(), ctx.affinity().affinityTopologyVersion(), skipStore), - node); - } + for (ClusterNode node : subgrid) + jobs.put(new GlobalRemoveAllJob(cacheName, topVer, skipStore), node); return jobs; }