http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0b213587/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCachePortableObjectsAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCachePortableObjectsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCachePortableObjectsAbstractSelfTest.java new file mode 100644 index 0000000..a20148e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCachePortableObjectsAbstractSelfTest.java @@ -0,0 +1,958 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.portable; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.portable.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.portable.*; +import org.apache.ignite.portable.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; +import org.apache.ignite.transactions.*; + +import org.jetbrains.annotations.*; + +import javax.cache.*; +import javax.cache.processor.*; +import java.util.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMemoryMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; +import static org.apache.ignite.transactions.TransactionConcurrency.*; +import static org.apache.ignite.transactions.TransactionIsolation.*; + +/** + * Test for portable objects stored in cache. + */ +public abstract class GridCachePortableObjectsAbstractSelfTest extends GridCommonAbstractTest { + /** */ + public static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int ENTRY_CNT = 100; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + CacheConfiguration cacheCfg = new CacheConfiguration(); + + cacheCfg.setCacheMode(cacheMode()); + cacheCfg.setAtomicityMode(atomicityMode()); + cacheCfg.setNearConfiguration(nearConfiguration()); + cacheCfg.setWriteSynchronizationMode(FULL_SYNC); + cacheCfg.setCacheStoreFactory(singletonFactory(new TestStore())); + cacheCfg.setReadThrough(true); + cacheCfg.setWriteThrough(true); + cacheCfg.setLoadPreviousValue(true); + cacheCfg.setBackups(1); + + if (offheapTiered()) { + cacheCfg.setMemoryMode(OFFHEAP_TIERED); + cacheCfg.setOffHeapMaxMemory(0); + } + + cfg.setCacheConfiguration(cacheCfg); + + cfg.setMarshaller(new PortableMarshaller()); + + return cfg; + } + + /** + * @return {@code True} if should use OFFHEAP_TIERED mode. + */ + protected boolean offheapTiered() { + return false; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(gridCount()); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + for (int i = 0; i < gridCount(); i++) { + GridCacheAdapter<Object, Object> c = ((IgniteKernal)grid(i)).internalCache(); + + for (GridCacheEntryEx e : c.map().entries0()) { + Object key = e.key().value(c.context().cacheObjectContext(), false); + Object val = CU.value(e.rawGet(), c.context(), false); + + if (key instanceof PortableObject) + assert ((GridPortableObjectImpl)key).detached() : val; + + if (val instanceof PortableObject) + assert ((GridPortableObjectImpl)val).detached() : val; + } + } + + IgniteCache<Object, Object> c = jcache(0); + + for (int i = 0; i < ENTRY_CNT; i++) + c.remove(i); + + if (offheapTiered()) { + for (int k = 0; k < 100; k++) + c.remove(k); + } + + assertEquals(0, c.size()); + } + + /** + * @return Cache mode. + */ + protected abstract CacheMode cacheMode(); + + /** + * @return Atomicity mode. + */ + protected abstract CacheAtomicityMode atomicityMode(); + + /** + * @return Distribution mode. + */ + protected abstract NearCacheConfiguration nearConfiguration(); + + /** + * @return Grid count. + */ + protected abstract int gridCount(); + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + public void testCircularReference() throws Exception { + IgniteCache c = keepPortableCache(); + + TestReferenceObject obj1 = new TestReferenceObject(); + + obj1.obj = new TestReferenceObject(obj1); + + c.put(1, obj1); + + PortableObject po = (PortableObject)c.get(1); + + String str = po.toString(); + + log.info("toString: " + str); + + assertNotNull(str); + + assertTrue("Unexpected toString: " + str, + str.startsWith("TestReferenceObject") && str.contains("obj=TestReferenceObject [")); + + TestReferenceObject obj1_r = po.deserialize(); + + assertNotNull(obj1_r); + + TestReferenceObject obj2_r = obj1_r.obj; + + assertNotNull(obj2_r); + + assertSame(obj1_r, obj2_r.obj); + } + + /** + * @throws Exception If failed. + */ + public void testGet() throws Exception { + IgniteCache<Integer, TestObject> c = jcache(0); + + for (int i = 0; i < ENTRY_CNT; i++) + c.put(i, new TestObject(i)); + + for (int i = 0; i < ENTRY_CNT; i++) { + TestObject obj = c.get(i); + + assertEquals(i, obj.val); + } + + IgniteCache<Integer, PortableObject> kpc = keepPortableCache(); + + for (int i = 0; i < ENTRY_CNT; i++) { + PortableObject po = kpc.get(i); + + assertEquals(i, (int)po.field("val")); + } + } + + /** + * @throws Exception If failed. + */ + public void testIterator() throws Exception { + IgniteCache<Integer, TestObject> c = jcache(0); + + Map<Integer, TestObject> entries = new HashMap<>(); + + for (int i = 0; i < ENTRY_CNT; i++) { + TestObject val = new TestObject(i); + + c.put(i, val); + + entries.put(i, val); + } + + IgniteCache<Integer, PortableObject> prj = ((IgniteCacheProxy)c).keepPortable(); + + Iterator<Cache.Entry<Integer, PortableObject>> it = prj.iterator(); + + assertTrue(it.hasNext()); + + while (it.hasNext()) { + Cache.Entry<Integer, PortableObject> entry = it.next(); + + assertTrue(entries.containsKey(entry.getKey())); + + TestObject o = entries.get(entry.getKey()); + + PortableObject po = entry.getValue(); + + assertEquals(o.val, (int)po.field("val")); + + entries.remove(entry.getKey()); + } + + assertEquals(0, entries.size()); + } + + /** + * @throws Exception If failed. + */ + public void testCollection() throws Exception { + IgniteCache<Integer, Collection<TestObject>> c = jcache(0); + + for (int i = 0; i < ENTRY_CNT; i++) { + Collection<TestObject> col = new ArrayList<>(3); + + for (int j = 0; j < 3; j++) + col.add(new TestObject(i * 10 + j)); + + c.put(i, col); + } + + for (int i = 0; i < ENTRY_CNT; i++) { + Collection<TestObject> col = c.get(i); + + assertEquals(3, col.size()); + + Iterator<TestObject> it = col.iterator(); + + for (int j = 0; j < 3; j++) { + assertTrue(it.hasNext()); + + assertEquals(i * 10 + j, it.next().val); + } + } + + IgniteCache<Integer, Collection<PortableObject>> kpc = keepPortableCache(); + + for (int i = 0; i < ENTRY_CNT; i++) { + Collection<PortableObject> col = kpc.get(i); + + assertEquals(3, col.size()); + + Iterator<PortableObject> it = col.iterator(); + + for (int j = 0; j < 3; j++) { + assertTrue(it.hasNext()); + + assertEquals(i * 10 + j, (int)it.next().field("val")); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testMap() throws Exception { + IgniteCache<Integer, Map<Integer, TestObject>> c = jcache(0); + + for (int i = 0; i < ENTRY_CNT; i++) { + Map<Integer, TestObject> map = U.newHashMap(3); + + for (int j = 0; j < 3; j++) { + int idx = i * 10 + j; + + map.put(idx, new TestObject(idx)); + } + + c.put(i, map); + } + + for (int i = 0; i < ENTRY_CNT; i++) { + Map<Integer, TestObject> map = c.get(i); + + assertEquals(3, map.size()); + + for (int j = 0; j < 3; j++) { + int idx = i * 10 + j; + + assertEquals(idx, map.get(idx).val); + } + } + + IgniteCache<Integer, Map<Integer, PortableObject>> kpc = keepPortableCache(); + + for (int i = 0; i < ENTRY_CNT; i++) { + Map<Integer, PortableObject> map = kpc.get(i); + + assertEquals(3, map.size()); + + for (int j = 0; j < 3; j++) { + int idx = i * 10 + j; + + assertEquals(idx, (int)map.get(idx).field("val")); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testGetAsync() throws Exception { + IgniteCache<Integer, TestObject> c = jcache(0); + + IgniteCache<Integer, TestObject> cacheAsync = c.withAsync(); + + for (int i = 0; i < ENTRY_CNT; i++) + c.put(i, new TestObject(i)); + + for (int i = 0; i < ENTRY_CNT; i++) { + cacheAsync.get(i); + TestObject obj = cacheAsync.<TestObject>future().get(); + + assertEquals(i, obj.val); + } + + IgniteCache<Integer, PortableObject> kpc = keepPortableCache(); + + IgniteCache<Integer, PortableObject> cachePortableAsync = kpc.withAsync(); + + for (int i = 0; i < ENTRY_CNT; i++) { + cachePortableAsync.get(i); + + PortableObject po = cachePortableAsync.<PortableObject>future().get(); + + assertEquals(i, (int)po.field("val")); + } + } + + /** + * @throws Exception If failed. + */ + public void testGetTx() throws Exception { + if (atomicityMode() != TRANSACTIONAL) + return; + + IgniteCache<Integer, TestObject> c = jcache(0); + + for (int i = 0; i < ENTRY_CNT; i++) + c.put(i, new TestObject(i)); + + for (int i = 0; i < ENTRY_CNT; i++) { + try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + TestObject obj = c.get(i); + + assertEquals(i, obj.val); + + tx.commit(); + } + } + + for (int i = 0; i < ENTRY_CNT; i++) { + try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { + TestObject obj = c.get(i); + + assertEquals(i, obj.val); + + tx.commit(); + } + } + + IgniteCache<Integer, PortableObject> kpc = keepPortableCache(); + + for (int i = 0; i < ENTRY_CNT; i++) { + try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + PortableObject po = kpc.get(i); + + assertEquals(i, (int)po.field("val")); + + tx.commit(); + } + } + + for (int i = 0; i < ENTRY_CNT; i++) { + try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { + PortableObject po = kpc.get(i); + + assertEquals(i, (int)po.field("val")); + + tx.commit(); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testGetAsyncTx() throws Exception { + if (atomicityMode() != TRANSACTIONAL) + return; + + IgniteCache<Integer, TestObject> c = jcache(0); + + IgniteCache<Integer, TestObject> cacheAsync = c.withAsync(); + + for (int i = 0; i < ENTRY_CNT; i++) + c.put(i, new TestObject(i)); + + for (int i = 0; i < ENTRY_CNT; i++) { + try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cacheAsync.get(i); + + TestObject obj = cacheAsync.<TestObject>future().get(); + + assertEquals(i, obj.val); + + tx.commit(); + } + } + + IgniteCache<Integer, PortableObject> kpc = keepPortableCache(); + IgniteCache<Integer, PortableObject> cachePortableAsync = kpc.withAsync(); + + for (int i = 0; i < ENTRY_CNT; i++) { + try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cachePortableAsync.get(i); + + PortableObject po = cachePortableAsync.<PortableObject>future().get(); + + assertEquals(i, (int)po.field("val")); + + tx.commit(); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testGetAll() throws Exception { + IgniteCache<Integer, TestObject> c = jcache(0); + + for (int i = 0; i < ENTRY_CNT; i++) + c.put(i, new TestObject(i)); + + for (int i = 0; i < ENTRY_CNT; ) { + Set<Integer> keys = new HashSet<>(); + + for (int j = 0; j < 10; j++) + keys.add(i++); + + Map<Integer, TestObject> objs = c.getAll(keys); + + assertEquals(10, objs.size()); + + for (Map.Entry<Integer, TestObject> e : objs.entrySet()) + assertEquals(e.getKey().intValue(), e.getValue().val); + } + + IgniteCache<Integer, PortableObject> kpc = keepPortableCache(); + + for (int i = 0; i < ENTRY_CNT; ) { + Set<Integer> keys = new HashSet<>(); + + for (int j = 0; j < 10; j++) + keys.add(i++); + + Map<Integer, PortableObject> objs = kpc.getAll(keys); + + assertEquals(10, objs.size()); + + for (Map.Entry<Integer, PortableObject> e : objs.entrySet()) + assertEquals(new Integer(e.getKey().intValue()), e.getValue().field("val")); + } + } + + /** + * @throws Exception If failed. + */ + public void testGetAllAsync() throws Exception { + IgniteCache<Integer, TestObject> c = jcache(0); + + IgniteCache<Integer, TestObject> cacheAsync = c.withAsync(); + + for (int i = 0; i < ENTRY_CNT; i++) + c.put(i, new TestObject(i)); + + for (int i = 0; i < ENTRY_CNT; ) { + Set<Integer> keys = new HashSet<>(); + + for (int j = 0; j < 10; j++) + keys.add(i++); + + cacheAsync.getAll(keys); + + Map<Integer, TestObject> objs = cacheAsync.<Map<Integer, TestObject>>future().get(); + + assertEquals(10, objs.size()); + + for (Map.Entry<Integer, TestObject> e : objs.entrySet()) + assertEquals(e.getKey().intValue(), e.getValue().val); + } + + IgniteCache<Integer, PortableObject> kpc = keepPortableCache(); + IgniteCache<Integer, PortableObject> cachePortableAsync = kpc.withAsync(); + + for (int i = 0; i < ENTRY_CNT; ) { + Set<Integer> keys = new HashSet<>(); + + for (int j = 0; j < 10; j++) + keys.add(i++); + + + cachePortableAsync.getAll(keys); + + Map<Integer, PortableObject> objs = cachePortableAsync.<Map<Integer, PortableObject>>future().get(); + + assertEquals(10, objs.size()); + + for (Map.Entry<Integer, PortableObject> e : objs.entrySet()) + assertEquals(new Integer(e.getKey().intValue()), e.getValue().field("val")); + } + } + + /** + * @throws Exception If failed. + */ + public void testGetAllTx() throws Exception { + if (atomicityMode() != TRANSACTIONAL) + return; + + IgniteCache<Integer, TestObject> c = jcache(0); + + for (int i = 0; i < ENTRY_CNT; i++) + c.put(i, new TestObject(i)); + + for (int i = 0; i < ENTRY_CNT; ) { + Set<Integer> keys = new HashSet<>(); + + for (int j = 0; j < 10; j++) + keys.add(i++); + + try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + Map<Integer, TestObject> objs = c.getAll(keys); + + assertEquals(10, objs.size()); + + for (Map.Entry<Integer, TestObject> e : objs.entrySet()) + assertEquals(e.getKey().intValue(), e.getValue().val); + + tx.commit(); + } + + try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { + Map<Integer, TestObject> objs = c.getAll(keys); + + assertEquals(10, objs.size()); + + for (Map.Entry<Integer, TestObject> e : objs.entrySet()) + assertEquals(e.getKey().intValue(), e.getValue().val); + + tx.commit(); + } + } + + IgniteCache<Integer, PortableObject> kpc = keepPortableCache(); + + for (int i = 0; i < ENTRY_CNT; ) { + Set<Integer> keys = new HashSet<>(); + + for (int j = 0; j < 10; j++) + keys.add(i++); + + try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + Map<Integer, PortableObject> objs = kpc.getAll(keys); + + assertEquals(10, objs.size()); + + for (Map.Entry<Integer, PortableObject> e : objs.entrySet()) + assertEquals(new Integer(e.getKey().intValue()), e.getValue().field("val")); + + tx.commit(); + } + + try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { + Map<Integer, PortableObject> objs = kpc.getAll(keys); + + assertEquals(10, objs.size()); + + for (Map.Entry<Integer, PortableObject> e : objs.entrySet()) + assertEquals(new Integer(e.getKey().intValue()), e.getValue().field("val")); + + tx.commit(); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testGetAllAsyncTx() throws Exception { + if (atomicityMode() != TRANSACTIONAL) + return; + + IgniteCache<Integer, TestObject> c = jcache(0); + IgniteCache<Integer, TestObject> cacheAsync = c.withAsync(); + + for (int i = 0; i < ENTRY_CNT; i++) + c.put(i, new TestObject(i)); + + for (int i = 0; i < ENTRY_CNT; ) { + Set<Integer> keys = new HashSet<>(); + + for (int j = 0; j < 10; j++) + keys.add(i++); + + try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cacheAsync.getAll(keys); + + Map<Integer, TestObject> objs = cacheAsync.<Map<Integer, TestObject>>future().get(); + + assertEquals(10, objs.size()); + + for (Map.Entry<Integer, TestObject> e : objs.entrySet()) + assertEquals(e.getKey().intValue(), e.getValue().val); + + tx.commit(); + } + } + + IgniteCache<Integer, PortableObject> cache = keepPortableCache(); + + for (int i = 0; i < ENTRY_CNT; ) { + Set<Integer> keys = new HashSet<>(); + + for (int j = 0; j < 10; j++) + keys.add(i++); + + IgniteCache<Integer, PortableObject> asyncCache = cache.withAsync(); + + try (Transaction tx = grid(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + asyncCache.getAll(keys); + + Map<Integer, PortableObject> objs = asyncCache.<Map<Integer, PortableObject>>future().get(); + + assertEquals(10, objs.size()); + + for (Map.Entry<Integer, PortableObject> e : objs.entrySet()) + assertEquals(new Integer(e.getKey().intValue()), e.getValue().field("val")); + + tx.commit(); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testLoadCache() throws Exception { + for (int i = 0; i < gridCount(); i++) + jcache(i).localLoadCache(null); + + IgniteCache<Integer, TestObject> cache = jcache(0); + + assertEquals(3, cache.size(CachePeekMode.PRIMARY)); + + assertEquals(1, cache.get(1).val); + assertEquals(2, cache.get(2).val); + assertEquals(3, cache.get(3).val); + } + + /** + * @throws Exception If failed. + */ + public void testLoadCacheAsync() throws Exception { + for (int i = 0; i < gridCount(); i++) { + IgniteCache<Object, Object> jcache = jcache(i).withAsync(); + + jcache.loadCache(null); + + jcache.future().get(); + } + + IgniteCache<Integer, TestObject> cache = jcache(0); + + assertEquals(3, cache.size(CachePeekMode.PRIMARY)); + + assertEquals(1, cache.get(1).val); + assertEquals(2, cache.get(2).val); + assertEquals(3, cache.get(3).val); + } + + /** + * @throws Exception If failed. + */ + public void testLoadCacheFilteredAsync() throws Exception { + for (int i = 0; i < gridCount(); i++) { + IgniteCache<Integer, TestObject> c = this.<Integer, TestObject>jcache(i).withAsync(); + + c.loadCache(new P2<Integer, TestObject>() { + @Override public boolean apply(Integer key, TestObject val) { + return val.val < 3; + } + }); + + c.future().get(); + } + + IgniteCache<Integer, TestObject> cache = jcache(0); + + assertEquals(2, cache.size(CachePeekMode.PRIMARY)); + + assertEquals(1, cache.get(1).val); + assertEquals(2, cache.get(2).val); + + assertNull(cache.get(3)); + } + + /** + * @throws Exception If failed. + */ + public void testTransform() throws Exception { + IgniteCache<Integer, PortableObject> c = keepPortableCache(); + + checkTransform(primaryKey(c)); + + if (cacheMode() != CacheMode.LOCAL) { + checkTransform(backupKey(c)); + + if (nearConfiguration() != null) + checkTransform(nearKey(c)); + } + } + + /** + * @return Cache with keep portable flag. + */ + private <K, V> IgniteCache<K, V> keepPortableCache() { + return ignite(0).cache(null).withKeepPortable(); + } + + /** + * @param key Key. + * @throws Exception If failed. + */ + private void checkTransform(Integer key) throws Exception { + log.info("Transform: " + key); + + IgniteCache<Integer, PortableObject> c = keepPortableCache(); + + try { + c.invoke(key, new EntryProcessor<Integer, PortableObject, Void>() { + @Override public Void process(MutableEntry<Integer, PortableObject> e, Object... args) { + PortableObject val = e.getValue(); + + assertNull("Unexpected value: " + val, val); + + return null; + } + }); + + jcache(0).put(key, new TestObject(1)); + + c.invoke(key, new EntryProcessor<Integer, PortableObject, Void>() { + @Override public Void process(MutableEntry<Integer, PortableObject> e, Object... args) { + PortableObject val = e.getValue(); + + assertNotNull("Unexpected value: " + val, val); + + assertEquals(new Integer(1), val.field("val")); + + Ignite ignite = e.unwrap(Ignite.class); + + IgnitePortables portables = ignite.portables(); + + PortableBuilder builder = portables.builder(val); + + builder.setField("val", 2); + + e.setValue(builder.build()); + + return null; + } + }); + + PortableObject obj = c.get(key); + + assertEquals(new Integer(2), obj.field("val")); + + c.invoke(key, new EntryProcessor<Integer, PortableObject, Void>() { + @Override public Void process(MutableEntry<Integer, PortableObject> e, Object... args) { + PortableObject val = e.getValue(); + + assertNotNull("Unexpected value: " + val, val); + + assertEquals(new Integer(2), val.field("val")); + + e.setValue(val); + + return null; + } + }); + + obj = c.get(key); + + assertEquals(new Integer(2), obj.field("val")); + + c.invoke(key, new EntryProcessor<Integer, PortableObject, Void>() { + @Override public Void process(MutableEntry<Integer, PortableObject> e, Object... args) { + PortableObject val = e.getValue(); + + assertNotNull("Unexpected value: " + val, val); + + assertEquals(new Integer(2), val.field("val")); + + e.remove(); + + return null; + } + }); + + assertNull(c.get(key)); + } + finally { + c.remove(key); + } + } + + /** + * + */ + private static class TestObject implements PortableMarshalAware { + /** */ + private int val; + + /** + */ + private TestObject() { + // No-op. + } + + /** + * @param val Value. + */ + private TestObject(int val) { + this.val = val; + } + + /** {@inheritDoc} */ + @Override public void writePortable(PortableWriter writer) throws PortableException { + writer.writeInt("val", val); + } + + /** {@inheritDoc} */ + @Override public void readPortable(PortableReader reader) throws PortableException { + val = reader.readInt("val"); + } + } + + /** + * + */ + private static class TestReferenceObject implements PortableMarshalAware { + /** */ + private TestReferenceObject obj; + + /** + */ + private TestReferenceObject() { + // No-op. + } + + /** + * @param obj Object. + */ + private TestReferenceObject(TestReferenceObject obj) { + this.obj = obj; + } + + /** {@inheritDoc} */ + @Override public void writePortable(PortableWriter writer) throws PortableException { + writer.writeObject("obj", obj); + } + + /** {@inheritDoc} */ + @Override public void readPortable(PortableReader reader) throws PortableException { + obj = reader.readObject("obj"); + } + } + + /** + * + */ + private static class TestStore extends CacheStoreAdapter<Integer, Object> { + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure<Integer, Object> clo, Object... args) { + for (int i = 1; i <= 3; i++) + clo.apply(i, new TestObject(i)); + } + + /** {@inheritDoc} */ + @Nullable @Override public Object load(Integer key) { + return null; + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<? extends Integer, ?> e) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + // No-op. + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0b213587/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCachePortableStoreAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCachePortableStoreAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCachePortableStoreAbstractSelfTest.java new file mode 100644 index 0000000..d224349 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCachePortableStoreAbstractSelfTest.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.portable; + +import org.apache.ignite.cache.store.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.marshaller.portable.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import com.google.common.collect.*; +import org.jetbrains.annotations.*; +import org.jsr166.*; + +import javax.cache.*; +import java.util.*; + +/** + * Tests for cache store with portables. + */ +public abstract class GridCachePortableStoreAbstractSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final TestStore STORE = new TestStore(); + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + PortableMarshaller marsh = new PortableMarshaller(); + + marsh.setClassNames(Arrays.asList(Key.class.getName(), Value.class.getName())); + + cfg.setMarshaller(marsh); + + CacheConfiguration cacheCfg = new CacheConfiguration(); + + cacheCfg.setCacheStoreFactory(singletonFactory(STORE)); + cacheCfg.setKeepPortableInStore(keepPortableInStore()); + cacheCfg.setReadThrough(true); + cacheCfg.setWriteThrough(true); + cacheCfg.setLoadPreviousValue(true); + + cfg.setCacheConfiguration(cacheCfg); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + return cfg; + } + + /** + * @return Keep portables in store flag. + */ + protected abstract boolean keepPortableInStore(); + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrid(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopGrid(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + STORE.map().clear(); + + jcache().clear(); + + assert jcache().size() == 0; + } + + /** + * @throws Exception If failed. + */ + public void testPut() throws Exception { + jcache().put(new Key(1), new Value(1)); + + checkMap(STORE.map(), 1); + } + + /** + * @throws Exception If failed. + */ + public void testPutAll() throws Exception { + Map<Object, Object> map = new HashMap<>(); + + for (int i = 1; i <= 3; i++) + map.put(new Key(i), new Value(i)); + + jcache().putAll(map); + + checkMap(STORE.map(), 1, 2, 3); + } + + /** + * @throws Exception If failed. + */ + public void testLoad() throws Exception { + populateMap(STORE.map(), 1); + + Object val = jcache().get(new Key(1)); + + assertTrue(String.valueOf(val), val instanceof Value); + + assertEquals(1, ((Value)val).index()); + } + + /** + * @throws Exception If failed. + */ + public void testLoadAll() throws Exception { + populateMap(STORE.map(), 1, 2, 3); + + Set<Object> keys = new HashSet<>(); + + for (int i = 1; i <= 3; i++) + keys.add(new Key(i)); + + Map<Object, Object> res = jcache().getAll(keys); + + assertEquals(3, res.size()); + + for (int i = 1; i <= 3; i++) { + Object val = res.get(new Key(i)); + + assertTrue(String.valueOf(val), val instanceof Value); + + assertEquals(i, ((Value)val).index()); + } + } + + /** + * @throws Exception If failed. + */ + public void testRemove() throws Exception { + for (int i = 1; i <= 3; i++) + jcache().put(new Key(i), new Value(i)); + + jcache().remove(new Key(1)); + + checkMap(STORE.map(), 2, 3); + } + + /** + * @throws Exception If failed. + */ + public void testRemoveAll() throws Exception { + for (int i = 1; i <= 3; i++) + jcache().put(new Key(i), new Value(i)); + + jcache().removeAll(ImmutableSet.of(new Key(1), new Key(2))); + + checkMap(STORE.map(), 3); + } + + /** + * @param map Map. + * @param idxs Indexes. + */ + protected abstract void populateMap(Map<Object, Object> map, int... idxs); + + /** + * @param map Map. + * @param idxs Indexes. + */ + protected abstract void checkMap(Map<Object, Object> map, int... idxs); + + /** + */ + protected static class Key { + /** */ + private int idx; + + /** + * @param idx Index. + */ + public Key(int idx) { + this.idx = idx; + } + + /** + * @return Index. + */ + int index() { + return idx; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + Key key = (Key)o; + + return idx == key.idx; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return idx; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "Key [idx=" + idx + ']'; + } + } + + /** + */ + protected static class Value { + /** */ + private int idx; + + /** + * @param idx Index. + */ + public Value(int idx) { + this.idx = idx; + } + + /** + * @return Index. + */ + int index() { + return idx; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "Value [idx=" + idx + ']'; + } + } + + /** + * + */ + private static class TestStore extends CacheStoreAdapter<Object, Object> { + /** */ + private final Map<Object, Object> map = new ConcurrentHashMap8<>(); + + /** {@inheritDoc} */ + @Nullable @Override public Object load(Object key) { + return map.get(key); + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<?, ?> e) { + map.put(e.getKey(), e.getValue()); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + map.remove(key); + } + + /** + * @return Map. + */ + Map<Object, Object> map() { + return map; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0b213587/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCachePortableStoreObjectsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCachePortableStoreObjectsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCachePortableStoreObjectsSelfTest.java new file mode 100644 index 0000000..830978f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCachePortableStoreObjectsSelfTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.portable; + +import java.util.*; + +/** + * Tests for cache store with portables. + */ +public class GridCachePortableStoreObjectsSelfTest extends GridCachePortableStoreAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected boolean keepPortableInStore() { + return false; + } + + /** {@inheritDoc} */ + @Override protected void populateMap(Map<Object, Object> map, int... idxs) { + assert map != null; + assert idxs != null; + + for (int idx : idxs) + map.put(new Key(idx), new Value(idx)); + } + + /** {@inheritDoc} */ + @Override protected void checkMap(Map<Object, Object> map, int... idxs) { + assert map != null; + assert idxs != null; + + assertEquals(idxs.length, map.size()); + + for (int idx : idxs) { + Object val = map.get(new Key(idx)); + + assertTrue(String.valueOf(val), val instanceof Value); + + assertEquals(idx, ((Value)val).index()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0b213587/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCachePortableStorePortablesSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCachePortableStorePortablesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCachePortableStorePortablesSelfTest.java new file mode 100644 index 0000000..9df6d18 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridCachePortableStorePortablesSelfTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache.portable; + +import org.apache.ignite.portable.*; + +import java.util.*; + +/** + * Tests for cache store with portables. + */ +public class GridCachePortableStorePortablesSelfTest extends GridCachePortableStoreAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected boolean keepPortableInStore() { + return true; + } + + /** {@inheritDoc} */ + @Override protected void populateMap(Map<Object, Object> map, int... idxs) { + assert map != null; + assert idxs != null; + + for (int idx : idxs) + map.put(portable(new Key(idx)), portable(new Value(idx))); + } + + /** {@inheritDoc} */ + @Override protected void checkMap(Map<Object, Object> map, int... idxs) { + assert map != null; + assert idxs != null; + + assertEquals(idxs.length, map.size()); + + for (int idx : idxs) { + Object val = map.get(portable(new Key(idx))); + + assertTrue(String.valueOf(val), val instanceof PortableObject); + + PortableObject po = (PortableObject)val; + + assertEquals("Value", po.metaData().typeName()); + assertEquals(new Integer(idx), po.field("idx")); + } + } + + /** + * @param obj Object. + * @return Portable object. + */ + private Object portable(Object obj) { + return grid().portables().toPortable(obj); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0b213587/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridPortableDuplicateIndexObjectsAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridPortableDuplicateIndexObjectsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridPortableDuplicateIndexObjectsAbstractSelfTest.java new file mode 100644 index 0000000..dcf9a62 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/GridPortableDuplicateIndexObjectsAbstractSelfTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.portable; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.marshaller.portable.*; +import org.apache.ignite.portable.*; + +import java.util.*; + +/** + * Tests that portable object is the same in cache entry and in index. + */ +public abstract class GridPortableDuplicateIndexObjectsAbstractSelfTest extends GridCacheAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + PortableMarshaller marsh = new PortableMarshaller(); + + marsh.setClassNames(Collections.singletonList(TestPortable.class.getName())); + + cfg.setMarshaller(marsh); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration ccfg = super.cacheConfiguration(gridName); + + ccfg.setCopyOnRead(false); + + CacheTypeMetadata meta = new CacheTypeMetadata(); + + meta.setKeyType(Integer.class); + meta.setValueType(TestPortable.class.getName()); + + Map<String, Class<?>> idx = new HashMap<>(); + + idx.put("fieldOne", String.class); + idx.put("fieldTwo", Integer.class); + + meta.setAscendingFields(idx); + + ccfg.setTypeMetadata(Collections.singletonList(meta)); + + return ccfg; + } + + /** {@inheritDoc} */ + @Override public abstract CacheAtomicityMode atomicityMode(); + + /** {@inheritDoc} */ + @Override public abstract CacheMode cacheMode(); + + /** + * @throws Exception If failed. + */ + public void testIndexReferences() throws Exception { + IgniteCache<Integer, TestPortable> cache = grid(0).cache(null); + + String fieldOneVal = "123"; + int fieldTwoVal = 123; + int key = 0; + + cache.put(key, new TestPortable(fieldOneVal, fieldTwoVal)); + + IgniteCache<Integer, PortableObject> prj = grid(0).cache(null).withKeepPortable(); + + PortableObject cacheVal = prj.get(key); + + assertEquals(fieldOneVal, cacheVal.field("fieldOne")); + assertEquals(new Integer(fieldTwoVal), cacheVal.field("fieldTwo")); + + List<?> row = F.first(prj.query(new SqlFieldsQuery("select _val from " + + "TestPortable where _key = ?").setArgs(key)).getAll()); + + assertEquals(1, row.size()); + + PortableObject qryVal = (PortableObject)row.get(0); + + assertEquals(fieldOneVal, qryVal.field("fieldOne")); + assertEquals(new Integer(fieldTwoVal), qryVal.field("fieldTwo")); + assertSame(cacheVal, qryVal); + } + + /** + * Test portable object. + */ + private static class TestPortable { + /** */ + private String fieldOne; + + /** */ + private int fieldTwo; + + /** + * + */ + private TestPortable() { + // No-op. + } + + /** + * @param fieldOne Field one. + * @param fieldTwo Field two. + */ + private TestPortable(String fieldOne, int fieldTwo) { + this.fieldOne = fieldOne; + this.fieldTwo = fieldTwo; + } + + /** + * @return Field one. + */ + public String fieldOne() { + return fieldOne; + } + + /** + * @return Field two. + */ + public int fieldTwo() { + return fieldTwo; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0b213587/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/datastreaming/DataStreamProcessorPortableSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/datastreaming/DataStreamProcessorPortableSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/datastreaming/DataStreamProcessorPortableSelfTest.java new file mode 100644 index 0000000..580ff13 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/datastreaming/DataStreamProcessorPortableSelfTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.portable.datastreaming; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.datastreamer.*; +import org.apache.ignite.marshaller.portable.*; +import org.apache.ignite.portable.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.stream.*; + +import java.util.*; + +/** + * + */ +public class DataStreamProcessorPortableSelfTest extends DataStreamProcessorSelfTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + PortableMarshaller marsh = new PortableMarshaller(); + + cfg.setMarshaller(marsh); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected StreamReceiver<String, TestObject> getStreamReceiver() { + return new TestDataReceiver(); + } + + /** + * + */ + private static class TestDataReceiver implements StreamReceiver<String, TestObject> { + /** {@inheritDoc} */ + @Override public void receive(IgniteCache<String, TestObject> cache, + Collection<Map.Entry<String, TestObject>> entries) { + for (Map.Entry<String, TestObject> e : entries) { + assertTrue(e.getKey() instanceof String); + assertTrue(e.getValue() instanceof PortableObject); + + TestObject obj = ((PortableObject)e.getValue()).deserialize(); + + cache.put(e.getKey(), new TestObject(obj.val + 1)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0b213587/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/datastreaming/GridDataStreamerImplSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/datastreaming/GridDataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/datastreaming/GridDataStreamerImplSelfTest.java new file mode 100644 index 0000000..2d8e34b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/datastreaming/GridDataStreamerImplSelfTest.java @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.portable.datastreaming; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.marshaller.portable.*; +import org.apache.ignite.portable.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; + +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + +/** + * Tests for {@code IgniteDataStreamerImpl}. + */ +public class GridDataStreamerImplSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Number of keys to load via data streamer. */ + private static final int KEYS_COUNT = 1000; + + /** Flag indicating should be cache configured with portables or not. */ + private static boolean portables; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + if (portables) { + PortableMarshaller marsh = new PortableMarshaller(); + + cfg.setMarshaller(marsh); + } + + cfg.setCacheConfiguration(cacheConfiguration()); + + return cfg; + } + + /** + * Gets cache configuration. + * + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration() { + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setNearConfiguration(null); + cacheCfg.setBackups(0); + cacheCfg.setWriteSynchronizationMode(FULL_SYNC); + + return cacheCfg; + } + + /** + * Data streamer should correctly load entries from HashMap in case of grids with more than one node + * and with GridOptimizedMarshaller that requires serializable. + * + * @throws Exception If failed. + */ + public void testAddDataFromMap() throws Exception { + try { + portables = false; + + startGrids(2); + + awaitPartitionMapExchange(); + + Ignite g0 = grid(0); + + IgniteDataStreamer<Integer, String> dataLdr = g0.dataStreamer(null); + + Map<Integer, String> map = U.newHashMap(KEYS_COUNT); + + for (int i = 0; i < KEYS_COUNT; i ++) + map.put(i, String.valueOf(i)); + + dataLdr.addData(map); + + dataLdr.close(); + + checkDistribution(grid(0)); + + checkDistribution(grid(1)); + + // Check several random keys in cache. + Random rnd = new Random(); + + IgniteCache<Integer, String> c0 = g0.cache(null); + + for (int i = 0; i < 100; i ++) { + Integer k = rnd.nextInt(KEYS_COUNT); + + String v = c0.get(k); + + assertEquals(k.toString(), v); + } + } + finally { + G.stopAll(true); + } + } + + /** + * Data streamer should add portable object that weren't registered explicitly. + * + * @throws Exception If failed. + */ + public void testAddMissingPortable() throws Exception { + try { + portables = true; + + startGrids(2); + + awaitPartitionMapExchange(); + + Ignite g0 = grid(0); + + IgniteDataStreamer<Integer, TestObject2> dataLdr = g0.dataStreamer(null); + + dataLdr.perNodeBufferSize(1); + dataLdr.autoFlushFrequency(1L); + + Map<Integer, TestObject2> map = U.newHashMap(KEYS_COUNT); + + for (int i = 0; i < KEYS_COUNT; i ++) + map.put(i, new TestObject2(i)); + + dataLdr.addData(map).get(); + + dataLdr.close(); + } + finally { + G.stopAll(true); + } + } + + /** + * Data streamer should correctly load portable entries from HashMap in case of grids with more than one node + * and with GridOptimizedMarshaller that requires serializable. + * + * @throws Exception If failed. + */ + public void testAddPortableDataFromMap() throws Exception { + try { + portables = true; + + startGrids(2); + + awaitPartitionMapExchange(); + + Ignite g0 = grid(0); + + IgniteDataStreamer<Integer, TestObject> dataLdr = g0.dataStreamer(null); + + Map<Integer, TestObject> map = U.newHashMap(KEYS_COUNT); + + for (int i = 0; i < KEYS_COUNT; i ++) + map.put(i, new TestObject(i)); + + dataLdr.addData(map); + + dataLdr.close(false); + + checkDistribution(grid(0)); + + checkDistribution(grid(1)); + + // Read random keys. Take values as TestObject. + Random rnd = new Random(); + + IgniteCache<Integer, TestObject> c = g0.cache(null); + + for (int i = 0; i < 100; i ++) { + Integer k = rnd.nextInt(KEYS_COUNT); + + TestObject v = c.get(k); + + assertEquals(k, v.val()); + } + + // Read random keys. Take values as PortableObject. + IgniteCache<Integer, PortableObject> c2 = ((IgniteCacheProxy)c).keepPortable(); + + for (int i = 0; i < 100; i ++) { + Integer k = rnd.nextInt(KEYS_COUNT); + + PortableObject v = c2.get(k); + + assertEquals(k, v.field("val")); + } + } + finally { + G.stopAll(true); + } + } + + /** + * Check that keys correctly destributed by nodes after data streamer. + * + * @param g Grid to check. + */ + private void checkDistribution(Ignite g) { + ClusterNode n = g.cluster().localNode(); + IgniteCache c = g.cache(null); + + // Check that data streamer correctly split data by nodes. + for (int i = 0; i < KEYS_COUNT; i ++) { + if (g.affinity(null).isPrimary(n, i)) + assertNotNull(c.localPeek(i, CachePeekMode.ONHEAP)); + else + assertNull(c.localPeek(i, CachePeekMode.ONHEAP)); + } + } + + /** + */ + private static class TestObject implements PortableMarshalAware, Serializable { + /** */ + private int val; + + /** + * + */ + private TestObject() { + // No-op. + } + + /** + * @param val Value. + */ + private TestObject(int val) { + this.val = val; + } + + public Integer val() { + return val; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return val; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + return obj instanceof TestObject && ((TestObject)obj).val == val; + } + + /** {@inheritDoc} */ + @Override public void writePortable(PortableWriter writer) throws PortableException { + writer.writeInt("val", val); + } + + /** {@inheritDoc} */ + @Override public void readPortable(PortableReader reader) throws PortableException { + val = reader.readInt("val"); + } + } + + /** + */ + private static class TestObject2 implements PortableMarshalAware, Serializable { + /** */ + private int val; + + /** + */ + private TestObject2() { + // No-op. + } + + /** + * @param val Value. + */ + private TestObject2(int val) { + this.val = val; + } + + public Integer val() { + return val; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return val; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + return obj instanceof TestObject2 && ((TestObject2)obj).val == val; + } + + /** {@inheritDoc} */ + @Override public void writePortable(PortableWriter writer) throws PortableException { + writer.writeInt("val", val); + } + + /** {@inheritDoc} */ + @Override public void readPortable(PortableReader reader) throws PortableException { + val = reader.readInt("val"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0b213587/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheAtomicNearEnabledPortableEnabledFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheAtomicNearEnabledPortableEnabledFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheAtomicNearEnabledPortableEnabledFullApiSelfTest.java new file mode 100644 index 0000000..e4fa5fa --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheAtomicNearEnabledPortableEnabledFullApiSelfTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.portable.distributed.dht; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.marshaller.portable.*; + +/** + * Atomic cache with portables. + */ +public class GridCacheAtomicNearEnabledPortableEnabledFullApiSelfTest + extends GridCacheAtomicNearEnabledFullApiSelfTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setMarshaller(new PortableMarshaller()); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0b213587/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheAtomicPartitionedOnlyPortableDataStreamerMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheAtomicPartitionedOnlyPortableDataStreamerMultiNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheAtomicPartitionedOnlyPortableDataStreamerMultiNodeSelfTest.java new file mode 100644 index 0000000..f69ede1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheAtomicPartitionedOnlyPortableDataStreamerMultiNodeSelfTest.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.portable.distributed.dht; + +/** + * + */ +public class GridCacheAtomicPartitionedOnlyPortableDataStreamerMultiNodeSelfTest extends + GridCacheAtomicPartitionedOnlyPortableDataStreamerMultithreadedSelfTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 4; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0b213587/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheAtomicPartitionedOnlyPortableDataStreamerMultithreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheAtomicPartitionedOnlyPortableDataStreamerMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheAtomicPartitionedOnlyPortableDataStreamerMultithreadedSelfTest.java new file mode 100644 index 0000000..3d14447 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheAtomicPartitionedOnlyPortableDataStreamerMultithreadedSelfTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.portable.distributed.dht; + +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.cache.portable.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * + */ +public class GridCacheAtomicPartitionedOnlyPortableDataStreamerMultithreadedSelfTest extends + GridCachePortableObjectsAbstractDataStreamerSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected NearCacheConfiguration nearConfiguration() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0b213587/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheAtomicPartitionedOnlyPortableMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheAtomicPartitionedOnlyPortableMultiNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheAtomicPartitionedOnlyPortableMultiNodeSelfTest.java new file mode 100644 index 0000000..94836db --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheAtomicPartitionedOnlyPortableMultiNodeSelfTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache.portable.distributed.dht; + +/** + * + */ +public class GridCacheAtomicPartitionedOnlyPortableMultiNodeSelfTest extends + GridCacheAtomicPartitionedOnlyPortableMultithreadedSelfTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 4; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0b213587/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheAtomicPartitionedOnlyPortableMultithreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheAtomicPartitionedOnlyPortableMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheAtomicPartitionedOnlyPortableMultithreadedSelfTest.java new file mode 100644 index 0000000..9e0ebeb --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheAtomicPartitionedOnlyPortableMultithreadedSelfTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.portable.distributed.dht; + +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.cache.portable.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * + */ +public class GridCacheAtomicPartitionedOnlyPortableMultithreadedSelfTest extends + GridCachePortableObjectsAbstractMultiThreadedSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected NearCacheConfiguration nearConfiguration() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0b213587/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheAtomicPortableEnabledFullApiMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheAtomicPortableEnabledFullApiMultiNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheAtomicPortableEnabledFullApiMultiNodeSelfTest.java new file mode 100644 index 0000000..e0c8355 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheAtomicPortableEnabledFullApiMultiNodeSelfTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.portable.distributed.dht; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.marshaller.portable.*; + +/** + * Atomic cache with portables multi-node test. + */ +public class GridCacheAtomicPortableEnabledFullApiMultiNodeSelfTest extends GridCacheAtomicMultiNodeFullApiSelfTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setMarshaller(new PortableMarshaller()); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0b213587/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheAtomicPortableEnabledFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheAtomicPortableEnabledFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheAtomicPortableEnabledFullApiSelfTest.java new file mode 100644 index 0000000..b3de8c7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheAtomicPortableEnabledFullApiSelfTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.portable.distributed.dht; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.marshaller.portable.*; + +/** + * Atomic cache with portables. + */ +public class GridCacheAtomicPortableEnabledFullApiSelfTest extends GridCacheAtomicFullApiSelfTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setMarshaller(new PortableMarshaller()); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0b213587/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheMemoryModePortableSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheMemoryModePortableSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheMemoryModePortableSelfTest.java new file mode 100644 index 0000000..0160fb1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheMemoryModePortableSelfTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.portable.distributed.dht; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.marshaller.portable.*; + +/** + * Memory models test. + */ +public class GridCacheMemoryModePortableSelfTest extends GridCacheMemoryModeSelfTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setMarshaller(new PortableMarshaller()); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0b213587/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheOffHeapAtomicPortableMultiThreadedUpdateSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheOffHeapAtomicPortableMultiThreadedUpdateSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheOffHeapAtomicPortableMultiThreadedUpdateSelfTest.java new file mode 100644 index 0000000..0f30619 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheOffHeapAtomicPortableMultiThreadedUpdateSelfTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.portable.distributed.dht; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.marshaller.portable.*; + +/** + * + */ +public class GridCacheOffHeapAtomicPortableMultiThreadedUpdateSelfTest + extends GridCacheOffHeapAtomicMultiThreadedUpdateSelfTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setMarshaller(new PortableMarshaller()); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0b213587/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheOffHeapTieredAtomicNearEnabledPortableEnabledFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheOffHeapTieredAtomicNearEnabledPortableEnabledFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheOffHeapTieredAtomicNearEnabledPortableEnabledFullApiSelfTest.java new file mode 100644 index 0000000..24f29ce --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheOffHeapTieredAtomicNearEnabledPortableEnabledFullApiSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.portable.distributed.dht; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.CacheMemoryMode.*; + +/** + * Atomic cache with portables. + */ +public class GridCacheOffHeapTieredAtomicNearEnabledPortableEnabledFullApiSelfTest extends + GridCacheAtomicNearEnabledPortableEnabledFullApiSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMemoryMode memoryMode() { + return OFFHEAP_TIERED; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0b213587/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheOffHeapTieredAtomicPortableEnabledFullApiMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheOffHeapTieredAtomicPortableEnabledFullApiMultiNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheOffHeapTieredAtomicPortableEnabledFullApiMultiNodeSelfTest.java new file mode 100644 index 0000000..16e2685 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/portable/distributed/dht/GridCacheOffHeapTieredAtomicPortableEnabledFullApiMultiNodeSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.portable.distributed.dht; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.CacheMemoryMode.*; + +/** + * Atomic cache with portables multi-node test in off-heap tiered mode. + */ +public class GridCacheOffHeapTieredAtomicPortableEnabledFullApiMultiNodeSelfTest extends + GridCacheAtomicPortableEnabledFullApiMultiNodeSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMemoryMode memoryMode() { + return OFFHEAP_TIERED; + } +}