#ignite-834: Change size and clear to jobs.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/863d6bf6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/863d6bf6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/863d6bf6

Branch: refs/heads/ignite-834
Commit: 863d6bf69f75c2bf1a7571ffa76259544baf0539
Parents: 086553e
Author: ivasilinets <ivasilin...@gridgain.com>
Authored: Tue May 5 15:23:41 2015 +0300
Committer: ivasilinets <ivasilin...@gridgain.com>
Committed: Tue May 5 15:23:41 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      | 331 +++++--------------
 .../GridDistributedCacheAdapter.java            |  28 +-
 2 files changed, 103 insertions(+), 256 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/863d6bf6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index d59e9cc..596705e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -25,7 +25,6 @@ import org.apache.ignite.compute.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
-import org.apache.ignite.internal.compute.*;
 import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.affinity.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
@@ -1123,32 +1122,31 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
      * @param clearCall Global clear callable object.
      * @throws IgniteCheckedException In case of cache could not be cleared on 
any of the nodes.
      */
-    private void clearRemotes(long timeout, GlobalClearCallable clearCall) 
throws IgniteCheckedException {
-        try {
-            // Send job to remote nodes only.
-            Collection<ClusterNode> nodes =
-                ctx.grid().cluster().forCacheNodes(name(), true, true, 
false).forRemotes().nodes();
+    private void clearRemotes(long timeout, final TopologyVersionAwareJob 
clearCall) throws IgniteCheckedException {
+        // Send job to remote nodes only.
+        ClusterGroup nodes = ctx.grid().cluster().forCacheNodes(name(), true, 
true, false).forRemotes();
 
-            IgniteInternalFuture<Object> fut = null;
+        if (!nodes.nodes().isEmpty()) {
+            ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, timeout);
 
-            if (!nodes.isEmpty()) {
-                ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, 
timeout);
+            ctx.grid().context().task().setThreadContext(TC_SUBGRID, nodes);
 
-                fut = ctx.closures().callAsyncNoFailover(BROADCAST, clearCall, 
nodes, true);
-            }
+            ctx.grid().context().task().execute(new ComputeTaskAdapter<Object, 
Object>() {
+                /** {@inheritDoc} */
+                @Nullable @Override public Map<? extends ComputeJob, 
ClusterNode> map(List<ClusterNode> subgrid,
+                    @Nullable Object arg) throws IgniteException {
+                    Map<ComputeJob, ClusterNode> jobs = new HashMap();
+                    for (ClusterNode node : subgrid)
+                        jobs.put(clearCall, node);
 
-            if (fut != null)
-                fut.get();
-        }
-        catch (ClusterGroupEmptyCheckedException ignore) {
-            if (log.isDebugEnabled())
-                log.debug("All remote nodes left while cache clearLocally 
[cacheName=" + name() + "]");
-        }
-        catch (ComputeTaskTimeoutCheckedException e) {
-            U.warn(log, "Timed out waiting for remote nodes to finish cache 
clear (consider increasing " +
-                "'networkTimeout' configuration property) [cacheName=" + 
name() + "]");
+                    return jobs;
+                }
 
-            throw e;
+                /** {@inheritDoc} */
+                @Nullable @Override public Object 
reduce(List<ComputeJobResult> results) throws IgniteException {
+                    return null;
+                }
+            }, null).get();
         }
     }
 
@@ -1161,26 +1159,28 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
      * @param clearCall Global clear callable object.
      * @return Future.
      */
-    private IgniteInternalFuture<?> clearAsync(GlobalClearCallable clearCall) {
+    private IgniteInternalFuture<?> clearAsync(final TopologyVersionAwareJob 
clearCall) {
         Collection<ClusterNode> nodes = 
ctx.grid().cluster().forCacheNodes(name(), true, true, false).nodes();
 
         if (!nodes.isEmpty()) {
-            IgniteInternalFuture<Object> fut =
-                ctx.closures().callAsyncNoFailover(BROADCAST, clearCall, 
nodes, true);
+            ctx.grid().context().task().setThreadContext(TC_SUBGRID, nodes);
 
-            return fut.chain(new CX1<IgniteInternalFuture<Object>, Object>() {
-                @Override public Object applyx(IgniteInternalFuture<Object> 
fut) throws IgniteCheckedException {
-                    try {
-                        return fut.get();
-                    }
-                    catch (ClusterGroupEmptyCheckedException ignore) {
-                        if (log.isDebugEnabled())
-                            log.debug("All remote nodes left while cache 
clearLocally [cacheName=" + name() + "]");
+            return ctx.grid().context().task().execute(new 
ComputeTaskAdapter<Object, Object>() {
+                /** {@inheritDoc} */
+                @Nullable @Override public Map<? extends ComputeJob, 
ClusterNode> map(List<ClusterNode> subgrid,
+                                                                  @Nullable 
Object arg) throws IgniteException {
+                    Map<ComputeJob, ClusterNode> jobs = new HashMap();
+                    for (ClusterNode node : subgrid)
+                        jobs.put(clearCall, node);
 
-                        return null;
-                    }
+                    return jobs;
                 }
-            });
+
+                /** {@inheritDoc} */
+                @Nullable @Override public Object 
reduce(List<ComputeJobResult> results) throws IgniteException {
+                    return null;
+                }
+            }, null);
         }
         else
             return new GridFinishedFuture<>();
@@ -3567,7 +3567,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Integer> sizeAsync(CachePeekMode[] 
peekModes) {
+    @Override public IgniteInternalFuture<Integer> sizeAsync(final 
CachePeekMode[] peekModes) {
         assert peekModes != null;
 
         PeekModes modes = parsePeekModes(peekModes, true);
@@ -3581,23 +3581,32 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         if (nodes.isEmpty())
             return new GridFinishedFuture<>(0);
 
-        IgniteInternalFuture<Collection<Integer>> fut =
-            ctx.closures().broadcastNoFailover(
-                new SizeCallable(ctx.name(), 
ctx.affinity().affinityTopologyVersion(), peekModes), null, nodes);
+        ctx.grid().context().task().setThreadContext(TC_SUBGRID, nodes);
+
+        return ctx.grid().context().task().execute(new 
ComputeTaskAdapter<Object, Integer>() {
+
+            /** {@inheritDoc} */
+            @Nullable @Override public Map<? extends ComputeJob, ClusterNode> 
map(List<ClusterNode> subgrid,
+                @Nullable Object arg) throws IgniteException {
+                Map<ComputeJob, ClusterNode> jobs = new HashMap();
+                for (ClusterNode node : subgrid)
+                    jobs.put(new SizeCallable(ctx.name(), 
ctx.affinity().affinityTopologyVersion(), peekModes), node);
 
-        return fut.chain(new CX1<IgniteInternalFuture<Collection<Integer>>, 
Integer>() {
-            @Override public Integer 
applyx(IgniteInternalFuture<Collection<Integer>> fut)
-            throws IgniteCheckedException {
-                Collection<Integer> res = fut.get();
+                return jobs;
+            }
 
-                int totalSize = 0;
+            /** {@inheritDoc} */
+            @Nullable @Override public Integer reduce(List<ComputeJobResult> 
results) throws IgniteException {
+                int size = 0;
 
-                for (Integer size : res)
-                    totalSize += size;
+                for (ComputeJobResult res : results) {
+                    if (res.getException() == null)
+                        size += res.<Integer>getData();
+                }
 
-                return totalSize;
+                return size;
             }
-        });
+        }, null);
     }
 
     /** {@inheritDoc} */
@@ -3915,51 +3924,6 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
     }
 
     /**
-     * Gets cache global size (with or without backups).
-     *
-     * @param primaryOnly {@code True} if only primary sizes should be 
included.
-     * @return Global size.
-     * @throws IgniteCheckedException If internal task execution failed.
-     */
-    private int globalSize(boolean primaryOnly) throws IgniteCheckedException {
-        try {
-            // Send job to remote nodes only.
-            Collection<ClusterNode> nodes = 
ctx.grid().cluster().forCacheNodes(name()).forRemotes().nodes();
-
-            IgniteInternalFuture<Collection<Integer>> fut = null;
-
-            if (!nodes.isEmpty()) {
-                ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, 
gridCfg.getNetworkTimeout());
-
-                fut = ctx.closures().broadcastNoFailover(
-                    new GlobalSizeCallable(name(), 
ctx.affinity().affinityTopologyVersion(), primaryOnly), null, nodes);
-            }
-
-            // Get local value.
-            int globalSize = primaryOnly ? primarySize() : size();
-
-            if (fut != null) {
-                for (Integer i : fut.get())
-                    globalSize += i;
-            }
-
-            return globalSize;
-        }
-        catch (ClusterGroupEmptyCheckedException ignore) {
-            if (log.isDebugEnabled())
-                log.debug("All remote nodes left while cache clearLocally 
[cacheName=" + name() + "]");
-
-            return primaryOnly ? primarySize() : size();
-        }
-        catch (ComputeTaskTimeoutCheckedException e) {
-            U.warn(log, "Timed out waiting for remote nodes to finish cache 
clear (consider increasing " +
-                "'networkTimeout' configuration property) [cacheName=" + 
name() + "]");
-
-            throw e;
-        }
-    }
-
-    /**
      * @param op Cache operation.
      * @param <T> Return type.
      * @return Operation result.
@@ -4900,41 +4864,10 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
     }
 
     /**
-     * Internal callable which performs clear operation on a cache with the 
given name.
-     */
-    @GridInternal
-    private static abstract class GlobalClearCallable extends DelayedCallable 
implements Callable<Object> {
-        /**
-         * Empty constructor for serialization.
-         */
-        public GlobalClearCallable() {
-            // No-op.
-        }
-
-        /**
-         * @param cacheName Cache name.
-         * @param topVer Affinity topology version.
-         */
-        protected GlobalClearCallable(String cacheName, 
AffinityTopologyVersion topVer) {
-            super(cacheName, topVer);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            super.writeExternal(out);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            super.readExternal(in);
-        }
-    }
-
-    /**
      * Global clear all.
      */
     @GridInternal
-    private static class GlobalClearAllCallable extends GlobalClearCallable {
+    private static class GlobalClearAllCallable extends 
TopologyVersionAwareJob {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -4954,9 +4887,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         }
 
         /** {@inheritDoc} */
-        @Override public Object call() throws Exception {
-            waitAffinityReadyFuture();
-
+        @Nullable @Override public Object localExecute() {
             ((IgniteEx)ignite).cachex(cacheName).clearLocally();
 
             return null;
@@ -4967,7 +4898,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
      * Global clear keys.
      */
     @GridInternal
-    private static class GlobalClearKeySetCallable<K, V> extends 
GlobalClearCallable {
+    private static class GlobalClearKeySetCallable<K, V> extends 
TopologyVersionAwareJob {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -4993,34 +4924,18 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         }
 
         /** {@inheritDoc} */
-        @Override public Object call() throws Exception {
-            waitAffinityReadyFuture();
-
+        @Nullable @Override public Object localExecute() {
             ((IgniteEx)ignite).<K, V>cachex(cacheName).clearLocallyAll(keys);
 
             return null;
         }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            super.writeExternal(out);
-
-            out.writeObject(keys);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            super.readExternal(in);
-
-            keys = (Set<K>) in.readObject();
-        }
     }
 
     /**
      * Internal callable for global size calculation.
      */
     @GridInternal
-    private static class SizeCallable extends DelayedCallable implements 
IgniteClosure<Object, Integer> {
+    private static class SizeCallable extends TopologyVersionAwareJob {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -5046,9 +4961,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         }
 
         /** {@inheritDoc} */
-        @Override public Integer apply(Object o) {
-            waitAffinityReadyFuture();
-
+        @Nullable @Override public Object localExecute() {
             IgniteInternalCache<Object, Object> cache = 
((IgniteEx)ignite).cachex(cacheName);
 
             assert cache != null : cacheName;
@@ -5062,89 +4975,12 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         }
 
         /** {@inheritDoc} */
-        @SuppressWarnings("ForLoopReplaceableByForEach")
-        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            super.writeExternal(out);
-
-            out.writeInt(peekModes.length);
-
-            for (int i = 0; i < peekModes.length; i++)
-                U.writeEnum(out, peekModes[i]);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            super.readExternal(in);
-
-            int len = in.readInt();
-
-            peekModes = new CachePeekMode[len];
-
-            for (int i = 0; i < len; i++)
-                peekModes[i] = CachePeekMode.fromOrdinal(in.readByte());
-        }
-
-        /** {@inheritDoc} */
         public String toString() {
             return S.toString(SizeCallable.class, this);
         }
     }
 
     /**
-     * Internal callable which performs {@link IgniteInternalCache#size()} or 
{@link IgniteInternalCache#primarySize()}
-     * operation on a cache with the given name.
-     */
-    @GridInternal
-    private static class GlobalSizeCallable extends DelayedCallable implements 
IgniteClosure<Object, Integer>, Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** Primary only flag. */
-        private boolean primaryOnly;
-
-        /**
-         * Empty constructor for serialization.
-         */
-        public GlobalSizeCallable() {
-            // No-op.
-        }
-
-        /**
-         * @param cacheName Cache name.
-         * @param topVer Affinity topology version.
-         * @param primaryOnly Primary only flag.
-         */
-        private GlobalSizeCallable(String cacheName, AffinityTopologyVersion 
topVer, boolean primaryOnly) {
-            super(cacheName, topVer);
-
-            this.primaryOnly = primaryOnly;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Integer apply(Object o) {
-            waitAffinityReadyFuture();
-
-            IgniteInternalCache<Object, Object> cache = 
((IgniteEx)ignite).cachex(cacheName);
-
-            return primaryOnly ? cache.primarySize() : cache.size();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            super.writeExternal(out);
-
-            out.writeBoolean(primaryOnly);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            super.readExternal(in);
-
-            primaryOnly = in.readBoolean();
-        }
-    }
-
-    /**
      * Holder for last async operation future.
      */
     protected static class FutureHolder {
@@ -5709,7 +5545,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
     /**
      * Delayed callable class.
      */
-    protected static abstract class DelayedCallable<K, V> implements 
Externalizable {
+    protected static abstract class TopologyVersionAwareJob<K, V> extends 
ComputeJobAdapter {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -5730,22 +5566,37 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         /**
          * Empty constructor for serialization.
          */
-        public DelayedCallable() {
+        public TopologyVersionAwareJob() {
             // No-op.
         }
 
         /**
+         * @param cacheName Cache name.
          * @param topVer Affinity topology version.
          */
-        public DelayedCallable(String cacheName, AffinityTopologyVersion 
topVer) {
+        public TopologyVersionAwareJob(String cacheName, 
AffinityTopologyVersion topVer) {
             this.cacheName = cacheName;
             this.topVer = topVer;
         }
 
+        /** {@inheritDoc} */
+        @Nullable @Override public final Object execute() {
+            waitAffinityReadyFuture();
+
+            localExecute();
+
+            return null;
+        }
+
+        /**
+         * @return Local execution result.
+         */
+        @Nullable protected abstract Object localExecute();
+
         /**
          * Hold (suspend) job execution until our cache version becomes equal 
to remote cache's version.
          */
-        public void waitAffinityReadyFuture() {
+        private void waitAffinityReadyFuture() {
             GridCacheProcessor cacheProc = ((IgniteKernal) 
ignite).context().cache();
 
             GridCacheAdapter<K, V> cacheAdapter = 
cacheProc.internalCache(cacheName);
@@ -5768,21 +5619,5 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                 }
             }
         }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            U.writeString(out, cacheName);
-
-            topVer.writeExternal(out);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            cacheName = U.readString(in);
-
-            topVer = new AffinityTopologyVersion();
-
-            topVer.readExternal(in);
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/863d6bf6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 661df87..3a685cc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.task.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
 import org.apache.ignite.transactions.*;
 import org.jetbrains.annotations.*;
 
@@ -230,13 +231,23 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
      * operation on a cache with the given name.
      */
     @GridInternal
-    private static class GlobalRemoveAllCallable<K,V> extends 
DelayedCallable<K, V> implements Callable<Object> {
+    private static class GlobalRemoveAllCallable<K,V> implements 
Callable<Object>, Externalizable {
         /** */
         private static final long serialVersionUID = 0L;
 
+        /** Cache name. */
+        private String cacheName;
+
+        /** Topology version. */
+        private AffinityTopologyVersion topVer;
+
         /** Skip store flag. */
         private boolean skipStore;
 
+        /** Injected grid instance. */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
         /**
          * Empty constructor for serialization.
          */
@@ -250,7 +261,8 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
          * @param skipStore Skip store flag.
          */
         private GlobalRemoveAllCallable(String cacheName, @NotNull 
AffinityTopologyVersion topVer, boolean skipStore) {
-            super(cacheName, topVer);
+            this.cacheName = cacheName;
+            this.topVer = topVer;
             this.skipStore = skipStore;
         }
 
@@ -258,11 +270,11 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
          * {@inheritDoc}
          */
         @Override public Object call() throws Exception {
-            GridCacheAdapter<K, V> cacheAdapter = ((IgniteKernal) 
ignite).context().cache().internalCache(cacheName);
+            GridCacheAdapter<K, V> cacheAdapter = 
((IgniteKernal)ignite).context().cache().internalCache(cacheName);
 
             final GridCacheContext<K, V> ctx = cacheAdapter.context();
 
-            waitAffinityReadyFuture();
+            ctx.affinity().affinityReadyFuture(topVer).get();
 
             ctx.gate().enter();
 
@@ -326,15 +338,15 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
 
         /** {@inheritDoc} */
         @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            super.writeExternal(out);
-
+            U.writeString(out, cacheName);
+            out.writeObject(topVer);
             out.writeBoolean(skipStore);
         }
 
         /** {@inheritDoc} */
         @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            super.readExternal(in);
-
+            cacheName = U.readString(in);
+            topVer = (AffinityTopologyVersion)in.readObject();
             skipStore = in.readBoolean();
         }
     }

Reply via email to