Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-834 9be603c17 -> 79e07233d


# ignite-834


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/79e07233
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/79e07233
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/79e07233

Branch: refs/heads/ignite-834
Commit: 79e07233deead23f420ad4ced2ef1d715a8a1ca5
Parents: 9be603c
Author: sboikov <sboi...@gridgain.com>
Authored: Tue May 5 17:10:15 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Tue May 5 17:10:15 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      | 112 ++++++++++---------
 .../resources/META-INF/classnames.properties    |   6 +-
 2 files changed, 65 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/79e07233/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index efa5fb5..b493812 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1115,34 +1115,38 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
     /**
      * @param timeout Timeout for clearLocally all task in milliseconds (0 for 
never).
      *      Set it to larger value for large caches.
-     * @param keys Keys to clear.
+     * @param keys Keys to clear or {@code null} if all cache should be 
cleared.
      * @throws IgniteCheckedException In case of cache could not be cleared on 
any of the nodes.
      */
-    private void clearRemotes(long timeout, final Set<? extends K> keys) 
throws IgniteCheckedException {
+    private void clearRemotes(long timeout, @Nullable final Set<? extends K> 
keys) throws IgniteCheckedException {
         // Send job to remote nodes only.
-        ClusterGroup nodes = ctx.grid().cluster().forCacheNodes(name(), true, 
true, false).forRemotes();
+        Collection<ClusterNode> nodes =
+            ctx.grid().cluster().forCacheNodes(name(), true, true, 
false).forRemotes().nodes();
 
-        if (!nodes.nodes().isEmpty()) {
+        if (!nodes.isEmpty()) {
             ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, timeout);
 
-            ctx.kernalContext().task().setThreadContext(TC_SUBGRID, 
nodes.nodes());
+            ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
 
             ctx.kernalContext().task().execute(new ComputeTaskAdapter<Object, 
Object>() {
-                /** {@inheritDoc} */
                 @Nullable @Override public Map<? extends ComputeJob, 
ClusterNode> map(List<ClusterNode> subgrid,
                     @Nullable Object arg) throws IgniteException {
                     Map<ComputeJob, ClusterNode> jobs = new HashMap();
 
-                    for (ClusterNode node : subgrid)
+                    for (ClusterNode node : subgrid) {
                         jobs.put(keys == null ?
-                            new GlobalClearAllCallable(name(), 
ctx.affinity().affinityTopologyVersion()) :
-                            new GlobalClearKeySetCallable<K, V>(name(), 
ctx.affinity().affinityTopologyVersion(), keys),
+                            new GlobalClearAllJob(name(), 
ctx.affinity().affinityTopologyVersion()) :
+                            new GlobalClearKeySetJob<K>(name(), 
ctx.affinity().affinityTopologyVersion(), keys),
                             node);
+                    }
 
                     return jobs;
                 }
 
-                /** {@inheritDoc} */
+                @Override public ComputeJobResultPolicy 
result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+                    return ComputeJobResultPolicy.WAIT;
+                }
+
                 @Nullable @Override public Object 
reduce(List<ComputeJobResult> results) throws IgniteException {
                     return null;
                 }
@@ -1156,7 +1160,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
     }
 
     /**
-     * @param keys Keys to clear.
+     * @param keys Keys to clear or {@code null} if all cache should be 
cleared.
      * @return Future.
      */
     private IgniteInternalFuture<?> clearKeysAsync(final Set<? extends K> 
keys) {
@@ -1166,20 +1170,24 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
             ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
 
             return ctx.kernalContext().task().execute(new 
ComputeTaskAdapter<Object, Object>() {
-                /** {@inheritDoc} */
                 @Nullable @Override public Map<? extends ComputeJob, 
ClusterNode> map(List<ClusterNode> subgrid,
-                                                                  @Nullable 
Object arg) throws IgniteException {
+                    @Nullable Object arg) throws IgniteException {
                     Map<ComputeJob, ClusterNode> jobs = new HashMap();
-                    for (ClusterNode node : subgrid)
+
+                    for (ClusterNode node : subgrid) {
                         jobs.put(keys == null ?
-                            new GlobalClearAllCallable(name(), 
ctx.affinity().affinityTopologyVersion()) :
-                            new GlobalClearKeySetCallable<K, V>(name(), 
ctx.affinity().affinityTopologyVersion(), keys),
+                            new GlobalClearAllJob(name(), 
ctx.affinity().affinityTopologyVersion()) :
+                            new GlobalClearKeySetJob<K>(name(), 
ctx.affinity().affinityTopologyVersion(), keys),
                             node);
+                    }
 
                     return jobs;
                 }
 
-                /** {@inheritDoc} */
+                @Override public ComputeJobResultPolicy 
result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+                    return ComputeJobResultPolicy.WAIT;
+                }
+
                 @Nullable @Override public Object 
reduce(List<ComputeJobResult> results) throws IgniteException {
                     return null;
                 }
@@ -3587,18 +3595,20 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
 
         return ctx.kernalContext().task().execute(new 
ComputeTaskAdapter<Object, Integer>() {
-
-            /** {@inheritDoc} */
             @Nullable @Override public Map<? extends ComputeJob, ClusterNode> 
map(List<ClusterNode> subgrid,
                 @Nullable Object arg) throws IgniteException {
                 Map<ComputeJob, ClusterNode> jobs = new HashMap();
+
                 for (ClusterNode node : subgrid)
-                    jobs.put(new SizeCallable(ctx.name(), 
ctx.affinity().affinityTopologyVersion(), peekModes), node);
+                    jobs.put(new SizeJob(ctx.name(), 
ctx.affinity().affinityTopologyVersion(), peekModes), node);
 
                 return jobs;
             }
 
-            /** {@inheritDoc} */
+            @Override public ComputeJobResultPolicy result(ComputeJobResult 
res, List<ComputeJobResult> rcvd) {
+                return ComputeJobResultPolicy.WAIT;
+            }
+
             @Nullable @Override public Integer reduce(List<ComputeJobResult> 
results) throws IgniteException {
                 int size = 0;
 
@@ -4870,14 +4880,14 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
      * Global clear all.
      */
     @GridInternal
-    private static class GlobalClearAllCallable extends 
TopologyVersionAwareJob {
+    private static class GlobalClearAllJob extends TopologyVersionAwareJob {
         /** */
         private static final long serialVersionUID = 0L;
 
         /**
          * Empty constructor for serialization.
          */
-        public GlobalClearAllCallable() {
+        public GlobalClearAllJob() {
             // No-op.
         }
 
@@ -4885,13 +4895,14 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
          * @param cacheName Cache name.
          * @param topVer Affinity topology version.
          */
-        private GlobalClearAllCallable(String cacheName, 
AffinityTopologyVersion topVer) {
+        private GlobalClearAllJob(String cacheName, AffinityTopologyVersion 
topVer) {
             super(cacheName, topVer);
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public Object localExecute() {
-            ((IgniteEx)ignite).cachex(cacheName).clearLocally();
+        @Nullable @Override public Object localExecute(@Nullable 
IgniteInternalCache cache) {
+            if (cache != null)
+                cache.clearLocally();
 
             return null;
         }
@@ -4901,7 +4912,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
      * Global clear keys.
      */
     @GridInternal
-    private static class GlobalClearKeySetCallable<K, V> extends 
TopologyVersionAwareJob {
+    private static class GlobalClearKeySetJob<K> extends 
TopologyVersionAwareJob {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -4911,7 +4922,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         /**
          * Empty constructor for serialization.
          */
-        public GlobalClearKeySetCallable() {
+        public GlobalClearKeySetJob() {
             // No-op.
         }
 
@@ -4920,15 +4931,16 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
          * @param topVer Affinity topology version.
          * @param keys Keys to clear.
          */
-        private GlobalClearKeySetCallable(String cacheName, 
AffinityTopologyVersion topVer, Set<? extends K> keys) {
+        private GlobalClearKeySetJob(String cacheName, AffinityTopologyVersion 
topVer, Set<? extends K> keys) {
             super(cacheName, topVer);
 
             this.keys = keys;
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public Object localExecute() {
-            ((IgniteEx)ignite).<K, V>cachex(cacheName).clearLocallyAll(keys);
+        @Nullable @Override public Object localExecute(@Nullable 
IgniteInternalCache cache) {
+            if (cache != null)
+                cache.clearLocallyAll(keys);
 
             return null;
         }
@@ -4938,7 +4950,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
      * Internal callable for global size calculation.
      */
     @GridInternal
-    private static class SizeCallable extends TopologyVersionAwareJob {
+    private static class SizeJob extends TopologyVersionAwareJob {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -4948,7 +4960,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         /**
          * Required by {@link Externalizable}.
          */
-        public SizeCallable() {
+        public SizeJob() {
             // No-op.
         }
 
@@ -4957,17 +4969,16 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
          * @param topVer Affinity topology version.
          * @param peekModes Cache peek modes.
          */
-        private SizeCallable(String cacheName, AffinityTopologyVersion topVer, 
CachePeekMode[] peekModes) {
+        private SizeJob(String cacheName, AffinityTopologyVersion topVer, 
CachePeekMode[] peekModes) {
             super(cacheName, topVer);
 
             this.peekModes = peekModes;
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public Object localExecute() {
-            IgniteInternalCache<Object, Object> cache = 
((IgniteEx)ignite).cachex(cacheName);
-
-            assert cache != null : cacheName;
+        @Nullable @Override public Object localExecute(@Nullable 
IgniteInternalCache cache) {
+            if (cache == null)
+                return 0;
 
             try {
                 return cache.localSize(peekModes);
@@ -4979,7 +4990,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
         /** {@inheritDoc} */
         public String toString() {
-            return S.toString(SizeCallable.class, this);
+            return S.toString(SizeJob.class, this);
         }
     }
 
@@ -5548,11 +5559,11 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
     /**
      * Delayed callable class.
      */
-    protected static abstract class TopologyVersionAwareJob<K, V> extends 
ComputeJobAdapter {
+    protected static abstract class TopologyVersionAwareJob extends 
ComputeJobAdapter {
         /** */
         private static final long serialVersionUID = 0L;
 
-        /** Auto-inject job context. */
+        /** Injected job context. */
         @JobContextResource
         protected ComputeJobContext jobCtx;
 
@@ -5578,6 +5589,8 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
          * @param topVer Affinity topology version.
          */
         public TopologyVersionAwareJob(String cacheName, 
AffinityTopologyVersion topVer) {
+            assert topVer != null;
+
             this.cacheName = cacheName;
             this.topVer = topVer;
         }
@@ -5586,25 +5599,24 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         @Nullable @Override public final Object execute() {
             waitAffinityReadyFuture();
 
-            return localExecute();
+            IgniteInternalCache cache = 
((IgniteKernal)ignite).context().cache().cache(cacheName);
+
+            return localExecute(cache);
         }
 
         /**
+         * @param cache Cache.
          * @return Local execution result.
          */
-        @Nullable protected abstract Object localExecute();
+        @Nullable protected abstract Object localExecute(@Nullable 
IgniteInternalCache cache);
 
         /**
-         * Hold (suspend) job execution until our cache version becomes equal 
to remote cache's version.
+         * Holds (suspends) job execution until our cache version becomes 
equal to remote cache's version.
          */
         private void waitAffinityReadyFuture() {
-            GridCacheProcessor cacheProc = ((IgniteKernal) 
ignite).context().cache();
-
-            GridCacheAdapter<K, V> cacheAdapter = 
cacheProc.internalCache(cacheName);
-
-            final GridCacheContext<K, V> ctx = cacheAdapter.context();
+            GridCacheProcessor cacheProc = 
((IgniteKernal)ignite).context().cache();
 
-            AffinityTopologyVersion locTopVer = 
ctx.affinity().affinityTopologyVersion();
+            AffinityTopologyVersion locTopVer = 
cacheProc.context().exchange().readyAffinityVersion();
 
             if (locTopVer.compareTo(topVer) < 0) {
                 IgniteInternalFuture<?> fut = 
cacheProc.context().exchange().affinityReadyFuture(topVer);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/79e07233/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties 
b/modules/core/src/main/resources/META-INF/classnames.properties
index 35495ed..ff263cd 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -323,13 +323,13 @@ 
org.apache.ignite.internal.processors.cache.GridCacheAdapter$72
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$73
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$74
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$9
-org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearAllCallable
+org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearAllJob
 
org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearCallable
-org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearKeySetCallable
+org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearKeySetJob
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalSizeCallable
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$LoadCacheClosure
 org.apache.ignite.internal.processors.cache.GridCacheAdapter$LoadKeysCallable
-org.apache.ignite.internal.processors.cache.GridCacheAdapter$SizeCallable
+org.apache.ignite.internal.processors.cache.GridCacheAdapter$SizeJob
 
org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdateGetTimeStatClosure
 
org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdatePutAndGetTimeStatClosure
 
org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdatePutTimeStatClosure

Reply via email to