http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest.java new file mode 100644 index 0000000..ef98ea0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest.java @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.testframework.*; + +import javax.cache.processor.*; +import java.io.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheAtomicWriteOrderMode.*; +import static org.apache.ignite.cache.GridCacheMemoryMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Multithreaded update test with off heap enabled. + */ +public abstract class GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest extends GridCacheAbstractSelfTest { + /** */ + protected static volatile boolean failed; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setAtomicityMode(atomicityMode()); + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setMemoryMode(OFFHEAP_TIERED); + ccfg.setOffHeapMaxMemory(1024 * 1024); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(PRIMARY); + ccfg.setPortableEnabled(portableEnabled()); + + return ccfg; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 3 * 60_000; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + failed = false; + } + + /** + * @throws Exception If failed. + */ + public void testTransform() throws Exception { + testTransform(keyForNode(0)); + + if (gridCount() > 1) + testTransform(keyForNode(1)); + } + + /** + * @param key Key. + * @throws Exception If failed. + */ + private void testTransform(final Integer key) throws Exception { + final IgniteCache<Integer, Integer> cache = grid(0).jcache(null); + + cache.put(key, 0); + + final int THREADS = 5; + final int ITERATIONS_PER_THREAD = 10_000; + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + for (int i = 0; i < ITERATIONS_PER_THREAD && !failed; i++) { + if (i % 500 == 0) + log.info("Iteration " + i); + + cache.invoke(key, new IncProcessor()); + } + + return null; + } + }, THREADS, "transform"); + + for (int i = 0; i < gridCount(); i++) { + Integer val = (Integer)grid(i).cache(null).get(key); + + assertEquals("Unexpected value for grid " + i, (Integer)(ITERATIONS_PER_THREAD * THREADS), val); + } + + assertFalse(failed); + } + + /** + * @throws Exception If failed. + */ + public void testPut() throws Exception { + testPut(keyForNode(0)); + + if (gridCount() > 1) + testPut(keyForNode(1)); + } + + /** + * @param key Key. + * @throws Exception If failed. + */ + private void testPut(final Integer key) throws Exception { + final GridCache<Integer, Integer> cache = grid(0).cache(null); + + cache.put(key, 0); + + final int THREADS = 5; + final int ITERATIONS_PER_THREAD = iterations(); + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + for (int i = 0; i < ITERATIONS_PER_THREAD && !failed; i++) { + if (i % 500 == 0) + log.info("Iteration " + i); + + Integer val = cache.put(key, i); + + assertNotNull(val); + } + + return null; + } + }, THREADS, "put"); + + for (int i = 0; i < gridCount(); i++) { + Integer val = (Integer)grid(i).cache(null).get(key); + + assertNotNull("Unexpected value for grid " + i, val); + } + + assertFalse(failed); + } + + /** + * @throws Exception If failed. + */ + public void testPutxIfAbsent() throws Exception { + testPutxIfAbsent(keyForNode(0)); + + if (gridCount() > 1) + testPutxIfAbsent(keyForNode(1)); + } + + /** + * @param key Key. + * @throws Exception If failed. + */ + private void testPutxIfAbsent(final Integer key) throws Exception { + final GridCache<Integer, Integer> cache = grid(0).cache(null); + + cache.put(key, 0); + + final int THREADS = 5; + final int ITERATIONS_PER_THREAD = iterations(); + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + for (int i = 0; i < ITERATIONS_PER_THREAD && !failed; i++) { + if (i % 500 == 0) + log.info("Iteration " + i); + + assertFalse(cache.putxIfAbsent(key, 100)); + } + + return null; + } + }, THREADS, "putxIfAbsent"); + + for (int i = 0; i < gridCount(); i++) { + Integer val = (Integer)grid(i).cache(null).get(key); + + assertEquals("Unexpected value for grid " + i, (Integer)0, val); + } + + assertFalse(failed); + } + + /** + * @throws Exception If failed. + */ + public void testPutWithFilter() throws Exception { + testPutWithFilter(keyForNode(0)); + + if (gridCount() > 1) + testPutWithFilter(keyForNode(1)); + } + + /** + * @param key Key. + * @throws Exception If failed. + */ + private void testPutWithFilter(final Integer key) throws Exception { + final GridCache<Integer, Integer> cache = grid(0).cache(null); + + cache.put(key, 0); + + final int THREADS = 5; + final int ITERATIONS_PER_THREAD = iterations(); + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + for (int i = 0; i < ITERATIONS_PER_THREAD && !failed; i++) { + if (i % 500 == 0) + log.info("Iteration " + i); + + assertTrue(cache.putx(key, i, new TestFilter())); + } + + return null; + } + }, THREADS, "putWithFilter"); + + for (int i = 0; i < gridCount(); i++) { + Integer val = (Integer)grid(i).cache(null).get(key); + + assertNotNull("Unexpected value for grid " + i, val); + } + + assertFalse(failed); + } + + /** + * @throws Exception If failed. + */ + public void testPutGet() throws Exception { + testPutGet(keyForNode(0)); + + if (gridCount() > 1) + testPutGet(keyForNode(1)); + } + + /** + * @param key Key. + * @throws Exception If failed. + */ + private void testPutGet(final Integer key) throws Exception { + final GridCache<Integer, Integer> cache = grid(0).cache(null); + + cache.put(key, 0); + + final int THREADS = 5; + final int ITERATIONS_PER_THREAD = iterations(); + + IgniteFuture<Long> putFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + for (int i = 0; i < ITERATIONS_PER_THREAD; i++) { + if (i % 1000 == 0) + log.info("Put iteration " + i); + + assertTrue(cache.putx(key, i)); + } + + return null; + } + }, THREADS, "put"); + + final AtomicBoolean stop = new AtomicBoolean(); + + IgniteFuture<Long> getFut; + + try { + getFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + int cnt = 0; + + while (!stop.get()) { + if (++cnt % 5000 == 0) + log.info("Get iteration " + cnt); + + assertNotNull(cache.get(key)); + } + + return null; + } + }, THREADS, "get"); + + putFut.get(); + } + finally { + stop.set(true); + } + + getFut.get(); + + for (int i = 0; i < gridCount(); i++) { + Integer val = (Integer)grid(i).cache(null).get(key); + + assertNotNull("Unexpected value for grid " + i, val); + } + } + + /** + * @param idx Node index. + * @return Primary key for node. + */ + protected Integer keyForNode(int idx) { + Integer key0 = null; + + GridCache<Integer, Integer> cache = grid(0).cache(null); + + for (int i = 0; i < 10_000; i++) { + if (cache.affinity().isPrimary(grid(idx).localNode(), i)) { + key0 = i; + + break; + } + } + + assertNotNull(key0); + + return key0; + } + + /** + * @return Number of iterations. + */ + protected int iterations() { + return 10_000; + } + + /** + * + */ + protected static class IncProcessor implements EntryProcessor<Integer, Integer, Void>, Serializable { + /** {@inheritDoc} */ + @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) { + Integer val = e.getValue(); + + if (val == null) { + failed = true; + + System.out.println(Thread.currentThread() + " got null in processor: " + val); + + return null; + } + + e.setValue(val + 1); + + return null; + } + } + + /** + * + */ + protected static class TestFilter implements IgnitePredicate<GridCacheEntry<Integer, Integer>> { + /** {@inheritDoc} */ + @Override public boolean apply(GridCacheEntry<Integer, Integer> e) { + if (e == null) { + failed = true; + + System.out.println(Thread.currentThread() + " got null entry in filter: " + e); + + return false; + } + else if (e.peek() == null) { + failed = true; + + System.out.println(Thread.currentThread() + " got null value in filter: " + e); + + return false; + } + + return true; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java new file mode 100644 index 0000000..718a406 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapMultiThreadedUpdateSelfTest.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.testframework.*; + +import java.util.concurrent.*; + +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; + +/** + * Multithreaded update test with off heap enabled. + */ +public class GridCacheOffHeapMultiThreadedUpdateSelfTest extends GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest { + /** + * @throws Exception If failed. + */ + public void testTransformTx() throws Exception { + info(">>> PESSIMISTIC node 0"); + + testTransformTx(keyForNode(0), PESSIMISTIC); + + info(">>> OPTIMISTIC node 0"); + testTransformTx(keyForNode(0), OPTIMISTIC); + + if (gridCount() > 1) { + info(">>> PESSIMISTIC node 1"); + testTransformTx(keyForNode(1), PESSIMISTIC); + + info(">>> OPTIMISTIC node 1"); + testTransformTx(keyForNode(1), OPTIMISTIC); + } + } + + /** + * @param key Key. + * @param txConcurrency Transaction concurrency. + * @throws Exception If failed. + */ + private void testTransformTx(final Integer key, final IgniteTxConcurrency txConcurrency) throws Exception { + final IgniteCache<Integer, Integer> cache = grid(0).jcache(null); + + cache.put(key, 0); + + final int THREADS = 5; + final int ITERATIONS_PER_THREAD = iterations(); + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + IgniteTransactions txs = ignite(0).transactions(); + + for (int i = 0; i < ITERATIONS_PER_THREAD && !failed; i++) { + if (i % 500 == 0) + log.info("Iteration " + i); + + try (IgniteTx tx = txs.txStart(txConcurrency, REPEATABLE_READ)) { + cache.invoke(key, new IncProcessor()); + + tx.commit(); + } + } + + return null; + } + }, THREADS, "transform"); + + for (int i = 0; i < gridCount(); i++) { + Integer val = (Integer)grid(i).cache(null).get(key); + + if (txConcurrency == PESSIMISTIC) + assertEquals("Unexpected value for grid " + i, (Integer)(ITERATIONS_PER_THREAD * THREADS), val); + else + assertNotNull("Unexpected value for grid " + i, val); + } + + if (failed) { + for (int g = 0; g < gridCount(); g++) + info("Value for cache [g=" + g + ", val=" + grid(g).cache(null).get(key) + ']'); + + assertFalse(failed); + } + } + + /** + * @throws Exception If failed. + */ + public void testPutTx() throws Exception { + testPutTx(keyForNode(0), PESSIMISTIC); + + // TODO GG-8118. + //testPutTx(keyForNode(0), OPTIMISTIC); + + if (gridCount() > 1) { + testPutTx(keyForNode(1), PESSIMISTIC); + + // TODO GG-8118. + //testPutTx(keyForNode(1), OPTIMISTIC); + } + } + + /** + * @param key Key. + * @param txConcurrency Transaction concurrency. + * @throws Exception If failed. + */ + private void testPutTx(final Integer key, final IgniteTxConcurrency txConcurrency) throws Exception { + final GridCache<Integer, Integer> cache = grid(0).cache(null); + + cache.put(key, 0); + + final int THREADS = 5; + final int ITERATIONS_PER_THREAD = iterations(); + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + for (int i = 0; i < ITERATIONS_PER_THREAD; i++) { + if (i % 500 == 0) + log.info("Iteration " + i); + + try (IgniteTx tx = cache.txStart(txConcurrency, REPEATABLE_READ)) { + Integer val = cache.put(key, i); + + assertNotNull(val); + + tx.commit(); + } + } + + return null; + } + }, THREADS, "put"); + + for (int i = 0; i < gridCount(); i++) { + Integer val = (Integer)grid(i).cache(null).get(key); + + assertNotNull("Unexpected value for grid " + i, val); + } + } + + /** + * @throws Exception If failed. + */ + public void testPutWithFilterTx() throws Exception { + testPutWithFilterTx(keyForNode(0), PESSIMISTIC); + + // TODO GG-8118. + //testPutWithFilterTx(keyForNode(0), OPTIMISTIC); + + if (gridCount() > 1) { + testPutWithFilterTx(keyForNode(1), PESSIMISTIC); + + // TODO GG-8118. + //testPutWithFilterTx(keyForNode(1), OPTIMISTIC); + } + } + + /** + * @param key Key. + * @param txConcurrency Transaction concurrency. + * @throws Exception If failed. + */ + private void testPutWithFilterTx(final Integer key, final IgniteTxConcurrency txConcurrency) throws Exception { + final GridCache<Integer, Integer> cache = grid(0).cache(null); + + cache.put(key, 0); + + final int THREADS = 5; + final int ITERATIONS_PER_THREAD = iterations(); + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + for (int i = 0; i < ITERATIONS_PER_THREAD && !failed; i++) { + if (i % 500 == 0) + log.info("Iteration " + i); + + try (IgniteTx tx = cache.txStart(txConcurrency, REPEATABLE_READ)) { + cache.putx(key, i, new TestFilter()); + + tx.commit(); + } + } + + return null; + } + }, THREADS, "putWithFilter"); + + for (int i = 0; i < gridCount(); i++) { + Integer val = (Integer)grid(i).cache(null).get(key); + + assertNotNull("Unexpected value for grid " + i, val); + } + + assertFalse(failed); + } + + /** + * @throws Exception If failed. + */ + public void testPutxIfAbsentTx() throws Exception { + testPutxIfAbsentTx(keyForNode(0), PESSIMISTIC); + + // TODO GG-8118. + //testPutxIfAbsentTx(keyForNode(0), OPTIMISTIC); + + if (gridCount() > 1) { + testPutxIfAbsentTx(keyForNode(1), PESSIMISTIC); + + // TODO GG-8118. + //testPutxIfAbsentTx(keyForNode(1), OPTIMISTIC); + } + } + + /** + * @param key Key. + * @param txConcurrency Transaction concurrency. + * @throws Exception If failed. + */ + private void testPutxIfAbsentTx(final Integer key, final IgniteTxConcurrency txConcurrency) throws Exception { + final GridCache<Integer, Integer> cache = grid(0).cache(null); + + cache.put(key, 0); + + final int THREADS = 5; + final int ITERATIONS_PER_THREAD = iterations(); + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + for (int i = 0; i < ITERATIONS_PER_THREAD && !failed; i++) { + if (i % 500 == 0) + log.info("Iteration " + i); + + try (IgniteTx tx = cache.txStart(txConcurrency, REPEATABLE_READ)) { + assertFalse(cache.putxIfAbsent(key, 100)); + + tx.commit(); + } + } + + return null; + } + }, THREADS, "putxIfAbsent"); + + for (int i = 0; i < gridCount(); i++) { + Integer val = (Integer)grid(i).cache(null).get(key); + + assertEquals("Unexpected value for grid " + i, (Integer)0, val); + } + + assertFalse(failed); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapSelfTest.java new file mode 100644 index 0000000..690568e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapSelfTest.java @@ -0,0 +1,654 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.spi.swapspace.file.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.configuration.IgniteDeploymentMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCachePeekMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * Test for cache swap. + */ +public class GridCacheOffHeapSelfTest extends GridCommonAbstractTest { + /** Entry count. */ + private static final int ENTRY_CNT = 1000; + + /** Swap count. */ + private final AtomicInteger swapCnt = new AtomicInteger(); + + /** Unswap count. */ + private final AtomicInteger unswapCnt = new AtomicInteger(); + + /** Saved versions. */ + private final Map<Integer, Object> versions = new HashMap<>(); + + /** */ + private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + + cfg.setNetworkTimeout(2000); + + cfg.setSwapSpaceSpi(new FileSwapSpaceSpi()); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setWriteSynchronizationMode(FULL_SYNC); + cacheCfg.setSwapEnabled(false); + cacheCfg.setCacheMode(REPLICATED); + cacheCfg.setOffHeapMaxMemory(1024L * 1024L * 1024L); + + cfg.setCacheConfiguration(cacheCfg); + + cfg.setMarshaller(new IgniteOptimizedMarshaller(false)); + cfg.setDeploymentMode(SHARED); + cfg.setPeerClassLoadingLocalClassPathExclude(GridCacheOffHeapSelfTest.class.getName(), + CacheValue.class.getName()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + versions.clear(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("BusyWait") + public void testOffHeapDeployment() throws Exception { + try { + Ignite ignite1 = startGrid(1); + Ignite ignite2 = startGrid(2); + + GridCache<Integer, Object> cache1 = ignite1.cache(null); + GridCache<Integer, Object> cache2 = ignite2.cache(null); + + Object v1 = new CacheValue(1); + + cache1.put(1, v1); + + info("Stored value in cache1 [v=" + v1 + ", ldr=" + v1.getClass().getClassLoader() + ']'); + + Object v2 = cache2.get(1); + + assert v2 != null; + + info("Read value from cache2 [v=" + v2 + ", ldr=" + v2.getClass().getClassLoader() + ']'); + + assert v2 != null; + assert !v2.getClass().getClassLoader().equals(getClass().getClassLoader()); + assert v2.getClass().getClassLoader().getClass().getName().contains("GridDeploymentClassLoader"); + + SwapListener lsnr = new SwapListener(); + + ignite2.events().localListen(lsnr, EVT_CACHE_OBJECT_TO_OFFHEAP, EVT_CACHE_OBJECT_FROM_OFFHEAP); + + cache2.evictAll(); + + assert lsnr.awaitSwap(); + + assert cache2.get(1) != null; + + assert lsnr.awaitUnswap(); + + ignite2.events().stopLocalListen(lsnr); + + lsnr = new SwapListener(); + + ignite2.events().localListen(lsnr, EVT_CACHE_OBJECT_TO_OFFHEAP, EVT_CACHE_OBJECT_FROM_OFFHEAP); + + cache2.evictAll(); + + assert lsnr.awaitSwap(); + + stopGrid(1); + + boolean success = false; + + for (int i = 0; i < 6; i++) { + success = cache2.get(1) == null; + + if (success) + break; + else if (i < 2) { + info("Sleeping to wait for cache clear."); + + Thread.sleep(500); + } + } + + assert success; + } + finally { + stopAllGrids(); + } + } + + /** + * @param cache Cache. + * @param timeout Timeout. + * @return {@code True} if success. + * @throws InterruptedException If thread was interrupted. + */ + @SuppressWarnings({"BusyWait"}) + private boolean waitCacheEmpty(GridCacheProjection<Integer,Object> cache, long timeout) + throws InterruptedException { + assert cache != null; + assert timeout >= 0; + + long end = System.currentTimeMillis() + timeout; + + while (end - System.currentTimeMillis() >= 0) { + if (cache.isEmpty()) + return true; + + Thread.sleep(500); + } + + return cache.isEmpty(); + } + + /** + * @throws Exception If failed. + */ + public void testOffHeap() throws Exception { + try { + startGrids(1); + + grid(0).events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + assert evt != null; + + switch (evt.type()) { + case EVT_CACHE_OBJECT_TO_OFFHEAP: + swapCnt.incrementAndGet(); + + break; + case EVT_CACHE_OBJECT_FROM_OFFHEAP: + unswapCnt.incrementAndGet(); + + break; + } + + return true; + } + }, EVT_CACHE_OBJECT_TO_OFFHEAP, EVT_CACHE_OBJECT_FROM_OFFHEAP); + + GridCache<Integer, CacheValue> cache = grid(0).cache(null); + + populate(cache); + evictAll(cache); + + // Check iterator. + Iterator<Map.Entry<Integer, CacheValue>> it = cache.offHeapIterator(); + + int cnt = 0; + + while (it.hasNext()) { + Map.Entry<Integer, CacheValue> e = it.next(); + + assertEquals(e.getKey().intValue(), e.getValue().value()); + + cnt++; + } + + assertEquals(ENTRY_CNT, cnt); + + query(cache, 0, 200); // Query swapped entries. + unswap(cache, 200, 400); // Check 'promote' method. + unswapAll(cache, 400, 600); // Check 'promoteAll' method. + get(cache, 600, 800); // Check 'get' method. + peek(cache, 800, ENTRY_CNT); // Check 'peek' method in 'SWAP' mode. + + // Check that all entries were unswapped. + for (int i = 0; i < ENTRY_CNT; i++) { + CacheValue val = cache.peek(i); + + assert val != null; + assert val.value() == i; + } + + // Query unswapped entries. + Collection<Map.Entry<Integer, CacheValue>> res = cache.queries(). + createSqlQuery(CacheValue.class, "val >= ? and val < ?"). + execute(0, ENTRY_CNT). + get(); + + assert res.size() == ENTRY_CNT; + + for (Map.Entry<Integer, CacheValue> entry : res) { + assert entry != null; + assert entry.getKey() != null; + assert entry.getValue() != null; + assert entry.getKey() == entry.getValue().value(); + } + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testOffHeapIterator() throws Exception { + try { + startGrids(1); + + grid(0); + + GridCache<Integer, Integer> cache = grid(0).cache(null); + + for (int i = 0; i < 100; i++) { + info("Putting: " + i); + + cache.put(i, i); + + assert cache.evict(i); + } + + Iterator<Map.Entry<Integer, Integer>> iter = cache.offHeapIterator(); + + assert iter != null; + + int i = 0; + + while (iter.hasNext()) { + Map.Entry<Integer, Integer> e = iter.next(); + + Integer key = e.getKey(); + + info("Key: " + key); + + i++; + + iter.remove(); + + assertNull(cache.get(key)); + } + + assertEquals(100, i); + + assert cache.isEmpty(); + } + finally { + stopAllGrids(); + } + } + + /** + * Populates cache. + * + * @param cache Cache. + * @throws Exception In case of error. + */ + private void populate(GridCacheProjection<Integer, CacheValue> cache) throws Exception { + resetCounters(); + + for (int i = 0; i < ENTRY_CNT; i++) { + cache.put(i, new CacheValue(i)); + + CacheValue val = cache.peek(i); + + assert val != null; + assert val.value() == i; + + GridCacheEntry<Integer, CacheValue> entry = cache.entry(i); + + assert entry != null; + + versions.put(i, entry.version()); + } + + assert swapCnt.get() == 0; + assert unswapCnt.get() == 0; + } + + /** + * Evicts all entries in cache. + * + * @param cache Cache. + * @throws Exception In case of error. + */ + private void evictAll(GridCache<Integer, CacheValue> cache) throws Exception { + resetCounters(); + + assertEquals(ENTRY_CNT, cache.size()); + assertEquals(0, cache.offHeapEntriesCount()); + + for (int i = 0; i < ENTRY_CNT; i++) { + cache.evict(i); + + assertEquals(ENTRY_CNT - i - 1, cache.size()); + assertEquals(i + 1, cache.offHeapEntriesCount()); + } + // cache.evictAll(); + + assertEquals(0, cache.size()); + assertEquals(ENTRY_CNT, cache.offHeapEntriesCount()); + + for (int i = 0; i < ENTRY_CNT; i++) + assertNull(cache.peek(i)); + + assertEquals(ENTRY_CNT, swapCnt.get()); + assertEquals(0, unswapCnt.get()); + } + + /** + * Runs SQL query and checks result. + * + * @param cache Cache. + * @param lowerBound Lower key bound. + * @param upperBound Upper key bound. + * @throws Exception In case of error. + */ + private void query(GridCacheProjection<Integer, CacheValue> cache, + int lowerBound, int upperBound) throws Exception { + resetCounters(); + + Collection<Map.Entry<Integer, CacheValue>> res = cache.queries(). + createSqlQuery(CacheValue.class, "val >= ? and val < ?"). + execute(lowerBound, upperBound). + get(); + + assertEquals(res.size(), upperBound - lowerBound); + + for (Map.Entry<Integer, CacheValue> entry : res) { + assert entry != null; + assert entry.getKey() != null; + assert entry.getValue() != null; + assert entry.getKey() == entry.getValue().value(); + } + + assertEquals(0, swapCnt.get()); + assertEquals(0, unswapCnt.get()); + + checkEntries(cache, lowerBound, upperBound); + + assertEquals(0, swapCnt.get()); + assertEquals(unswapCnt.get(), upperBound - lowerBound); + } + + /** + * Unswaps entries and checks result. + * + * @param cache Cache. + * @param lowerBound Lower key bound. + * @param upperBound Upper key bound. + * @throws Exception In case of error. + */ + private void unswap(GridCacheProjection<Integer, CacheValue> cache, + int lowerBound, int upperBound) throws Exception { + resetCounters(); + + assertEquals(0, swapCnt.get()); + assertEquals(0, unswapCnt.get()); + + for (int i = lowerBound; i < upperBound; i++) { + assert cache.peek(i) == null; + + CacheValue val = cache.promote(i); + + assertNotNull(val); + assertEquals(i, val.value()); + + assertEquals(i - lowerBound + 1, unswapCnt.get()); + } + + assertEquals(0, swapCnt.get()); + assertEquals(unswapCnt.get(), upperBound - lowerBound); + + checkEntries(cache, lowerBound, upperBound); + + assertEquals(0, swapCnt.get()); + assertEquals(unswapCnt.get(), upperBound - lowerBound); + } + + /** + * Unswaps entries and checks result. + * + * @param cache Cache. + * @param lowerBound Lower key bound. + * @param upperBound Upper key bound. + * @throws Exception In case of error. + */ + private void unswapAll(GridCacheProjection<Integer, CacheValue> cache, + int lowerBound, int upperBound) throws Exception { + resetCounters(); + + Collection<Integer> keys = new HashSet<>(); + + for (int i = lowerBound; i < upperBound; i++) { + assert cache.peek(i) == null; + + keys.add(i); + } + + cache.promoteAll(keys); + + assert swapCnt.get() == 0; + assert unswapCnt.get() == upperBound - lowerBound; + + checkEntries(cache, lowerBound, upperBound); + + assert swapCnt.get() == 0; + assert unswapCnt.get() == upperBound - lowerBound; + } + + /** + * Unswaps entries via {@code get} method and checks result. + * + * @param cache Cache. + * @param lowerBound Lower key bound. + * @param upperBound Upper key bound. + * @throws Exception In case of error. + */ + private void get(GridCacheProjection<Integer, CacheValue> cache, + int lowerBound, int upperBound) throws Exception { + resetCounters(); + + for (int i = lowerBound; i < upperBound; i++) { + assert cache.peek(i) == null; + + CacheValue val = cache.get(i); + + assert val != null; + assert val.value() == i; + } + + assert swapCnt.get() == 0; + assert unswapCnt.get() == upperBound - lowerBound; + + checkEntries(cache, lowerBound, upperBound); + + assert swapCnt.get() == 0; + assert unswapCnt.get() == upperBound - lowerBound; + } + + /** + * Peeks entries in {@code SWAP} mode and checks result. + * + * @param cache Cache. + * @param lowerBound Lower key bound. + * @param upperBound Upper key bound. + * @throws Exception In case of error. + */ + private void peek(GridCacheProjection<Integer, CacheValue> cache, + int lowerBound, int upperBound) throws Exception { + resetCounters(); + + for (int i = lowerBound; i < upperBound; i++) { + assert cache.peek(i) == null; + + CacheValue val = cache.peek(i, F.asList(SWAP)); + + assert val != null; + assert val.value() == i; + } + + assert swapCnt.get() == 0; + assert unswapCnt.get() == 0; + + checkEntries(cache, lowerBound, upperBound); + + assert swapCnt.get() == 0; + assert unswapCnt.get() == upperBound - lowerBound; + } + + /** + * Resets event counters. + */ + private void resetCounters() { + swapCnt.set(0); + unswapCnt.set(0); + } + + /** + * Checks that entries in cache are correct after being unswapped. + * If entry is still swapped, it will be unswapped in this method. + * + * @param cache Cache. + * @param lowerBound Lower key bound. + * @param upperBound Upper key bound. + * @throws Exception In case of error. + */ + private void checkEntries(GridCacheProjection<Integer, CacheValue> cache, + int lowerBound, int upperBound) throws Exception { + for (int i = lowerBound; i < upperBound; i++) { + GridCacheEntry<Integer, CacheValue> entry = cache.entry(i); + + assert entry != null; + assert entry.getKey() != null; + + CacheValue val = entry.getValue(); + + assertNotNull("Value null for key: " + i, val); + assertEquals(entry.getKey(), (Integer)val.value()); + assertEquals(entry.version(), versions.get(i)); + } + } + + /** + * + */ + private static class CacheValue { + /** Value. */ + @GridCacheQuerySqlField + private final int val; + + /** + * @param val Value. + */ + private CacheValue(int val) { + this.val = val; + } + + /** + * @return Value. + */ + public int value() { + return val; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheValue.class, this); + } + } + + /** + * + */ + private class SwapListener implements IgnitePredicate<IgniteEvent> { + /** */ + private final CountDownLatch swapLatch = new CountDownLatch(1); + + /** */ + private final CountDownLatch unswapLatch = new CountDownLatch(1); + + /** {@inheritDoc} */ + @Override public boolean apply(IgniteEvent evt) { + assert evt != null; + + info("Received event: " + evt); + + switch (evt.type()) { + case EVT_CACHE_OBJECT_TO_OFFHEAP: + swapLatch.countDown(); + + break; + case EVT_CACHE_OBJECT_FROM_OFFHEAP: + unswapLatch.countDown(); + + break; + } + + return true; + } + + /** + * @return {@code True} if await succeeded. + * @throws InterruptedException If interrupted. + */ + boolean awaitSwap() throws InterruptedException { + return swapLatch.await(5000, MILLISECONDS); + } + + /** + * @return {@code True} if await succeeded. + * @throws InterruptedException If interrupted. + */ + boolean awaitUnswap() throws InterruptedException { + return unswapLatch.await(5000, MILLISECONDS); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTest.java new file mode 100644 index 0000000..754a047 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTest.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.eviction.fifo.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.spi.swapspace.noop.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Test for cache swap. + */ +public class GridCacheOffHeapTest extends GridCommonAbstractTest { + /** */ + private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private GridCacheMode mode; + + /** */ + private int onheap; + + /** Start size. */ + private int startSize = 4 * 1024 * 1024; + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return Long.MAX_VALUE; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + + cfg.setSwapSpaceSpi(new NoopSwapSpaceSpi()); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setWriteSynchronizationMode(FULL_ASYNC); + cacheCfg.setSwapEnabled(false); + cacheCfg.setCacheMode(mode); + cacheCfg.setQueryIndexEnabled(false); + cacheCfg.setDistributionMode(PARTITIONED_ONLY); + cacheCfg.setStartSize(startSize); + + if (onheap > 0) { + cacheCfg.setEvictionPolicy(new GridCacheFifoEvictionPolicy(onheap)); + + cacheCfg.setOffHeapMaxMemory(80 * 1024L * 1024L * 1024L); // 10GB + } + + cfg.setCacheConfiguration(cacheCfg); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void _testOnHeapReplicatedPerformance() throws Exception { + mode = REPLICATED; + onheap = 0; + startSize = 18 * 1024 * 1024; + + performanceTest(); + } + + /** + * @throws Exception If failed. + */ + public void _testOnHeapPartitionedPerformance() throws Exception { + mode = PARTITIONED; + onheap = 0; + startSize = 18 * 1024 * 1024; + + performanceTest(); + } + + /** + * @throws Exception If failed. + */ + public void _testOffHeapReplicatedPerformance() throws Exception { + mode = REPLICATED; + onheap = 1024 * 1024; + startSize = onheap; + + performanceTest(); + } + + /** + * @throws Exception If failed. + */ + public void _testOffHeapPartitionedPerformance() throws Exception { + mode = PARTITIONED; + onheap = 4 * 1024 * 1024; + + performanceTest(); + } + + /** + * @throws Exception If failed. + */ + public void _testOnHeapReplicatedPerformanceMultithreaded() throws Exception { + mode = REPLICATED; + onheap = 0; + startSize = 18 * 1024 * 1024; + + performanceMultithreadedTest(); + } + + /** + * @throws Exception If failed. + */ + public void _testOnHeapPartitionedPerformanceMultithreaded() throws Exception { + mode = PARTITIONED; + onheap = 0; + startSize = 18 * 1024 * 1024; + + performanceMultithreadedTest(); + } + + /** + * @throws Exception If failed. + */ + public void testOffHeapReplicatedPerformanceMultithreaded() throws Exception { + mode = REPLICATED; + onheap = 1024 * 1024; + startSize = onheap; + + performanceMultithreadedTest(); + } + + /** + * @throws Exception If failed. + */ + public void _testOffHeapPartitionedPerformanceMultithreaded() throws Exception { + mode = PARTITIONED; + onheap = 4 * 1024 * 1024; + + performanceMultithreadedTest(); + } + + /** + * @throws Exception If failed. + */ + private void performanceTest() throws Exception { + Ignite g = startGrid(); + + try { + GridCache<Integer, Integer> cache = g.cache(null); + +// int max = 17 * 1024 * 1024; + int max = Integer.MAX_VALUE; + + long start = System.currentTimeMillis(); + + for (int i = 0; i < max; i++) { + cache.put(i, i); + + if (i % 100000 == 0) { + long cur = System.currentTimeMillis(); + + info("Stats [i=" + i + ", time=" + (cur - start) + ", throughput=" + (i * 1000d / (cur - start)) + + "ops/sec, onheapCnt=" + cache.size() + ", offheapCnt=" + cache.offHeapEntriesCount() + "]"); + } + } + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + private void performanceMultithreadedTest() throws Exception { + Ignite g = startGrid(); + + try { + final GridCache<Integer, Integer> c = g.cache(null); + + final long start = System.currentTimeMillis(); + + final AtomicInteger keyGen = new AtomicInteger(); + + final int reserveSize = 1024 * 1024; + + multithreaded(new Callable<Object>() { + @SuppressWarnings("InfiniteLoopStatement") + @Override public Object call() throws Exception { + while (true) { + int val = keyGen.addAndGet(reserveSize); // Reserve keys. + + for (int i = val - reserveSize; i < val; i++) { + c.put(i, i); + + if (i % 500000 == 0) { + long dur = System.currentTimeMillis() - start; + long keySize= c.size() + c.offHeapEntriesCount(); + + info("Stats [size=" + keySize + ", time=" + dur + ", throughput=" + + (keySize * 1000f / dur) + " ops/sec, onheapCnt=" + c.size() + + ", offheapCnt=" + c.offHeapEntriesCount() + "]"); + } + } + } + } + }, Runtime.getRuntime().availableProcessors()); + } + finally { + stopAllGrids(); + } + } + + /** + * Main method. + * + * @param args Parameters. + * @throws Exception If failed. + */ +// public static void main(String[] args) throws Exception { +// new GridCacheOffHeapTest().testOffHeapReplicatedPerformanceMultithreaded(); +// } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTieredAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTieredAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTieredAbstractSelfTest.java new file mode 100644 index 0000000..4fafc9f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTieredAbstractSelfTest.java @@ -0,0 +1,682 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.portables.*; +import org.apache.ignite.transactions.*; +import org.jetbrains.annotations.*; +import org.junit.*; + +import javax.cache.processor.*; +import java.util.*; + +import static org.apache.ignite.cache.GridCacheAtomicWriteOrderMode.*; +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCacheMemoryMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; + +/** + * + */ +public abstract class GridCacheOffHeapTieredAbstractSelfTest extends GridCacheAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 4; + } + + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected GridCacheDistributionMode distributionMode() { + return PARTITIONED_ONLY; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + if (portableEnabled()) { + PortableConfiguration pCfg = new PortableConfiguration(); + + pCfg.setClassNames(Arrays.asList(TestValue.class.getName())); + + cfg.setPortableConfiguration(pCfg); + } + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration ccfg = super.cacheConfiguration(gridName); + + ccfg.setAtomicWriteOrderMode(PRIMARY); + + ccfg.setMemoryMode(OFFHEAP_TIERED); + ccfg.setOffHeapMaxMemory(1024 * 1024); + + return ccfg; + } + + /** + * @throws Exception If failed. + */ + public void testTransform() throws Exception { + GridCache<Integer, Integer> cache = grid(0).cache(null); + + checkTransform(primaryKey(cache)); + + checkTransform(backupKey(cache)); + + checkTransform(nearKey(cache)); + } + + /** + * @param key Key. + * @throws Exception If failed. + */ + private void checkTransform(Integer key) throws Exception { + IgniteCache<Integer, Integer> c = grid(0).jcache(null); + + c.invoke(key, new EntryProcessor<Integer, Integer, Void>() { + @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) { + Integer val = e.getValue(); + + assertNull("Unexpected value: " + val, val); + + return null; + } + }); + + c.put(key, 1); + + c.invoke(key, new EntryProcessor<Integer, Integer, Void>() { + @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) { + Integer val = e.getValue(); + + assertNotNull("Unexpected value: " + val, val); + + assertEquals((Integer) 1, val); + + e.setValue(val + 1); + + return null; + } + }); + + assertEquals((Integer)2, c.get(key)); + + c.invoke(key, new EntryProcessor<Integer, Integer, Void>() { + @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) { + Integer val = e.getValue(); + + assertNotNull("Unexpected value: " + val, val); + + assertEquals((Integer)2, val); + + e.setValue(val); + + return null; + } + }); + + assertEquals((Integer)2, c.get(key)); + + c.invoke(key, new EntryProcessor<Integer, Integer, Void>() { + @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) { + Integer val = e.getValue(); + + assertNotNull("Unexpected value: " + val, val); + + assertEquals((Integer)2, val); + + e.remove(); + + return null; + } + }); + + assertNull(c.get(key)); + } + + /** + * @throws Exception If failed. + */ + public void testPutGetRemove() throws Exception { + GridCache<Integer, Integer> c = grid(0).cache(null); + + checkPutGetRemove(primaryKey(c)); + + checkPutGetRemove(backupKey(c)); + + checkPutGetRemove(nearKey(c)); + } + + /** + * @throws Exception If failed. + */ + public void testPutGetRemoveByteArray() throws Exception { + GridCache<Integer, Integer> c = grid(0).cache(null); + + checkPutGetRemoveByteArray(primaryKey(c)); + + checkPutGetRemoveByteArray(backupKey(c)); + + checkPutGetRemoveByteArray(nearKey(c)); + } + + /** + * @param key Key. + * @throws Exception If failed. + */ + private void checkPutGetRemove(Integer key) throws Exception { + GridCache<Integer, Integer> c = grid(0).cache(null); + + checkValue(key, null); + + assertNull(c.put(key, key)); + + checkValue(key, key); + + assertEquals(key, c.remove(key)); + + checkValue(key, null); + + if (atomicityMode() == TRANSACTIONAL) { + checkPutGetRemoveTx(key, PESSIMISTIC); + + checkPutGetRemoveTx(key, OPTIMISTIC); + } + } + + /** + * @param key Key. + * @throws Exception If failed. + */ + private void checkPutGetRemoveByteArray(Integer key) throws Exception { + GridCache<Integer, byte[]> c = grid(0).cache(null); + + checkValue(key, null); + + byte[] val = new byte[] {key.byteValue()}; + + assertNull(c.put(key, val)); + + checkValue(key, val); + + Assert.assertArrayEquals(val, c.remove(key)); + + checkValue(key, null); + + if (atomicityMode() == TRANSACTIONAL) { + checkPutGetRemoveTxByteArray(key, PESSIMISTIC); + + checkPutGetRemoveTxByteArray(key, OPTIMISTIC); + } + } + + /** + * @param key Key, + * @param txConcurrency Transaction concurrency. + * @throws Exception If failed. + */ + private void checkPutGetRemoveTx(Integer key, IgniteTxConcurrency txConcurrency) throws Exception { + GridCache<Integer, Integer> c = grid(0).cache(null); + + IgniteTx tx = c.txStart(txConcurrency, REPEATABLE_READ); + + assertNull(c.put(key, key)); + + tx.commit(); + + checkValue(key, key); + + tx = c.txStart(txConcurrency, REPEATABLE_READ); + + assertEquals(key, c.remove(key)); + + tx.commit(); + + checkValue(key, null); + } + + /** + * @param key Key, + * @param txConcurrency Transaction concurrency. + * @throws Exception If failed. + */ + private void checkPutGetRemoveTxByteArray(Integer key, IgniteTxConcurrency txConcurrency) throws Exception { + GridCache<Integer, byte[]> c = grid(0).cache(null); + + IgniteTx tx = c.txStart(txConcurrency, REPEATABLE_READ); + + byte[] val = new byte[] {key.byteValue()}; + + assertNull(c.put(key, val)); + + tx.commit(); + + checkValue(key, val); + + tx = c.txStart(txConcurrency, REPEATABLE_READ); + + Assert.assertArrayEquals(val, c.remove(key)); + + tx.commit(); + + checkValue(key, null); + } + + /** + * @throws Exception If failed. + */ + public void testPromote() throws Exception { + GridCache<Integer, TestValue> c = grid(0).cache(null); + + TestValue val = new TestValue(new byte[100 * 1024]); + + List<Integer> keys = primaryKeys(c, 200); + + for (Integer key : keys) + c.putx(key, val); + + for (int i = 0; i < 50; i++) { + TestValue val0 = c.promote(keys.get(i)); + + Assert.assertArrayEquals(val.val, val0.val); + } + + List<Integer> keys0 = keys.subList(50, 100); + + c.promoteAll(keys0); + + for (Integer key : keys0) { + TestValue val0 = c.get(key); + + Assert.assertArrayEquals(val.val, val0.val); + } + } + + /** + * @throws Exception If failed. + */ + public void testPutAllGetAllRemoveAll() throws Exception { + Map<Integer, Integer> map = new HashMap<>(); + + for (int i = 0; i < 100; i++) + map.put(i, i); + + IgniteCache<Integer, Integer> c = grid(0).jcache(null); + + Map<Integer, Integer> map0 = c.getAll(map.keySet()); + + assertTrue(map0.isEmpty()); + + c.putAll(map); + + map0 = c.getAll(map.keySet()); + + assertEquals(map, map0); + + for (Map.Entry<Integer, Integer> e : map.entrySet()) + checkValue(e.getKey(), e.getValue()); + + c.invokeAll(map.keySet(), new EntryProcessor<Integer, Integer, Void>() { + @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) { + Integer val = e.getValue(); + + e.setValue(val + 1); + + return null; + } + }); + + map0 = c.getAll(map.keySet()); + + for (Map.Entry<Integer, Integer> e : map0.entrySet()) + assertEquals((Integer)(e.getKey() + 1), e.getValue()); + + for (Map.Entry<Integer, Integer> e : map.entrySet()) + checkValue(e.getKey(), e.getValue() + 1); + + c.removeAll(map.keySet()); + + map0 = c.getAll(map.keySet()); + + assertTrue(map0.isEmpty()); + + for (Map.Entry<Integer, Integer> e : map.entrySet()) + checkValue(e.getKey(), null); + + if (atomicityMode() == TRANSACTIONAL) { + checkPutAllGetAllRemoveAllTx(PESSIMISTIC); + + checkPutAllGetAllRemoveAllTx(OPTIMISTIC); + } + } + + /** + * @param txConcurrency Transaction concurrency. + * @throws Exception If failed. + */ + private void checkPutAllGetAllRemoveAllTx(IgniteTxConcurrency txConcurrency) throws Exception { + Map<Integer, Integer> map = new HashMap<>(); + + for (int i = 0; i < 100; i++) + map.put(i, i); + + GridCache<Integer, Integer> c = grid(0).cache(null); + + Map<Integer, Integer> map0 = c.getAll(map.keySet()); + + assertTrue(map0.isEmpty()); + + try (IgniteTx tx = c.txStart(txConcurrency, REPEATABLE_READ)) { + c.putAll(map); + + tx.commit(); + } + + map0 = c.getAll(map.keySet()); + + assertEquals(map, map0); + + for (Map.Entry<Integer, Integer> e : map.entrySet()) + checkValue(e.getKey(), e.getValue()); + + try (IgniteTx tx = c.txStart(txConcurrency, REPEATABLE_READ)) { + c.removeAll(map.keySet()); + + tx.commit(); + } + + map0 = c.getAll(map.keySet()); + + assertTrue(map0.isEmpty()); + + for (Map.Entry<Integer, Integer> e : map.entrySet()) + checkValue(e.getKey(), null); + } + + /** + * @throws Exception If failed. + */ + public void testPutGetRemoveObject() throws Exception { + GridCache<Integer, Integer> c = grid(0).cache(null); + + checkPutGetRemoveObject(primaryKey(c)); + + checkPutGetRemoveObject(backupKey(c)); + + checkPutGetRemoveObject(nearKey(c)); + } + + /** + * @param key Key. + * @throws Exception If failed. + */ + private void checkPutGetRemoveObject(Integer key) throws Exception { + GridCache<Integer, TestValue> c = grid(0).cache(null); + + checkValue(key, null); + + TestValue val = new TestValue(new byte[10]); + + assertNull(c.put(key, val)); + + checkValue(key, val); + + TestValue val2 = new TestValue(new byte[10]); + + if (portableEnabled()) // TODO: 9271, check return value when fixed. + c.put(key, val); + else + assertEquals(val, c.put(key, val)); + + checkValue(key, val2); + + if (portableEnabled()) // TODO: 9271, check return value when fixed. + c.remove(key); + else + assertEquals(val2, c.remove(key)); + + checkValue(key, null); + + if (atomicityMode() == TRANSACTIONAL) { + checkPutGetRemoveTx(key, PESSIMISTIC); + + checkPutGetRemoveTx(key, OPTIMISTIC); + } + } + + /** + * @param key Key, + * @param txConcurrency Transaction concurrency. + * @throws Exception If failed. + */ + private void checkPutGetRemoveObjectTx(Integer key, IgniteTxConcurrency txConcurrency) throws Exception { + GridCache<Integer, TestValue> c = grid(0).cache(null); + + TestValue val = new TestValue(new byte[10]); + + IgniteTx tx = c.txStart(txConcurrency, REPEATABLE_READ); + + assertNull(c.put(key, val)); + + tx.commit(); + + checkValue(key, val); + + tx = c.txStart(txConcurrency, REPEATABLE_READ); + + assertEquals(val, c.remove(key)); + + tx.commit(); + + checkValue(key, null); + } + + /** + * @throws Exception If failed. + */ + public void testLockUnlock() throws Exception { + if (atomicityMode() == ATOMIC) + return; + + GridCache<Integer, TestValue> c = grid(0).cache(null); + + checkLockUnlock(primaryKey(c)); + + checkLockUnlock(backupKey(c)); + + checkLockUnlock(nearKey(c)); + } + + /** + * @param key Key. + * @throws Exception If failed. + */ + @SuppressWarnings("UnnecessaryLocalVariable") + private void checkLockUnlock(Integer key) throws Exception { + IgniteCache<Integer, Integer> c = grid(0).jcache(null); + + Integer val = key; + + c.put(key, val); + + assertNull(c.localPeek(key)); + + c.lock(key).lock(); + + assertTrue(c.isLocked(key)); + + c.lock(key).unlock(); + + assertFalse(c.isLocked(key)); + + assertNull(c.localPeek(key)); + + checkValue(key, val); + } + + /** + * @param key Key. + * @param val Value. + * @throws Exception If failed. + */ + private void checkValue(Object key, @Nullable Object val) throws Exception { + for (int i = 0; i < gridCount(); i++) { + if (val != null && val.getClass() == byte[].class) { + Assert.assertArrayEquals("Unexpected value for grid: " + i, + (byte[])val, + (byte[])grid(i).cache(null).get(key)); + } + else + assertEquals("Unexpected value for grid: " + i, val, grid(i).cache(null).get(key)); + } + } + + /** + * @throws Exception If failed. + */ + public void testUnswap() throws Exception { + GridCache<Integer, Integer> c = grid(0).cache(null); + + checkUnswap(primaryKey(c)); + + checkUnswap(backupKey(c)); + + checkUnswap(nearKey(c)); + } + + /** + * @param key Key. + * @throws Exception If failed. + */ + private void checkUnswap(Integer key) throws Exception { + GridCache<Integer, Integer> c = grid(0).cache(null); + + for (int i = 0; i < gridCount(); i++) { + assertEquals("Unexpected entries for grid: " + i, 0, grid(i).cache(null).offHeapEntriesCount()); + + assertEquals("Unexpected offheap size for grid: " + i, 0, grid(i).cache(null).offHeapAllocatedSize()); + } + + assertNull(c.peek(key)); + + c.put(key, key); + + assertNull(c.peek(key)); + + assertEquals(key, c.get(key)); + + assertNull(c.peek(key)); + + assertTrue(c.removex(key)); + + assertNull(c.peek(key)); + + for (int i = 0; i < gridCount(); i++) { + assertEquals("Unexpected entries for grid: " + i, 0, grid(i).cache(null).offHeapEntriesCount()); + + assertEquals("Unexpected offheap size for grid: " + i, 0, grid(i).cache(null).offHeapAllocatedSize()); + } + } + + /** + * + */ + @SuppressWarnings("PublicInnerClass") + public static class TestEntryPredicate implements IgnitePredicate<GridCacheEntry<Integer, Integer>> { + /** */ + private Integer expVal; + + /** + * @param expVal Expected value. + */ + TestEntryPredicate(Integer expVal) { + this.expVal = expVal; + } + + /** {@inheritDoc} */ + @Override public boolean apply(GridCacheEntry<Integer, Integer> e) { + assertEquals(expVal, e.peek()); + + return true; + } + } + + /** + * + */ + @SuppressWarnings("PublicInnerClass") + public static class TestValue { + /** */ + @SuppressWarnings("PublicField") + public byte[] val; + + /** + * Default constructor. + */ + public TestValue() { + // No-op. + } + + /** + * @param val Value. + */ + public TestValue(byte[] val) { + this.val = val; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestValue other = (TestValue)o; + + return Arrays.equals(val, other.val); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return Arrays.hashCode(val); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "TestValue{" + + "val=" + Arrays.toString(val) + + '}'; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTieredAtomicSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTieredAtomicSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTieredAtomicSelfTest.java new file mode 100644 index 0000000..9e21d3b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTieredAtomicSelfTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; + +/** + * + */ +public class GridCacheOffHeapTieredAtomicSelfTest extends GridCacheOffHeapTieredAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected GridCacheAtomicityMode atomicityMode() { + return ATOMIC; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTieredEvictionAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTieredEvictionAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTieredEvictionAbstractSelfTest.java new file mode 100644 index 0000000..5348eb6 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTieredEvictionAbstractSelfTest.java @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.portables.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.*; + +import javax.cache.processor.*; +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.GridCacheAtomicWriteOrderMode.*; +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMemoryMode.*; + +/** + * Tests that offheap entry is not evicted while cache entry is in use. + */ +public abstract class GridCacheOffHeapTieredEvictionAbstractSelfTest extends GridCacheAbstractSelfTest { + /** */ + private static final int VALS = 100; + + /** */ + private static final int VAL_SIZE = 128; + + /** */ + private static final int KEYS = 100; + + /** */ + private List<TestValue> vals = new ArrayList<>(VALS); + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 60 * 1000; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + if (portableEnabled()) { + PortableConfiguration pCfg = new PortableConfiguration(); + + pCfg.setClassNames(Arrays.asList(TestValue.class.getName())); + + cfg.setPortableConfiguration(pCfg); + } + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { + CacheConfiguration ccfg = super.cacheConfiguration(gridName); + + ccfg.setAtomicWriteOrderMode(PRIMARY); + + ccfg.setMemoryMode(OFFHEAP_TIERED); + ccfg.setOffHeapMaxMemory(0); + + return ccfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + final GridCache<Integer, Object> cache = grid(0).cache(null); + + vals = new ArrayList<>(VALS); + + for (int i = 0; i < VALS; i++) { + SB sb = new SB(VAL_SIZE); + + char c = Character.forDigit(i, 10); + + for (int j = 0; j < VAL_SIZE; j++) + sb.a(c); + + vals.add(new TestValue(sb.toString())); + } + + for (int i = 0; i < KEYS; i++) + cache.put(i, vals.get(i % vals.size())); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + vals = null; + } + + /** + * @return Number of iterations per thread. + */ + private int iterations() { + return atomicityMode() == ATOMIC ? 100_000 : 50_000; + } + + /** + * @throws Exception If failed. + */ + public void testPut() throws Exception { + final GridCache<Integer, Object> cache = grid(0).cache(null); + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < iterations(); i++) { + int key = rnd.nextInt(KEYS); + + final TestValue val = vals.get(key % VAL_SIZE); + + TestPredicate p = testPredicate(val.val, false); + + cache.putx(key, val, p); + } + + return null; + } + }, 16, "test"); + } + + /** + * @throws Exception If failed. + */ + public void testRemove() throws Exception { + final GridCache<Integer, Object> cache = grid(0).cache(null); + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < iterations(); i++) { + int key = rnd.nextInt(KEYS); + + final TestValue val = vals.get(key % VAL_SIZE); + + TestPredicate p = testPredicate(val.val, true); + + if (rnd.nextBoolean()) + cache.removex(key, p); + else + cache.putx(key, val, p); + } + + return null; + } + }, 16, "test"); + } + + /** + * @throws Exception If failed. + */ + public void testTransform() throws Exception { + final IgniteCache<Integer, Object> cache = grid(0).jcache(null); + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < iterations(); i++) { + int key = rnd.nextInt(KEYS); + + final TestValue val = vals.get(key % VAL_SIZE); + + TestProcessor c = testClosure(val.val, false); + + cache.invoke(key, c); + } + + return null; + } + }, 16, "test"); + } + + /** + * @param expVal Expected cache value. + * @param acceptNull If {@code true} value can be null; + * @return Predicate. + */ + private TestPredicate testPredicate(String expVal, boolean acceptNull) { + return portableEnabled() ? + new PortableValuePredicate(expVal, acceptNull) : + new TestValuePredicate(expVal, acceptNull); + } + + /** + * @param expVal Expected cache value. + * @param acceptNull If {@code true} value can be null; + * @return Predicate. + */ + private TestProcessor testClosure(String expVal, boolean acceptNull) { + return portableEnabled() ? + new PortableValueClosure(expVal, acceptNull) : + new TestValueClosure(expVal, acceptNull); + } + + /** + * + */ + @SuppressWarnings("PublicInnerClass") + public static class TestValue { + /** */ + @SuppressWarnings("PublicField") + public String val; + + /** + * + */ + public TestValue() { + // No-op. + } + + /** + * @param val Value. + */ + public TestValue(String val) { + this.val = val; + } + } + + /** + * + */ + protected abstract static class TestPredicate implements P1<GridCacheEntry<Integer, Object>> { + /** */ + protected String expVal; + + /** */ + protected boolean acceptNull; + + /** + * @param expVal Expected value. + * @param acceptNull If {@code true} value can be null; + */ + protected TestPredicate(String expVal, boolean acceptNull) { + this.expVal = expVal; + this.acceptNull = acceptNull; + } + + /** {@inheritDoc} */ + @Override public final boolean apply(GridCacheEntry<Integer, Object> e) { + assertNotNull(e); + + Object val = e.peek(); + + if (val == null) { + if (!acceptNull) + assertNotNull(val); + + return true; + } + + checkValue(val); + + return true; + } + + /** + * @param val Value. + */ + abstract void checkValue(Object val); + } + + /** + * + */ + @SuppressWarnings("PackageVisibleInnerClass") + static class PortableValuePredicate extends TestPredicate { + /** + * @param expVal Expected value. + * @param acceptNull If {@code true} value can be null; + */ + PortableValuePredicate(String expVal, boolean acceptNull) { + super(expVal, acceptNull); + } + + /** {@inheritDoc} */ + @Override void checkValue(Object val) { + PortableObject obj = (PortableObject)val; + + assertEquals(expVal, obj.field("val")); + } + } + + /** + * + */ + @SuppressWarnings("PackageVisibleInnerClass") + static class TestValuePredicate extends TestPredicate { + /** + * @param expVal Expected value. + * @param acceptNull If {@code true} value can be null; + */ + TestValuePredicate(String expVal, boolean acceptNull) { + super(expVal, acceptNull); + } + + /** {@inheritDoc} */ + @Override void checkValue(Object val) { + TestValue obj = (TestValue)val; + + assertEquals(expVal, obj.val); + } + } + + /** + * + */ + protected abstract static class TestProcessor implements EntryProcessor<Integer, Object, Void>, Serializable { + /** */ + protected String expVal; + + /** */ + protected boolean acceptNull; + + /** + * @param expVal Expected value. + * @param acceptNull If {@code true} value can be null; + */ + protected TestProcessor(String expVal, boolean acceptNull) { + this.expVal = expVal; + this.acceptNull = acceptNull; + } + + /** {@inheritDoc} */ + @Override public Void process(MutableEntry<Integer, Object> e, Object... args) { + Object val = e.getValue(); + + if (val == null) { + if (!acceptNull) + assertNotNull(val); + + e.setValue(true); + + return null; + } + + checkValue(val); + + e.setValue(val); + + return null; + } + + /** + * @param val Value. + */ + abstract void checkValue(Object val); + } + + /** + * + */ + @SuppressWarnings("PackageVisibleInnerClass") + static class PortableValueClosure extends TestProcessor { + /** + * @param expVal Expected value. + * @param acceptNull If {@code true} value can be null; + */ + PortableValueClosure(String expVal, boolean acceptNull) { + super(expVal, acceptNull); + } + + /** {@inheritDoc} */ + @Override void checkValue(Object val) { + PortableObject obj = (PortableObject)val; + + assertEquals(expVal, obj.field("val")); + } + } + + /** + * + */ + @SuppressWarnings("PackageVisibleInnerClass") + static class TestValueClosure extends TestProcessor { + /** + * @param expVal Expected value. + * @param acceptNull If {@code true} value can be null; + */ + TestValueClosure(String expVal, boolean acceptNull) { + super(expVal, acceptNull); + } + + /** {@inheritDoc} */ + @Override void checkValue(Object val) { + TestValue obj = (TestValue)val; + + assertEquals(expVal, obj.val); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTieredEvictionAtomicSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTieredEvictionAtomicSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTieredEvictionAtomicSelfTest.java new file mode 100644 index 0000000..996372d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTieredEvictionAtomicSelfTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; + +/** + * + */ +public class GridCacheOffHeapTieredEvictionAtomicSelfTest extends GridCacheOffHeapTieredEvictionAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected GridCacheAtomicityMode atomicityMode() { + return ATOMIC; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTieredEvictionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTieredEvictionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTieredEvictionSelfTest.java new file mode 100644 index 0000000..95a2c5c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTieredEvictionSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; + +/** + * Test with TRANSACTIONAL cache. + */ +public class GridCacheOffHeapTieredEvictionSelfTest extends GridCacheOffHeapTieredEvictionAbstractSelfTest { + /** {@inheritDoc} */ + @SuppressWarnings("RedundantMethodOverride") + @Override protected GridCacheAtomicityMode atomicityMode() { + return TRANSACTIONAL; + } +}