#ignite-732: wip.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ec4218e3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ec4218e3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ec4218e3 Branch: refs/heads/ignite-732 Commit: ec4218e3ad40c474a55318d9b5db699f4e945c8b Parents: 64637bf Author: ivasilinets <ivasilin...@gridgain.com> Authored: Thu Apr 30 15:20:17 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Thu Apr 30 15:20:17 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 273 +++++++++++++++---- .../GridDistributedCacheAdapter.java | 29 +- .../cache/GridCacheSizeTopologyChangedTest.java | 33 ++- 3 files changed, 250 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ec4218e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 83b5ca5..a574103 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -78,6 +78,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** clearLocally() split threshold. */ public static final int CLEAR_ALL_SPLIT_THRESHOLD = 10000; + /** Filed result. */ + protected static final GridCacheReturn FAIL = new GridCacheReturn(false, false); + /** Deserialization stash. */ private static final ThreadLocal<IgniteBiTuple<String, String>> stash = new ThreadLocal<IgniteBiTuple<String, String>>() { @@ -1083,7 +1086,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V // Clear local cache synchronously. clearLocally(); - clearRemotes(0, new GlobalClearAllCallable(name())); + clearRemotes(0, new GlobalClearAllCallable(name(), ctx.affinity().affinityTopologyVersion())); } /** {@inheritDoc} */ @@ -1091,7 +1094,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V // Clear local cache synchronously. clearLocally(key); - clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), Collections.singleton(key))); + clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(), + Collections.singleton(key))); } /** {@inheritDoc} */ @@ -1099,17 +1103,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V // Clear local cache synchronously. clearLocallyAll(keys); - clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), keys)); + clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(), + keys)); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> clearAsync(K key) { - return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), Collections.singleton(key))); + return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(), + Collections.singleton(key))); } /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> clearAsync(Set<? extends K> keys) { - return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), keys)); + return clearAsync(new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(), + keys)); } /** @@ -1129,7 +1136,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (!nodes.isEmpty()) { ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, timeout); - fut = ctx.closures().callAsyncNoFailover(BROADCAST, clearCall, nodes, true); + fut = new ClearFuture(ctx, clearCall); } if (fut != null) @@ -1149,7 +1156,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> clearAsync() { - return clearAsync(new GlobalClearAllCallable(name())); + return clearAsync(new GlobalClearAllCallable(name(), ctx.affinity().affinityTopologyVersion())); } /** @@ -1160,8 +1167,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(name(), true, true, false).nodes(); if (!nodes.isEmpty()) { - IgniteInternalFuture<Object> fut = - ctx.closures().callAsyncNoFailover(BROADCAST, clearCall, nodes, true); + IgniteInternalFuture<Object> fut = new ClearFuture(ctx, clearCall); return fut.chain(new CX1<IgniteInternalFuture<Object>, Object>() { @Override public Object applyx(IgniteInternalFuture<Object> fut) throws IgniteCheckedException { @@ -3576,7 +3582,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (nodes.isEmpty()) return new GridFinishedFuture<>(0); - return new SizeFuture(peekModes, ctx, nodes); + return new SizeFuture(peekModes, ctx, modes.near); } /** {@inheritDoc} */ @@ -4837,14 +4843,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * Internal callable which performs clear operation on a cache with the given name. */ @GridInternal - private static abstract class GlobalClearCallable implements Callable<Object>, Externalizable { - /** Cache name. */ - protected String cacheName; - - /** Injected grid instance. */ - @IgniteInstanceResource - protected Ignite ignite; - + private static abstract class GlobalClearCallable extends VersionComparable implements Callable<Object> { /** * Empty constructor for serialization. */ @@ -4854,19 +4853,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * @param cacheName Cache name. + * @param topVer Affinity topology version. */ - protected GlobalClearCallable(String cacheName) { - this.cacheName = cacheName; + protected GlobalClearCallable(String cacheName, AffinityTopologyVersion topVer) { + super(cacheName, topVer); } /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, cacheName); + super.writeExternal(out); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cacheName = U.readString(in); + super.readExternal(in); } } @@ -4887,15 +4887,22 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * @param cacheName Cache name. + * @param topVer Affinity topology version. */ - private GlobalClearAllCallable(String cacheName) { - super(cacheName); + private GlobalClearAllCallable(String cacheName, AffinityTopologyVersion topVer) { + super(cacheName, topVer); } /** {@inheritDoc} */ @Override public Object call() throws Exception { + if (!compareTopologyVersions()) + return FAIL; + ((IgniteEx)ignite).cachex(cacheName).clearLocally(); + if (!compareTopologyVersions()) + return FAIL; + return null; } } @@ -4920,16 +4927,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * @param cacheName Cache name. + * @param topVer Affinity topology version. * @param keys Keys to clear. */ - private GlobalClearKeySetCallable(String cacheName, Set<? extends K> keys) { - super(cacheName); + private GlobalClearKeySetCallable(String cacheName, AffinityTopologyVersion topVer, Set<? extends K> keys) { + super(cacheName, topVer); this.keys = keys; } /** {@inheritDoc} */ @Override public Object call() throws Exception { + if (!compareTopologyVersions()) + return FAIL; + ((IgniteEx)ignite).<K, V>cachex(cacheName).clearLocallyAll(keys); return null; @@ -4954,20 +4965,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * Internal callable for global size calculation. */ @GridInternal - private static class SizeCallable extends IgniteClosureX<Object, Integer> implements Externalizable { + private static class SizeCallable extends VersionComparable implements IgniteClosure<Object, Integer> { /** */ private static final long serialVersionUID = 0L; - /** Cache name. */ - private String cacheName; - /** Peek modes. */ private CachePeekMode[] peekModes; - /** Injected grid instance. */ - @IgniteInstanceResource - private Ignite ignite; - /** * Required by {@link Externalizable}. */ @@ -4977,24 +4981,34 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * @param cacheName Cache name. + * @param topVer Affinity topology version. * @param peekModes Cache peek modes. */ - private SizeCallable(String cacheName, CachePeekMode[] peekModes) { - this.cacheName = cacheName; + private SizeCallable(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes) { + super(cacheName, topVer); + this.peekModes = peekModes; } /** {@inheritDoc} */ - @Override public Integer applyx(Object o) throws IgniteCheckedException { - IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName); + @Override public Integer apply(Object o) { + try { + if (!compareTopologyVersions()) + return -1; + + IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName); - return cache == null ? 0 : cache.localSize(peekModes); + return cache == null ? 0 : cache.localSize(peekModes); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } } /** {@inheritDoc} */ @SuppressWarnings("ForLoopReplaceableByForEach") @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, cacheName); + super.writeExternal(out); out.writeInt(peekModes.length); @@ -5004,7 +5018,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cacheName = U.readString(in); + super.readExternal(in); int len = in.readInt(); @@ -5030,8 +5044,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** Context. */ private final GridCacheContext ctx; - /** Nodes. */ - private final Collection<ClusterNode> nodes; + /** Near enable. */ + private final boolean near; /** Max retries count before issuing an error. */ private int retries = 32; @@ -5039,10 +5053,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * @param peekModes Peek modes. */ - public SizeFuture(CachePeekMode[] peekModes, GridCacheContext ctx, Collection<ClusterNode> nodes) { + public SizeFuture(CachePeekMode[] peekModes, GridCacheContext ctx, boolean near) { this.peekModes = peekModes; this.ctx = ctx; - this.nodes = nodes; + this.near = near; init(); } @@ -5051,8 +5065,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * Init. */ private void init() { + IgniteClusterEx cluster = ctx.grid().cluster(); + + ClusterGroup grp = near ? + cluster.forCacheNodes(ctx.name(), true, true, false) : + cluster.forDataNodes(ctx.name()); + + Collection<ClusterNode> nodes = grp.nodes(); + IgniteInternalFuture<Collection<Integer>> fut = - ctx.closures().broadcastNoFailover(new SizeCallable(ctx.name(), peekModes), null, nodes); + ctx.closures().broadcastNoFailover( + new SizeCallable(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes), null, nodes); fut.listen(new IgniteInClosure<IgniteInternalFuture<Collection<Integer>>>() { @Override public void apply(IgniteInternalFuture<Collection<Integer>> fut) { @@ -5061,13 +5084,93 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V int size = 0; - for (Integer locSize : res) + for (Integer locSize : res) { + if (locSize == -1) { + if (retries-- > 0) + init(); + else { + onDone(new ClusterTopologyException("Failed to wait topology.")); + + return; + } + } + size += locSize; + } + + onDone(size); + } + catch (IgniteCheckedException e) { + if (X.hasCause(e, ClusterTopologyException.class)) { + if (retries-- > 0) + init(); + else + onDone(e); + } + else + onDone(e); + } + } + }); + } + } + + /** + * Cache clear future. + */ + private static class ClearFuture extends GridFutureAdapter<Object> { + /** Context. */ + private final GridCacheContext ctx; + + + private final GlobalClearCallable clearCall; + + /** Max retries count before issuing an error. */ + private volatile int retries = 32; + + /** + */ + public ClearFuture(GridCacheContext ctx, GlobalClearCallable clearCall) { + this.ctx = ctx; + this.clearCall = clearCall; + init(); + } + + /** + * Init. + */ + private void init() { + Collection<ClusterNode> nodes = + ctx.grid().cluster().forCacheNodes(ctx.name(), true, true, false).forRemotes().nodes(); + + clearCall.topologyVersion(ctx.affinity().affinityTopologyVersion()); + + IgniteInternalFuture<Collection<Object>> fut = ctx.closures().callAsyncNoFailover(BROADCAST, + F.asSet(clearCall), nodes, true); + + fut.listen(new IgniteInClosure<IgniteInternalFuture<Collection<Object>>>() { + @Override public void apply(IgniteInternalFuture<Collection<Object>> fut) { + try { + Collection<Object> res = fut.get(); + + int size = 0; + + for (Object locRes : res) { + if (locRes == FAIL) { + if (retries-- > 0) + init(); + else { + onDone(new ClusterTopologyException("Failed to wait topology.")); + + return; + } + } + } onDone(size); } catch (IgniteCheckedException e) { - if (X.hasCause(e, ClusterTopologyException.class, NullPointerException.class)) { + if (X.hasCause(e, ClusterTopologyException.class)) { if (retries-- > 0) init(); else @@ -5642,4 +5745,78 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V metrics.addPutAndGetTimeNanos(System.nanoTime() - start); } } + + /** + * Delayed callable class. + */ + protected static abstract class VersionComparable<K, V> implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Injected grid instance. */ + @IgniteInstanceResource + protected Ignite ignite; + + /** Affinity topology version. */ + protected AffinityTopologyVersion topVer; + + /** Cache name. */ + protected String cacheName; + + /** + * Empty constructor for serialization. + */ + public VersionComparable() { + // No-op. + } + + /** + * @param topVer Affinity topology version. + */ + public VersionComparable(String cacheName, AffinityTopologyVersion topVer) { + this.cacheName = cacheName; + this.topVer = topVer; + } + + /** + * Compare topology versions. + */ + public boolean compareTopologyVersions() { + GridCacheProcessor cacheProc = ((IgniteKernal) ignite).context().cache(); + + GridCacheAdapter<K, V> cacheAdapter = cacheProc.internalCache(cacheName); + + if (cacheAdapter == null) + return false; + + final GridCacheContext<K, V> ctx = cacheAdapter.context(); + + AffinityTopologyVersion locTopVer = ctx.affinity().affinityTopologyVersion(); + + return locTopVer.compareTo(topVer) == 0; + } + + /** + * @param topVer Affinity topology version. + */ + public void topologyVersion(AffinityTopologyVersion topVer) { + this.topVer = topVer; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, cacheName); + + topVer.writeExternal(out); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + cacheName = U.readString(in); + + topVer = new AffinityTopologyVersion(); + + topVer.readExternal(in); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ec4218e3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index 3a685cc..80aa809 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -32,7 +32,6 @@ import org.apache.ignite.internal.processors.task.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; @@ -231,23 +230,13 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter * operation on a cache with the given name. */ @GridInternal - private static class GlobalRemoveAllCallable<K,V> implements Callable<Object>, Externalizable { + private static class GlobalRemoveAllCallable<K,V> extends VersionComparable<K, V> implements Callable<Object> { /** */ private static final long serialVersionUID = 0L; - /** Cache name. */ - private String cacheName; - - /** Topology version. */ - private AffinityTopologyVersion topVer; - /** Skip store flag. */ private boolean skipStore; - /** Injected grid instance. */ - @IgniteInstanceResource - private Ignite ignite; - /** * Empty constructor for serialization. */ @@ -261,8 +250,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter * @param skipStore Skip store flag. */ private GlobalRemoveAllCallable(String cacheName, @NotNull AffinityTopologyVersion topVer, boolean skipStore) { - this.cacheName = cacheName; - this.topVer = topVer; + super(cacheName, topVer); this.skipStore = skipStore; } @@ -270,12 +258,13 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter * {@inheritDoc} */ @Override public Object call() throws Exception { + if (!compareTopologyVersions()) + return null; + GridCacheAdapter<K, V> cacheAdapter = ((IgniteKernal)ignite).context().cache().internalCache(cacheName); final GridCacheContext<K, V> ctx = cacheAdapter.context(); - ctx.affinity().affinityReadyFuture(topVer).get(); - ctx.gate().enter(); try { @@ -338,15 +327,15 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, cacheName); - out.writeObject(topVer); + super.writeExternal(out); + out.writeBoolean(skipStore); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cacheName = U.readString(in); - topVer = (AffinityTopologyVersion)in.readObject(); + super.readExternal(in); + skipStore = in.readBoolean(); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ec4218e3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSizeTopologyChangedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSizeTopologyChangedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSizeTopologyChangedTest.java index c04461b..ddc2b78 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSizeTopologyChangedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSizeTopologyChangedTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -62,9 +63,13 @@ public class GridCacheSizeTopologyChangedTest extends GridCommonAbstractTest { ccfg.setAtomicityMode(ATOMIC); + ccfg.setRebalanceMode(CacheRebalanceMode.SYNC); + + ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + ccfg.setCacheMode(PARTITIONED); - ccfg.setBackups(1); + ccfg.setBackups(2); cfg.setCacheConfiguration(defaultCacheConfiguration()); @@ -91,27 +96,14 @@ public class GridCacheSizeTopologyChangedTest extends GridCommonAbstractTest { IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() { @Override public Void call() throws Exception { + int idx = 2; while(!canceled.get()) { - int idx = rnd.nextInt(GRIDS_CNT); - - if (idx > 0) { - boolean state = status[idx]; - - if (state) { - System.out.println("!!! STOP GRID: " + idx); stopGrid(idx); - } - else { - System.out.println("!!! START GRID:" + idx); - startGrid(idx); - } - - status[idx] = !state; - U.sleep(1000); - } + U.sleep(3000); } + return null; } }); @@ -128,6 +120,13 @@ public class GridCacheSizeTopologyChangedTest extends GridCommonAbstractTest { if (i % 1000 == 0) System.out.println("!!! Keys added: " + i + ", size: " + size); + + + + if (i % 1000 == 0) { + U.sleep(1000); + System.out.println("!!! Keys added: " + i + ", size: " + cache.size()); + } } canceled.set(true);