ignite-973-2 - swap read before remove
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e1243b40 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e1243b40 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e1243b40 Branch: refs/heads/ignite-950 Commit: e1243b40537ac883271eb7cab492e8ef86f7b330 Parents: e0b573b Author: S.Vladykin <svlady...@gridgain.com> Authored: Tue Jun 23 23:13:03 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Tue Jun 23 23:13:03 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheSwapManager.java | 98 +++++++++++++------- .../inmemory/GridTestSwapSpaceSpi.java | 3 +- .../processors/query/h2/opt/GridH2Table.java | 2 +- .../cache/IgniteCacheOffheapEvictQueryTest.java | 2 +- 4 files changed, 66 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1243b40/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index e45ec2d..7595a1d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -582,6 +582,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { part, key.valueBytes(cctx.cacheObjectContext())); + ClassLoader ldr = cctx.deploy().globalLoader(); + + GridCacheQueryManager qryMgr = cctx.queries(); + + if (qryMgr != null && !readSwapBeforeRemove(key, swapKey, ldr)) + return null; // Not found. + swapMgr.remove(spaceName, swapKey, new CI1<byte[]>() { @Override public void apply(byte[] rmv) { if (cctx.config().isStatisticsEnabled()) @@ -597,7 +604,6 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { t.set(entry); CacheObject v = entry.value(); - byte[] valBytes = entry.valueBytes(); // Event notification. if (cctx.events().isRecordable(EVT_CACHE_OBJECT_UNSWAPPED)) { @@ -621,18 +627,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { // Always fire this event, since preloading depends on it. onUnswapped(part, key, entry); - - GridCacheQueryManager qryMgr = cctx.queries(); - - if (qryMgr != null) - qryMgr.onUnswap(key, v); } catch (IgniteCheckedException e) { err.set(e); } } } - }, cctx.deploy().globalLoader()); + }, ldr); if (err.get() != null) throw err.get(); @@ -839,7 +840,17 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { assert swapEnabled; assert unprocessedKeys != null; - // Swap is enabled. + ClassLoader ldr = cctx.deploy().globalLoader(); + + if (qryMgr != null) { // Unswap for indexing. + Iterator<SwapKey> iter = unprocessedKeys.iterator(); + + while (iter.hasNext()) { + if (!readSwapBeforeRemove(null, iter.next(), ldr)) + iter.remove(); // We will not do unswapping further -> need to skip the key. + } + } + final GridTuple<IgniteCheckedException> err = F.t1(); swapMgr.removeAll(spaceName, @@ -891,9 +902,6 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { // Always fire this event, since preloading depends on it. onUnswapped(swapKey.partition(), key, entry); - - if (qryMgr != null) - qryMgr.onUnswap(key, entry.value()); } catch (IgniteCheckedException e) { err.set(e); @@ -901,7 +909,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { } } }, - cctx.deploy().globalLoader()); + ldr); if (err.get() != null) throw err.get(); @@ -923,7 +931,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { boolean rmv = offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); - if(rmv && cctx.config().isStatisticsEnabled()) + if (rmv && cctx.config().isStatisticsEnabled()) cctx.cache().metrics0().onOffHeapRemove(); return rmv; @@ -982,6 +990,37 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { } /** + * Reads value from swap and unswaps it to indexing. + * + * @param key Key. + * @param swapKey Swap key. + * @param ldr Class loader. + * @return {@code true} If read and unswapped successfully. + * @throws IgniteCheckedException If failed. + */ + private boolean readSwapBeforeRemove(@Nullable KeyCacheObject key, SwapKey swapKey, ClassLoader ldr) + throws IgniteCheckedException { + assert cctx.queries() != null; + + byte[] entryBytes = swapMgr.read(spaceName, swapKey, ldr); + + if (entryBytes == null) + return false; + + GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes)); + + if (entry == null) + return false; + + if (key == null) + key = cctx.toCacheKeyObject(swapKey.keyBytes()); + + cctx.queries().onUnswap(key, entry.value()); + + return true; + } + + /** * @param key Key to remove. * @throws IgniteCheckedException If failed. */ @@ -1013,33 +1052,20 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { part, key.valueBytes(cctx.cacheObjectContext())); + ClassLoader ldr = cctx.deploy().globalLoader(); + + if (qryMgr != null && !readSwapBeforeRemove(key, swapKey, ldr)) + return; // Not found. + swapMgr.remove(spaceName, swapKey, - new CI1<byte[]>() { + cctx.config().isStatisticsEnabled() ? new CI1<byte[]>() { @Override public void apply(byte[] rmv) { - if (rmv == null) - return; - - try { - if (cctx.config().isStatisticsEnabled()) - cctx.cache().metrics0().onSwapRemove(); - - if (qryMgr == null) - return; - - GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv)); - - if (entry == null) - return; - - qryMgr.onUnswap(key, entry.value()); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } + if (rmv != null) + cctx.cache().metrics0().onSwapRemove(); } - }, - cctx.deploy().globalLoader()); + } : null, + ldr); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1243b40/modules/core/src/test/java/org/apache/ignite/spi/swapspace/inmemory/GridTestSwapSpaceSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/swapspace/inmemory/GridTestSwapSpaceSpi.java b/modules/core/src/test/java/org/apache/ignite/spi/swapspace/inmemory/GridTestSwapSpaceSpi.java index d8303a4..2a3c940 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/swapspace/inmemory/GridTestSwapSpaceSpi.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/swapspace/inmemory/GridTestSwapSpaceSpi.java @@ -285,7 +285,8 @@ public class GridTestSwapSpaceSpi extends IgniteSpiAdapter implements SwapSpaceS byte[] val = data.remove(key); if (val != null) { - c.apply(val); + if (c != null) + c.apply(val); fireEvent(EVT_SWAP_SPACE_DATA_REMOVED, name, key.keyBytes()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1243b40/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java index 92991af..86dbf06 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java @@ -393,7 +393,7 @@ public class GridH2Table extends TableBase { for (int i = 2, len = idxs.size(); i < len; i++) { Row res = index(i).remove(old); - assert eq(pk, res, old): "\n" + old + "\n" + res; + assert eq(pk, res, old): "\n" + old + "\n" + res + "\n" + i + " -> " + index(i).getName(); } } else http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e1243b40/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapEvictQueryTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapEvictQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapEvictQueryTest.java index 45d744e..f9ff69e 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapEvictQueryTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapEvictQueryTest.java @@ -86,7 +86,7 @@ public class IgniteCacheOffheapEvictQueryTest extends GridCommonAbstractTest { */ public void testEvictAndRemove() throws Exception { final int KEYS_CNT = 3000; - final int THREADS_CNT = 50; + final int THREADS_CNT = 250; final IgniteCache<Integer,Integer> c = startGrid().cache(null);