Repository: incubator-ignite Updated Branches: refs/heads/ignite-834 [created] c62eb1177
#ignite-834: Add DelayedCallable. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c62eb117 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c62eb117 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c62eb117 Branch: refs/heads/ignite-834 Commit: c62eb117729261742e44daa5688718ea7bfafae5 Parents: edcf921 Author: ivasilinets <ivasilin...@gridgain.com> Authored: Wed Apr 29 18:34:15 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Wed Apr 29 18:34:15 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 185 ++++++++++++++----- .../GridDistributedCacheAdapter.java | 28 +-- 2 files changed, 143 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c62eb117/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 3f4e97b..427eafa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; @@ -1083,7 +1084,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V // Clear local cache synchronously. clearLocally(); - clearRemotes(0, new GlobalClearAllCallable(name())); + clearRemotes(0, new GlobalClearAllCallable(name(), ctx.affinity().affinityTopologyVersion())); } /** {@inheritDoc} */ @@ -1091,7 +1092,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V // Clear local cache synchronously. clearLocally(key); - clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), Collections.singleton(key))); + clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(), + Collections.singleton(key))); } /** {@inheritDoc} */ @@ -1099,17 +1101,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V // Clear local cache synchronously. clearLocallyAll(keys); - clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), keys)); + clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(), + keys)); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> clearAsync(K key) { - return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), Collections.singleton(key))); + return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(), + Collections.singleton(key))); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> clearAsync(Set<? extends K> keys) { - return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), keys)); + return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(), + keys)); } /** @@ -1149,7 +1154,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> clearAsync() { - return clearAsync(new GlobalClearAllCallable(name())); + return clearAsync(new GlobalClearAllCallable(name(), ctx.affinity().affinityTopologyVersion())); } /** @@ -3577,7 +3582,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return new GridFinishedFuture<>(0); IgniteInternalFuture<Collection<Integer>> fut = - ctx.closures().broadcastNoFailover(new SizeCallable(ctx.name(), peekModes), null, nodes); + ctx.closures().broadcastNoFailover( + new SizeCallable(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes), null, nodes); return fut.chain(new CX1<IgniteInternalFuture<Collection<Integer>>, Integer>() { @Override public Integer applyx(IgniteInternalFuture<Collection<Integer>> fut) @@ -3925,7 +3931,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (!nodes.isEmpty()) { ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, gridCfg.getNetworkTimeout()); - fut = ctx.closures().broadcastNoFailover(new GlobalSizeCallable(name(), primaryOnly), null, nodes); + fut = ctx.closures().broadcastNoFailover( + new GlobalSizeCallable(name(), ctx.affinity().affinityTopologyVersion(), primaryOnly), null, nodes); } // Get local value. @@ -4896,14 +4903,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * Internal callable which performs clear operation on a cache with the given name. */ @GridInternal - private static abstract class GlobalClearCallable implements Callable<Object>, Externalizable { - /** Cache name. */ - protected String cacheName; - - /** Injected grid instance. */ - @IgniteInstanceResource - protected Ignite ignite; - + private static abstract class GlobalClearCallable extends DelayedCallable implements Callable<Object> { /** * Empty constructor for serialization. */ @@ -4913,19 +4913,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * @param cacheName Cache name. + * @param topVer Affinity topology version. */ - protected GlobalClearCallable(String cacheName) { - this.cacheName = cacheName; + protected GlobalClearCallable(String cacheName, AffinityTopologyVersion topVer) { + super(cacheName, topVer); } /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, cacheName); + super.writeExternal(out); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cacheName = U.readString(in); + super.readExternal(in); } } @@ -4946,13 +4947,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * @param cacheName Cache name. + * @param topVer Affinity topology version. */ - private GlobalClearAllCallable(String cacheName) { - super(cacheName); + private GlobalClearAllCallable(String cacheName, AffinityTopologyVersion topVer) { + super(cacheName, topVer); } /** {@inheritDoc} */ @Override public Object call() throws Exception { + waitAffinityReadyFuture(); + ((IgniteEx)ignite).cachex(cacheName).clearLocally(); return null; @@ -4979,16 +4983,19 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * @param cacheName Cache name. + * @param topVer Affinity topology version. * @param keys Keys to clear. */ - private GlobalClearKeySetCallable(String cacheName, Set<? extends K> keys) { - super(cacheName); + private GlobalClearKeySetCallable(String cacheName, AffinityTopologyVersion topVer, Set<? extends K> keys) { + super(cacheName, topVer); this.keys = keys; } /** {@inheritDoc} */ @Override public Object call() throws Exception { + waitAffinityReadyFuture(); + ((IgniteEx)ignite).<K, V>cachex(cacheName).clearLocallyAll(keys); return null; @@ -5013,20 +5020,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * Internal callable for global size calculation. */ @GridInternal - private static class SizeCallable extends IgniteClosureX<Object, Integer> implements Externalizable { + private static class SizeCallable extends DelayedCallable implements IgniteClosure<Object, Integer> { /** */ private static final long serialVersionUID = 0L; - /** Cache name. */ - private String cacheName; - /** Peek modes. */ private CachePeekMode[] peekModes; - /** Injected grid instance. */ - @IgniteInstanceResource - private Ignite ignite; - /** * Required by {@link Externalizable}. */ @@ -5036,26 +5036,35 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * @param cacheName Cache name. + * @param topVer Affinity topology version. * @param peekModes Cache peek modes. */ - private SizeCallable(String cacheName, CachePeekMode[] peekModes) { - this.cacheName = cacheName; + private SizeCallable(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes) { + super(cacheName, topVer); + this.peekModes = peekModes; } /** {@inheritDoc} */ - @Override public Integer applyx(Object o) throws IgniteCheckedException { + @Override public Integer apply(Object o) { + waitAffinityReadyFuture(); + IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName); assert cache != null : cacheName; - return cache.localSize(peekModes); + try { + return cache.localSize(peekModes); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } } /** {@inheritDoc} */ @SuppressWarnings("ForLoopReplaceableByForEach") @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, cacheName); + super.writeExternal(out); out.writeInt(peekModes.length); @@ -5065,7 +5074,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cacheName = U.readString(in); + super.readExternal(in); int len = in.readInt(); @@ -5086,20 +5095,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * operation on a cache with the given name. */ @GridInternal - private static class GlobalSizeCallable implements IgniteClosure<Object, Integer>, Externalizable { + private static class GlobalSizeCallable extends DelayedCallable implements IgniteClosure<Object, Integer>, Externalizable { /** */ private static final long serialVersionUID = 0L; - /** Cache name. */ - private String cacheName; - /** Primary only flag. */ private boolean primaryOnly; - /** Injected grid instance. */ - @IgniteInstanceResource - private Ignite ignite; - /** * Empty constructor for serialization. */ @@ -5109,15 +5111,19 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * @param cacheName Cache name. + * @param topVer Affinity topology version. * @param primaryOnly Primary only flag. */ - private GlobalSizeCallable(String cacheName, boolean primaryOnly) { - this.cacheName = cacheName; + private GlobalSizeCallable(String cacheName, AffinityTopologyVersion topVer, boolean primaryOnly) { + super(cacheName, topVer); + this.primaryOnly = primaryOnly; } /** {@inheritDoc} */ @Override public Integer apply(Object o) { + waitAffinityReadyFuture(); + IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName); return primaryOnly ? cache.primarySize() : cache.size(); @@ -5125,13 +5131,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, cacheName); + super.writeExternal(out); + out.writeBoolean(primaryOnly); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cacheName = U.readString(in); + super.readExternal(in); + primaryOnly = in.readBoolean(); } } @@ -5697,4 +5705,81 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V metrics.addPutAndGetTimeNanos(System.nanoTime() - start); } } + + protected static abstract class DelayedCallable<K, V> implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Auto-inject job context. */ + @JobContextResource + protected ComputeJobContext jobCtx; + + /** Injected grid instance. */ + @IgniteInstanceResource + protected Ignite ignite; + + /** Affinity topology version. */ + protected AffinityTopologyVersion topVer; + + /** Cache name. */ + protected String cacheName; + + /** + * Empty constructor for serialization. + */ + public DelayedCallable() { + // No-op. + } + + /** + * @param topVer Affinity topology version. + */ + public DelayedCallable(String cacheName, AffinityTopologyVersion topVer) { + this.cacheName = cacheName; + this.topVer = topVer; + } + + /** + * Hold (suspend) job execution until our cache version becomes equal to remote cache's version. + */ + public void waitAffinityReadyFuture() { + GridCacheProcessor cacheProc = ((IgniteKernal) ignite).context().cache(); + + GridCacheAdapter<K, V> cacheAdapter = cacheProc.internalCache(cacheName); + + final GridCacheContext<K, V> ctx = cacheAdapter.context(); + + AffinityTopologyVersion locTopVer = ctx.affinity().affinityTopologyVersion(); + + if (locTopVer.compareTo(topVer) < 0) { + IgniteInternalFuture<?> fut = cacheProc.context().exchange().affinityReadyFuture(topVer); + + if (fut != null && !fut.isDone()) { + fut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> t) { + jobCtx.callcc(); + } + }); + + jobCtx.holdcc(); + } + } + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, cacheName); + + topVer.writeExternal(out); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + cacheName = U.readString(in); + + topVer = new AffinityTopologyVersion(); + + topVer.readExternal(in); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c62eb117/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index 3a685cc..661df87 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -32,7 +32,6 @@ import org.apache.ignite.internal.processors.task.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; @@ -231,23 +230,13 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter * operation on a cache with the given name. */ @GridInternal - private static class GlobalRemoveAllCallable<K,V> implements Callable<Object>, Externalizable { + private static class GlobalRemoveAllCallable<K,V> extends DelayedCallable<K, V> implements Callable<Object> { /** */ private static final long serialVersionUID = 0L; - /** Cache name. */ - private String cacheName; - - /** Topology version. */ - private AffinityTopologyVersion topVer; - /** Skip store flag. */ private boolean skipStore; - /** Injected grid instance. */ - @IgniteInstanceResource - private Ignite ignite; - /** * Empty constructor for serialization. */ @@ -261,8 +250,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter * @param skipStore Skip store flag. */ private GlobalRemoveAllCallable(String cacheName, @NotNull AffinityTopologyVersion topVer, boolean skipStore) { - this.cacheName = cacheName; - this.topVer = topVer; + super(cacheName, topVer); this.skipStore = skipStore; } @@ -270,11 +258,11 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter * {@inheritDoc} */ @Override public Object call() throws Exception { - GridCacheAdapter<K, V> cacheAdapter = ((IgniteKernal)ignite).context().cache().internalCache(cacheName); + GridCacheAdapter<K, V> cacheAdapter = ((IgniteKernal) ignite).context().cache().internalCache(cacheName); final GridCacheContext<K, V> ctx = cacheAdapter.context(); - ctx.affinity().affinityReadyFuture(topVer).get(); + waitAffinityReadyFuture(); ctx.gate().enter(); @@ -338,15 +326,15 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, cacheName); - out.writeObject(topVer); + super.writeExternal(out); + out.writeBoolean(skipStore); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cacheName = U.readString(in); - topVer = (AffinityTopologyVersion)in.readObject(); + super.readExternal(in); + skipStore = in.readBoolean(); } }