Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-373 ceb283542 -> e50632cad


#ignite-373: Use tasks instead of callable in removeAll.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e50632ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e50632ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e50632ca

Branch: refs/heads/ignite-373
Commit: e50632cad41ddd076d1e5bf65ec87d1aa3d06485
Parents: ceb2835
Author: ivasilinets <ivasilin...@gridgain.com>
Authored: Tue May 12 16:29:39 2015 +0300
Committer: ivasilinets <ivasilin...@gridgain.com>
Committed: Tue May 12 16:29:39 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |  19 ++-
 .../GridDistributedCacheAdapter.java            | 153 +++++++++++--------
 2 files changed, 99 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50632ca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 3826bfa..0b7fa91 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -4842,7 +4842,9 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public Object localExecute(@Nullable 
IgniteInternalCache cache) {
+        @Nullable @Override public Object localExecute() {
+            IgniteInternalCache cache = 
((IgniteKernal)ignite).context().cache().cache(cacheName);
+
             if (cache != null)
                 cache.clearLocally();
 
@@ -4880,7 +4882,9 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public Object localExecute(@Nullable 
IgniteInternalCache cache) {
+        @Nullable @Override public Object localExecute() {
+            IgniteInternalCache cache = 
((IgniteKernal)ignite).context().cache().cache(cacheName);
+
             if (cache != null)
                 cache.clearLocallyAll(keys);
 
@@ -4918,7 +4922,9 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public Object localExecute(@Nullable 
IgniteInternalCache cache) {
+        @Nullable @Override public Object localExecute() {
+            IgniteInternalCache cache = 
((IgniteKernal)ignite).context().cache().cache(cacheName);
+
             if (cache == null)
                 return 0;
 
@@ -5541,16 +5547,13 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         @Nullable @Override public final Object execute() {
             waitAffinityReadyFuture();
 
-            IgniteInternalCache cache = 
((IgniteKernal)ignite).context().cache().cache(cacheName);
-
-            return localExecute(cache);
+            return localExecute();
         }
 
         /**
-         * @param cache Cache.
          * @return Local execution result.
          */
-        @Nullable protected abstract Object localExecute(@Nullable 
IgniteInternalCache cache);
+        @Nullable protected abstract Object localExecute();
 
         /**
          * Holds (suspends) job execution until our cache version becomes 
equal to remote cache's version.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50632ca/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index d10ab56..cafa8b3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -19,6 +19,7 @@ package 
org.apache.ignite.internal.processors.cache.distributed;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.processors.affinity.*;
@@ -33,16 +34,14 @@ import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
-import org.apache.ignite.resources.*;
 import org.apache.ignite.transactions.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.util.*;
-import java.util.concurrent.*;
 
-import static org.apache.ignite.internal.GridClosureCallMode.*;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.*;
 
 /**
  * Distributed cache implementation.
@@ -144,10 +143,10 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
         try {
             AffinityTopologyVersion topVer;
 
-            boolean removedAll;
+            Boolean rmvAll;
 
             do {
-                removedAll = true;
+                rmvAll = true;
 
                 topVer = ctx.affinity().affinityTopologyVersion();
 
@@ -155,19 +154,12 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
                 Collection<ClusterNode> nodes = 
ctx.grid().cluster().forDataNodes(name()).nodes();
 
                 if (!nodes.isEmpty()) {
-                    CacheOperationContext opCtx = 
ctx.operationContextPerCall();
+                    ctx.kernalContext().task().setThreadContext(TC_SUBGRID, 
nodes);
 
-                    Collection<Boolean> results = 
ctx.closures().callAsyncNoFailover(BROADCAST,
-                        Collections.singleton(new 
GlobalRemoveAllCallable<>(name(), topVer,
-                        opCtx != null && opCtx.skipStore())), nodes, 
true).get();
-
-                    for (Boolean res : results) {
-                        if (res != null && !res)
-                            removedAll = false;
-                    }
+                    rmvAll = ctx.kernalContext().task().execute(new 
RemoveAllTask(ctx), null).get();
                 }
             }
-            while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) 
!= 0 || !removedAll);
+            while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) 
!= 0 || rmvAll == null || !rmvAll);
         }
         catch (ClusterGroupEmptyCheckedException ignore) {
             if (log.isDebugEnabled())
@@ -194,19 +186,18 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
         Collection<ClusterNode> nodes = 
ctx.grid().cluster().forDataNodes(name()).nodes();
 
         if (!nodes.isEmpty()) {
-            CacheOperationContext opCtx = ctx.operationContextPerCall();
+            ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
 
-            IgniteInternalFuture<?> rmvFut = 
ctx.closures().callAsyncNoFailover(BROADCAST,
-                    new GlobalRemoveAllCallable<>(name(), topVer, opCtx != 
null && opCtx.skipStore()), nodes, true);
+            IgniteInternalFuture<Boolean> rmvAll = 
ctx.kernalContext().task().execute(new RemoveAllTask(ctx), null);
 
-            rmvFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
-                @Override public void apply(IgniteInternalFuture<?> fut) {
+            rmvAll.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() 
{
+                @Override public void apply(IgniteInternalFuture<Boolean> fut) 
{
                     try {
-                        fut.get();
+                        Boolean res = fut.get();
 
                         AffinityTopologyVersion topVer0 = 
ctx.affinity().affinityTopologyVersion();
 
-                        if (topVer0.equals(topVer))
+                        if (topVer0.equals(topVer) && res != null && res)
                             opFut.onDone();
                         else
                             removeAllAsync(opFut, topVer0);
@@ -238,31 +229,78 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
     }
 
     /**
-     * Internal callable which performs remove all primary key mappings
-     * operation on a cache with the given name.
+     * Remove task.
      */
     @GridInternal
-    private static class GlobalRemoveAllCallable<K,V> implements 
Callable<Boolean>, Externalizable {
+    private static class RemoveAllTask extends ComputeTaskAdapter<Object, 
Boolean> {
         /** */
         private static final long serialVersionUID = 0L;
 
-        /** Cache name. */
-        private String cacheName;
+        /** Cache context. */
+        private GridCacheContext ctx;
+
+        /**
+         * Empty constructor for serialization.
+         */
+        public RemoveAllTask(){
+            // No-op.
+        }
+
+        /**
+         * @param ctx Cache context.
+         */
+        public RemoveAllTask(GridCacheContext ctx) {
+            this.ctx = ctx;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Map<? extends ComputeJob, ClusterNode> 
map(List<ClusterNode> subgrid,
+            @Nullable Object arg) throws IgniteException {
+            Map<ComputeJob, ClusterNode> jobs = new HashMap();
 
-        /** Topology version. */
-        private AffinityTopologyVersion topVer;
+            CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+            boolean skipStore = opCtx != null && opCtx.skipStore();
+
+            for (ClusterNode node : subgrid) {
+                jobs.put(new GlobalRemoveAllJob(ctx.name(), 
ctx.affinity().affinityTopologyVersion(), skipStore),
+                    node);
+            }
+
+            return jobs;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ComputeJobResultPolicy result(ComputeJobResult res, 
List<ComputeJobResult> rcvd) {
+            return ComputeJobResultPolicy.WAIT;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Boolean reduce(List<ComputeJobResult> 
results) throws IgniteException {
+            for (ComputeJobResult locRes : results) {
+                if (locRes != null && (locRes.getException() != null || 
!locRes.<Boolean>getData()))
+                    return false;
+            }
+
+            return true;
+        }
+    }
+    /**
+     * Internal job which performs remove all primary key mappings
+     * operation on a cache with the given name.
+     */
+    @GridInternal
+    private static class GlobalRemoveAllJob<K,V>  extends 
TopologyVersionAwareJob {
+        /** */
+        private static final long serialVersionUID = 0L;
 
         /** Skip store flag. */
         private boolean skipStore;
 
-        /** Injected grid instance. */
-        @IgniteInstanceResource
-        private Ignite ignite;
-
         /**
          * Empty constructor for serialization.
          */
-        public GlobalRemoveAllCallable() {
+        public GlobalRemoveAllJob() {
             // No-op.
         }
 
@@ -271,24 +309,20 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
          * @param topVer Topology version.
          * @param skipStore Skip store flag.
          */
-        private GlobalRemoveAllCallable(String cacheName, @NotNull 
AffinityTopologyVersion topVer, boolean skipStore) {
-            this.cacheName = cacheName;
-            this.topVer = topVer;
+        private GlobalRemoveAllJob(String cacheName, @NotNull 
AffinityTopologyVersion topVer, boolean skipStore) {
+            super(cacheName, topVer);
+
             this.skipStore = skipStore;
         }
 
-        /**
-         * {@inheritDoc}
-         */
-        @Override public Boolean call() throws Exception {
-            GridCacheAdapter<K, V> cacheAdapter = 
((IgniteKernal)ignite).context().cache().internalCache(cacheName);
+        /** {@inheritDoc} */
+        @Nullable @Override public Object localExecute() {
+            GridCacheAdapter cache = ((IgniteKernal) 
ignite).context().cache().internalCache(cacheName);
 
-            if (cacheAdapter == null)
+            if (cache == null)
                 return false;
 
-            final GridCacheContext<K, V> ctx = cacheAdapter.context();
-
-            ctx.affinity().affinityReadyFuture(topVer).get();
+            final GridCacheContext<K, V> ctx = cache.context();
 
             ctx.gate().enter();
 
@@ -299,16 +333,16 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
                 GridDhtCacheAdapter<K, V> dht;
                 GridNearCacheAdapter<K, V> near = null;
 
-                if (cacheAdapter instanceof GridNearCacheAdapter) {
-                    near = ((GridNearCacheAdapter<K, V>)cacheAdapter);
+                if (cache instanceof GridNearCacheAdapter) {
+                    near = ((GridNearCacheAdapter<K, V>) cache);
                     dht = near.dht();
                 }
                 else
-                    dht = (GridDhtCacheAdapter<K, V>)cacheAdapter;
+                    dht = (GridDhtCacheAdapter<K, V>) cache;
 
                 try (DataStreamerImpl<KeyCacheObject, Object> dataLdr =
-                         (DataStreamerImpl)ignite.dataStreamer(cacheName)) {
-                    ((DataStreamerImpl)dataLdr).maxRemapCount(0);
+                         (DataStreamerImpl) ignite.dataStreamer(cacheName)) {
+                    ((DataStreamerImpl) dataLdr).maxRemapCount(0);
 
                     dataLdr.skipStore(skipStore);
 
@@ -351,25 +385,14 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
                     }
                 }
             }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
             finally {
                 ctx.gate().leave();
             }
 
             return true;
         }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            U.writeString(out, cacheName);
-            out.writeObject(topVer);
-            out.writeBoolean(skipStore);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            cacheName = U.readString(in);
-            topVer = (AffinityTopologyVersion)in.readObject();
-            skipStore = in.readBoolean();
-        }
     }
 }

Reply via email to