Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-834 [created] c62eb1177


#ignite-834: Add DelayedCallable.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c62eb117
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c62eb117
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c62eb117

Branch: refs/heads/ignite-834
Commit: c62eb117729261742e44daa5688718ea7bfafae5
Parents: edcf921
Author: ivasilinets <ivasilin...@gridgain.com>
Authored: Wed Apr 29 18:34:15 2015 +0300
Committer: ivasilinets <ivasilin...@gridgain.com>
Committed: Wed Apr 29 18:34:15 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      | 185 ++++++++++++++-----
 .../GridDistributedCacheAdapter.java            |  28 +--
 2 files changed, 143 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c62eb117/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 3f4e97b..427eafa 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
@@ -1083,7 +1084,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         // Clear local cache synchronously.
         clearLocally();
 
-        clearRemotes(0, new GlobalClearAllCallable(name()));
+        clearRemotes(0, new GlobalClearAllCallable(name(), 
ctx.affinity().affinityTopologyVersion()));
     }
 
     /** {@inheritDoc} */
@@ -1091,7 +1092,8 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         // Clear local cache synchronously.
         clearLocally(key);
 
-        clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), 
Collections.singleton(key)));
+        clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), 
ctx.affinity().affinityTopologyVersion(),
+            Collections.singleton(key)));
     }
 
     /** {@inheritDoc} */
@@ -1099,17 +1101,20 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         // Clear local cache synchronously.
         clearLocallyAll(keys);
 
-        clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), keys));
+        clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), 
ctx.affinity().affinityTopologyVersion(),
+            keys));
     }
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> clearAsync(K key) {
-        return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), 
Collections.singleton(key)));
+        return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), 
ctx.affinity().affinityTopologyVersion(),
+            Collections.singleton(key)));
     }
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> clearAsync(Set<? extends K> keys) 
{
-        return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), keys));
+        return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), 
ctx.affinity().affinityTopologyVersion(),
+            keys));
     }
 
     /**
@@ -1149,7 +1154,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
     /** {@inheritDoc} */
     @Override public IgniteInternalFuture<?> clearAsync() {
-        return clearAsync(new GlobalClearAllCallable(name()));
+        return clearAsync(new GlobalClearAllCallable(name(), 
ctx.affinity().affinityTopologyVersion()));
     }
 
     /**
@@ -3577,7 +3582,8 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
             return new GridFinishedFuture<>(0);
 
         IgniteInternalFuture<Collection<Integer>> fut =
-            ctx.closures().broadcastNoFailover(new SizeCallable(ctx.name(), 
peekModes), null, nodes);
+            ctx.closures().broadcastNoFailover(
+                new SizeCallable(ctx.name(), 
ctx.affinity().affinityTopologyVersion(), peekModes), null, nodes);
 
         return fut.chain(new CX1<IgniteInternalFuture<Collection<Integer>>, 
Integer>() {
             @Override public Integer 
applyx(IgniteInternalFuture<Collection<Integer>> fut)
@@ -3925,7 +3931,8 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
             if (!nodes.isEmpty()) {
                 ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, 
gridCfg.getNetworkTimeout());
 
-                fut = ctx.closures().broadcastNoFailover(new 
GlobalSizeCallable(name(), primaryOnly), null, nodes);
+                fut = ctx.closures().broadcastNoFailover(
+                    new GlobalSizeCallable(name(), 
ctx.affinity().affinityTopologyVersion(), primaryOnly), null, nodes);
             }
 
             // Get local value.
@@ -4896,14 +4903,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
      * Internal callable which performs clear operation on a cache with the 
given name.
      */
     @GridInternal
-    private static abstract class GlobalClearCallable implements 
Callable<Object>, Externalizable {
-        /** Cache name. */
-        protected String cacheName;
-
-        /** Injected grid instance. */
-        @IgniteInstanceResource
-        protected Ignite ignite;
-
+    private static abstract class GlobalClearCallable extends DelayedCallable 
implements Callable<Object> {
         /**
          * Empty constructor for serialization.
          */
@@ -4913,19 +4913,20 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
         /**
          * @param cacheName Cache name.
+         * @param topVer Affinity topology version.
          */
-        protected GlobalClearCallable(String cacheName) {
-            this.cacheName = cacheName;
+        protected GlobalClearCallable(String cacheName, 
AffinityTopologyVersion topVer) {
+            super(cacheName, topVer);
         }
 
         /** {@inheritDoc} */
         @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            U.writeString(out, cacheName);
+            super.writeExternal(out);
         }
 
         /** {@inheritDoc} */
         @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            cacheName = U.readString(in);
+            super.readExternal(in);
         }
     }
 
@@ -4946,13 +4947,16 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
         /**
          * @param cacheName Cache name.
+         * @param topVer Affinity topology version.
          */
-        private GlobalClearAllCallable(String cacheName) {
-            super(cacheName);
+        private GlobalClearAllCallable(String cacheName, 
AffinityTopologyVersion topVer) {
+            super(cacheName, topVer);
         }
 
         /** {@inheritDoc} */
         @Override public Object call() throws Exception {
+            waitAffinityReadyFuture();
+
             ((IgniteEx)ignite).cachex(cacheName).clearLocally();
 
             return null;
@@ -4979,16 +4983,19 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
         /**
          * @param cacheName Cache name.
+         * @param topVer Affinity topology version.
          * @param keys Keys to clear.
          */
-        private GlobalClearKeySetCallable(String cacheName, Set<? extends K> 
keys) {
-            super(cacheName);
+        private GlobalClearKeySetCallable(String cacheName, 
AffinityTopologyVersion topVer, Set<? extends K> keys) {
+            super(cacheName, topVer);
 
             this.keys = keys;
         }
 
         /** {@inheritDoc} */
         @Override public Object call() throws Exception {
+            waitAffinityReadyFuture();
+
             ((IgniteEx)ignite).<K, V>cachex(cacheName).clearLocallyAll(keys);
 
             return null;
@@ -5013,20 +5020,13 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
      * Internal callable for global size calculation.
      */
     @GridInternal
-    private static class SizeCallable extends IgniteClosureX<Object, Integer> 
implements Externalizable {
+    private static class SizeCallable extends DelayedCallable implements 
IgniteClosure<Object, Integer> {
         /** */
         private static final long serialVersionUID = 0L;
 
-        /** Cache name. */
-        private String cacheName;
-
         /** Peek modes. */
         private CachePeekMode[] peekModes;
 
-        /** Injected grid instance. */
-        @IgniteInstanceResource
-        private Ignite ignite;
-
         /**
          * Required by {@link Externalizable}.
          */
@@ -5036,26 +5036,35 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
         /**
          * @param cacheName Cache name.
+         * @param topVer Affinity topology version.
          * @param peekModes Cache peek modes.
          */
-        private SizeCallable(String cacheName, CachePeekMode[] peekModes) {
-            this.cacheName = cacheName;
+        private SizeCallable(String cacheName, AffinityTopologyVersion topVer, 
CachePeekMode[] peekModes) {
+            super(cacheName, topVer);
+
             this.peekModes = peekModes;
         }
 
         /** {@inheritDoc} */
-        @Override public Integer applyx(Object o) throws 
IgniteCheckedException {
+        @Override public Integer apply(Object o) {
+            waitAffinityReadyFuture();
+
             IgniteInternalCache<Object, Object> cache = 
((IgniteEx)ignite).cachex(cacheName);
 
             assert cache != null : cacheName;
 
-            return cache.localSize(peekModes);
+            try {
+                return cache.localSize(peekModes);
+            }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
         }
 
         /** {@inheritDoc} */
         @SuppressWarnings("ForLoopReplaceableByForEach")
         @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            U.writeString(out, cacheName);
+            super.writeExternal(out);
 
             out.writeInt(peekModes.length);
 
@@ -5065,7 +5074,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
         /** {@inheritDoc} */
         @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            cacheName = U.readString(in);
+            super.readExternal(in);
 
             int len = in.readInt();
 
@@ -5086,20 +5095,13 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
      * operation on a cache with the given name.
      */
     @GridInternal
-    private static class GlobalSizeCallable implements IgniteClosure<Object, 
Integer>, Externalizable {
+    private static class GlobalSizeCallable extends DelayedCallable implements 
IgniteClosure<Object, Integer>, Externalizable {
         /** */
         private static final long serialVersionUID = 0L;
 
-        /** Cache name. */
-        private String cacheName;
-
         /** Primary only flag. */
         private boolean primaryOnly;
 
-        /** Injected grid instance. */
-        @IgniteInstanceResource
-        private Ignite ignite;
-
         /**
          * Empty constructor for serialization.
          */
@@ -5109,15 +5111,19 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
         /**
          * @param cacheName Cache name.
+         * @param topVer Affinity topology version.
          * @param primaryOnly Primary only flag.
          */
-        private GlobalSizeCallable(String cacheName, boolean primaryOnly) {
-            this.cacheName = cacheName;
+        private GlobalSizeCallable(String cacheName, AffinityTopologyVersion 
topVer, boolean primaryOnly) {
+            super(cacheName, topVer);
+
             this.primaryOnly = primaryOnly;
         }
 
         /** {@inheritDoc} */
         @Override public Integer apply(Object o) {
+            waitAffinityReadyFuture();
+
             IgniteInternalCache<Object, Object> cache = 
((IgniteEx)ignite).cachex(cacheName);
 
             return primaryOnly ? cache.primarySize() : cache.size();
@@ -5125,13 +5131,15 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
         /** {@inheritDoc} */
         @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            U.writeString(out, cacheName);
+            super.writeExternal(out);
+
             out.writeBoolean(primaryOnly);
         }
 
         /** {@inheritDoc} */
         @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            cacheName = U.readString(in);
+            super.readExternal(in);
+
             primaryOnly = in.readBoolean();
         }
     }
@@ -5697,4 +5705,81 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
             metrics.addPutAndGetTimeNanos(System.nanoTime() - start);
         }
     }
+
+    protected static abstract class DelayedCallable<K, V> implements 
Externalizable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Auto-inject job context. */
+        @JobContextResource
+        protected ComputeJobContext jobCtx;
+
+        /** Injected grid instance. */
+        @IgniteInstanceResource
+        protected Ignite ignite;
+
+        /** Affinity topology version. */
+        protected AffinityTopologyVersion topVer;
+
+        /** Cache name. */
+        protected String cacheName;
+
+        /**
+         * Empty constructor for serialization.
+         */
+        public DelayedCallable() {
+            // No-op.
+        }
+
+        /**
+         * @param topVer Affinity topology version.
+         */
+        public DelayedCallable(String cacheName, AffinityTopologyVersion 
topVer) {
+            this.cacheName = cacheName;
+            this.topVer = topVer;
+        }
+
+        /**
+         * Hold (suspend) job execution until our cache version becomes equal 
to remote cache's version.
+         */
+        public void waitAffinityReadyFuture() {
+            GridCacheProcessor cacheProc = ((IgniteKernal) 
ignite).context().cache();
+
+            GridCacheAdapter<K, V> cacheAdapter = 
cacheProc.internalCache(cacheName);
+
+            final GridCacheContext<K, V> ctx = cacheAdapter.context();
+
+            AffinityTopologyVersion locTopVer = 
ctx.affinity().affinityTopologyVersion();
+
+            if (locTopVer.compareTo(topVer) < 0) {
+                IgniteInternalFuture<?> fut = 
cacheProc.context().exchange().affinityReadyFuture(topVer);
+
+                if (fut != null && !fut.isDone()) {
+                    fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                        @Override public void apply(IgniteInternalFuture<?> t) 
{
+                            jobCtx.callcc();
+                        }
+                    });
+
+                    jobCtx.holdcc();
+                }
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
+            U.writeString(out, cacheName);
+
+            topVer.writeExternal(out);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+            cacheName = U.readString(in);
+
+            topVer = new AffinityTopologyVersion();
+
+            topVer.readExternal(in);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c62eb117/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 3a685cc..661df87 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -32,7 +32,6 @@ import org.apache.ignite.internal.processors.task.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
-import org.apache.ignite.resources.*;
 import org.apache.ignite.transactions.*;
 import org.jetbrains.annotations.*;
 
@@ -231,23 +230,13 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
      * operation on a cache with the given name.
      */
     @GridInternal
-    private static class GlobalRemoveAllCallable<K,V> implements 
Callable<Object>, Externalizable {
+    private static class GlobalRemoveAllCallable<K,V> extends 
DelayedCallable<K, V> implements Callable<Object> {
         /** */
         private static final long serialVersionUID = 0L;
 
-        /** Cache name. */
-        private String cacheName;
-
-        /** Topology version. */
-        private AffinityTopologyVersion topVer;
-
         /** Skip store flag. */
         private boolean skipStore;
 
-        /** Injected grid instance. */
-        @IgniteInstanceResource
-        private Ignite ignite;
-
         /**
          * Empty constructor for serialization.
          */
@@ -261,8 +250,7 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
          * @param skipStore Skip store flag.
          */
         private GlobalRemoveAllCallable(String cacheName, @NotNull 
AffinityTopologyVersion topVer, boolean skipStore) {
-            this.cacheName = cacheName;
-            this.topVer = topVer;
+            super(cacheName, topVer);
             this.skipStore = skipStore;
         }
 
@@ -270,11 +258,11 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
          * {@inheritDoc}
          */
         @Override public Object call() throws Exception {
-            GridCacheAdapter<K, V> cacheAdapter = 
((IgniteKernal)ignite).context().cache().internalCache(cacheName);
+            GridCacheAdapter<K, V> cacheAdapter = ((IgniteKernal) 
ignite).context().cache().internalCache(cacheName);
 
             final GridCacheContext<K, V> ctx = cacheAdapter.context();
 
-            ctx.affinity().affinityReadyFuture(topVer).get();
+            waitAffinityReadyFuture();
 
             ctx.gate().enter();
 
@@ -338,15 +326,15 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
 
         /** {@inheritDoc} */
         @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            U.writeString(out, cacheName);
-            out.writeObject(topVer);
+            super.writeExternal(out);
+
             out.writeBoolean(skipStore);
         }
 
         /** {@inheritDoc} */
         @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            cacheName = U.readString(in);
-            topVer = (AffinityTopologyVersion)in.readObject();
+            super.readExternal(in);
+
             skipStore = in.readBoolean();
         }
     }

Reply via email to