#ignite-565: GridCacheOffHeapSelfTest should be in indexing module + Change localPeek mode.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a4567fa4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a4567fa4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a4567fa4 Branch: refs/heads/sprint-3 Commit: a4567fa4f04e764cc2c5c2353aed03d300b11ade Parents: eb0474e Author: ivasilinets <ivasilin...@gridgain.com> Authored: Wed Mar 25 15:31:29 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Wed Mar 25 15:31:29 2015 +0300 ---------------------------------------------------------------------- .../cache/GridCacheOffHeapSelfTest.java | 627 ------------------- .../cache/GridCacheOffHeapSelfTest.java | 624 ++++++++++++++++++ 2 files changed, 624 insertions(+), 627 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a4567fa4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapSelfTest.java deleted file mode 100644 index 2e2b01e..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapSelfTest.java +++ /dev/null @@ -1,627 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.query.*; -import org.apache.ignite.cache.query.annotations.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.marshaller.optimized.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.spi.swapspace.file.*; -import org.apache.ignite.testframework.junits.common.*; - -import javax.cache.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static java.util.concurrent.TimeUnit.*; -import static org.apache.ignite.cache.CacheMode.*; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; -import static org.apache.ignite.configuration.DeploymentMode.*; -import static org.apache.ignite.events.EventType.*; - -/** - * Test for cache swap. - */ -public class GridCacheOffHeapSelfTest extends GridCommonAbstractTest { - /** Entry count. */ - private static final int ENTRY_CNT = 1000; - - /** Swap count. */ - private final AtomicInteger swapCnt = new AtomicInteger(); - - /** Unswap count. */ - private final AtomicInteger unswapCnt = new AtomicInteger(); - - /** Saved versions. */ - private final Map<Integer, Object> versions = new HashMap<>(); - - /** */ - private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** PeerClassLoadingLocalClassPathExclude enable. */ - private boolean excluded; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TcpDiscoverySpi disco = new TcpDiscoverySpi(); - - disco.setIpFinder(ipFinder); - - cfg.setDiscoverySpi(disco); - - cfg.setNetworkTimeout(2000); - - cfg.setSwapSpaceSpi(new FileSwapSpaceSpi()); - - CacheConfiguration<?,?> cacheCfg = defaultCacheConfiguration(); - - cacheCfg.setWriteSynchronizationMode(FULL_SYNC); - cacheCfg.setSwapEnabled(false); - cacheCfg.setCacheMode(REPLICATED); - cacheCfg.setOffHeapMaxMemory(1024L * 1024L * 1024L); - cacheCfg.setIndexedTypes( - Integer.class, CacheValue.class - ); - - cfg.setCacheConfiguration(cacheCfg); - - cfg.setMarshaller(new OptimizedMarshaller(false)); - cfg.setDeploymentMode(SHARED); - - if (excluded) - cfg.setPeerClassLoadingLocalClassPathExclude(GridCacheOffHeapSelfTest.class.getName(), - CacheValue.class.getName()); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - versions.clear(); - - super.afterTestsStopped(); - } - - /** - * @throws Exception If failed. - */ - @SuppressWarnings("BusyWait") - public void testOffHeapDeployment() throws Exception { - try { - Ignite ignite1 = startGrid(1); - - excluded = true; - - Ignite ignite2 = startGrid(2); - - IgniteCache<Integer, Object> cache1 = ignite1.cache(null); - IgniteCache<Integer, Object> cache2 = ignite2.cache(null); - - Object v1 = new CacheValue(1); - - cache1.put(1, v1); - - info("Stored value in cache1 [v=" + v1 + ", ldr=" + v1.getClass().getClassLoader() + ']'); - - Object v2 = cache2.get(1); - - assert v2 != null; - - info("Read value from cache2 [v=" + v2 + ", ldr=" + v2.getClass().getClassLoader() + ']'); - - assert v2 != null; - assert !v2.getClass().getClassLoader().equals(getClass().getClassLoader()); - assert v2.getClass().getClassLoader().getClass().getName().contains("GridDeploymentClassLoader"); - - SwapListener lsnr = new SwapListener(); - - ignite2.events().localListen(lsnr, EVT_CACHE_OBJECT_TO_OFFHEAP, EVT_CACHE_OBJECT_FROM_OFFHEAP); - - cache2.localEvict(keySet(cache2)); - - assert lsnr.awaitSwap(); - - assert cache2.get(1) != null; - - assert lsnr.awaitUnswap(); - - ignite2.events().stopLocalListen(lsnr); - - lsnr = new SwapListener(); - - ignite2.events().localListen(lsnr, EVT_CACHE_OBJECT_TO_OFFHEAP, EVT_CACHE_OBJECT_FROM_OFFHEAP); - - cache2.localEvict(keySet(cache2)); - - assert lsnr.awaitSwap(); - - stopGrid(1); - - boolean success = false; - - for (int i = 0; i < 6; i++) { - success = cache2.get(1) == null; - - if (success) - break; - else if (i < 2) { - info("Sleeping to wait for cache clear."); - - Thread.sleep(500); - } - } - - assert success; - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testOffHeap() throws Exception { - try { - startGrids(1); - - grid(0).events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - assert evt != null; - - switch (evt.type()) { - case EVT_CACHE_OBJECT_TO_OFFHEAP: - swapCnt.incrementAndGet(); - - break; - case EVT_CACHE_OBJECT_FROM_OFFHEAP: - unswapCnt.incrementAndGet(); - - break; - } - - return true; - } - }, EVT_CACHE_OBJECT_TO_OFFHEAP, EVT_CACHE_OBJECT_FROM_OFFHEAP); - - IgniteCache<Integer, CacheValue> cache = grid(0).cache(null); - - populate(cache); - evictAll(cache); - - int cnt = 0; - - for (Cache.Entry<Integer, CacheValue> e : cache.localEntries(CachePeekMode.OFFHEAP)) { - assertEquals(e.getKey().intValue(), e.getValue().value()); - - cnt++; - } - - assertEquals(ENTRY_CNT, cnt); - - query(cache, 0, 200); // Query swapped entries. - unswap(cache, 200, 400); // Check 'promote' method. - unswapAll(cache, 400, 600); // Check 'promoteAll' method. - get(cache, 600, 800); // Check 'get' method. - peek(cache, 800, ENTRY_CNT); // Check 'peek' method in 'SWAP' mode. - - // Check that all entries were unswapped. - for (int i = 0; i < ENTRY_CNT; i++) { - CacheValue val = cache.localPeek(i); - - assert val != null; - assert val.value() == i; - } - - // Query unswapped entries. - Collection<Cache.Entry<Integer, CacheValue>> res = cache.query( - new SqlQuery<Integer, CacheValue>(CacheValue.class, "val >= ? and val < ?"). - setArgs(0, ENTRY_CNT)). - getAll(); - - assert res.size() == ENTRY_CNT; - - for (Cache.Entry<Integer, CacheValue> entry : res) { - assert entry != null; - assert entry.getKey() != null; - assert entry.getValue() != null; - assert entry.getKey() == entry.getValue().value(); - } - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testOffHeapIterator() throws Exception { - try { - startGrids(1); - - grid(0); - - IgniteCache<Integer, Integer> cache = grid(0).cache(null); - - for (int i = 0; i < 100; i++) { - info("Putting: " + i); - - cache.put(i, i); - - cache.localEvict(Collections.singleton(i)); - } - - int i = 0; - - for (Cache.Entry<Integer, Integer> e : cache.localEntries(CachePeekMode.OFFHEAP)) { - Integer key = e.getKey(); - - info("Key: " + key); - - i++; - - cache.remove(e.getKey()); - - assertNull(cache.get(key)); - } - - assertEquals(100, i); - - assert cache.localSize() == 0; - } - finally { - stopAllGrids(); - } - } - - /** - * Populates cache. - * - * @param cache Cache. - * @throws Exception In case of error. - */ - private void populate(IgniteCache<Integer, CacheValue> cache) throws Exception { - resetCounters(); - - for (int i = 0; i < ENTRY_CNT; i++) { - cache.put(i, new CacheValue(i)); - - CacheValue val = cache.localPeek(i); - - assert val != null; - assert val.value() == i; - - GridCacheEntryEx entry = dht(cache).peekEx(i); - - assert entry != null; - - versions.put(i, entry.version()); - } - - assert swapCnt.get() == 0; - assert unswapCnt.get() == 0; - } - - /** - * Evicts all entries in cache. - * - * @param cache Cache. - * @throws Exception In case of error. - */ - private void evictAll(IgniteCache<Integer, CacheValue> cache) throws Exception { - resetCounters(); - - assertEquals(ENTRY_CNT, cache.size()); - assertEquals(0, cache.localSize(CachePeekMode.OFFHEAP)); - - for (int i = 0; i < ENTRY_CNT; i++) { - cache.localEvict(Collections.singleton(i)); - - assertEquals(ENTRY_CNT - i - 1, cache.size()); - assertEquals(i + 1, cache.localSize(CachePeekMode.OFFHEAP)); - } - // cache.evictAll(); - - assertEquals(0, cache.size()); - assertEquals(ENTRY_CNT, cache.localSize(CachePeekMode.OFFHEAP)); - - for (int i = 0; i < ENTRY_CNT; i++) - assertNull(cache.localPeek(i)); - - assertEquals(ENTRY_CNT, swapCnt.get()); - assertEquals(0, unswapCnt.get()); - } - - /** - * Runs SQL query and checks result. - * - * @param cache Cache. - * @param lowerBound Lower key bound. - * @param upperBound Upper key bound. - * @throws Exception In case of error. - */ - private void query(IgniteCache<Integer, CacheValue> cache, int lowerBound, int upperBound) throws Exception { - resetCounters(); - - Collection<Cache.Entry<Integer, CacheValue>> res = cache.query(new SqlQuery<Integer, CacheValue>(CacheValue.class, "val >= ? and val < ?"). - setArgs(lowerBound, upperBound)). - getAll(); - - assertEquals(res.size(), upperBound - lowerBound); - - for (Cache.Entry<Integer, CacheValue> entry : res) { - assert entry != null; - assert entry.getKey() != null; - assert entry.getValue() != null; - assert entry.getKey() == entry.getValue().value(); - } - - assertEquals(0, swapCnt.get()); - assertEquals(0, unswapCnt.get()); - - checkEntries(cache, lowerBound, upperBound); - - assertEquals(0, swapCnt.get()); - assertEquals(unswapCnt.get(), upperBound - lowerBound); - } - - /** - * Unswaps entries and checks result. - * - * @param cache Cache. - * @param lowerBound Lower key bound. - * @param upperBound Upper key bound. - * @throws Exception In case of error. - */ - private void unswap(IgniteCache<Integer, CacheValue> cache, int lowerBound, int upperBound) throws Exception { - resetCounters(); - - assertEquals(0, swapCnt.get()); - assertEquals(0, unswapCnt.get()); - - for (int i = lowerBound; i < upperBound; i++) { - assert cache.localPeek(i) == null; - - cache.localPromote(Collections.singleton(i)); - CacheValue val = cache.localPeek(i); - - assertNotNull(val); - assertEquals(i, val.value()); - - assertEquals(i - lowerBound + 1, unswapCnt.get()); - } - - assertEquals(0, swapCnt.get()); - assertEquals(unswapCnt.get(), upperBound - lowerBound); - - checkEntries(cache, lowerBound, upperBound); - - assertEquals(0, swapCnt.get()); - assertEquals(unswapCnt.get(), upperBound - lowerBound); - } - - /** - * Unswaps entries and checks result. - * - * @param cache Cache. - * @param lowerBound Lower key bound. - * @param upperBound Upper key bound. - * @throws Exception In case of error. - */ - private void unswapAll(IgniteCache<Integer, CacheValue> cache, int lowerBound, int upperBound) throws Exception { - resetCounters(); - - Set<Integer> keys = new HashSet<>(); - - for (int i = lowerBound; i < upperBound; i++) { - assert cache.localPeek(i) == null; - - keys.add(i); - } - - cache.localPromote(keys); - - assert swapCnt.get() == 0; - assert unswapCnt.get() == upperBound - lowerBound; - - checkEntries(cache, lowerBound, upperBound); - - assert swapCnt.get() == 0; - assert unswapCnt.get() == upperBound - lowerBound; - } - - /** - * Unswaps entries via {@code get} method and checks result. - * - * @param cache Cache. - * @param lowerBound Lower key bound. - * @param upperBound Upper key bound. - * @throws Exception In case of error. - */ - private void get(IgniteCache<Integer, CacheValue> cache, int lowerBound, int upperBound) throws Exception { - resetCounters(); - - for (int i = lowerBound; i < upperBound; i++) { - assert cache.localPeek(i) == null; - - CacheValue val = cache.get(i); - - assert val != null; - assert val.value() == i; - } - - assert swapCnt.get() == 0; - assert unswapCnt.get() == upperBound - lowerBound; - - checkEntries(cache, lowerBound, upperBound); - - assert swapCnt.get() == 0; - assert unswapCnt.get() == upperBound - lowerBound; - } - - /** - * Peeks entries in {@code SWAP} mode and checks result. - * - * @param cache Cache. - * @param lowerBound Lower key bound. - * @param upperBound Upper key bound. - * @throws Exception In case of error. - */ - private void peek(IgniteCache<Integer, CacheValue> cache, int lowerBound, int upperBound) throws Exception { - resetCounters(); - - for (int i = lowerBound; i < upperBound; i++) { - assert cache.localPeek(i) == null; - - CacheValue val = cache.localPeek(i, CachePeekMode.SWAP); - - assert val != null; - assert val.value() == i; - } - - assert swapCnt.get() == 0; - assert unswapCnt.get() == 0; - - checkEntries(cache, lowerBound, upperBound); - - assert swapCnt.get() == 0; - assert unswapCnt.get() == upperBound - lowerBound; - } - - /** - * Resets event counters. - */ - private void resetCounters() { - swapCnt.set(0); - unswapCnt.set(0); - } - - /** - * Checks that entries in cache are correct after being unswapped. - * If entry is still swapped, it will be unswapped in this method. - * - * @param cache Cache. - * @param lowerBound Lower key bound. - * @param upperBound Upper key bound. - * @throws Exception In case of error. - */ - private void checkEntries(IgniteCache<Integer, CacheValue> cache, int lowerBound, int upperBound) throws Exception { - for (int i = lowerBound; i < upperBound; i++) { - cache.localPromote(Collections.singleton(i)); - - GridCacheEntryEx entry = dht(cache).entryEx(i); - - assert entry != null; - assert entry.key() != null; - - CacheValue val = CU.value(entry.rawGet(), entry.context(), false); - - assertNotNull("Value null for key: " + i, val); - assertEquals(entry.key().value(entry.context().cacheObjectContext(), false), (Integer)val.value()); - - assertEquals(entry.version(), versions.get(i)); - } - } - - /** - * - */ - private static class CacheValue { - /** Value. */ - @QuerySqlField - private final int val; - - /** - * @param val Value. - */ - private CacheValue(int val) { - this.val = val; - } - - /** - * @return Value. - */ - public int value() { - return val; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(CacheValue.class, this); - } - } - - /** - * - */ - private class SwapListener implements IgnitePredicate<Event> { - /** */ - private final CountDownLatch swapLatch = new CountDownLatch(1); - - /** */ - private final CountDownLatch unswapLatch = new CountDownLatch(1); - - /** {@inheritDoc} */ - @Override public boolean apply(Event evt) { - assert evt != null; - - info("Received event: " + evt); - - switch (evt.type()) { - case EVT_CACHE_OBJECT_TO_OFFHEAP: - swapLatch.countDown(); - - break; - case EVT_CACHE_OBJECT_FROM_OFFHEAP: - unswapLatch.countDown(); - - break; - } - - return true; - } - - /** - * @return {@code True} if await succeeded. - * @throws InterruptedException If interrupted. - */ - boolean awaitSwap() throws InterruptedException { - return swapLatch.await(5000, MILLISECONDS); - } - - /** - * @return {@code True} if await succeeded. - * @throws InterruptedException If interrupted. - */ - boolean awaitUnswap() throws InterruptedException { - return unswapLatch.await(5000, MILLISECONDS); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a4567fa4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapSelfTest.java new file mode 100644 index 0000000..8b1c170 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapSelfTest.java @@ -0,0 +1,624 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cache.query.annotations.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.spi.swapspace.file.*; +import org.apache.ignite.testframework.junits.common.*; + +import javax.cache.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; +import static org.apache.ignite.configuration.DeploymentMode.*; +import static org.apache.ignite.events.EventType.*; + +/** + * Test for cache swap. + */ +public class GridCacheOffHeapSelfTest extends GridCommonAbstractTest { + /** Entry count. */ + private static final int ENTRY_CNT = 1000; + + /** Swap count. */ + private final AtomicInteger swapCnt = new AtomicInteger(); + + /** Unswap count. */ + private final AtomicInteger unswapCnt = new AtomicInteger(); + + /** Saved versions. */ + private final Map<Integer, Object> versions = new HashMap<>(); + + /** */ + private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** PeerClassLoadingLocalClassPathExclude enable. */ + private boolean excluded; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + + cfg.setNetworkTimeout(2000); + + cfg.setSwapSpaceSpi(new FileSwapSpaceSpi()); + + CacheConfiguration<?,?> cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setWriteSynchronizationMode(FULL_SYNC); + cacheCfg.setSwapEnabled(false); + cacheCfg.setCacheMode(REPLICATED); + cacheCfg.setOffHeapMaxMemory(1024L * 1024L * 1024L); + cacheCfg.setIndexedTypes(Integer.class, CacheValue.class); + + cfg.setCacheConfiguration(cacheCfg); + + cfg.setMarshaller(new OptimizedMarshaller(false)); + cfg.setDeploymentMode(SHARED); + + if (excluded) + cfg.setPeerClassLoadingLocalClassPathExclude(GridCacheOffHeapSelfTest.class.getName(), + CacheValue.class.getName()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + versions.clear(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("BusyWait") + public void testOffHeapDeployment() throws Exception { + try { + Ignite ignite1 = startGrid(1); + + excluded = true; + + Ignite ignite2 = startGrid(2); + + IgniteCache<Integer, Object> cache1 = ignite1.cache(null); + IgniteCache<Integer, Object> cache2 = ignite2.cache(null); + + Object v1 = new CacheValue(1); + + cache1.put(1, v1); + + info("Stored value in cache1 [v=" + v1 + ", ldr=" + v1.getClass().getClassLoader() + ']'); + + Object v2 = cache2.get(1); + + assert v2 != null; + + info("Read value from cache2 [v=" + v2 + ", ldr=" + v2.getClass().getClassLoader() + ']'); + + assert !v2.getClass().getClassLoader().equals(getClass().getClassLoader()); + assert v2.getClass().getClassLoader().getClass().getName().contains("GridDeploymentClassLoader"); + + SwapListener lsnr = new SwapListener(); + + ignite2.events().localListen(lsnr, EVT_CACHE_OBJECT_TO_OFFHEAP, EVT_CACHE_OBJECT_FROM_OFFHEAP); + + cache2.localEvict(keySet(cache2)); + + assert lsnr.awaitSwap(); + + assert cache2.get(1) != null; + + assert lsnr.awaitUnswap(); + + ignite2.events().stopLocalListen(lsnr); + + lsnr = new SwapListener(); + + ignite2.events().localListen(lsnr, EVT_CACHE_OBJECT_TO_OFFHEAP, EVT_CACHE_OBJECT_FROM_OFFHEAP); + + cache2.localEvict(keySet(cache2)); + + assert lsnr.awaitSwap(); + + stopGrid(1); + + boolean success = false; + + for (int i = 0; i < 6; i++) { + success = cache2.get(1) == null; + + if (success) + break; + else if (i < 2) { + info("Sleeping to wait for cache clear."); + + Thread.sleep(500); + } + } + + assert success; + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testOffHeap() throws Exception { + try { + startGrids(1); + + grid(0).events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + assert evt != null; + + switch (evt.type()) { + case EVT_CACHE_OBJECT_TO_OFFHEAP: + swapCnt.incrementAndGet(); + + break; + case EVT_CACHE_OBJECT_FROM_OFFHEAP: + unswapCnt.incrementAndGet(); + + break; + } + + return true; + } + }, EVT_CACHE_OBJECT_TO_OFFHEAP, EVT_CACHE_OBJECT_FROM_OFFHEAP); + + IgniteCache<Integer, CacheValue> cache = grid(0).cache(null); + + populate(cache); + evictAll(cache); + + int cnt = 0; + + for (Cache.Entry<Integer, CacheValue> e : cache.localEntries(CachePeekMode.OFFHEAP)) { + assertEquals(e.getKey().intValue(), e.getValue().value()); + + cnt++; + } + + assertEquals(ENTRY_CNT, cnt); + + query(cache, 0, 200); // Query swapped entries. + unswap(cache, 200, 400); // Check 'promote' method. + unswapAll(cache, 400, 600); // Check 'promoteAll' method. + get(cache, 600, 800); // Check 'get' method. + peek(cache, 800, ENTRY_CNT); // Check 'peek' method in 'SWAP' mode. + + // Check that all entries were unswapped. + for (int i = 0; i < ENTRY_CNT; i++) { + CacheValue val = cache.localPeek(i); + + assert val != null; + assert val.value() == i; + } + + // Query unswapped entries. + Collection<Cache.Entry<Integer, CacheValue>> res = cache.query( + new SqlQuery<Integer, CacheValue>(CacheValue.class, "val >= ? and val < ?"). + setArgs(0, ENTRY_CNT)). + getAll(); + + assert res.size() == ENTRY_CNT; + + for (Cache.Entry<Integer, CacheValue> entry : res) { + assert entry != null; + assert entry.getKey() != null; + assert entry.getValue() != null; + assert entry.getKey() == entry.getValue().value(); + } + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testOffHeapIterator() throws Exception { + try { + startGrids(1); + + grid(0); + + IgniteCache<Integer, Integer> cache = grid(0).cache(null); + + for (int i = 0; i < 100; i++) { + info("Putting: " + i); + + cache.put(i, i); + + cache.localEvict(Collections.singleton(i)); + } + + int i = 0; + + for (Cache.Entry<Integer, Integer> e : cache.localEntries(CachePeekMode.OFFHEAP)) { + Integer key = e.getKey(); + + info("Key: " + key); + + i++; + + cache.remove(e.getKey()); + + assertNull(cache.get(key)); + } + + assertEquals(100, i); + + assert cache.localSize() == 0; + } + finally { + stopAllGrids(); + } + } + + /** + * Populates cache. + * + * @param cache Cache. + * @throws Exception In case of error. + */ + private void populate(IgniteCache<Integer, CacheValue> cache) throws Exception { + resetCounters(); + + for (int i = 0; i < ENTRY_CNT; i++) { + cache.put(i, new CacheValue(i)); + + CacheValue val = cache.localPeek(i); + + assert val != null; + assert val.value() == i; + + GridCacheEntryEx entry = dht(cache).peekEx(i); + + assert entry != null; + + versions.put(i, entry.version()); + } + + assert swapCnt.get() == 0; + assert unswapCnt.get() == 0; + } + + /** + * Evicts all entries in cache. + * + * @param cache Cache. + * @throws Exception In case of error. + */ + private void evictAll(IgniteCache<Integer, CacheValue> cache) throws Exception { + resetCounters(); + + assertEquals(ENTRY_CNT, cache.size()); + assertEquals(0, cache.localSize(CachePeekMode.OFFHEAP)); + + for (int i = 0; i < ENTRY_CNT; i++) { + cache.localEvict(Collections.singleton(i)); + + assertEquals(ENTRY_CNT - i - 1, cache.localSize(CachePeekMode.ONHEAP)); + assertEquals(i + 1, cache.localSize(CachePeekMode.OFFHEAP)); + } + // cache.evictAll(); + + assertEquals(0, cache.localSize(CachePeekMode.ONHEAP)); + assertEquals(ENTRY_CNT, cache.localSize(CachePeekMode.OFFHEAP)); + + for (int i = 0; i < ENTRY_CNT; i++) + assertNull(cache.localPeek(i, CachePeekMode.ONHEAP)); + + assertEquals(ENTRY_CNT, swapCnt.get()); + assertEquals(0, unswapCnt.get()); + } + + /** + * Runs SQL query and checks result. + * + * @param cache Cache. + * @param lowerBound Lower key bound. + * @param upperBound Upper key bound. + * @throws Exception In case of error. + */ + private void query(IgniteCache<Integer, CacheValue> cache, int lowerBound, int upperBound) throws Exception { + resetCounters(); + + Collection<Cache.Entry<Integer, CacheValue>> res = cache.query(new SqlQuery<Integer, CacheValue>(CacheValue.class, "val >= ? and val < ?"). + setArgs(lowerBound, upperBound)). + getAll(); + + assertEquals(res.size(), upperBound - lowerBound); + + for (Cache.Entry<Integer, CacheValue> entry : res) { + assert entry != null; + assert entry.getKey() != null; + assert entry.getValue() != null; + assert entry.getKey() == entry.getValue().value(); + } + + assertEquals(0, swapCnt.get()); + assertEquals(0, unswapCnt.get()); + + checkEntries(cache, lowerBound, upperBound); + + assertEquals(0, swapCnt.get()); + assertEquals(unswapCnt.get(), upperBound - lowerBound); + } + + /** + * Unswaps entries and checks result. + * + * @param cache Cache. + * @param lowerBound Lower key bound. + * @param upperBound Upper key bound. + * @throws Exception In case of error. + */ + private void unswap(IgniteCache<Integer, CacheValue> cache, int lowerBound, int upperBound) throws Exception { + resetCounters(); + + assertEquals(0, swapCnt.get()); + assertEquals(0, unswapCnt.get()); + + for (int i = lowerBound; i < upperBound; i++) { + assert cache.localPeek(i, CachePeekMode.ONHEAP) == null; + + cache.localPromote(Collections.singleton(i)); + CacheValue val = cache.localPeek(i); + + assertNotNull(val); + assertEquals(i, val.value()); + + assertEquals(i - lowerBound + 1, unswapCnt.get()); + } + + assertEquals(0, swapCnt.get()); + assertEquals(unswapCnt.get(), upperBound - lowerBound); + + checkEntries(cache, lowerBound, upperBound); + + assertEquals(0, swapCnt.get()); + assertEquals(unswapCnt.get(), upperBound - lowerBound); + } + + /** + * Unswaps entries and checks result. + * + * @param cache Cache. + * @param lowerBound Lower key bound. + * @param upperBound Upper key bound. + * @throws Exception In case of error. + */ + private void unswapAll(IgniteCache<Integer, CacheValue> cache, int lowerBound, int upperBound) throws Exception { + resetCounters(); + + Set<Integer> keys = new HashSet<>(); + + for (int i = lowerBound; i < upperBound; i++) { + assert cache.localPeek(i, CachePeekMode.ONHEAP) == null; + + keys.add(i); + } + + cache.localPromote(keys); + + assert swapCnt.get() == 0; + assert unswapCnt.get() == upperBound - lowerBound; + + checkEntries(cache, lowerBound, upperBound); + + assert swapCnt.get() == 0; + assert unswapCnt.get() == upperBound - lowerBound; + } + + /** + * Unswaps entries via {@code get} method and checks result. + * + * @param cache Cache. + * @param lowerBound Lower key bound. + * @param upperBound Upper key bound. + * @throws Exception In case of error. + */ + private void get(IgniteCache<Integer, CacheValue> cache, int lowerBound, int upperBound) throws Exception { + resetCounters(); + + for (int i = lowerBound; i < upperBound; i++) { + assert cache.localPeek(i, CachePeekMode.ONHEAP) == null; + + CacheValue val = cache.get(i); + + assert val != null; + assert val.value() == i; + } + + assert swapCnt.get() == 0; + assert unswapCnt.get() == upperBound - lowerBound; + + checkEntries(cache, lowerBound, upperBound); + + assert swapCnt.get() == 0; + assert unswapCnt.get() == upperBound - lowerBound; + } + + /** + * Peeks entries in {@code SWAP} mode and checks result. + * + * @param cache Cache. + * @param lowerBound Lower key bound. + * @param upperBound Upper key bound. + * @throws Exception In case of error. + */ + private void peek(IgniteCache<Integer, CacheValue> cache, int lowerBound, int upperBound) throws Exception { + resetCounters(); + + for (int i = lowerBound; i < upperBound; i++) { + assert cache.localPeek(i, CachePeekMode.ONHEAP) == null; + + CacheValue val = cache.localPeek(i, CachePeekMode.SWAP, CachePeekMode.OFFHEAP); + + assert val != null; + assert val.value() == i; + } + + assert swapCnt.get() == 0; + assert unswapCnt.get() == 0; + + checkEntries(cache, lowerBound, upperBound); + + assert swapCnt.get() == 0; + assert unswapCnt.get() == upperBound - lowerBound; + } + + /** + * Resets event counters. + */ + private void resetCounters() { + swapCnt.set(0); + unswapCnt.set(0); + } + + /** + * Checks that entries in cache are correct after being unswapped. + * If entry is still swapped, it will be unswapped in this method. + * + * @param cache Cache. + * @param lowerBound Lower key bound. + * @param upperBound Upper key bound. + * @throws Exception In case of error. + */ + private void checkEntries(IgniteCache<Integer, CacheValue> cache, int lowerBound, int upperBound) throws Exception { + for (int i = lowerBound; i < upperBound; i++) { + cache.localPromote(Collections.singleton(i)); + + GridCacheEntryEx entry = dht(cache).entryEx(i); + + assert entry != null; + assert entry.key() != null; + + CacheValue val = CU.value(entry.rawGet(), entry.context(), false); + + assertNotNull("Value null for key: " + i, val); + assertEquals(entry.key().value(entry.context().cacheObjectContext(), false), val.value()); + + assertEquals(entry.version(), versions.get(i)); + } + } + + /** + * + */ + private static class CacheValue { + /** Value. */ + @QuerySqlField + private final int val; + + /** + * @param val Value. + */ + private CacheValue(int val) { + this.val = val; + } + + /** + * @return Value. + */ + public int value() { + return val; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheValue.class, this); + } + } + + /** + * + */ + private class SwapListener implements IgnitePredicate<Event> { + /** */ + private final CountDownLatch swapLatch = new CountDownLatch(1); + + /** */ + private final CountDownLatch unswapLatch = new CountDownLatch(1); + + /** {@inheritDoc} */ + @Override public boolean apply(Event evt) { + assert evt != null; + + info("Received event: " + evt); + + switch (evt.type()) { + case EVT_CACHE_OBJECT_TO_OFFHEAP: + swapLatch.countDown(); + + break; + case EVT_CACHE_OBJECT_FROM_OFFHEAP: + unswapLatch.countDown(); + + break; + } + + return true; + } + + /** + * @return {@code True} if await succeeded. + * @throws InterruptedException If interrupted. + */ + boolean awaitSwap() throws InterruptedException { + return swapLatch.await(5000, MILLISECONDS); + } + + /** + * @return {@code True} if await succeeded. + * @throws InterruptedException If interrupted. + */ + boolean awaitUnswap() throws InterruptedException { + return unswapLatch.await(5000, MILLISECONDS); + } + } +}