#ignite-732: wip.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b65aa1d9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b65aa1d9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b65aa1d9 Branch: refs/heads/ignite-732 Commit: b65aa1d97730559864e464fff696bfa936f4117f Parents: ec4218e Author: ivasilinets <ivasilin...@gridgain.com> Authored: Thu Apr 30 17:47:57 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Thu Apr 30 17:47:57 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 323 ++++++++++--------- .../processors/cache/GridCacheProcessor.java | 2 + .../cache/GridCacheSizeTopologyChangedTest.java | 23 +- 3 files changed, 186 insertions(+), 162 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b65aa1d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index a574103..46d248a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -75,12 +75,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** */ private static final long serialVersionUID = 0L; + /** Filed result. */ + private static final Object FAIL = new Integer(-1); + /** clearLocally() split threshold. */ public static final int CLEAR_ALL_SPLIT_THRESHOLD = 10000; - /** Filed result. */ - protected static final GridCacheReturn FAIL = new GridCacheReturn(false, false); - /** Deserialization stash. */ private static final ThreadLocal<IgniteBiTuple<String, String>> stash = new ThreadLocal<IgniteBiTuple<String, String>>() { @@ -1086,7 +1086,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V // Clear local cache synchronously. clearLocally(); - clearRemotes(0, new GlobalClearAllCallable(name(), ctx.affinity().affinityTopologyVersion())); + clearRemotes(0, new GlobalRemoteClearAllCallable(name(), ctx.affinity().affinityTopologyVersion())); } /** {@inheritDoc} */ @@ -1094,7 +1094,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V // Clear local cache synchronously. clearLocally(key); - clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(), + clearRemotes(0, new GlobalRemoteClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(), Collections.singleton(key))); } @@ -1103,7 +1103,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V // Clear local cache synchronously. clearLocallyAll(keys); - clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(), + clearRemotes(0, new GlobalRemoteClearKeySetCallable<K, V>(name(), ctx.affinity().affinityTopologyVersion(), keys)); } @@ -4843,7 +4843,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * Internal callable which performs clear operation on a cache with the given name. */ @GridInternal - private static abstract class GlobalClearCallable extends VersionComparable implements Callable<Object> { + private static abstract class GlobalClearCallable extends VersionAwareCallable { /** * Empty constructor for serialization. */ @@ -4858,30 +4858,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V protected GlobalClearCallable(String cacheName, AffinityTopologyVersion topVer) { super(cacheName, topVer); } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - super.writeExternal(out); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - super.readExternal(in); - } } /** * Global clear all. */ @GridInternal - private static class GlobalClearAllCallable extends GlobalClearCallable { + private static class GlobalRemoteClearAllCallable extends GlobalClearCallable { /** */ private static final long serialVersionUID = 0L; /** * Empty constructor for serialization. */ - public GlobalClearAllCallable() { + public GlobalRemoteClearAllCallable() { // No-op. } @@ -4889,22 +4879,43 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param cacheName Cache name. * @param topVer Affinity topology version. */ - private GlobalClearAllCallable(String cacheName, AffinityTopologyVersion topVer) { + private GlobalRemoteClearAllCallable(String cacheName, AffinityTopologyVersion topVer) { super(cacheName, topVer); } /** {@inheritDoc} */ - @Override public Object call() throws Exception { - if (!compareTopologyVersions()) - return FAIL; - + @Override protected Object callLocal() { ((IgniteEx)ignite).cachex(cacheName).clearLocally(); - if (!compareTopologyVersions()) - return FAIL; - return null; } + + /** {@inheritDoc} */ + @Override protected Collection<ClusterNode> nodes(GridCacheContext ctx) { + return ctx.grid().cluster().forCacheNodes(ctx.name(), true, true, false).forRemotes().nodes(); + } + } + + /** + * Global clear all. + */ + @GridInternal + private static class GlobalClearAllCallable extends GlobalRemoteClearAllCallable { + /** */ + private static final long serialVersionUID = 0L; + + /** + * @param cacheName Cache name. + * @param topVer Affinity topology version. + */ + private GlobalClearAllCallable(String cacheName, AffinityTopologyVersion topVer) { + super(cacheName, topVer); + } + + /** {@inheritDoc} */ + @Override protected Collection<ClusterNode> nodes(GridCacheContext ctx) { + return ctx.grid().cluster().forCacheNodes(ctx.name(), true, true, false).nodes(); + } } /** @@ -4930,34 +4941,42 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param topVer Affinity topology version. * @param keys Keys to clear. */ - private GlobalClearKeySetCallable(String cacheName, AffinityTopologyVersion topVer, Set<? extends K> keys) { + protected GlobalClearKeySetCallable(String cacheName, AffinityTopologyVersion topVer, Set<? extends K> keys) { super(cacheName, topVer); this.keys = keys; } /** {@inheritDoc} */ - @Override public Object call() throws Exception { - if (!compareTopologyVersions()) - return FAIL; + @Override protected Collection<ClusterNode> nodes(GridCacheContext ctx) { + return ctx.grid().cluster().forCacheNodes(ctx.name(), true, true, false).nodes(); + } + /** {@inheritDoc} */ + @Override protected Object callLocal() { ((IgniteEx)ignite).<K, V>cachex(cacheName).clearLocallyAll(keys); return null; } + } - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - super.writeExternal(out); - - out.writeObject(keys); + /** + * Global clear keys. + */ + @GridInternal + private static class GlobalRemoteClearKeySetCallable<K, V> extends GlobalClearKeySetCallable { + /** + * @param cacheName Cache name. + * @param topVer Affinity topology version. + * @param keys Keys to clear. + */ + protected GlobalRemoteClearKeySetCallable(String cacheName, AffinityTopologyVersion topVer, Set<? extends K> keys) { + super(cacheName, topVer, keys); } /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - super.readExternal(in); - - keys = (Set<K>) in.readObject(); + @Override protected Collection<ClusterNode> nodes(GridCacheContext ctx) { + return ctx.grid().cluster().forCacheNodes(ctx.name(), true, true, false).forRemotes().nodes(); } } @@ -4965,13 +4984,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * Internal callable for global size calculation. */ @GridInternal - private static class SizeCallable extends VersionComparable implements IgniteClosure<Object, Integer> { + private static class SizeCallable extends VersionAwareCallable { /** */ private static final long serialVersionUID = 0L; /** Peek modes. */ private CachePeekMode[] peekModes; + /** Near enable. */ + private boolean nearEnable; + /** * Required by {@link Externalizable}. */ @@ -4984,18 +5006,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param topVer Affinity topology version. * @param peekModes Cache peek modes. */ - private SizeCallable(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes) { + private SizeCallable(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes, boolean nearEnable) { super(cacheName, topVer); this.peekModes = peekModes; + this.nearEnable = nearEnable; } /** {@inheritDoc} */ - @Override public Integer apply(Object o) { + @Override protected Object callLocal() { try { - if (!compareTopologyVersions()) - return -1; - IgniteInternalCache<Object, Object> cache = ((IgniteEx)ignite).cachex(cacheName); return cache == null ? 0 : cache.localSize(peekModes); @@ -5006,26 +5026,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @SuppressWarnings("ForLoopReplaceableByForEach") - @Override public void writeExternal(ObjectOutput out) throws IOException { - super.writeExternal(out); - - out.writeInt(peekModes.length); - - for (int i = 0; i < peekModes.length; i++) - U.writeEnum(out, peekModes[i]); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - super.readExternal(in); - - int len = in.readInt(); + @Override protected Collection<ClusterNode> nodes(GridCacheContext ctx) { + IgniteClusterEx cluster = ctx.grid().cluster(); - peekModes = new CachePeekMode[len]; + ClusterGroup grp = nearEnable ? cluster.forCacheNodes(ctx.name(), true, true, false) : cluster.forDataNodes(ctx.name()); - for (int i = 0; i < len; i++) - peekModes[i] = CachePeekMode.fromOrdinal(in.readByte()); + return grp.nodes(); } /** {@inheritDoc} */ @@ -5037,102 +5043,78 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * Cache size future. */ - private static class SizeFuture extends GridFutureAdapter<Integer> { - /** Peek modes. */ - private final CachePeekMode[] peekModes; - - /** Context. */ - private final GridCacheContext ctx; - - /** Near enable. */ - private final boolean near; - - /** Max retries count before issuing an error. */ - private int retries = 32; + private static class SizeFuture extends RetryFuture { + /** Size. */ + private int size = 0; /** * @param peekModes Peek modes. */ public SizeFuture(CachePeekMode[] peekModes, GridCacheContext ctx, boolean near) { - this.peekModes = peekModes; - this.ctx = ctx; - this.near = near; - - init(); + super(ctx, new SizeCallable(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes, near)); } - /** - * Init. - */ - private void init() { - IgniteClusterEx cluster = ctx.grid().cluster(); - - ClusterGroup grp = near ? - cluster.forCacheNodes(ctx.name(), true, true, false) : - cluster.forDataNodes(ctx.name()); - - Collection<ClusterNode> nodes = grp.nodes(); - - IgniteInternalFuture<Collection<Integer>> fut = - ctx.closures().broadcastNoFailover( - new SizeCallable(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes), null, nodes); + /** {@inheritDoc} */ + @Override protected void onInit() { + size = 0; + } - fut.listen(new IgniteInClosure<IgniteInternalFuture<Collection<Integer>>>() { - @Override public void apply(IgniteInternalFuture<Collection<Integer>> fut) { - try { - Collection<Integer> res = fut.get(); + /** {@inheritDoc} */ + @Override protected void onLocal(Object localRes) { + size += (Integer)localRes; + } - int size = 0; + /** {@inheritDoc} */ + @Override protected void allDone() { + onDone(size); + } + } - for (Integer locSize : res) { - if (locSize == -1) { - if (retries-- > 0) - init(); - else { - onDone(new ClusterTopologyException("Failed to wait topology.")); + /** + * Cache clear future. + */ + private static class ClearFuture extends RetryFuture { + /** + */ + public ClearFuture(GridCacheContext ctx, GlobalClearCallable clearCall) { + super(ctx, clearCall); + } - return; - } - } + /** {@inheritDoc} */ + @Override protected void onInit() { + // No-op. + } - size += locSize; - } + /** {@inheritDoc} */ + @Override protected void onLocal(Object localRes) { + // No-op. + } - onDone(size); - } - catch (IgniteCheckedException e) { - if (X.hasCause(e, ClusterTopologyException.class)) { - if (retries-- > 0) - init(); - else - onDone(e); - } - else - onDone(e); - } - } - }); + /** {@inheritDoc} */ + @Override protected void allDone() { + onDone(); } } /** - * Cache clear future. + * Retry future. */ - private static class ClearFuture extends GridFutureAdapter<Object> { + protected static abstract class RetryFuture<T> extends GridFutureAdapter<T> { /** Context. */ private final GridCacheContext ctx; - - private final GlobalClearCallable clearCall; + /** Callable. */ + private final VersionAwareCallable call; /** Max retries count before issuing an error. */ private volatile int retries = 32; /** */ - public ClearFuture(GridCacheContext ctx, GlobalClearCallable clearCall) { + public RetryFuture(GridCacheContext ctx, VersionAwareCallable call) { this.ctx = ctx; - this.clearCall = clearCall; + this.call = call; + init(); } @@ -5140,20 +5122,19 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * Init. */ private void init() { - Collection<ClusterNode> nodes = - ctx.grid().cluster().forCacheNodes(ctx.name(), true, true, false).forRemotes().nodes(); + Collection<ClusterNode> nodes = call.nodes(ctx); - clearCall.topologyVersion(ctx.affinity().affinityTopologyVersion()); + call.topologyVersion(ctx.affinity().affinityTopologyVersion()); IgniteInternalFuture<Collection<Object>> fut = ctx.closures().callAsyncNoFailover(BROADCAST, - F.asSet(clearCall), nodes, true); + F.asSet((Callable<Object>)call), nodes, true); fut.listen(new IgniteInClosure<IgniteInternalFuture<Collection<Object>>>() { @Override public void apply(IgniteInternalFuture<Collection<Object>> fut) { try { Collection<Object> res = fut.get(); - int size = 0; + onInit(); for (Object locRes : res) { if (locRes == FAIL) { @@ -5165,9 +5146,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return; } } + + onLocal(locRes); } - onDone(size); + allDone(); } catch (IgniteCheckedException e) { if (X.hasCause(e, ClusterTopologyException.class)) { @@ -5182,6 +5165,21 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } }); } + + /** + * Init reducer. + */ + protected abstract void onInit(); + + /** + * @param localRes Add local result to global result. + */ + protected abstract void onLocal(Object localRes); + + /** + * On done. + */ + protected abstract void allDone(); } /** @@ -5749,7 +5747,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * Delayed callable class. */ - protected static abstract class VersionComparable<K, V> implements Externalizable { + protected static abstract class VersionAwareCallable<K, V> implements Serializable, Callable<Object> { /** */ private static final long serialVersionUID = 0L; @@ -5766,18 +5764,45 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * Empty constructor for serialization. */ - public VersionComparable() { + public VersionAwareCallable() { // No-op. } /** * @param topVer Affinity topology version. */ - public VersionComparable(String cacheName, AffinityTopologyVersion topVer) { + public VersionAwareCallable(String cacheName, AffinityTopologyVersion topVer) { this.cacheName = cacheName; this.topVer = topVer; } + /** {@inheritDoc} */ + @Override public Object call() throws Exception { + if (!compareTopologyVersions()) + return FAIL; + + Object res = callLocal(); + + if (!compareTopologyVersions()) + return FAIL; + else + return res; + } + + /** + * Call local. + * + * @return Local result. + * @throws IgniteCheckedException If failed. + */ + protected abstract Object callLocal() throws IgniteCheckedException; + + /** + * @param ctx Grid cache context. + * @return Nodes to call. + */ + protected abstract Collection<ClusterNode> nodes(GridCacheContext ctx); + /** * Compare topology versions. */ @@ -5802,21 +5827,5 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V public void topologyVersion(AffinityTopologyVersion topVer) { this.topVer = topVer; } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, cacheName); - - topVer.writeExternal(out); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cacheName = U.readString(in); - - topVer = new AffinityTopologyVersion(); - - topVer.readExternal(in); - } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b65aa1d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index c0026ab..9003b11 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -739,6 +739,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { } } + + ctx.marshallerContext().onMarshallerCacheStarted(ctx); marshallerCache().context().preloader().syncFuture().listen(new CIX1<IgniteInternalFuture<?>>() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b65aa1d9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSizeTopologyChangedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSizeTopologyChangedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSizeTopologyChangedTest.java index ddc2b78..bac6536 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSizeTopologyChangedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSizeTopologyChangedTest.java @@ -61,15 +61,15 @@ public class GridCacheSizeTopologyChangedTest extends GridCommonAbstractTest { CacheConfiguration ccfg = new CacheConfiguration(); - ccfg.setAtomicityMode(ATOMIC); + ccfg.setAtomicityMode(ATOMIC);/* ccfg.setRebalanceMode(CacheRebalanceMode.SYNC); - ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);*/ ccfg.setCacheMode(PARTITIONED); - ccfg.setBackups(2); + ccfg.setBackups(1); cfg.setCacheConfiguration(defaultCacheConfiguration()); @@ -96,14 +96,27 @@ public class GridCacheSizeTopologyChangedTest extends GridCommonAbstractTest { IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() { @Override public Void call() throws Exception { - int idx = 2; while(!canceled.get()) { + int idx = rnd.nextInt(GRIDS_CNT); + + if (idx > 0) { + boolean state = status[idx]; + + if (state) { + System.out.println("!!! STOP GRID: " + idx); stopGrid(idx); + } + else { + System.out.println("!!! START GRID:" + idx); + startGrid(idx); + } + + status[idx] = !state; U.sleep(3000); + } } - return null; } });