#ignite-373: Use one topology version for all nodes in clear(), removeAll() and 
size().


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f670f400
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f670f400
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f670f400

Branch: refs/heads/ignite-373
Commit: f670f4001517754eed1e22f7ef333fb1a21ce93a
Parents: 48fdafa
Author: ivasilinets <ivasilin...@gridgain.com>
Authored: Wed May 13 12:31:44 2015 +0300
Committer: ivasilinets <ivasilin...@gridgain.com>
Committed: Wed May 13 12:31:44 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      | 47 +++++++++++------
 .../GridDistributedCacheAdapter.java            | 54 +++++++++++++-------
 2 files changed, 65 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f670f400/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 3826bfa..76ad7a1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1133,7 +1133,8 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
             ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
 
-            ctx.kernalContext().task().execute(new ClearTask(ctx, keys), 
null).get();
+            ctx.kernalContext().task().execute(
+                new ClearTask(ctx.name(), 
ctx.affinity().affinityTopologyVersion(), keys), null).get();
         }
     }
 
@@ -1152,7 +1153,8 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         if (!nodes.isEmpty()) {
             ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
 
-            return ctx.kernalContext().task().execute(new ClearTask(ctx, 
keys), null);
+            return ctx.kernalContext().task().execute(
+                new ClearTask(ctx.name(), 
ctx.affinity().affinityTopologyVersion(), keys), null);
         }
         else
             return new GridFinishedFuture<>();
@@ -3571,7 +3573,8 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
         ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
 
-        return ctx.kernalContext().task().execute(new SizeTask(ctx, 
peekModes), null);
+        return ctx.kernalContext().task().execute(
+            new SizeTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), 
peekModes), null);
     }
 
     /** {@inheritDoc} */
@@ -5583,8 +5586,11 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         /** */
         private static final long serialVersionUID = 0L;
 
-        /** Cache context. */
-        private GridCacheContext ctx;
+        /** Cache name. */
+        private String cacheName;
+
+        /** Affinity topology version. */
+        private AffinityTopologyVersion topVer;
 
         /** Peek modes. */
         private CachePeekMode[] peekModes;
@@ -5597,10 +5603,13 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         }
 
         /**
-         * @param ctx Cache context.
+         * @param cacheName Cache name.
+         * @param topVer Affinity topology version.
+         * @param peekModes Cache peek modes.
          */
-        public SizeTask(GridCacheContext ctx, CachePeekMode[] peekModes) {
-            this.ctx = ctx;
+        public SizeTask(String cacheName, AffinityTopologyVersion topVer, 
CachePeekMode[] peekModes) {
+            this.cacheName = cacheName;
+            this.topVer = topVer;
             this.peekModes = peekModes;
         }
 
@@ -5610,7 +5619,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
             Map<ComputeJob, ClusterNode> jobs = new HashMap();
 
             for (ClusterNode node : subgrid)
-                jobs.put(new SizeJob(ctx.name(), 
ctx.affinity().affinityTopologyVersion(), peekModes), node);
+                jobs.put(new SizeJob(cacheName, topVer, peekModes), node);
 
             return jobs;
         }
@@ -5640,8 +5649,11 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         /** */
         private static final long serialVersionUID = 0L;
 
-        /** Cache context. */
-        private GridCacheContext ctx;
+        /** Cache name. */
+        private String cacheName;
+
+        /** Affinity topology version. */
+        private AffinityTopologyVersion topVer;
 
         /** Keys to clear. */
         private Set<? extends K> keys;
@@ -5654,11 +5666,13 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         }
 
         /**
-         * @param ctx Cache context.
+         * @param cacheName Cache name.
+         * @param topVer Affinity topology version.
          * @param keys Keys to clear.
          */
-        public ClearTask(GridCacheContext ctx, Set<? extends K> keys) {
-            this.ctx = ctx;
+        public ClearTask(String cacheName, AffinityTopologyVersion topVer, 
Set<? extends K> keys) {
+            this.cacheName = cacheName;
+            this.topVer = topVer;
             this.keys = keys;
         }
 
@@ -5668,9 +5682,8 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
             Map<ComputeJob, ClusterNode> jobs = new HashMap();
 
             for (ClusterNode node : subgrid) {
-                jobs.put(keys == null ?
-                        new GlobalClearAllJob(ctx.name(), 
ctx.affinity().affinityTopologyVersion()) :
-                        new GlobalClearKeySetJob<K>(ctx.name(), 
ctx.affinity().affinityTopologyVersion(), keys),
+                jobs.put(keys == null ? new GlobalClearAllJob(cacheName, 
topVer) :
+                        new GlobalClearKeySetJob<K>(cacheName, topVer, keys),
                     node);
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f670f400/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 6f939e1..c172a87 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -143,7 +143,11 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
         try {
             AffinityTopologyVersion topVer;
 
-            boolean retry = false;
+            boolean retry;
+
+            CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+            boolean skipStore = opCtx != null && opCtx.skipStore();
 
             do {
                 retry = false;
@@ -156,7 +160,8 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
                 if (!nodes.isEmpty()) {
                     ctx.kernalContext().task().setThreadContext(TC_SUBGRID, 
nodes);
 
-                    retry = !ctx.kernalContext().task().execute(new 
RemoveAllTask(ctx), null).get();
+                    retry = !ctx.kernalContext().task().execute(
+                        new RemoveAllTask(ctx.name(), topVer, skipStore), 
null).get();
                 }
             }
             while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) 
!= 0 || retry);
@@ -173,7 +178,11 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
 
         AffinityTopologyVersion topVer = 
ctx.affinity().affinityTopologyVersion();
 
-        removeAllAsync(opFut, topVer);
+        CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+        boolean skipStore = opCtx != null && opCtx.skipStore();
+
+        removeAllAsync(opFut, topVer, skipStore);
 
         return opFut;
     }
@@ -181,14 +190,17 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
     /**
      * @param opFut Future.
      * @param topVer Topology version.
+     * @param skipStore Skip store flag.
      */
-    private void removeAllAsync(final GridFutureAdapter<Void> opFut, final 
AffinityTopologyVersion topVer) {
+    private void removeAllAsync(final GridFutureAdapter<Void> opFut, final 
AffinityTopologyVersion topVer,
+        final boolean skipStore) {
         Collection<ClusterNode> nodes = 
ctx.grid().cluster().forDataNodes(name()).nodes();
 
         if (!nodes.isEmpty()) {
             ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
 
-            IgniteInternalFuture<Boolean> rmvAll = 
ctx.kernalContext().task().execute(new RemoveAllTask(ctx), null);
+            IgniteInternalFuture<Boolean> rmvAll = 
ctx.kernalContext().task().execute(
+                new RemoveAllTask(ctx.name(), topVer, skipStore), null);
 
             rmvAll.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() 
{
                 @Override public void apply(IgniteInternalFuture<Boolean> fut) 
{
@@ -200,7 +212,7 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
                         if (topVer0.equals(topVer) && !retry)
                             opFut.onDone();
                         else
-                            removeAllAsync(opFut, topVer0);
+                            removeAllAsync(opFut, topVer0, skipStore);
                     }
                     catch (ClusterGroupEmptyCheckedException ignore) {
                         if (log.isDebugEnabled())
@@ -236,8 +248,14 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
         /** */
         private static final long serialVersionUID = 0L;
 
-        /** Cache context. */
-        private GridCacheContext ctx;
+        /** Cache name. */
+        private String cacheName;
+
+        /** Affinity topology version. */
+        private AffinityTopologyVersion topVer;
+
+        /** Skip store flag. */
+        private boolean skipStore;
 
         /**
          * Empty constructor for serialization.
@@ -247,10 +265,14 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
         }
 
         /**
-         * @param ctx Cache context.
+         * @param cacheName Cache name.
+         * @param topVer Affinity topology version.
+         * @param skipStore Skip store flag.
          */
-        public RemoveAllTask(GridCacheContext ctx) {
-            this.ctx = ctx;
+        public RemoveAllTask(String cacheName, AffinityTopologyVersion topVer, 
boolean skipStore) {
+            this.cacheName = cacheName;
+            this.topVer = topVer;
+            this.skipStore = skipStore;
         }
 
         /** {@inheritDoc} */
@@ -258,14 +280,8 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
             @Nullable Object arg) throws IgniteException {
             Map<ComputeJob, ClusterNode> jobs = new HashMap();
 
-            CacheOperationContext opCtx = ctx.operationContextPerCall();
-
-            boolean skipStore = opCtx != null && opCtx.skipStore();
-
-            for (ClusterNode node : subgrid) {
-                jobs.put(new GlobalRemoveAllJob(ctx.name(), 
ctx.affinity().affinityTopologyVersion(), skipStore),
-                    node);
-            }
+            for (ClusterNode node : subgrid)
+                jobs.put(new GlobalRemoveAllJob(cacheName, topVer, skipStore), 
node);
 
             return jobs;
         }

Reply via email to