http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractIteratorsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractIteratorsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractIteratorsSelfTest.java new file mode 100644 index 0000000..b12d664 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractIteratorsSelfTest.java @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.*; + +import java.util.*; + +/** + * Tests for cache iterators. + */ +public abstract class GridCacheAbstractIteratorsSelfTest extends GridCacheAbstractSelfTest { + /** Key prefix. */ + protected static final String KEY_PREFIX = "testKey"; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + for (int i = 0; i < entryCount(); i++) + cache().put(KEY_PREFIX + i, i); + } + + /** + * @return Entry count. + */ + protected abstract int entryCount(); + + /** + * @throws Exception If failed. + */ + public void testCacheIterator() throws Exception { + int cnt = 0; + + for (GridCacheEntry<String, Integer> entry : cache()) { + assert entry != null; + assert entry.getKey() != null; + assert entry.getKey().contains(KEY_PREFIX); + assert entry.getValue() != null; + assert entry.getValue() >= 0 && entry.getValue() < entryCount(); + assert entry.get() != null; + assert entry.get() >= 0 && entry.get() < entryCount(); + + cnt++; + } + + assertEquals(cnt, entryCount()); + } + + /** + * @throws Exception If failed. + */ + public void testCacheProjectionIterator() throws Exception { + int cnt = 0; + + for (GridCacheEntry<String, Integer> entry : cache().projection(lt50)) { + assert entry != null; + assert entry.getKey() != null; + assert entry.getKey().contains(KEY_PREFIX); + assert entry.getValue() != null; + assert entry.getValue() >= 0 && entry.getValue() < 50; + assert entry.get() != null; + assert entry.get() >= 0 && entry.get() < 50; + + cnt++; + } + + assert cnt == 50; + } + + /** + * @throws Exception If failed. + */ + public void testCacheIteratorMultithreaded() throws Exception { + for (int i = 0; i < gridCount(); i++) + cache(i).removeAll(); + + final IgniteFuture<?> putFut = GridTestUtils.runMultiThreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + for (int i = 0; i < entryCount(); i++) + cache().put(KEY_PREFIX + i, i); + } + }, 1, "put-thread"); + + GridTestUtils.runMultiThreaded(new CA() { + @Override public void apply() { + while (!putFut.isDone()) { + for (GridCacheEntry<String, Integer> entry : cache()) { + assert entry != null; + assert entry.getKey() != null; + assert entry.getKey().contains(KEY_PREFIX); + } + } + } + }, 3, "iterator-thread"); + } + + /** + * @throws Exception If failed. + */ + public void testEntrySetIterator() throws Exception { + Set<GridCacheEntry<String, Integer>> entries = cache().entrySet(); + + assert entries != null; + assert entries.size() == entryCount(); + + int cnt = 0; + + for (GridCacheEntry<String, Integer> entry : entries) { + assert entry != null; + assert entry.getKey() != null; + assert entry.getKey().contains(KEY_PREFIX); + assert entry.getValue() != null; + assert entry.getValue() >= 0 && entry.getValue() < entryCount(); + assert entry.get() != null; + assert entry.get() >= 0 && entry.get() < entryCount(); + + cnt++; + } + + assert cnt == entryCount(); + } + + /** + * @throws Exception If failed. + */ + public void testEntrySetIteratorFiltered() throws Exception { + Set<GridCacheEntry<String, Integer>> entries = cache().projection(lt50).entrySet(); + + assert entries != null; + assert entries.size() == 50; + + int cnt = 0; + + for (GridCacheEntry<String, Integer> entry : entries) { + assert entry != null; + assert entry.getKey() != null; + assert entry.getKey().contains(KEY_PREFIX); + assert entry.getValue() != null; + assert entry.getValue() >= 0 && entry.getValue() < 50; + assert entry.get() != null; + assert entry.get() >= 0 && entry.get() < 50; + + cnt++; + } + + assert cnt == 50; + } + + /** + * @throws Exception If failed. + */ + public void testEntrySetIteratorMultithreaded() throws Exception { + for (int i = 0; i < gridCount(); i++) + cache(i).removeAll(); + + final IgniteFuture<?> putFut = GridTestUtils.runMultiThreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + for (int i = 0; i < entryCount(); i++) + cache().put(KEY_PREFIX + i, i); + } + }, 1, "put-thread"); + + GridTestUtils.runMultiThreaded(new CA() { + @Override public void apply() { + while (!putFut.isDone()) { + for (GridCacheEntry<String, Integer> entry : cache().entrySet()) { + assert entry != null; + assert entry.getKey() != null; + assert entry.getKey().contains(KEY_PREFIX); + } + } + } + }, 3, "iterator-thread"); + } + + /** + * @throws Exception If failed. + */ + public void testKeySetIterator() throws Exception { + Set<String> keys = cache().keySet(); + + assert keys != null; + assert keys.size() == entryCount(); + + List<Integer> values = new ArrayList<>(entryCount()); + + int cnt = 0; + + for (String key : keys) { + assert key != null; + assert key.contains(KEY_PREFIX); + + values.add(cache().get(key)); + + cnt++; + } + + assert values.size() == entryCount(); + assert cnt == entryCount(); + + Collections.sort(values); + + for (int i = 0; i < values.size(); i++) + assert values.get(i) == i; + } + + /** + * @throws Exception If failed. + */ + public void testKeySetIteratorFiltered() throws Exception { + Set<String> keys = cache().projection(lt50).keySet(); + + assert keys != null; + assert keys.size() == 50; + + List<Integer> values = new ArrayList<>(50); + + int cnt = 0; + + for (String key : keys) { + assert key != null; + assert key.contains(KEY_PREFIX); + + values.add(cache().get(key)); + + cnt++; + } + + assert values.size() == 50; + assert cnt == 50; + + Collections.sort(values); + + for (int i = 0; i < values.size(); i++) + assert values.get(i) == i; + } + + /** + * @throws Exception If failed. + */ + public void testKeySetIteratorMultithreaded() throws Exception { + for (int i = 0; i < gridCount(); i++) + cache(i).removeAll(); + + final IgniteFuture<?> putFut = GridTestUtils.runMultiThreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + for (int i = 0; i < entryCount(); i++) + cache().put(KEY_PREFIX + i, i); + } + }, 1, "put-thread"); + + GridTestUtils.runMultiThreaded(new CA() { + @Override public void apply() { + while (!putFut.isDone()) { + for (String key : cache().keySet()) { + assert key != null; + assert key.contains(KEY_PREFIX); + } + } + } + }, 3, "iterator-thread"); + } + + /** + * @throws Exception If failed. + */ + public void testValuesIterator() throws Exception { + Collection<Integer> values = cache().values(); + + assert values != null; + assert values.size() == entryCount(); + + int cnt = 0; + + for (Integer value : values) { + assert value != null; + assert value >= 0 && value < entryCount(); + + cnt++; + } + + assert cnt == entryCount(); + } + + /** + * @throws Exception If failed. + */ + public void testValuesIteratorFiltered() throws Exception { + Collection<Integer> values = cache().projection(lt50).values(); + + assert values != null; + assert values.size() == 50; + + int cnt = 0; + + for (Integer value : values) { + assert value != null; + assert value >= 0 && value < 50; + + cnt++; + } + + assert cnt == 50; + } + + /** + * @throws Exception If failed. + */ + public void testValuesIteratorMultithreaded() throws Exception { + for (int i = 0; i < gridCount(); i++) + cache(i).removeAll(); + + final IgniteFuture<?> putFut = GridTestUtils.runMultiThreadedAsync(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + for (int i = 0; i < entryCount(); i++) + cache().put(KEY_PREFIX + i, i); + } + }, 1, "put-thread"); + + GridTestUtils.runMultiThreaded(new CA() { + @Override public void apply() { + while (!putFut.isDone()) { + for (Integer value : cache().values()) + assert value != null; + } + } + }, 3, "iterator-thread"); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java new file mode 100644 index 0000000..3fcc1c3 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractMetricsSelfTest.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; + +/** + * Cache metrics test. + */ +public abstract class GridCacheAbstractMetricsSelfTest extends GridCacheAbstractSelfTest { + /** */ + private static final int KEY_CNT = 50; + + /** {@inheritDoc} */ + @Override protected boolean swapEnabled() { + return false; + } + + /** + * @return Key count. + */ + protected int keyCount() { + return KEY_CNT; + } + + /** + * Gets number of inner reads per "put" operation. + * + * @param isPrimary {@code true} if local node is primary for current key, {@code false} otherwise. + * @return Expected number of inner reads. + */ + protected int expectedReadsPerPut(boolean isPrimary) { + return isPrimary ? 1 : 2; + } + + /** + * Gets number of missed per "put" operation. + * + * @param isPrimary {@code true} if local node is primary for current key, {@code false} otherwise. + * @return Expected number of misses. + */ + protected int expectedMissesPerPut(boolean isPrimary) { + return isPrimary ? 1 : 2; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + for (int i = 0; i < gridCount(); i++) { + Ignite g = grid(i); + + g.cache(null).removeAll(); + + assert g.cache(null).isEmpty(); + + g.cache(null).resetMetrics(); + + g.transactions().resetMetrics(); + } + } + + /** + * @throws Exception If failed. + */ + public void testWritesReads() throws Exception { + GridCache<Integer, Integer> cache0 = grid(0).cache(null); + + int keyCnt = keyCount(); + + int expReads = 0; + int expMisses = 0; + + // Put and get a few keys. + for (int i = 0; i < keyCnt; i++) { + cache0.put(i, i); // +1 read + + boolean isPrimary = cache0.affinity().isPrimary(grid(0).localNode(), i); + + expReads += expectedReadsPerPut(isPrimary); + expMisses += expectedMissesPerPut(isPrimary); + + info("Writes: " + cache0.metrics().writes()); + + for (int j = 0; j < gridCount(); j++) { + GridCache<Integer, Integer> cache = grid(j).cache(null); + + int cacheWrites = cache.metrics().writes(); + + assertEquals("Wrong cache metrics [i=" + i + ", grid=" + j + ']', i + 1, cacheWrites); + } + + assertEquals("Wrong value for key: " + i, Integer.valueOf(i), cache0.get(i)); // +1 read + + expReads++; + } + + // Check metrics for the whole cache. + long writes = 0; + long reads = 0; + long hits = 0; + long misses = 0; + + for (int i = 0; i < gridCount(); i++) { + GridCacheMetrics m = grid(i).cache(null).metrics(); + + writes += m.writes(); + reads += m.reads(); + hits += m.hits(); + misses += m.misses(); + } + + info("Stats [reads=" + reads + ", hits=" + hits + ", misses=" + misses + ']'); + + assertEquals(keyCnt * gridCount(), writes); + assertEquals(expReads, reads); + assertEquals(keyCnt, hits); + assertEquals(expMisses, misses); + } + + /** + * @throws Exception If failed. + */ + public void testMisses() throws Exception { + GridCache<Integer, Integer> cache = grid(0).cache(null); + + // TODO: GG-7578. + if (cache.configuration().getCacheMode() == GridCacheMode.REPLICATED) + return; + + int keyCnt = keyCount(); + + int expReads = 0; + + // Get a few keys missed keys. + for (int i = 0; i < keyCnt; i++) { + assertNull("Value is not null for key: " + i, cache.get(i)); + + if (cache.affinity().isPrimary(grid(0).localNode(), i)) + expReads++; + else + expReads += 2; + } + + // Check metrics for the whole cache. + long writes = 0; + long reads = 0; + long hits = 0; + long misses = 0; + + for (int i = 0; i < gridCount(); i++) { + GridCacheMetrics m = grid(i).cache(null).metrics(); + + writes += m.writes(); + reads += m.reads(); + hits += m.hits(); + misses += m.misses(); + } + + assertEquals(0, writes); + assertEquals(expReads, reads); + assertEquals(0, hits); + assertEquals(expReads, misses); + } + + /** + * @throws Exception If failed. + */ + public void testMissesOnEmptyCache() throws Exception { + GridCache<Integer, Integer> cache = grid(0).cache(null); + + // TODO: GG-7578. + if (cache.configuration().getCacheMode() == GridCacheMode.REPLICATED) + return; + + Integer key = null; + + for (int i = 0; i < 1000; i++) { + if (cache.affinity().isPrimary(grid(0).localNode(), i)) { + key = i; + + break; + } + } + + assertNotNull(key); + + cache.get(key); + + assertEquals("Expected 1 read", 1, cache.metrics().reads()); + assertEquals("Expected 1 miss", 1, cache.metrics().misses()); + + cache.put(key, key); // +1 read, +1 miss. + + cache.get(key); + + assertEquals("Expected 1 write", 1, cache.metrics().writes()); + assertEquals("Expected 3 reads", 3, cache.metrics().reads()); + assertEquals("Expected 2 misses", 2, cache.metrics().misses()); + assertEquals("Expected 1 hit", 1, cache.metrics().hits()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractProjectionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractProjectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractProjectionSelfTest.java new file mode 100644 index 0000000..8015fed --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractProjectionSelfTest.java @@ -0,0 +1,834 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.util.*; +import java.util.concurrent.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheFlag.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Tests for custom cache projection (with filters and flags). + */ +public abstract class GridCacheAbstractProjectionSelfTest extends GridCacheAbstractSelfTest { + /** Test timeout */ + private static final long TEST_TIMEOUT = 120 * 1000; + + /** Number of grids to start. */ + private static final int GRID_CNT = 1; + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TEST_TIMEOUT; + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return GRID_CNT; + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration cfg = super.cacheConfiguration(gridName); + + cfg.setCacheMode(cacheMode()); + cfg.setWriteSynchronizationMode(FULL_SYNC); + cfg.setPreloadMode(GridCachePreloadMode.SYNC); + + return cfg; + } + + /** + * @return Cache mode. + */ + @Override protected abstract GridCacheMode cacheMode(); + + /** + * @return Cache instance. + */ + @SuppressWarnings({"TypeMayBeWeakened"}) + private GridCache<String, TestCloneable> cacheCloneable() { + return grid(0).cache(null); + } + + /** + * Test cloneable. + */ + private static class TestCloneable implements Cloneable { + /** */ + private String str; + + /** + * Default constructor. + */ + private TestCloneable() { + // No-op. + } + + /** + * @param str String value. + */ + private TestCloneable(String str) { + this.str = str; + } + + /** + * @return str value. + */ + private String str() { + return str; + } + + /** {@inheritDoc} */ + @Override public Object clone() throws CloneNotSupportedException { + return super.clone(); + } + } + + /** */ + private IgniteBiPredicate<String, Integer> kvFilter = new P2<String, Integer>() { + @Override public boolean apply(String key, Integer val) { + return key.contains("key") && val >= 0; + } + }; + + /** */ + private IgnitePredicate<GridCacheEntry<String, Integer>> entryFilter = new P1<GridCacheEntry<String, Integer>>() { + @Override public boolean apply(GridCacheEntry<String, Integer> e) { + Integer val = e.peek(); + + // Let's assume that null values will be passed through, otherwise we won't be able + // to put any new values to cache using projection with this entry filter. + return e.getKey().contains("key") && (val == null || val >= 0); + } + }; + + /** + * Asserts that given runnable throws specified exception. + * + * @param exCls Expected exception. + * @param r Runnable to check. + * @throws Exception If check failed. + */ + private void assertException(Class<? extends Exception> exCls, Runnable r) throws Exception { + assert exCls != null; + assert r != null; + + try { + r.run(); + + fail(exCls.getSimpleName() + " must have been thrown."); + } + catch (Exception e) { + if (e.getClass() != exCls) + throw e; + + info("Caught expected exception: " + e); + } + } + + /** + * @param r Runnable. + * @throws Exception If check failed. + */ + private void assertFlagException(Runnable r) throws Exception { + assertException(GridCacheFlagException.class, r); + } + + /** + * @throws Exception In case of error. + */ + public void testTypeProjection() throws Exception { + GridCache<String, Integer> cache = cache(); + + cache.putAll(F.asMap("k1", 1 , "k2", 2, "k3", 3)); + + GridCache<Double, Boolean> anotherCache = grid(0).cache(null); + + assert anotherCache != null; + + anotherCache.put(3.14, true); + + GridCacheProjection<String, Integer> prj = cache.projection(String.class, Integer.class); + + List<String> keys = F.asList("k1", "k2", "k3"); + + for (String key : keys) + assert prj.containsKey(key); + } + + /** + * @throws Exception In case of error. + */ + public void testSize() throws Exception { + GridCacheProjection<String, Integer> prj = cache().projection(kvFilter); + + assert prj.cache() != null; + + int size = 10; + + if (atomicityMode() == TRANSACTIONAL) { + IgniteTx tx = prj.txStart(); + + for (int i = 0; i < size; i++) + prj.put("key" + i, i); + + prj.put("k", 11); + prj.put("key", -1); + + tx.commit(); + } + else { + for (int i = 0; i < size; i++) + prj.put("key" + i, i); + + prj.put("k", 11); + prj.put("key", -1); + } + + assertEquals(size, cache().size()); + assertEquals(size, prj.size()); + } + + /** + * @throws Exception In case of error. + */ + public void testContainsKey() throws Exception { + cache().put("key", 1); + cache().put("k", 2); + + assert cache().containsKey("key"); + assert cache().containsKey("k"); + assert !cache().containsKey("wrongKey"); + + GridCacheProjection<String, Integer> prj = cache().projection(kvFilter); + + assert prj.containsKey("key"); + assert !prj.containsKey("k"); + assert !prj.containsKey("wrongKey"); + + assert prj.projection(F.<GridCacheEntry<String, Integer>>alwaysTrue()).containsKey("key"); + assert !prj.projection(F.<GridCacheEntry<String, Integer>>alwaysFalse()).containsKey("key"); + assert !prj.projection(F.<GridCacheEntry<String, Integer>>alwaysFalse()).containsKey("k"); + } + + /** + * @throws Exception In case of error. + */ + public void testPut() throws Exception { + final GridCacheProjection<String, Integer> prj = cache().projection(kvFilter); + + prj.put("key", 1); + prj.put("k", 2); + + assert prj.containsKey("key"); + assert !prj.containsKey("k"); + + assertFlagException(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + prj.flagsOn(LOCAL).put("key", 1); + } + }); + + assertFlagException(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + prj.flagsOn(READ).put("key", 1); + } + }); + } + + /** + * @throws Exception In case of error. + */ + public void testLocalFlag() throws Exception { + GridCacheProjection<String, Integer> prj = cache().projection(entryFilter); + + final GridCacheProjection<String, Integer> locPrj = prj.flagsOn(LOCAL); + + prj.put("key", 1); + + Integer one = 1; + + assertEquals(one, prj.get("key")); + + assertFlagException(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + locPrj.put("key", 1); + } + }); + + prj.get("key"); + + assertFlagException(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + locPrj.get("key"); + } + }); + + prj.getAll(F.asList("key", "key1")); + + assertFlagException(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + locPrj.getAll(F.asList("key", "key1")); + } + }); + + prj.remove("key"); + + assertFlagException(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + locPrj.remove("key"); + } + }); + + prj.put("key", 1); + + assertEquals(one, prj.replace("key", 2)); + + assertFlagException(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + locPrj.replace("key", 3); + } + }); + + prj.removeAll(F.asList("key")); + + assert !prj.containsKey("key"); + + prj.put("key", 1); + + assertFlagException(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + locPrj.removeAll(F.asList("key")); + } + }); + + assert prj.containsKey("key"); + + assert locPrj.containsKey("key"); + + assertFlagException(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + locPrj.reload("key"); + } + }); + + assertEquals(one, locPrj.peek("key")); + + locPrj.evict("key"); + + assert !locPrj.containsKey("key"); + + locPrj.promote("key"); + + assert locPrj.containsKey("key"); + + locPrj.clear("key"); + + assert !locPrj.containsKey("key"); + } + + /** + * @throws Exception In case of error. + */ + public void testEntryLocalFlag() throws Exception { + GridCacheProjection<String, Integer> prj = cache().projection(entryFilter); + + GridCacheProjection<String, Integer> loc = prj.flagsOn(LOCAL); + + prj.put("key", 1); + + GridCacheEntry<String, Integer> prjEntry = prj.entry("key"); + final GridCacheEntry<String, Integer> locEntry = loc.entry("key"); + + assert prjEntry != null; + assert locEntry != null; + + Integer one = 1; + + assertEquals(one, prjEntry.getValue()); + + assertFlagException(new CA() { + @Override public void apply() { + locEntry.setValue(1); + } + }); + + assertEquals(one, prjEntry.getValue()); + + assertFlagException(new CA() { + @Override public void apply() { + locEntry.getValue(); + } + }); + + prjEntry.remove(); + + assertFlagException(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + locEntry.remove(); + } + }); + + prjEntry.set(1); + + assertEquals(one, prjEntry.replace(2)); + + assertFlagException(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + locEntry.replace(3); + } + }); + + assertFlagException(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + locEntry.reload(); + } + }); + + prj.put("key", 1); + + assertEquals(one, locEntry.peek()); + + locEntry.evict(); + + assert locEntry.peek() == null; + + loc.promote("key"); + + assert loc.containsKey("key"); + + locEntry.clear(); + + assert locEntry.peek() == null; + } + + /** + * @throws Exception In case of error. + */ + public void testReadFlag() throws Exception { + GridCacheProjection<String, Integer> prj = cache().projection(entryFilter); + + final GridCacheProjection<String, Integer> readPrj = prj.flagsOn(READ); + + prj.put("key", 1); + + Integer one = 1; + + assertEquals(one, prj.get("key")); + + assertFlagException(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + readPrj.put("key", 1); + } + }); + + prj.remove("key"); + + assertFlagException(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + readPrj.remove("key"); + } + }); + + prj.put("key", 1); + + assertEquals(one, prj.replace("key", 2)); + + assertFlagException(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + readPrj.replace("key", 3); + } + }); + + prj.removeAll(F.asList("key")); + + assert !prj.containsKey("key"); + + prj.put("key", 1); + + assertFlagException(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + readPrj.removeAll(F.asList("key")); + } + }); + + assertFlagException(new CA() { + @Override public void apply() { + readPrj.evict("key"); + } + }); + + assert prj.containsKey("key"); + + assertFlagException(new CA() { + @Override public void apply() { + readPrj.clear("key"); + } + }); + + assert prj.containsKey("key"); + + assertFlagException(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + readPrj.reload("key"); + } + }); + + assert prj.containsKey("key"); + + assertFlagException(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + readPrj.promote("key"); + } + }); + + assert prj.containsKey("key"); + + readPrj.get("key"); + + readPrj.getAll(F.asList("key", "key1")); + + assertEquals(one, readPrj.peek("key")); + } + + /** + * @param clone Cloned value. + * @param original Original value. + */ + private void checkClone(TestCloneable clone, TestCloneable original) { + assert original != null; + assert clone != null; + assert clone != original; + assertEquals(clone.str(), original.str()); + } + + /** + * @throws Exception In case of error. + */ + @SuppressWarnings({"UnnecessaryFinalOnLocalVariable"}) + public void testCloneFlag() throws Exception { + GridCacheProjection<String, TestCloneable> prj = cacheCloneable().flagsOn(CLONE); + + final TestCloneable val = new TestCloneable("val"); + + prj.put("key", val); + + checkClone(prj.get("key"), val); + + checkClone(prj.getAsync("key").get(), val); + + Map<String, TestCloneable> map = prj.getAll(F.asList("key")); + + assertEquals(1, map.size()); + + checkClone(map.get("key"), val); + + map = prj.getAllAsync(F.asList("key")).get(); + + assertEquals(1, map.size()); + + checkClone(map.get("key"), val); + + checkClone(prj.peek("key"), val); + + Collection<TestCloneable> vals = prj.values(); + + assert vals != null; + assertEquals(1, vals.size()); + + checkClone(vals.iterator().next(), val); + + Set<GridCacheEntry<String, TestCloneable>> entries = prj.entrySet(); + + assertEquals(1, entries.size()); + + checkClone(entries.iterator().next().getValue(), val); + + GridCacheEntry<String, TestCloneable> entry = prj.entry("key"); + + assert entry != null; + + checkClone(entry.peek(), val); + } + + /** + * @throws Exception In case of error. + */ + public void testEntryParent() throws Exception { + cache().put("key", 1); + + GridCacheProxyImpl<String, Integer> prj = (GridCacheProxyImpl<String, Integer>)cache(). + flagsOn(CLONE, INVALIDATE); + + GridCacheEntry<String, Integer> entry = prj.entry("key"); + + assert entry != null; + + GridCacheProxyImpl<String, Integer> entryPrj = (GridCacheProxyImpl<String, Integer>)entry.projection(); + + assert entryPrj.delegate() == prj.delegate(); + } + + /** + * @throws Exception if failed. + */ + public void testSkipStoreFlag() throws Exception { + assertNull(cache().put("kk1", 100500)); + assertEquals(100500, map.get("kk1")); + + IgniteCache<String, Integer> c = jcache().withSkipStore(); + + assertNull(c.getAndPut("noStore", 123)); + assertEquals(123, (Object) c.get("noStore")); + assertNull(map.get("noStore")); + + assertTrue(c.remove("kk1", 100500)); + assertEquals(100500, map.get("kk1")); + assertNull(c.get("kk1")); + assertEquals(100500, (Object) cache().get("kk1")); + } + + /** + * @throws Exception if failed. + */ + // TODO: enable when GG-7579 is fixed. + public void _testSkipStoreFlagMultinode() throws Exception { + final int nGrids = 3; + + // Start additional grids. + for (int i = 1; i < nGrids; i++) + startGrid(i); + + try { + testSkipStoreFlag(); + } + finally { + for (int i = 1; i < nGrids; i++) + stopGrid(i); + } + } + + /** + * @throws Exception In case of error. + */ + public void testSkipSwapFlag() throws Exception { + cache().put("key", 1); + + cache().evict("key"); + + assert cache().peek("key") == null; + + Integer one = 1; + + assertEquals(one, cache().get("key")); + + cache().evict("key"); + + assertEquals(one, cache().reload("key")); + + cache().remove("key"); + + assertFalse(cache().containsKey("key")); + assertNull(cache().get("key")); + + GridCacheProjection<String, Integer> prj = cache().flagsOn(SKIP_SWAP, SKIP_STORE); + + prj.put("key", 1); + + assertEquals(one, prj.get("key")); + assertEquals(one, prj.peek("key")); + + assert prj.evict("key"); + + assert prj.peek("key") == null; + assert prj.get("key") == null; + } + + /** + * Checks that previous entry in update operations is taken + * from swap after eviction, even if SKIP_SWAP is enabled. + * + * @throws Exception If error happens. + */ + public void testSkipSwapFlag2() throws Exception { + cache().put("key", 1); + + cache().evict("key"); + + GridCacheProjection<String, Integer> prj = cache().flagsOn(SKIP_SWAP, SKIP_STORE); + + assertNull(prj.get("key")); + + Integer old = prj.put("key", 2); + + assertEquals(Integer.valueOf(1), old); // Update operations on cache should not take into account SKIP_SWAP flag. + + prj.remove("key"); + } + + /** + * Tests {@link GridCacheFlag#SKIP_SWAP} flag on multiple nodes. + * + * @throws Exception If error occurs. + */ + public void testSkipSwapFlagMultinode() throws Exception { + final int nGrids = 3; + + // Start additional grids. + for (int i = 1; i < nGrids; i++) + startGrid(i); + + try { + final int nEntries = 100; + + // Put the values in cache. + for (int i = 1; i <= nEntries; i++) + grid(0).cache(null).put(i, i); + + // Evict values from cache. Values should go to swap. + for (int i = 0; i < nGrids; i++) { + grid(i).cache(null).evictAll(); + + assertTrue("Grid #" + i + " has empty swap.", grid(i).cache(null).swapIterator().hasNext()); + } + + // Set SKIP_SWAP flag. + GridCacheProjection<Object, Object> cachePrj = grid(0).cache(null).flagsOn(SKIP_SWAP, SKIP_STORE); + + // Put new values. + for (int i = 1; i <= nEntries; i++) + assertEquals(i, cachePrj.put(i, i + 1)); // We should get previous values from swap, disregarding SKIP_SWAP. + + // Swap should be empty now. + for (int i = 0; i < nGrids; i++) + assertFalse("Grid #" + i + " has non-empty swap.", grid(i).cache(null).swapIterator().hasNext()); + } + finally { + // Stop started grids. + for (int i = 1; i < nGrids; i++) + stopGrid(i); + } + } + + /** + * @throws Exception In case of error. + */ + public void testTx() throws Exception { + if (atomicityMode() == ATOMIC) + return; + + IgniteTx tx = cache().txStart(); + + GridCacheProjection<String, Integer> typePrj = cache().projection(String.class, Integer.class); + + typePrj.put("key", 1); + typePrj.put("k", 2); + + GridCacheProjection<String, Integer> kvFilterPrj = cache().projection(kvFilter); + + Integer one = 1; + + assertEquals(one, kvFilterPrj.get("key")); + assert kvFilterPrj.get("k") == null; + + GridCacheProjection<String, Integer> entryFilterPrj = cache().projection(entryFilter); + + assertEquals(one, entryFilterPrj.get("key")); + assert entryFilterPrj.get("k") == null; + + // Now will check projection on projection. + kvFilterPrj = typePrj.projection(kvFilter); + + assertEquals(one, kvFilterPrj.get("key")); + assert kvFilterPrj.get("k") == null; + + entryFilterPrj = typePrj.projection(entryFilter); + + assertEquals(one, entryFilterPrj.get("key")); + assert entryFilterPrj.get("k") == null; + + typePrj = cache().projection(entryFilter).projection(String.class, Integer.class); + + assertEquals(one, typePrj.get("key")); + assertNull(typePrj.get("k")); + + tx.commit(); + + TransactionsConfiguration tCfg = grid(0).configuration().getTransactionsConfiguration(); + + tx = cache().txStart( + tCfg.getDefaultTxConcurrency(), + tCfg.getDefaultTxIsolation(), + tCfg.getDefaultTxTimeout(), + 0 + ); + + // Try to change tx property. + assertFlagException(new CA() { + @Override public void apply() { + cache().flagsOn(INVALIDATE); + } + }); + + assertFlagException(new CA() { + @Override public void apply() { + cache().projection(entryFilter).flagsOn(INVALIDATE); + } + }); + + tx.commit(); + } + + /** + * @throws IgniteCheckedException In case of error. + */ + public void testTypedProjection() throws Exception { + GridCache<Object, Object> cache = grid(0).cache(null); + + cache.putx("1", "test string"); + cache.putx("2", 0); + + final GridCacheProjection<String, String> prj = cache.projection(String.class, String.class); + + final CountDownLatch latch = new CountDownLatch(1); + + prj.removeAll(new P1<GridCacheEntry<String, String>>() { + @Override + public boolean apply(GridCacheEntry<String, String> e) { + info(" --> " + e.peek().getClass()); + + latch.countDown(); + + return true; + } + }); + + assertTrue(latch.await(1, SECONDS)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java new file mode 100644 index 0000000..8992d33 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractRemoveFailureTest.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.testframework.*; +import org.jdk8.backport.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; + +import static org.apache.ignite.IgniteSystemProperties.*; + +/** + * Tests that removes are not lost when topology changes. + */ +public abstract class GridCacheAbstractRemoveFailureTest extends GridCacheAbstractSelfTest { + /** */ + private static final int GRID_CNT = 3; + + /** Keys count. */ + private static final int KEYS_CNT = 10000; + + /** Test duration. */ + private static final long DUR = 90 * 1000L; + + /** Cache data assert frequency. */ + private static final long ASSERT_FREQ = 10_000; + + /** Kill delay. */ + private static final T2<Integer, Integer> KILL_DELAY = new T2<>(2000, 5000); + + /** Start delay. */ + private static final T2<Integer, Integer> START_DELAY = new T2<>(2000, 5000); + + /** Node kill lock (used to prevent killing while cache data is compared). */ + private final Lock killLock = new ReentrantLock(); + + /** */ + private CountDownLatch assertLatch; + + /** */ + private CountDownLatch updateLatch; + + /** Caches comparison request flag. */ + private volatile boolean cmp; + + /** */ + private String sizePropVal; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return GRID_CNT; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + // Need to increase value set in GridAbstractTest + sizePropVal = System.getProperty(GG_ATOMIC_CACHE_DELETE_HISTORY_SIZE); + + System.setProperty(GG_ATOMIC_CACHE_DELETE_HISTORY_SIZE, "100000"); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + System.setProperty(GG_ATOMIC_CACHE_DELETE_HISTORY_SIZE, sizePropVal != null ? sizePropVal : ""); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrids(gridCount()); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return DUR + 60_000; + } + + /** + * @throws Exception If failed. + */ + public void testPutAndRemove() throws Exception { + final GridCache<Integer, Integer> sndCache0 = grid(0).cache(null); + + final AtomicBoolean stop = new AtomicBoolean(); + + final AtomicLong cntr = new AtomicLong(); + + final AtomicLong errCntr = new AtomicLong(); + + // Expected values in cache. + final Map<Integer, GridTuple<Integer>> expVals = new ConcurrentHashMap8<>(); + + IgniteFuture<?> updateFut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (!stop.get()) { + for (int i = 0; i < 100; i++) { + int key = rnd.nextInt(KEYS_CNT); + + boolean put = rnd.nextInt(0, 100) > 10; + + while (true) { + try { + if (put) { + sndCache0.put(key, i); + + expVals.put(key, F.t(i)); + } + else { + sndCache0.remove(key); + + expVals.put(key, F.<Integer>t(null)); + } + + break; + } + catch (IgniteCheckedException e) { + if (put) + log.error("Put failed [key=" + key + ", val=" + i + ']', e); + else + log.error("Remove failed [key=" + key + ']', e); + + errCntr.incrementAndGet(); + } + } + } + + cntr.addAndGet(100); + + if (cmp) { + assertLatch.countDown(); + + updateLatch.await(); + } + } + + return null; + } + }); + + IgniteFuture<?> killFut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + while (!stop.get()) { + U.sleep(random(KILL_DELAY.get1(), KILL_DELAY.get2())); + + killLock.lock(); + + try { + killAndRestart(stop); + } + finally { + killLock.unlock(); + } + } + + return null; + } + }); + + try { + long stopTime = DUR + U.currentTimeMillis() ; + + long nextAssert = U.currentTimeMillis() + ASSERT_FREQ; + + while (U.currentTimeMillis() < stopTime) { + long start = System.nanoTime(); + + long ops = cntr.longValue(); + + U.sleep(1000); + + long diff = cntr.longValue() - ops; + + double time = (System.nanoTime() - start) / 1_000_000_000d; + + long opsPerSecond = (long)(diff / time); + + log.info("Operations/second: " + opsPerSecond); + + if (U.currentTimeMillis() >= nextAssert) { + updateLatch = new CountDownLatch(1); + + assertLatch = new CountDownLatch(1); + + cmp = true; + + killLock.lock(); + + try { + if (!assertLatch.await(60_000, TimeUnit.MILLISECONDS)) + throw new IgniteCheckedException("Failed to suspend thread executing updates."); + + log.info("Checking cache content."); + + assertCacheContent(expVals); + + nextAssert = System.currentTimeMillis() + ASSERT_FREQ; + } + finally { + killLock.unlock(); + + updateLatch.countDown(); + + U.sleep(500); + } + } + } + } + finally { + stop.set(true); + } + + killFut.get(); + + updateFut.get(); + + log.info("Test finished. Update errors: " + errCntr.get()); + + } + + /** + * @param stop Stop flag. + * @throws Exception If failed. + */ + void killAndRestart(AtomicBoolean stop) throws Exception { + if (stop.get()) + return; + + int idx = random(1, gridCount() + 1); + + log.info("Killing node " + idx); + + stopGrid(idx); + + U.sleep(random(START_DELAY.get1(), START_DELAY.get2())); + + if (stop.get()) + return; + + log.info("Restarting node " + idx); + + startGrid(idx); + + U.sleep(1000); + } + + /** + * @param expVals Expected values in cache. + * @throws Exception If failed. + */ + @SuppressWarnings({"TooBroadScope", "ConstantIfStatement"}) + private void assertCacheContent(Map<Integer, GridTuple<Integer>> expVals) throws Exception { + assert !expVals.isEmpty(); + + Collection<Integer> failedKeys = new HashSet<>(); + + for (int i = 0; i < GRID_CNT; i++) { + Ignite ignite = grid(i); + + GridCache<Integer, Integer> cache = ignite.cache(null); + + for (Map.Entry<Integer, GridTuple<Integer>> expVal : expVals.entrySet()) { + Integer val = cache.get(expVal.getKey()); + + if (!F.eq(expVal.getValue().get(), val)) { + failedKeys.add(expVal.getKey()); + + boolean primary = cache.affinity().isPrimary(ignite.cluster().localNode(), expVal.getKey()); + boolean backup = cache.affinity().isBackup(ignite.cluster().localNode(), expVal.getKey()); + + log.error("Unexpected cache data [exp=" + expVal + ", actual=" + val + ", nodePrimary=" + primary + + ", nodeBackup=" + backup + ", nodeId=" + ignite.cluster().localNode().id() + ']'); + } + } + } + + assertTrue("Unexpected data for keys: " + failedKeys, failedKeys.isEmpty()); + } + + /** + * @param min Min possible value. + * @param max Max possible value (exclusive). + * @return Random value. + */ + private static int random(int min, int max) { + if (max == min) + return max; + + return ThreadLocalRandom.current().nextInt(min, max); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java new file mode 100644 index 0000000..7eba9c5 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java @@ -0,0 +1,604 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import javax.cache.configuration.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCacheMemoryMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Abstract class for cache tests. + */ +public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest { + /** Test timeout */ + private static final long TEST_TIMEOUT = 30 * 1000; + + /** Store map. */ + protected static final Map<Object, Object> map = new ConcurrentHashMap8<>(); + + /** VM ip finder for TCP discovery. */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** + * @return Grids count to start. + */ + protected abstract int gridCount(); + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TEST_TIMEOUT; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + int cnt = gridCount(); + + assert cnt >= 1 : "At least one grid must be started"; + + startGridsMultiThreaded(cnt); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + map.clear(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + assert cache().tx() == null; + assert cache().isEmpty() : "Cache is not empty: " + cache().entrySet(); + assert cache().keySet().isEmpty() : "Key set is not empty: " + cache().keySet(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + IgniteTx tx = cache().tx(); + + if (tx != null) { + tx.close(); + + fail("Cache transaction remained after test completion: " + tx); + } + + for (int i = 0; i < gridCount(); i++) { + while (true) { + try { + final int fi = i; + + assertTrue( + "Cache is not empty: " + cache(i).entrySet(), + GridTestUtils.waitForCondition( + // Preloading may happen as nodes leave, so we need to wait. + new GridAbsPredicateX() { + @Override public boolean applyx() throws IgniteCheckedException { + GridCache<String, Integer> cache = cache(fi); + + cache.removeAll(); + + if (offheapTiered(cache)) { + Iterator it = cache.offHeapIterator(); + + while (it.hasNext()) { + it.next(); + + it.remove(); + } + + if (cache.offHeapIterator().hasNext()) + return false; + } + + return cache.isEmpty(); + } + }, + getTestTimeout())); + + int primaryKeySize = cache(i).primarySize(); + int keySize = cache(i).size(); + int size = cache(i).size(); + int globalSize = cache(i).globalSize(); + int globalPrimarySize = cache(i).globalPrimarySize(); + + info("Size after [idx=" + i + + ", size=" + size + + ", keySize=" + keySize + + ", primarySize=" + primaryKeySize + + ", globalSize=" + globalSize + + ", globalPrimarySize=" + globalPrimarySize + + ", keySet=" + cache(i).keySet() + ']'); + + assertEquals("Cache is not empty [idx=" + i + ", entrySet=" + cache(i).entrySet() + ']', + 0, cache(i).size()); + + break; + } + catch (Exception e) { + if (X.hasCause(e, ClusterTopologyException.class)) { + info("Got topology exception while tear down (will retry in 1000ms)."); + + U.sleep(1000); + } + else + throw e; + } + } + + Iterator<Map.Entry<String, Integer>> it = cache(i).swapIterator(); + + while (it.hasNext()) { + Map.Entry<String, Integer> entry = it.next(); + + cache(i).remove(entry.getKey()); + } + } + + assert cache().tx() == null; + assert cache().isEmpty() : "Cache is not empty: " + cache().entrySet(); + assertEquals("Cache is not empty: " + cache().entrySet(), 0, cache().size()); + assert cache().keySet().isEmpty() : "Key set is not empty: " + cache().keySet(); + + resetStore(); + } + + /** + * Cleans up cache store. + */ + protected void resetStore() { + map.clear(); + } + + /** + * Put entry to cache store. + * + * @param key Key. + * @param val Value. + */ + protected void putToStore(Object key, Object val) { + map.put(key, val); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setMaxMissedHeartbeats(Integer.MAX_VALUE); + + disco.setIpFinder(ipFinder); + + if (isDebug()) + disco.setAckTimeout(Integer.MAX_VALUE); + + cfg.setDiscoverySpi(disco); + + cfg.setCacheConfiguration(cacheConfiguration(gridName)); + + cfg.setMarshaller(new IgniteOptimizedMarshaller(false)); + + return cfg; + } + + /** + * @param gridName Grid name. + * @return Cache configuration. + * @throws Exception In case of error. + */ + @SuppressWarnings("unchecked") + protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration cfg = defaultCacheConfiguration(); + + CacheStore<?, ?> store = cacheStore(); + + if (store != null) { + cfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); + cfg.setReadThrough(true); + cfg.setWriteThrough(true); + cfg.setLoadPreviousValue(true); + } + + cfg.setSwapEnabled(swapEnabled()); + cfg.setCacheMode(cacheMode()); + cfg.setAtomicityMode(atomicityMode()); + cfg.setWriteSynchronizationMode(writeSynchronization()); + cfg.setDistributionMode(distributionMode()); + cfg.setPortableEnabled(portableEnabled()); + + if (cacheMode() == PARTITIONED) + cfg.setBackups(1); + + return cfg; + } + + /** + * @return Default cache mode. + */ + protected GridCacheMode cacheMode() { + return CacheConfiguration.DFLT_CACHE_MODE; + } + + /** + * @return Cache atomicity mode. + */ + protected GridCacheAtomicityMode atomicityMode() { + return TRANSACTIONAL; + } + + /** + * @return Partitioned mode. + */ + protected GridCacheDistributionMode distributionMode() { + return NEAR_PARTITIONED; + } + + /** + * @return Write synchronization. + */ + protected GridCacheWriteSynchronizationMode writeSynchronization() { + return FULL_SYNC; + } + + /** + * @return Write through storage emulator. + */ + protected CacheStore<?, ?> cacheStore() { + return new CacheStoreAdapter<Object, Object>() { + @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, + Object... args) { + for (Map.Entry<Object, Object> e : map.entrySet()) + clo.apply(e.getKey(), e.getValue()); + } + + @Override public Object load(Object key) { + return map.get(key); + } + + @Override public void write(Cache.Entry<? extends Object, ? extends Object> e) { + map.put(e.getKey(), e.getValue()); + } + + @Override public void delete(Object key) { + map.remove(key); + } + }; + } + + /** + * @return {@code true} if swap should be enabled. + */ + protected boolean swapEnabled() { + return true; + } + + /** + * @return {@code true} if near cache should be enabled. + */ + protected boolean nearEnabled() { + return distributionMode() == NEAR_ONLY || distributionMode() == NEAR_PARTITIONED; + } + + /** + * @return {@code True} if transactions are enabled. + */ + protected boolean txEnabled() { + return true; + } + + /** + * @return {@code True} if locking is enabled. + */ + protected boolean lockingEnabled() { + return true; + } + + /** + * @return Whether portable mode is enabled. + */ + protected boolean portableEnabled() { + return false; + } + + /** + * @return {@code True} for partitioned caches. + */ + protected final boolean partitionedMode() { + return cacheMode() == PARTITIONED; + } + + /** + * @param idx Index of grid. + * @return Cache instance casted to work with string and integer types for convenience. + */ + @SuppressWarnings({"unchecked"}) + @Override protected GridCache<String, Integer> cache(int idx) { + return grid(idx).cache(null); + } + + /** + * @return Default cache instance casted to work with string and integer types for convenience. + */ + @SuppressWarnings({"unchecked"}) + @Override protected GridCache<String, Integer> cache() { + return cache(0); + } + + /** + * @return Default cache instance. + */ + @SuppressWarnings({"unchecked"}) + @Override protected IgniteCache<String, Integer> jcache() { + return jcache(0); + } + + /** + * @param idx Index of grid. + * @return Default cache. + */ + @SuppressWarnings({"unchecked"}) + protected IgniteCache<String, Integer> jcache(int idx) { + return ignite(idx).jcache(null); + } + + /** + * @param idx Index of grid. + * @return Cache context. + */ + protected GridCacheContext<String, Integer> context(int idx) { + return ((GridKernal)grid(idx)).<String, Integer>internalCache().context(); + } + + /** + * @param key Key. + * @param idx Node index. + * @return {@code True} if key belongs to node with index idx. + */ + protected boolean belongs(String key, int idx) { + return context(idx).cache().affinity().isPrimaryOrBackup(context(idx).localNode(), key); + } + + /** + * @param cache Cache. + * @return {@code True} if cache has OFFHEAP_TIERED memory mode. + */ + protected boolean offheapTiered(GridCache cache) { + return cache.configuration().getMemoryMode() == OFFHEAP_TIERED; + } + + /** + * Executes regular peek or peek from swap. + * + * @param prj Cache projection. + * @param key Key. + * @return Value. + * @throws Exception If failed. + */ + @Nullable protected <K, V> V peek(GridCacheProjection<K, V> prj, K key) throws Exception { + return offheapTiered(prj.cache()) ? prj.peek(key, F.asList(GridCachePeekMode.SWAP)) : prj.peek(key); + } + + /** + * @param cache Cache. + * @param key Key. + * @return {@code True} if cache contains given key. + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + protected boolean containsKey(GridCache cache, Object key) throws Exception { + return offheapTiered(cache) ? containsOffheapKey(cache, key) : cache.containsKey(key); + } + + /** + * @param cache Cache. + * @param val Value. + * @return {@code True} if cache contains given value. + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + protected boolean containsValue(GridCache cache, Object val) throws Exception { + return offheapTiered(cache) ? containsOffheapValue(cache, val) : cache.containsValue(val); + } + + /** + * @param cache Cache. + * @param key Key. + * @return {@code True} if offheap contains given key. + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + protected boolean containsOffheapKey(GridCache cache, Object key) throws Exception { + for (Iterator<Map.Entry> it = cache.offHeapIterator(); it.hasNext();) { + Map.Entry e = it.next(); + + if (key.equals(e.getKey())) + return true; + } + + return false; + } + + /** + * @param cache Cache. + * @param val Value. + * @return {@code True} if offheap contains given value. + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + protected boolean containsOffheapValue(GridCache cache, Object val) throws Exception { + for (Iterator<Map.Entry> it = cache.offHeapIterator(); it.hasNext();) { + Map.Entry e = it.next(); + + if (val.equals(e.getValue())) + return true; + } + + return false; + } + + /** + * Filters cache entry projections leaving only ones with keys containing 'key'. + */ + protected static IgnitePredicate<GridCacheEntry<String, Integer>> entryKeyFilter = + new P1<GridCacheEntry<String, Integer>>() { + @Override public boolean apply(GridCacheEntry<String, Integer> entry) { + return entry.getKey().contains("key"); + } + }; + + /** + * Filters cache entry projections leaving only ones with keys not containing 'key'. + */ + protected static IgnitePredicate<GridCacheEntry<String, Integer>> entryKeyFilterInv = + new P1<GridCacheEntry<String, Integer>>() { + @Override public boolean apply(GridCacheEntry<String, Integer> entry) { + return !entry.getKey().contains("key"); + } + }; + + /** + * Filters cache entry projections leaving only ones with values less than 50. + */ + protected static final IgnitePredicate<GridCacheEntry<String, Integer>> lt50 = + new P1<GridCacheEntry<String, Integer>>() { + @Override public boolean apply(GridCacheEntry<String, Integer> entry) { + Integer i = entry.peek(); + + return i != null && i < 50; + } + }; + + /** + * Filters cache entry projections leaving only ones with values greater or equal than 100. + */ + protected static final IgnitePredicate<GridCacheEntry<String, Integer>> gte100 = + new P1<GridCacheEntry<String, Integer>>() { + @Override public boolean apply(GridCacheEntry<String, Integer> entry) { + Integer i = entry.peek(); + + return i != null && i >= 100; + } + + @Override public String toString() { + return "gte100"; + } + }; + + /** + * Filters cache entry projections leaving only ones with values greater or equal than 200. + */ + protected static final IgnitePredicate<GridCacheEntry<String, Integer>> gte200 = + new P1<GridCacheEntry<String, Integer>>() { + @Override public boolean apply(GridCacheEntry<String, Integer> entry) { + Integer i = entry.peek(); + + return i != null && i >= 200; + } + + @Override public String toString() { + return "gte200"; + } + }; + + /** + * {@link org.apache.ignite.lang.IgniteInClosure} for calculating sum. + */ + @SuppressWarnings({"PublicConstructorInNonPublicClass"}) + protected static final class SumVisitor implements CI1<GridCacheEntry<String, Integer>> { + /** */ + private final AtomicInteger sum; + + /** + * @param sum {@link AtomicInteger} instance for accumulating sum. + */ + public SumVisitor(AtomicInteger sum) { + this.sum = sum; + } + + /** {@inheritDoc} */ + @Override public void apply(GridCacheEntry<String, Integer> entry) { + if (entry.getValue() != null) { + Integer i = entry.getValue(); + + assert i != null : "Value cannot be null for entry: " + entry; + + sum.addAndGet(i); + } + } + } + + /** + * {@link org.apache.ignite.lang.IgniteReducer} for calculating sum. + */ + @SuppressWarnings({"PublicConstructorInNonPublicClass"}) + protected static final class SumReducer implements R1<GridCacheEntry<String, Integer>, Integer> { + /** */ + private int sum; + + /** */ + public SumReducer() { + // no-op + } + + /** {@inheritDoc} */ + @Override public boolean collect(GridCacheEntry<String, Integer> entry) { + if (entry.getValue() != null) { + Integer i = entry.getValue(); + + assert i != null; + + sum += i; + } + + return true; + } + + /** {@inheritDoc} */ + @Override public Integer reduce() { + return sum; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractTxReadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractTxReadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractTxReadTest.java new file mode 100644 index 0000000..0d68a57 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractTxReadTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.transactions.*; + +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Tests value read inside transaction. + */ +public abstract class GridCacheAbstractTxReadTest extends GridCacheAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.getTransactionsConfiguration().setTxSerializableEnabled(true); + + return cfg; + } + + /** {@inheritDoc} */ + @SuppressWarnings("NullableProblems") + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration cfg = super.cacheConfiguration(gridName); + + cfg.setWriteSynchronizationMode(FULL_SYNC); + + cfg.setCacheStoreFactory(null); + + return cfg; + } + + /** + * @throws IgniteCheckedException If failed + */ + public void testTxReadOptimisticReadCommitted() throws IgniteCheckedException { + checkTransactionalRead(IgniteTxConcurrency.OPTIMISTIC, IgniteTxIsolation.READ_COMMITTED); + } + + /** + * @throws IgniteCheckedException If failed + */ + public void testTxReadOptimisticRepeatableRead() throws IgniteCheckedException { + checkTransactionalRead(IgniteTxConcurrency.OPTIMISTIC, IgniteTxIsolation.REPEATABLE_READ); + } + + /** + * @throws IgniteCheckedException If failed + */ + public void testTxReadOptimisticSerializable() throws IgniteCheckedException { + checkTransactionalRead(IgniteTxConcurrency.OPTIMISTIC, IgniteTxIsolation.SERIALIZABLE); + } + + /** + * @throws IgniteCheckedException If failed + */ + public void testTxReadPessimisticReadCommitted() throws IgniteCheckedException { + checkTransactionalRead(IgniteTxConcurrency.PESSIMISTIC, IgniteTxIsolation.READ_COMMITTED); + } + + /** + * @throws IgniteCheckedException If failed + */ + public void testTxReadPessimisticRepeatableRead() throws IgniteCheckedException { + checkTransactionalRead(IgniteTxConcurrency.PESSIMISTIC, IgniteTxIsolation.REPEATABLE_READ); + } + + /** + * @throws IgniteCheckedException If failed + */ + public void testTxReadPessimisticSerializable() throws IgniteCheckedException { + checkTransactionalRead(IgniteTxConcurrency.PESSIMISTIC, IgniteTxIsolation.SERIALIZABLE); + } + + /** + * Tests sequential value write and read inside transaction. + * @param concurrency Transaction concurrency. + * @param isolation Transaction isolation. + * @throws IgniteCheckedException If failed + */ + protected void checkTransactionalRead(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) + throws IgniteCheckedException { + GridCache<String, Integer> cache = cache(0); + + cache.clearAll(); + + IgniteTx tx = cache.txStart(concurrency, isolation); + + try { + cache.put("key", 1); + + assertEquals("Invalid value after put", 1, cache.get("key").intValue()); + + tx.commit(); + } + finally { + tx.close(); + } + + assertEquals("Invalid cache size after put", 1, cache.size()); + + try { + tx = cache.txStart(concurrency, isolation); + + assertEquals("Invalid value inside transactional read", Integer.valueOf(1), cache.get("key")); + + tx.commit(); + } + finally { + tx.close(); + } + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } +}