# ignite-sprint-3 minor (fixed test, use static classes in CacheObjectProcessor)
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8a86e387 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8a86e387 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8a86e387 Branch: refs/heads/gg-9998 Commit: 8a86e387be58d670acfdf06e4d9e83fe657106c9 Parents: b7659f3 Author: sboikov <semen.boi...@inria.fr> Authored: Wed Apr 1 22:11:37 2015 +0300 Committer: sboikov <semen.boi...@inria.fr> Committed: Wed Apr 1 22:19:58 2015 +0300 ---------------------------------------------------------------------- .../IgniteCacheObjectProcessorImpl.java | 165 ++++++++++++------- .../IgniteCacheStoreValueAbstractTest.java | 92 +++++++---- .../junits/common/GridCommonAbstractTest.java | 2 +- 3 files changed, 167 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a86e387/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java index a9b7c00..f65b7bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java @@ -26,7 +26,6 @@ import org.apache.ignite.internal.processors.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.query.*; import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; @@ -114,34 +113,7 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme if (!userObj) return new KeyCacheObjectImpl(obj, null); - return new KeyCacheObjectImpl(obj, null) { - @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) { - return super.value(ctx, false); // Do not need copy since user value is not in cache. - } - - @Override public CacheObject prepareForCache(CacheObjectContext ctx) { - try { - if (!ctx.processor().immutable(val)) { - if (valBytes == null) - valBytes = ctx.processor().marshal(ctx, val); - - ClassLoader ldr = ctx.p2pEnabled() ? - IgniteUtils.detectClass(this.val).getClassLoader() : val.getClass().getClassLoader(); - - Object val = ctx.processor().unmarshal(ctx, - valBytes, - ldr); - - return new KeyCacheObjectImpl(val, valBytes); - } - - return new KeyCacheObjectImpl(val, valBytes); - } - catch (IgniteCheckedException e) { - throw new IgniteException("Failed to marshal object: " + val, e); - } - } - }; + return new UserKeyCacheObjectImpl(obj); } /** {@inheritDoc} */ @@ -208,23 +180,13 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme if (!userObj) return new CacheObjectByteArrayImpl((byte[])obj); - return new CacheObjectByteArrayImpl((byte[]) obj) { - @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) { - return super.value(ctx, false); // Do not need copy since user value is not in cache. - } - - @Override public CacheObject prepareForCache(CacheObjectContext ctx) { - byte[] valCpy = Arrays.copyOf(val, val.length); - - return new CacheObjectByteArrayImpl(valCpy); - } - }; + return new UserCacheObjectByteArrayImpl((byte[])obj); } if (!userObj) return new CacheObjectImpl(obj, null); - return new IgniteCacheObjectImpl(obj, null); + return new UserCacheObjectImpl(obj, null); } /** {@inheritDoc} */ @@ -313,16 +275,65 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme } /** - * + * Wraps key provided by user, must be serialized before stored in cache. */ - private static class IgniteCacheObjectImpl extends CacheObjectImpl { + private static class UserKeyCacheObjectImpl extends KeyCacheObjectImpl { /** */ private static final long serialVersionUID = 0L; /** * */ - public IgniteCacheObjectImpl() { + public UserKeyCacheObjectImpl() { + //No-op. + } + + /** + * @param key Key. + */ + UserKeyCacheObjectImpl(Object key) { + super(key, null); + } + + /** {@inheritDoc} */ + @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) { + return super.value(ctx, false); // Do not need copy since user value is not in cache. + } + + /** {@inheritDoc} */ + @Override public CacheObject prepareForCache(CacheObjectContext ctx) { + try { + if (!ctx.processor().immutable(val)) { + if (valBytes == null) + valBytes = ctx.processor().marshal(ctx, val); + + ClassLoader ldr = ctx.p2pEnabled() ? + IgniteUtils.detectClass(this.val).getClassLoader() : val.getClass().getClassLoader(); + + Object val = ctx.processor().unmarshal(ctx, valBytes, ldr); + + return new KeyCacheObjectImpl(val, valBytes); + } + + return new KeyCacheObjectImpl(val, valBytes); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to marshal object: " + val, e); + } + } + } + + /** + * Wraps value provided by user, must be serialized before stored in cache. + */ + private static class UserCacheObjectImpl extends CacheObjectImpl { + /** */ + private static final long serialVersionUID = 0L; + + /** + * + */ + public UserCacheObjectImpl() { //No-op. } @@ -330,7 +341,7 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme * @param val Value. * @param valBytes Value bytes. */ - public IgniteCacheObjectImpl(Object val, byte[] valBytes) { + public UserCacheObjectImpl(Object val, byte[] valBytes) { super(val, valBytes); } @@ -341,28 +352,58 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme /** {@inheritDoc} */ @Override public CacheObject prepareForCache(CacheObjectContext ctx) { - if (!ctx.processor().immutable(val)) { - try { - if (valBytes == null) - valBytes = ctx.processor().marshal(ctx, val); + try { + if (valBytes == null) + valBytes = ctx.processor().marshal(ctx, val); - if (ctx.storeValue()) { - ClassLoader ldr = ctx.p2pEnabled() ? - IgniteUtils.detectClass(this.val).getClassLoader() : val.getClass().getClassLoader(); + if (ctx.storeValue()) { + ClassLoader ldr = ctx.p2pEnabled() ? + IgniteUtils.detectClass(this.val).getClassLoader() : val.getClass().getClassLoader(); - Object val = ctx.processor().unmarshal(ctx, valBytes, ldr); + Object val = ctx.processor().unmarshal(ctx, valBytes, ldr); - return new CacheObjectImpl(val, valBytes); - } - - return new CacheObjectImpl(null, valBytes); - } - catch (IgniteCheckedException e) { - throw new IgniteException("Failed to marshal object: " + val, e); + return new CacheObjectImpl(val, valBytes); } + + return new CacheObjectImpl(null, valBytes); } - else - return new CacheObjectImpl(val, valBytes); + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to marshal object: " + val, e); + } + } + } + + /** + * Wraps value provided by user, must be copied before stored in cache. + */ + private static class UserCacheObjectByteArrayImpl extends CacheObjectByteArrayImpl { + /** */ + private static final long serialVersionUID = 0L; + + /** + * + */ + public UserCacheObjectByteArrayImpl() { + // No-op. + } + + /** + * @param val Value. + */ + public UserCacheObjectByteArrayImpl(byte[] val) { + super(val); + } + + /** {@inheritDoc} */ + @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) { + return super.value(ctx, false); // Do not need copy since user value is not in cache. + } + + /** {@inheritDoc} */ + @Override public CacheObject prepareForCache(CacheObjectContext ctx) { + byte[] valCpy = Arrays.copyOf(val, val.length); + + return new CacheObjectByteArrayImpl(valCpy); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a86e387/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStoreValueAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStoreValueAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStoreValueAbstractTest.java index 9fa24cc..9552258 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStoreValueAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStoreValueAbstractTest.java @@ -62,7 +62,7 @@ public abstract class IgniteCacheStoreValueAbstractTest extends IgniteCacheAbstr ccfg.setCacheStoreFactory(singletonFactory(new CacheStoreAdapter() { @Override public void loadCache(IgniteBiInClosure clo, Object... args) { - clo.apply(new TestKey(1), new TestValue()); + clo.apply(new TestKey(100_000), new TestValue(30_000)); } @Override public Object load(Object key) throws CacheLoaderException { @@ -88,6 +88,11 @@ public abstract class IgniteCacheStoreValueAbstractTest extends IgniteCacheAbstr return true; } + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 2 * 60_000; + } + /** * @throws Exception If failed. */ @@ -98,9 +103,9 @@ public abstract class IgniteCacheStoreValueAbstractTest extends IgniteCacheAbstr final List<WeakReference<Object>> refs = new ArrayList<>(); - for (int i = 0; i < 1000; i++) { + for (int i = 0; i < 100; i++) { TestKey key = new TestKey(i); - TestValue val = new TestValue(); + TestValue val = new TestValue(i); refs.add(new WeakReference<Object>(val)); @@ -117,9 +122,9 @@ public abstract class IgniteCacheStoreValueAbstractTest extends IgniteCacheAbstr @Override public Object process(MutableEntry<TestKey, TestValue> entry, Object... args) { assertNotNull(entry.getValue()); - entry.setValue(new TestValue()); + entry.setValue(new TestValue(10_000)); - return new TestValue(); + return new TestValue(20_000); } }); @@ -135,7 +140,19 @@ public abstract class IgniteCacheStoreValueAbstractTest extends IgniteCacheAbstr for (int g = 0; g < gridCount(); g++) assertNull(grid(g).cache(null).get(key)); - try (IgniteDataStreamer streamer = grid(0).dataStreamer(null)) { + try (IgniteDataStreamer<TestKey, TestValue> streamer = grid(0).dataStreamer(null)) { + streamer.addData(key, val); + } + + checkNoValue(aff, key); + + cache.remove(key); + + atomicClockModeDelay(cache); + + try (IgniteDataStreamer<TestKey, TestValue> streamer = grid(0).dataStreamer(null)) { + streamer.allowOverwrite(true); + streamer.addData(key, val); } @@ -154,9 +171,9 @@ public abstract class IgniteCacheStoreValueAbstractTest extends IgniteCacheAbstr } } - cache.loadCache(null); // Should load TestKey(1). + cache.loadCache(null); // Should load TestKey(100_000). - TestKey key = new TestKey(1); + TestKey key = new TestKey(100_000); checkNoValue(aff, key); @@ -165,26 +182,29 @@ public abstract class IgniteCacheStoreValueAbstractTest extends IgniteCacheAbstr checkNoValue(aff, key); -// boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() { -// @Override public boolean apply() { -// System.gc(); -// -// boolean pass = true; -// -// for (Iterator<WeakReference<Object>> it = refs.iterator(); it.hasNext();) { -// WeakReference<Object> ref = it.next(); -// -// if (ref.get() == null) -// it.remove(); -// else -// pass = false; -// } -// -// return pass; -// } -// }, 10_000); -// -// assertTrue("Failed to wait for when values are collected", wait); + boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + System.gc(); + + boolean pass = true; + + for (Iterator<WeakReference<Object>> it = refs.iterator(); it.hasNext();) { + WeakReference<Object> ref = it.next(); + + if (ref.get() == null) + it.remove(); + else { + pass = false; + + log.info("Not collected value: " + ref.get()); + } + } + + return pass; + } + }, 60_000); + + assertTrue("Failed to wait for when values are collected", wait); } /** @@ -265,7 +285,21 @@ public abstract class IgniteCacheStoreValueAbstractTest extends IgniteCacheAbstr * */ static class TestValue implements Serializable { - // No-op. + /** */ + private int val; + + /** + * + * @param val Value. + */ + public TestValue(int val) { + this.val = val; + } + + /** {@inheritDoc} */ + public String toString() { + return "TestValue [val=" + val + ']'; + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8a86e387/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index 9dd40d0..be13228 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -769,6 +769,6 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { if (ccfg.getCacheMode() != LOCAL && ccfg.getAtomicityMode() == CacheAtomicityMode.ATOMIC && ccfg.getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.CLOCK) - U.sleep(100); + U.sleep(50); } }