http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheIteratorPerformanceTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheIteratorPerformanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheIteratorPerformanceTest.java new file mode 100644 index 0000000..046561a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheIteratorPerformanceTest.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; + +import static org.apache.ignite.cache.GridCacheMode.*; + +/** + * Cache iterator performance test. + */ +public class GridCacheIteratorPerformanceTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Large entry count. */ + private static final int LARGE_ENTRY_CNT = 100000; + + /** Small entry count. */ + private static final int SMALL_ENTRY_CNT = 10000; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + cfg.setCacheConfiguration(cacheConfiguration()); + + return cfg; + } + + /** + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration() { + CacheConfiguration cfg = defaultCacheConfiguration(); + + cfg.setCacheMode(PARTITIONED); + cfg.setBackups(1); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrid(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopGrid(); + } + + /** + * Iterates over cache. + * + * @param prj Projection. + * @param c Visitor closure. + */ + private void iterate(GridCacheProjection<Integer, Integer> prj, IgniteInClosure<GridCacheEntry<Integer, Integer>> c) { + prj.forEach(c); + } + + /** + * @return Empty filter. + */ + private IgniteInClosure<GridCacheEntry<Integer, Integer>> emptyFilter() { + return new CI1<GridCacheEntry<Integer, Integer>>() { + @Override public void apply(GridCacheEntry<Integer, Integer> e) { + // No-op + } + }; + } + + /** + * @throws Exception If failed. + */ + public void testSmall() throws Exception { + GridCacheProjection<Integer, Integer> cache = grid().cache(null); + + for (int i = 0; i < SMALL_ENTRY_CNT; i++) + assert cache.putx(i, i); + + assert cache.size() == SMALL_ENTRY_CNT; + + IgniteInClosure<GridCacheEntry<Integer, Integer>> c = emptyFilter(); + + // Warmup. + for (int i = 0; i < 10; i ++) + iterate(cache, c); + + long start = System.currentTimeMillis(); + + iterate(cache, c); + + long time = System.currentTimeMillis() - start; + + X.println(">>>"); + X.println(">>> Iterated over " + cache.size() + " entries."); + X.println(">>> Iteration time: " + time + "ms."); + X.println(">>>"); + } + + /** + * @throws Exception If failed. + */ + public void testLarge() throws Exception { + GridCacheProjection<Integer, Integer> cache = grid().cache(null); + + for (int i = 0; i < LARGE_ENTRY_CNT; i++) + assert cache.putx(i, i); + + assert cache.size() == LARGE_ENTRY_CNT; + + IgniteInClosure<GridCacheEntry<Integer, Integer>> c = emptyFilter(); + + // Warmup. + for (int i = 0; i < 3; i++) + iterate(cache, c); + + long start = System.currentTimeMillis(); + + iterate(cache, c); + + long time = System.currentTimeMillis() - start; + + X.println(">>>"); + X.println(">>> Iterated over " + cache.size() + " entries."); + X.println(">>> Iteration time: " + time + "ms."); + X.println(">>>"); + } + + /** + * @throws Exception If failed. + */ + public void testProjectionFiltered() throws Exception { + GridCache<Integer, Integer> cache = grid().cache(null); + + for (int i = 0; i < LARGE_ENTRY_CNT; i++) + assert cache.putx(i, i); + + assert cache.size() == LARGE_ENTRY_CNT; + + IgniteInClosure<GridCacheEntry<Integer, Integer>> c = emptyFilter(); + + GridCacheProjection<Integer, Integer> prj = cache.projection(new P2<Integer, Integer>() { + @Override public boolean apply(Integer key, Integer val) { + return val < SMALL_ENTRY_CNT; + } + }); + + assert prj.size() == SMALL_ENTRY_CNT; + + // Warmup. + for (int i = 0; i < 3; i++) + iterate(prj, c); + + long start = System.currentTimeMillis(); + + iterate(prj, c); + + long time = System.currentTimeMillis() - start; + + X.println(">>>"); + X.println(">>> Iterated over " + prj.size() + " entries."); + X.println(">>> Iteration time: " + time + "ms."); + X.println(">>>"); + } + + + /** + * @throws Exception If failed. + */ + public void testFiltered() throws Exception { + GridCache<Integer, Integer> cache = grid().cache(null); + + for (int i = 0; i < LARGE_ENTRY_CNT; i++) + assert cache.putx(i, i); + + assert cache.size() == LARGE_ENTRY_CNT; + + final BoxedInt cnt = new BoxedInt(); + + IgniteInClosure<GridCacheEntry<Integer, Integer>> c = new CI1<GridCacheEntry<Integer, Integer>>() { + @Override public void apply(GridCacheEntry<Integer, Integer> t) { + if (t.peek() < SMALL_ENTRY_CNT) + cnt.increment(); + } + }; + + assert cache.size() == LARGE_ENTRY_CNT; + + // Warmup. + for (int i = 0; i < 3; i++) + iterate(cache, c); + + cnt.reset(); + + long start = System.currentTimeMillis(); + + iterate(cache, c); + + long time = System.currentTimeMillis() - start; + + X.println(">>>"); + X.println(">>> Iterated over " + cache.size() + " entries, accepted " + cnt.get() + " entries."); + X.println(">>> Iteration time: " + time + "ms."); + X.println(">>>"); + } + + /** + * Boxed integer. + */ + private static class BoxedInt { + /** */ + private int i; + + /** + * @param i Integer. + */ + BoxedInt(int i) { + this.i = i; + } + + BoxedInt() { + // No-op. + } + + /** + * @return Integer. + */ + int increment() { + return ++i; + } + + /** + * @return Integer. + */ + int get() { + return i; + } + + /** + * Resets integer. + */ + void reset() { + i = 0; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheKeyCheckNearEnabledSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheKeyCheckNearEnabledSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheKeyCheckNearEnabledSelfTest.java new file mode 100644 index 0000000..f7b07a6 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheKeyCheckNearEnabledSelfTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; + +/** + * Tests for key check for near cache. + */ +public class GridCacheKeyCheckNearEnabledSelfTest extends GridCacheKeyCheckSelfTest { + /** {@inheritDoc} */ + @Override protected GridCacheDistributionMode distributionMode() { + return GridCacheDistributionMode.NEAR_PARTITIONED; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheKeyCheckSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheKeyCheckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheKeyCheckSelfTest.java new file mode 100644 index 0000000..31c85ce --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheKeyCheckSelfTest.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Tests for cache key check. + */ +public class GridCacheKeyCheckSelfTest extends GridCacheAbstractSelfTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Atomicity mode. */ + private GridCacheAtomicityMode atomicityMode; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override protected GridCacheDistributionMode distributionMode() { + return PARTITIONED_ONLY; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + cfg.setCacheConfiguration(cacheConfiguration()); + + return cfg; + } + + /** + * @return Cache configuration. + */ + protected CacheConfiguration cacheConfiguration() { + CacheConfiguration cfg = defaultCacheConfiguration(); + + cfg.setCacheMode(PARTITIONED); + cfg.setBackups(1); + cfg.setDistributionMode(distributionMode()); + cfg.setWriteSynchronizationMode(FULL_SYNC); + cfg.setQueryIndexEnabled(false); + cfg.setAtomicityMode(atomicityMode); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testGetTransactional() throws Exception { + checkGet(TRANSACTIONAL); + } + + /** + * @throws Exception If failed. + */ + public void testGetAtomic() throws Exception { + checkGet(ATOMIC); + } + + /** + * @throws Exception If failed. + */ + public void testPutTransactional() throws Exception { + checkPut(TRANSACTIONAL); + } + + /** + * @throws Exception If failed. + */ + public void testPutAtomic() throws Exception { + checkPut(ATOMIC); + } + + /** + * @throws Exception If failed. + */ + public void testRemoveTransactional() throws Exception { + checkRemove(TRANSACTIONAL); + } + + /** + * @throws Exception If failed. + */ + public void testRemoveAtomic() throws Exception { + checkRemove(ATOMIC); + } + + /** + * @throws Exception If failed. + */ + private void checkGet(GridCacheAtomicityMode atomicityMode) throws Exception { + this.atomicityMode = atomicityMode; + + try { + GridCache<IncorrectCacheKey, String> cache = grid(0).cache(null); + + cache.get(new IncorrectCacheKey(0)); + + fail("Key without hashCode()/equals() was successfully retrieved from cache."); + } + catch (IllegalArgumentException e) { + info("Catched expected exception: " + e.getMessage()); + + assertTrue(e.getMessage().startsWith("Cache key must override hashCode() and equals() methods")); + } + } + + /** + * @throws Exception If failed. + */ + private void checkPut(GridCacheAtomicityMode atomicityMode) throws Exception { + this.atomicityMode = atomicityMode; + + try { + GridCache<IncorrectCacheKey, String> cache = grid(0).cache(null); + + cache.put(new IncorrectCacheKey(0), "test_value"); + + fail("Key without hashCode()/equals() was successfully inserted to cache."); + } + catch (IllegalArgumentException e) { + info("Catched expected exception: " + e.getMessage()); + + assertTrue(e.getMessage().startsWith("Cache key must override hashCode() and equals() methods")); + } + } + + /** + * @throws Exception If failed. + */ + private void checkRemove(GridCacheAtomicityMode atomicityMode) throws Exception { + this.atomicityMode = atomicityMode; + + try { + GridCache<IncorrectCacheKey, String> cache = grid(0).cache(null); + + cache.remove(new IncorrectCacheKey(0)); + + fail("Key without hashCode()/equals() was successfully used for remove operation."); + } + catch (IllegalArgumentException e) { + info("Catched expected exception: " + e.getMessage()); + + assertTrue(e.getMessage().startsWith("Cache key must override hashCode() and equals() methods")); + } + } + + /** + * Cache key that doesn't override hashCode()/equals(). + */ + private static final class IncorrectCacheKey { + /** */ + private int someVal; + + /** + * @param someVal Some test value. + */ + private IncorrectCacheKey(int someVal) { + this.someVal = someVal; + } + + /** + * @return Test value. + */ + public int getSomeVal() { + return someVal; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLeakTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLeakTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLeakTest.java new file mode 100644 index 0000000..8537a9f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLeakTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.rendezvous.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Leak test. + */ +public class GridCacheLeakTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Cache name. */ + private static final String CACHE_NAME = "ggfs-data"; + + /** Atomicity mode. */ + private GridCacheAtomicityMode atomicityMode; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + cfg.setCacheConfiguration(cacheConfiguration()); + + return cfg; + } + + /** + * Gets cache configuration. + * + * @return Data cache configuration. + */ + protected CacheConfiguration cacheConfiguration() { + CacheConfiguration cfg = defaultCacheConfiguration(); + + cfg.setName(CACHE_NAME); + + cfg.setAffinity(new GridCacheRendezvousAffinityFunction(false, 128)); + + cfg.setCacheMode(PARTITIONED); + cfg.setBackups(1); + cfg.setDistributionMode(PARTITIONED_ONLY); + cfg.setWriteSynchronizationMode(FULL_SYNC); + cfg.setQueryIndexEnabled(false); + cfg.setAtomicityMode(atomicityMode); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testLeakTransactional() throws Exception { + checkLeak(TRANSACTIONAL); + } + + /** + * @throws Exception If failed. + */ + public void testLeakAtomic() throws Exception { + checkLeak(ATOMIC); + } + + /** + * @throws Exception If failed. + */ + private void checkLeak(GridCacheAtomicityMode mode) throws Exception { + atomicityMode = mode; + + startGrids(3); + + try { + int i = 0; + + GridCache<Object, Object> cache = grid(0).cache(CACHE_NAME); + + while (!Thread.currentThread().isInterrupted()) { + UUID key = UUID.randomUUID(); + + cache.put(key, 0); + + cache.remove(key); + cache.remove(key); + + i++; + + if (i % 1000 == 0) + info("Put: " + i); + + if (i % 5000 == 0) { + for (int g = 0; g < 3; g++) { + GridCacheConcurrentMap<Object, Object> map = ((GridKernal)grid(g)).internalCache(CACHE_NAME).map(); + + info("Map size for cache [g=" + g + ", size=" + map.size() + + ", pubSize=" + map.publicSize() + ']'); + + assertTrue("Wrong map size: " + map.size(), map.size() <= 8192); + } + } + + if (i == 500_000) + break; + } + } + finally { + stopAllGrids(); + } + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return Long.MAX_VALUE; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLifecycleAwareSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLifecycleAwareSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLifecycleAwareSelfTest.java new file mode 100644 index 0000000..167c10b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLifecycleAwareSelfTest.java @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.cloner.*; +import org.apache.ignite.cache.eviction.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.lifecycle.LifecycleAware; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import javax.cache.configuration.*; +import javax.cache.integration.*; +import java.util.*; + +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; + +/** + * Test for {@link LifecycleAware} support in {@link CacheConfiguration}. + */ +public class GridCacheLifecycleAwareSelfTest extends GridAbstractLifecycleAwareSelfTest { + /** */ + private static final String CACHE_NAME = "cache"; + + /** */ + private GridCacheDistributionMode distroMode; + + /** */ + private boolean writeBehind; + + /** + */ + private static class TestStore extends CacheStore implements LifecycleAware { + /** */ + private final TestLifecycleAware lifecycleAware = new TestLifecycleAware(CACHE_NAME); + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + lifecycleAware.start(); + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteCheckedException { + lifecycleAware.stop(); + } + + /** + * @param cacheName Cache name. + */ + @IgniteCacheNameResource + public void setCacheName(String cacheName) { + lifecycleAware.cacheName(cacheName); + } + + /** {@inheritDoc} */ + @Nullable @Override public Object load(Object key) { + return null; + } + + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure clo, @Nullable Object... args) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public Map loadAll(Iterable keys) throws CacheLoaderException { + return Collections.emptyMap(); + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry entry) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void writeAll(Collection col) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void deleteAll(Collection keys) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void txEnd(boolean commit) { + // No-op. + } + } + + /** + */ + private static class TestAffinityFunction extends TestLifecycleAware implements GridCacheAffinityFunction { + /** + */ + TestAffinityFunction() { + super(CACHE_NAME); + } + + /** {@inheritDoc} */ + @Override public void reset() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public int partitions() { + return 1; + } + + /** {@inheritDoc} */ + @Override public int partition(Object key) { + return 0; + } + + /** {@inheritDoc} */ + @Override public List<List<ClusterNode>> assignPartitions(GridCacheAffinityFunctionContext affCtx) { + List<List<ClusterNode>> res = new ArrayList<>(); + + res.add(nodes(0, affCtx.currentTopologySnapshot())); + + return res; + } + + /** {@inheritDoc} */ + public List<ClusterNode> nodes(int part, Collection<ClusterNode> nodes) { + return new ArrayList<>(nodes); + } + + /** {@inheritDoc} */ + @Override public void removeNode(UUID nodeId) { + // No-op. + } + } + + /** + */ + private static class TestEvictionPolicy extends TestLifecycleAware implements GridCacheEvictionPolicy { + /** + */ + TestEvictionPolicy() { + super(CACHE_NAME); + } + + /** {@inheritDoc} */ + @Override public void onEntryAccessed(boolean rmv, GridCacheEntry entry) { + // No-op. + } + } + + /** + */ + private static class TestEvictionFilter extends TestLifecycleAware implements GridCacheEvictionFilter { + /** + */ + TestEvictionFilter() { + super(CACHE_NAME); + } + + /** {@inheritDoc} */ + @Override public boolean evictAllowed(GridCacheEntry entry) { + return false; + } + } + + /** + */ + private static class TestCloner extends TestLifecycleAware implements GridCacheCloner { + /** + */ + TestCloner() { + super(CACHE_NAME); + } + + /** {@inheritDoc} */ + @Nullable @Override public <T> T cloneValue(T val) throws IgniteCheckedException { + return val; + } + } + + /** + */ + private static class TestAffinityKeyMapper extends TestLifecycleAware implements GridCacheAffinityKeyMapper { + /** + */ + TestAffinityKeyMapper() { + super(CACHE_NAME); + } + + /** {@inheritDoc} */ + @Override public Object affinityKey(Object key) { + return key; + } + + /** {@inheritDoc} */ + @Override public void reset() { + // No-op. + } + } + + /** + */ + private static class TestInterceptor extends TestLifecycleAware implements GridCacheInterceptor { + /** + */ + private TestInterceptor() { + super(CACHE_NAME); + } + + /** {@inheritDoc} */ + @Nullable @Override public Object onGet(Object key, @Nullable Object val) { + return val; + } + + /** {@inheritDoc} */ + @Nullable @Override public Object onBeforePut(Object key, @Nullable Object oldVal, Object newVal) { + return newVal; + } + + /** {@inheritDoc} */ + @Override public void onAfterPut(Object key, Object val) { + // No-op. + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Nullable @Override public IgniteBiTuple onBeforeRemove(Object key, @Nullable Object val) { + return new IgniteBiTuple(false, val); + } + + /** {@inheritDoc} */ + @Override public void onAfterRemove(Object key, Object val) { + // No-op. + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected final IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setDiscoverySpi(new TcpDiscoverySpi()); + + CacheConfiguration ccfg = defaultCacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + + ccfg.setDistributionMode(distroMode); + + ccfg.setWriteBehindEnabled(writeBehind); + + ccfg.setCacheMode(GridCacheMode.PARTITIONED); + + ccfg.setName(CACHE_NAME); + + TestStore store = new TestStore(); + + ccfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); + ccfg.setReadThrough(true); + ccfg.setWriteThrough(true); + ccfg.setLoadPreviousValue(true); + + lifecycleAwares.add(store.lifecycleAware); + + TestAffinityFunction affinity = new TestAffinityFunction(); + + ccfg.setAffinity(affinity); + + lifecycleAwares.add(affinity); + + TestEvictionPolicy evictionPlc = new TestEvictionPolicy(); + + ccfg.setEvictionPolicy(evictionPlc); + + lifecycleAwares.add(evictionPlc); + + TestEvictionPolicy nearEvictionPlc = new TestEvictionPolicy(); + + ccfg.setNearEvictionPolicy(nearEvictionPlc); + + lifecycleAwares.add(nearEvictionPlc); + + TestEvictionFilter evictionFilter = new TestEvictionFilter(); + + ccfg.setEvictionFilter(evictionFilter); + + lifecycleAwares.add(evictionFilter); + + TestCloner cloner = new TestCloner(); + + ccfg.setCloner(cloner); + + lifecycleAwares.add(cloner); + + TestAffinityKeyMapper mapper = new TestAffinityKeyMapper(); + + ccfg.setAffinityMapper(mapper); + + lifecycleAwares.add(mapper); + + TestInterceptor interceptor = new TestInterceptor(); + + lifecycleAwares.add(interceptor); + + ccfg.setInterceptor(interceptor); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** {@inheritDoc} */ + @SuppressWarnings("ErrorNotRethrown") + @Override public void testLifecycleAware() throws Exception { + for (GridCacheDistributionMode mode : new GridCacheDistributionMode[] {PARTITIONED_ONLY, NEAR_PARTITIONED}) { + distroMode = mode; + + writeBehind = false; + + try { + super.testLifecycleAware(); + } + catch (AssertionError e) { + throw new AssertionError("Failed for [distroMode=" + distroMode + ", writeBehind=" + writeBehind + ']', + e); + } + + writeBehind = true; + + try { + super.testLifecycleAware(); + } + catch (AssertionError e) { + throw new AssertionError("Failed for [distroMode=" + distroMode + ", writeBehind=" + writeBehind + ']', + e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLocalTxStoreExceptionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLocalTxStoreExceptionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLocalTxStoreExceptionSelfTest.java new file mode 100644 index 0000000..5e3a4f9 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLocalTxStoreExceptionSelfTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.GridCacheMode.*; + +/** + * + */ +public class GridCacheLocalTxStoreExceptionSelfTest extends IgniteTxStoreExceptionAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return LOCAL; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLuceneQueryIndexTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLuceneQueryIndexTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLuceneQueryIndexTest.java new file mode 100644 index 0000000..c78a603 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheLuceneQueryIndexTest.java @@ -0,0 +1,465 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * + */ +public class GridCacheLuceneQueryIndexTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** + * + */ + public GridCacheLuceneQueryIndexTest() { + super(false); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + + cfg.setIncludeEventTypes(); + cfg.setRestEnabled(false); + + CacheConfiguration cacheCfg1 = defaultCacheConfiguration(); + + cacheCfg1.setName("local1"); + cacheCfg1.setCacheMode(LOCAL); + cacheCfg1.setWriteSynchronizationMode(FULL_SYNC); + + CacheConfiguration cacheCfg2 = defaultCacheConfiguration(); + + cacheCfg2.setName("local2"); + cacheCfg2.setCacheMode(LOCAL); + cacheCfg2.setWriteSynchronizationMode(FULL_SYNC); + + cfg.setCacheConfiguration(cacheCfg1, cacheCfg2); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 10 * 60 * 1000; + } + + /** + * Tests puts one by one. + * + * @throws Exception In case of error. + */ + public void testLuceneIndex() throws Exception { + final Ignite g = startGrid(0); + + final GridCache<Integer, ObjectValue> cache1 = g.cache("local1"); + final GridCache<Integer, ObjectValue> cache2 = g.cache("local2"); + + final AtomicInteger threadIdxGen = new AtomicInteger(); + + final int keyCnt = 10000; + + final IgniteFuture<?> fut = multithreadedAsync( + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + int threadIdx = threadIdxGen.getAndIncrement() % 2; + + for (int i = 0; i < keyCnt; i++) { + if (threadIdx == 0) + cache1.putx(i, new ObjectValue("test full text more" + i)); + else + cache2.putx(i, new ObjectValue("test full text more" + i)); + + if (i % 200 == 0) + info("Put entries count: " + i); + } + + return null; + } + }, + 10); + + IgniteFuture<?> fut1 = multithreadedAsync( + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + while (!fut.isDone()) { + Thread.sleep(10000); + +// ((GridKernal)g).internalCache("local1").context().queries().index().printH2Stats(); +// ((GridKernal)g).internalCache("local2").context().queries().index().printH2Stats(); + } + + return null; + } + }, + 1); + + fut.get(); + fut1.get(); + + assert cache1.size() == keyCnt; + assert cache2.size() == keyCnt; + } + + /** + * Tests with putAll. + * + * @throws Exception In case of error. + */ + public void testLuceneIndex1() throws Exception { + final Ignite g = startGrid(0); + + final GridCache<Integer, ObjectValue> cache1 = g.cache("local1"); + final GridCache<Integer, ObjectValue> cache2 = g.cache("local2"); + + final AtomicInteger threadIdxGen = new AtomicInteger(); + + final int keyCnt = 10000; + + final IgniteFuture<?> fut = multithreadedAsync( + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + int threadIdx = threadIdxGen.getAndIncrement() % 2; + + Map<Integer, ObjectValue> map = new HashMap<>(); + + for (int i = 0; i < keyCnt; i++) { + if (i % 200 == 0 && !map.isEmpty()) { + if (threadIdx == 0) + cache1.putAll(map); + else + cache2.putAll(map); + + info("Put entries count: " + i); + + map = new HashMap<>(); + } + + map.put(i, new ObjectValue("String value " + i)); + } + + if (!map.isEmpty()) { + if (threadIdx == 0) + cache1.putAll(map); + else + cache2.putAll(map); + } + + return null; + } + }, + 10); + + IgniteFuture<?> fut1 = multithreadedAsync( + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + while (!fut.isDone()) { + Thread.sleep(10000); + +// ((GridKernal)g).internalCache("local1").context().queries().index().printH2Stats(); +// ((GridKernal)g).internalCache("local2").context().queries().index().printH2Stats(); + } + + return null; + } + }, + 1); + + fut.get(); + fut1.get(); + + assert cache1.size() == keyCnt; + assert cache2.size() == keyCnt; + } + + /** + * Test same value with putAll. + * + * @throws Exception In case of error. + */ + public void testLuceneIndex2() throws Exception { + final Ignite g = startGrid(0); + + final GridCache<Integer, ObjectValue> cache1 = g.cache("local1"); + final GridCache<Integer, ObjectValue> cache2 = g.cache("local2"); + + final AtomicInteger threadIdxGen = new AtomicInteger(); + + final int keyCnt = 10000; + + final ObjectValue val = new ObjectValue("String value"); + + final IgniteFuture<?> fut = multithreadedAsync( + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + int threadIdx = threadIdxGen.getAndIncrement() % 2; + + Map<Integer, ObjectValue> map = new HashMap<>(); + + for (int i = 0; i < keyCnt; i++) { + if (i % 200 == 0 && !map.isEmpty()) { + if (threadIdx == 0) + cache1.putAll(map); + else + cache2.putAll(map); + + info("Put entries count: " + i); + + map = new HashMap<>(); + } + + map.put(i, val); + } + + if (!map.isEmpty()) { + if (threadIdx == 0) + cache1.putAll(map); + else + cache2.putAll(map); + } + + return null; + } + }, + 10); + + IgniteFuture<?> fut1 = multithreadedAsync( + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + while (!fut.isDone()) { + Thread.sleep(10000); + +// ((GridKernal)g).internalCache("local1").context().queries().index().printH2Stats(); +// ((GridKernal)g).internalCache("local2").context().queries().index().printH2Stats(); + } + + return null; + } + }, + 1); + + fut.get(); + fut1.get(); + + assert cache1.size() == keyCnt; + assert cache2.size() == keyCnt; + } + + /** + * Test limited values set and custom keys with putAll. + * + * @throws Exception In case of error. + */ + public void testLuceneIndex3() throws Exception { + final Ignite g = startGrid(0); + + final GridCache<ObjectKey, ObjectValue> cache1 = g.cache("local1"); + final GridCache<ObjectKey, ObjectValue> cache2 = g.cache("local2"); + + final AtomicInteger threadIdxGen = new AtomicInteger(); + + final int keyCnt = 10000; + + final ObjectValue[] vals = new ObjectValue[10]; + + for (int i = 0; i < vals.length; i++) + vals[i] = new ObjectValue("Object value " + i); + + final IgniteFuture<?> fut = multithreadedAsync( + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + int threadIdx = threadIdxGen.getAndIncrement() % 2; + + Map<ObjectKey, ObjectValue> map = new HashMap<>(); + + for (int i = 0; i < keyCnt; i++) { + if (i % 200 == 0 && !map.isEmpty()) { + if (threadIdx == 0) + cache1.putAll(map); + else + cache2.putAll(map); + + info("Put entries count: " + i); + + map = new HashMap<>(); + } + + map.put(new ObjectKey(String.valueOf(i)), F.rand(vals)); + } + + if (!map.isEmpty()) { + if (threadIdx == 0) + cache1.putAll(map); + else + cache2.putAll(map); + } + + return null; + } + }, + 1); + + IgniteFuture<?> fut1 = multithreadedAsync( + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + while (!fut.isDone()) { + Thread.sleep(10000); + +// ((GridKernal)g).internalCache("local1").context().queries().index().printH2Stats(); +// ((GridKernal)g).internalCache("local2").context().queries().index().printH2Stats(); + } + + return null; + } + }, + 1); + + fut.get(); + fut1.get(); + + assert cache1.size() == keyCnt; + assert cache2.size() == keyCnt; + } + + /** + * Test value object. + */ + private static class ObjectValue implements Serializable { + /** String value. */ + @GridCacheQueryTextField + private String strVal; + + /** + * @param strVal String value. + */ + ObjectValue(String strVal) { + this.strVal = strVal; + } + + /** + * @return Value. + */ + public String stringValue() { + return strVal; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + ObjectValue other = (ObjectValue)o; + + return strVal == null ? other.strVal == null : strVal.equals(other.strVal); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return strVal != null ? strVal.hashCode() : 0; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ObjectValue.class, this); + } + } + + /** + * Test value key. + */ + private static class ObjectKey implements Serializable { + /** String key. */ + @GridCacheQueryTextField + private String strKey; + + /** + * @param strKey String key. + */ + ObjectKey(String strKey) { + this.strKey = strKey; + } + + /** + * @return Key. + */ + public String stringKey() { + return strKey; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + ObjectKey other = (ObjectKey)o; + + return strKey == null ? other.strKey == null : strKey.equals(other.strKey); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return strKey != null ? strKey.hashCode() : 0; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ObjectKey.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMarshallerTxAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMarshallerTxAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMarshallerTxAbstractTest.java new file mode 100644 index 0000000..af84e0c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMarshallerTxAbstractTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; + +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; + +/** + * Test transaction with wrong marshalling. + */ +public abstract class GridCacheMarshallerTxAbstractTest extends GridCommonAbstractTest { + /** + * Wrong Externalizable class. + */ + private static class GridCacheWrongValue implements Externalizable { + @Override public void writeExternal(ObjectOutput out) throws IOException { + throw new NullPointerException("Expected exception."); + } + + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + throw new NullPointerException("Expected exception."); + } + } + + /** + * Wrong Externalizable class. + */ + private static class GridCacheWrongValue1 { + private int val1 = 8; + private long val2 = 9; + } + + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** + * Constructs a test. + */ + protected GridCacheMarshallerTxAbstractTest() { + super(true /* start grid. */); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(spi); + + return cfg; + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testValueMarshallerFail() throws Exception { + String key = UUID.randomUUID().toString(); + String value = UUID.randomUUID().toString(); + String newValue = UUID.randomUUID().toString(); + + String key2 = UUID.randomUUID().toString(); + GridCacheWrongValue1 wrongValue = new GridCacheWrongValue1(); + + IgniteTx tx = grid().cache(null).txStart(PESSIMISTIC, REPEATABLE_READ); + try { + grid().cache(null).put(key, value); + + tx.commit(); + } + finally { + tx.close(); + } + + tx = grid().cache(null).txStart(PESSIMISTIC, REPEATABLE_READ); + + try { + assert value.equals(grid().cache(null).get(key)); + + grid().cache(null).put(key, newValue); + + grid().cache(null).put(key2, wrongValue); + + tx.commit(); + } + finally { + tx.close(); + } + + tx = grid().cache(null).txStart(PESSIMISTIC, REPEATABLE_READ); + + try { + String locVal = (String)grid().cache(null).get(key); + + assert locVal != null; + + tx.commit(); + } + finally { + tx.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMemoryModeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMemoryModeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMemoryModeSelfTest.java new file mode 100644 index 0000000..ce50355 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMemoryModeSelfTest.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.eviction.lru.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.spi.swapspace.file.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; +import org.junit.*; + +import java.util.*; + +import static java.lang.String.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Memory model self test. + */ +@SuppressWarnings("deprecation") +public class GridCacheMemoryModeSelfTest extends GridCommonAbstractTest { + /** */ + private TcpDiscoveryIpFinder ipFinder; + + /** */ + private boolean swapEnabled; + + /** */ + private GridCacheMode mode; + + /** */ + private GridCacheMemoryMode memoryMode; + + /** */ + private int maxOnheapSize; + + /** */ + private long offheapSize; + + /** */ + private GridCacheAtomicityMode atomicity; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + + cfg.setNetworkTimeout(2000); + + cfg.setSwapSpaceSpi(new FileSwapSpaceSpi()); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setWriteSynchronizationMode(FULL_SYNC); + + cacheCfg.setSwapEnabled(swapEnabled); + cacheCfg.setCacheMode(mode); + cacheCfg.setMemoryMode(memoryMode); + cacheCfg.setEvictionPolicy(maxOnheapSize == Integer.MAX_VALUE ? null : + new GridCacheLruEvictionPolicy(maxOnheapSize)); + cacheCfg.setAtomicityMode(atomicity); + cacheCfg.setOffHeapMaxMemory(offheapSize); + cacheCfg.setQueryIndexEnabled(memoryMode != GridCacheMemoryMode.OFFHEAP_VALUES); + cacheCfg.setPortableEnabled(portableEnabled()); + + cfg.setCacheConfiguration(cacheCfg); + cfg.setMarshaller(new IgniteOptimizedMarshaller(false)); + + return cfg; + } + + /** + * @return Portable enabled flag. + */ + protected boolean portableEnabled() { + return false; + } + + /** + * @throws Exception If failed. + */ + public void testOnheap() throws Exception { + mode = GridCacheMode.LOCAL; + memoryMode = GridCacheMemoryMode.ONHEAP_TIERED; + maxOnheapSize = Integer.MAX_VALUE; + swapEnabled = false; + atomicity = GridCacheAtomicityMode.ATOMIC; + offheapSize = -1; + + doTestPutAndPutAll(1000, 0, true, true); + } + + /** + * @throws Exception If failed. + */ + public void testOnheapSwap() throws Exception { + mode = GridCacheMode.LOCAL; + memoryMode = GridCacheMemoryMode.ONHEAP_TIERED; + maxOnheapSize = 330; + swapEnabled = true; + atomicity = GridCacheAtomicityMode.ATOMIC; + offheapSize = -1; + + doTestPutAndPutAll(330, 670, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testOffheap() throws Exception { + mode = GridCacheMode.LOCAL; + memoryMode = GridCacheMemoryMode.OFFHEAP_TIERED; + maxOnheapSize = Integer.MAX_VALUE; + swapEnabled = false; + atomicity = GridCacheAtomicityMode.ATOMIC; + offheapSize = -1; // Must be fixed in config validation. + + doTestPutAndPutAll(0, 1000, false, true); + } + + /** + * @throws Exception If failed. + */ + public void testOffheapSwap() throws Exception { + mode = GridCacheMode.LOCAL; + memoryMode = GridCacheMemoryMode.OFFHEAP_TIERED; + maxOnheapSize = Integer.MAX_VALUE; + swapEnabled = true; + atomicity = GridCacheAtomicityMode.ATOMIC; + offheapSize = 1000; // Small for evictions from offheap to swap. + + doTestPutAndPutAll(0, 1000, false, false); + } + + /** + * @throws Exception If failed. + */ + public void testTiered() throws Exception { + mode = GridCacheMode.LOCAL; + memoryMode = GridCacheMemoryMode.ONHEAP_TIERED; + maxOnheapSize = 24; + swapEnabled = true; + atomicity = GridCacheAtomicityMode.ATOMIC; + offheapSize = 1000; // Small for evictions from offheap to swap. + + doTestPutAndPutAll(24, 976, false, false); + } + + /** + * @throws Exception If failed. + */ + public void testOffheapValuesConfigFixBackward() throws Exception { + mode = GridCacheMode.LOCAL; + memoryMode = GridCacheMemoryMode.OFFHEAP_VALUES; + maxOnheapSize = 24; + swapEnabled = true; + atomicity = GridCacheAtomicityMode.ATOMIC; + offheapSize = -1; + + Ignite g = startGrid(); + + CacheConfiguration cfg = g.cache(null).configuration(); + + assertEquals(memoryMode, cfg.getMemoryMode()); + assertEquals(0, cfg.getOffHeapMaxMemory()); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @param cache In cache. + * @param offheapSwap In swap and offheap. + * @param offheapEmpty Offheap is empty. + * @param swapEmpty Swap is empty. + * @throws Exception If failed. + */ + private void doTestPutAndPutAll(int cache, int offheapSwap, boolean offheapEmpty, boolean swapEmpty) throws Exception { + final int all = cache + offheapSwap; + + // put + doTest(cache, offheapSwap, offheapEmpty, swapEmpty, new CIX1<GridCache<String, Integer>>() { + @Override public void applyx(GridCache<String, Integer> c) throws IgniteCheckedException { + for (int i = 0; i < all; i++) + c.put(valueOf(i), i); + } + }); + + //putAll + doTest(cache, offheapSwap, offheapEmpty, swapEmpty, new CIX1<GridCache<String, Integer>>() { + @Override public void applyx(GridCache<String, Integer> c) throws IgniteCheckedException { + Map<String, Integer> m = new HashMap<>(); + + for (int i = 0; i < all; i++) + m.put(valueOf(i), i); + + c.putAll(m); + } + }); + } + + /** + * @param cache Cache size. + * @param offheapSwap Offheap + swap size. + * @param offheapEmpty Offheap is empty. + * @param swapEmpty Swap is empty. + * @param x Cache modifier. + * @throws IgniteCheckedException If failed. + */ + void doTest(int cache, int offheapSwap, boolean offheapEmpty, boolean swapEmpty, CIX1<GridCache<String, Integer>> x) throws Exception { + ipFinder = new TcpDiscoveryVmIpFinder(true); + + startGrid(); + + final GridCache<String, Integer> c = cache(); + + x.applyx(c); + + assertEquals(cache, c.size()); + assertEquals(offheapSwap, c.offHeapEntriesCount() + c.swapKeys()); + + if (offheapEmpty) + Assert.assertEquals(0, c.offHeapEntriesCount()); + else + Assert.assertNotEquals(0, c.offHeapEntriesCount()); + + if (swapEmpty) + Assert.assertEquals(0, c.swapKeys()); + else + Assert.assertNotEquals(0, c.swapKeys()); + + info("size: " + c.size()); + info("offheap: " + c.offHeapEntriesCount()); + info("swap: " + c.swapKeys()); + + stopAllGrids(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java new file mode 100644 index 0000000..fe1a689 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.IgniteSystemProperties.*; +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; + +/** + * + */ +public class GridCacheMissingCommitVersionSelfTest extends GridCommonAbstractTest { + /** */ + private volatile Integer failedKey; + + /** */ + private String maxCompletedTxCount; + + /** + */ + public GridCacheMissingCommitVersionSelfTest() { + super(true); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration() throws Exception { + maxCompletedTxCount = System.getProperty(GG_MAX_COMPLETED_TX_COUNT); + + System.setProperty(GG_MAX_COMPLETED_TX_COUNT, String.valueOf(5)); + + IgniteConfiguration cfg = super.getConfiguration(); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + cfg.setDiscoverySpi(discoSpi); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setAtomicityMode(TRANSACTIONAL); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + System.setProperty(GG_MAX_COMPLETED_TX_COUNT, maxCompletedTxCount != null ? maxCompletedTxCount : ""); + + super.afterTest(); + } + + /** + * @throws Exception If failed. + */ + public void testMissingCommitVersion() throws Exception { + final GridCache<Integer, Integer> cache = cache(); + + final int KEYS_PER_THREAD = 10_000; + + final AtomicInteger keyStart = new AtomicInteger(); + + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + int start = keyStart.getAndAdd(KEYS_PER_THREAD); + + for (int i = 0; i < KEYS_PER_THREAD && failedKey == null; i++) { + int key = start + i; + + try { + cache.put(key, 1); + } + catch (Exception e) { + log.info("Put failed: " + e); + + failedKey = key; + } + } + + + return null; + } + }, 10, "put-thread"); + + assertNotNull("Test failed to provoke 'missing commit version' error.", failedKey); + + log.info("Trying to update " + failedKey); + + IgniteFuture<?> fut = cache.putAsync(failedKey, 2); + + try { + fut.get(5000); + } + catch (IgniteFutureTimeoutException ignore) { + fail("Put failed to finish in 5s."); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMixedPartitionExchangeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMixedPartitionExchangeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMixedPartitionExchangeSelfTest.java new file mode 100644 index 0000000..4d64b3b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMixedPartitionExchangeSelfTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; + +/** + * Test case checks partition exchange when non-cache node joins topology (partition + * exchange should be skipped in this case). + */ +public class GridCacheMixedPartitionExchangeSelfTest extends GridCommonAbstractTest { + /** Flag indicating whether to include cache to the node configuration. */ + private boolean cache; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + if (cache) + cfg.setCacheConfiguration(cacheConfiguration()); + + return cfg; + } + + /** + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration() { + CacheConfiguration ccfg = defaultCacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setDistributionMode(PARTITIONED_ONLY); + ccfg.setBackups(1); + + return ccfg; + } + + /** + * @throws Exception If failed. + */ + public void testNodeJoinLeave() throws Exception { + try { + cache = true; + + startGrids(4); + + awaitPartitionMapExchange(); + + final AtomicBoolean finished = new AtomicBoolean(); + + IgniteFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new IgniteCallable<Object>() { + @Override public Object call() throws Exception { + Random rnd = new Random(); + + int keys = 100; + + while (!finished.get()) { + int g = rnd.nextInt(4); + + int key = rnd.nextInt(keys); + + GridCache<Integer, Integer> prj = grid(g).cache(null); + + try (IgniteTx tx = prj.txStart(PESSIMISTIC, REPEATABLE_READ)) { + Integer val = prj.get(key); + + val = val == null ? 1 : val + 1; + + prj.put(key, val); + + tx.commit(); + } + } + + return null; + } + }, 4, "async-runner"); + + cache = false; + + for (int r = 0; r < 3; r++) { + for (int i = 4; i < 8; i++) + startGrid(i); + + for (int i = 4; i < 8; i++) + stopGrid(i); + } + + // Check we can start more cache nodes after non-cache ones. + cache = true; + + startGrid(4); + + U.sleep(500); + + finished.set(true); + + fut.get(); + + long topVer = grid(0).topologyVersion(); + + assertEquals(29, topVer); + + // Check all grids have all exchange futures completed. + for (int i = 0; i < 4; i++) { + GridKernal grid = (GridKernal)grid(i); + + GridCacheContext<Object, Object> cctx = grid.internalCache(null).context(); + + IgniteFuture<Long> verFut = cctx.affinity().affinityReadyFuture(topVer); + + assertEquals((Long)topVer, verFut.get()); + assertEquals((Long)topVer, cctx.topologyVersionFuture().get()); + } + } + finally { + stopAllGrids(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMultiUpdateLockSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMultiUpdateLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMultiUpdateLockSelfTest.java new file mode 100644 index 0000000..036a7a7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMultiUpdateLockSelfTest.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.spi.checkpoint.noop.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Tests multi-update locks. + */ +public class GridCacheMultiUpdateLockSelfTest extends GridCommonAbstractTest { + /** Shared IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Near enabled flag. */ + private boolean nearEnabled; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String name) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(name); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + cfg.setCacheConfiguration(cacheConfiguration()); + + cfg.setCheckpointSpi(new NoopCheckpointSpi()); + + return cfg; + } + + /** + * @return Cache configuration. + */ + protected CacheConfiguration cacheConfiguration() { + CacheConfiguration cfg = defaultCacheConfiguration(); + + cfg.setCacheMode(PARTITIONED); + cfg.setBackups(1); + cfg.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY); + + cfg.setWriteSynchronizationMode(FULL_SYNC); + cfg.setPreloadMode(SYNC); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testMultiUpdateLocksNear() throws Exception { + checkMultiUpdateLocks(true); + } + + /** + * @throws Exception If failed. + */ + public void testMultiUpdateLocksColocated() throws Exception { + checkMultiUpdateLocks(false); + } + + /** + * @param nearEnabled Near enabled flag. + * @throws Exception If failed. + */ + private void checkMultiUpdateLocks(boolean nearEnabled) throws Exception { + this.nearEnabled = nearEnabled; + + startGrids(3); + + try { + GridKernal g = (GridKernal)grid(0); + + GridCacheContext<Object, Object> cctx = g.internalCache().context(); + + GridDhtCacheAdapter cache = nearEnabled ? cctx.near().dht() : cctx.colocated(); + + long topVer = cache.beginMultiUpdate(); + + IgniteFuture<?> startFut; + + try { + assertEquals(3, topVer); + + final AtomicBoolean started = new AtomicBoolean(); + + startFut = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + info(">>>> Starting grid."); + + Ignite g4 = startGrid(4); + + started.set(true); + + GridCache<Object, Object> c = g4.cache(null); + + info(">>>> Checking tx in new grid."); + + try (IgniteTx tx = c.txStart(PESSIMISTIC, REPEATABLE_READ)) { + assertEquals(2, c.get("a")); + assertEquals(4, c.get("b")); + assertEquals(6, c.get("c")); + } + + return null; + } + }, 1); + + U.sleep(200); + + info(">>>> Checking grid has not started yet."); + + assertFalse(started.get()); + + // Check we can proceed with transactions. + GridCache<Object, Object> cache0 = g.cache(null); + + info(">>>> Checking tx commit."); + + IgniteTx tx = cache0.txStart(PESSIMISTIC, REPEATABLE_READ); + + try { + cache0.put("a", 1); + cache0.put("b", 2); + cache0.put("c", 3); + + tx.commit(); + } + finally { + tx.close(); + } + + info(">>>> Checking grid still is not started"); + + assertFalse(started.get()); + + tx = cache0.txStart(PESSIMISTIC, REPEATABLE_READ); + + try { + cache0.put("a", 2); + cache0.put("b", 4); + cache0.put("c", 6); + + tx.commit(); + } + finally { + tx.close(); + } + } + finally { + info(">>>> Releasing multi update."); + + cache.endMultiUpdate(); + } + + info("Waiting for thread termination."); + + startFut.get(); + } + finally { + stopAllGrids(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMultinodeUpdateAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMultinodeUpdateAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMultinodeUpdateAbstractSelfTest.java new file mode 100644 index 0000000..39a3cb2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMultinodeUpdateAbstractSelfTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.testframework.*; +import org.jetbrains.annotations.*; + +import javax.cache.processor.*; +import java.io.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; + +/** + * Multinode update test. + */ +public abstract class GridCacheMultinodeUpdateAbstractSelfTest extends GridCacheAbstractSelfTest { + /** */ + protected static volatile boolean failed; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** {@inheritDoc} */ + @Nullable @Override protected CacheStore<?, ?> cacheStore() { + return null; + } + + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 3 * 60_000; + } + + /** + * @throws Exception If failed. + */ + public void testInvoke() throws Exception { + IgniteCache<Integer, Integer> cache = grid(0).jcache(null); + + final Integer key = primaryKey(cache); + + cache.put(key, 0); + + final int THREADS = gridCount(); + final int ITERATIONS_PER_THREAD = 1000; + + Integer expVal = 0; + + for (int i = 0; i < iterations(); i++) { + log.info("Iteration: " + i); + + final AtomicInteger gridIdx = new AtomicInteger(); + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + int idx = gridIdx.incrementAndGet() - 1; + + final IgniteCache<Integer, Integer> cache = grid(idx).jcache(null); + + for (int i = 0; i < ITERATIONS_PER_THREAD && !failed; i++) + cache.invoke(key, new IncProcessor()); + + return null; + } + }, THREADS, "invoke"); + + assertFalse("Got null in processor.", failed); + + expVal += ITERATIONS_PER_THREAD * THREADS; + + for (int j = 0; j < gridCount(); j++) { + Integer val = (Integer)grid(j).cache(null).get(key); + + assertEquals("Unexpected value for grid " + j, expVal, val); + } + } + } + + /** + * @return Number of iterations. + */ + protected int iterations() { + return atomicityMode() == ATOMIC ? 30 : 15; + } + + /** + * + */ + protected static class IncProcessor implements EntryProcessor<Integer, Integer, Void>, Serializable { + /** {@inheritDoc} */ + @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) { + Integer val = e.getValue(); + + if (val == null) { + failed = true; + + System.out.println(Thread.currentThread() + " got null in processor: " + val); + + return null; + } + + e.setValue(val + 1); + + return null; + } + } +}