Repository: incubator-ignite Updated Branches: refs/heads/ignite-876 [created] b16fd589c
# ignite-876 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b16fd589 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b16fd589 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b16fd589 Branch: refs/heads/ignite-876 Commit: b16fd589c7ab493f60ffb77a9fa2930fbedaabd5 Parents: bbc21a6 Author: sboikov <[email protected]> Authored: Wed May 13 12:51:34 2015 +0300 Committer: sboikov <[email protected]> Committed: Wed May 13 12:51:34 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 149 ++++++++++++------- 1 file changed, 96 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b16fd589/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 71858d1..bfff2d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -488,34 +488,34 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (cctx.isSwapOrOffheapEnabled() && !deletedUnlocked() && hasValueUnlocked() && !detached()) { assert Thread.holdsLock(this); - long expireTime = expireTimeExtras(); - - if (expireTime > 0 && U.currentTimeMillis() >= expireTime) { // Don't swap entry if it's expired. - // Entry might have been updated. - if (cctx.offheapTiered()) { - cctx.swap().removeOffheap(key); + if (cctx.offheapTiered()) { + if (val == null && valPtr != 0) { + if (log.isDebugEnabled()) + log.debug("Value did not change, skip write swap entry: " + this); - valPtr = 0; + if (cctx.swap().offheapEvictionEnabled()) + cctx.swap().enableOffheapEviction(key()); } return; } - if (val == null && cctx.offheapTiered() && valPtr != 0) { - if (log.isDebugEnabled()) - log.debug("Value did not change, skip write swap entry: " + this); - - if (cctx.swap().offheapEvictionEnabled()) - cctx.swap().enableOffheapEviction(key()); + long expireTime = expireTimeExtras(); + if (expireTime > 0 && U.currentTimeMillis() >= expireTime) // Don't swap entry if it's expired. return; - } IgniteUuid valClsLdrId = null; + IgniteUuid keyClsLdrId = null; - if (val != null) { - valClsLdrId = cctx.deploy().getClassLoaderId( - val.value(cctx.cacheObjectContext(), false).getClass().getClassLoader()); + if (cctx.kernalContext().config().isPeerClassLoadingEnabled()) { + if (val != null) { + valClsLdrId = cctx.deploy().getClassLoaderId( + U.detectObjectClassLoader(val.value(cctx.cacheObjectContext(), false))); + } + + keyClsLdrId = cctx.deploy().getClassLoaderId( + U.detectObjectClassLoader(key.value(cctx.cacheObjectContext(), false))); } IgniteBiTuple<byte[], Byte> valBytes = valueBytes0(); @@ -526,7 +526,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { ver, ttlExtras(), expireTime, - cctx.deploy().getClassLoaderId(U.detectObjectClassLoader(key.value(cctx.cacheObjectContext(), false))), + keyClsLdrId, valClsLdrId); if (log.isDebugEnabled()) @@ -816,13 +816,10 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // Update indexes before actual write to entry. updateIndex(ret, expTime, nextVer, prevVal); - boolean hadValPtr = valPtr != 0; - // Don't change version for read-through. update(ret, expTime, ttl, nextVer); - if (hadValPtr && cctx.offheapTiered()) - cctx.swap().removeOffheap(key); + updateOffheap(ret, expTime, ttl); if (cctx.deferredDelete() && deletedUnlocked() && !isInternal() && !detached()) deletedUnlocked(false); @@ -903,6 +900,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { update(ret, expTime, ttl, nextVer); + updateOffheap(ret, expTime, ttl); + touch = true; // If value was set - return, otherwise try again. @@ -1037,6 +1036,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { update(val, expireTime, ttl, newVer); + updateOffheap(val, expireTime, ttl); + drReplicate(drType, val, newVer); recordNodeId(affNodeId, topVer); @@ -1164,15 +1165,10 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // can be updated without actually holding entry lock. clearIndex(old); - boolean hadValPtr = valPtr != 0; - update(null, 0, 0, newVer); - if (cctx.offheapTiered() && hadValPtr) { - boolean rmv = cctx.swap().removeOffheap(key); - - assert rmv; - } + if (cctx.offheapTiered()) + cctx.swap().removeOffheap(key); if (cctx.deferredDelete() && !detached() && !isInternal()) { if (!deletedUnlocked()) { @@ -1351,6 +1347,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { clearIndex(null); update(old, expireTime, ttl, ver); + + updateOffheap(old, expireTime, ttl); } // Apply metrics. @@ -1495,6 +1493,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { update(updated, expireTime, ttl, ver); + updateOffheap(updated, expireTime, ttl); + if (evt) { CacheObject evtOld = null; @@ -1521,19 +1521,14 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // Must persist inside synchronization in non-tx mode. cctx.store().remove(null, keyValue(false)); - boolean hasValPtr = valPtr != 0; - // Update index inside synchronization since it can be updated // in load methods without actually holding entry lock. clearIndex(old); update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver); - if (cctx.offheapTiered() && hasValPtr) { - boolean rmv = cctx.swap().removeOffheap(key); - - assert rmv; - } + if (cctx.offheapTiered()) + cctx.swap().removeOffheap(key); if (evt) { CacheObject evtOld = null; @@ -1841,6 +1836,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { update(oldVal, initExpireTime, initTtl, ver); + updateOffheap(oldVal, initExpireTime, initTtl); + if (deletedUnlocked() && oldVal != null && !isInternal()) deletedUnlocked(false); } @@ -2052,6 +2049,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { update(updated, newExpireTime, newTtl, newVer); + updateOffheap(updated, newExpireTime, newTtl); + drReplicate(drType, updated, newVer); recordNodeId(affNodeId, topVer); @@ -2122,19 +2121,14 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { enqueueVer = newVer; - boolean hasValPtr = valPtr != 0; - // Clear value on backup. Entry will be removed from cache when it got evicted from queue. update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer); assert newSysTtl == CU.TTL_NOT_CHANGED; assert newSysExpireTime == CU.EXPIRE_TIME_CALCULATE; - if (cctx.offheapTiered() && hasValPtr) { - boolean rmv = cctx.swap().removeOffheap(key); - - assert rmv; - } + if (cctx.offheapTiered()) + cctx.swap().removeOffheap(key); clearReaders(); @@ -2989,6 +2983,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // Version does not change for load ops. update(val, expTime, ttl, ver); + updateOffheap(val, expTime, ttl); + boolean skipQryNtf = false; if (val == null) { @@ -3036,11 +3032,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx); // Version does not change for load ops. - update(val, - unswapped.expireTime(), - unswapped.ttl(), - unswapped.version() - ); + update(val, unswapped.expireTime(), unswapped.ttl(), unswapped.version()); return true; } @@ -3093,6 +3085,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // Version does not change for load ops. update(val, expTime, ttl, newVer); + + updateOffheap(val, expTime, ttl); } return true; @@ -3647,6 +3641,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { return true; } + else if (cctx.offheapTiered()) + cctx.swap().removeOffheap(key); } } else { @@ -3690,8 +3686,12 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { return true; } - else + else { + if (cctx.offheapTiered()) + cctx.swap().removeOffheap(key); + return false; + } } } } @@ -3722,10 +3722,17 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (!hasReaders() && markObsolete0(obsoleteVer, false)) { if (!isStartVersion() && hasValueUnlocked()) { IgniteUuid valClsLdrId = null; + IgniteUuid keyClsLdrId = null; - if (val != null) - valClsLdrId = cctx.deploy().getClassLoaderId( - U.detectObjectClassLoader(val.value(cctx.cacheObjectContext(), false))); + if (cctx.kernalContext().config().isPeerClassLoadingEnabled()) { + if (val != null) { + valClsLdrId = cctx.deploy().getClassLoaderId( + U.detectObjectClassLoader(val.value(cctx.cacheObjectContext(), false))); + } + + keyClsLdrId = cctx.deploy().getClassLoaderId( + U.detectObjectClassLoader(key.value(cctx.cacheObjectContext(), false))); + } IgniteBiTuple<byte[], Byte> valBytes = valueBytes0(); @@ -3736,7 +3743,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { ver, ttlExtras(), expireTimeExtras(), - cctx.deploy().getClassLoaderId(U.detectObjectClassLoader(key.value(cctx.cacheObjectContext(), false))), + keyClsLdrId, valClsLdrId); } @@ -4108,6 +4115,42 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { return extras != null ? extras.size() : 0; } + /** + * @param val Value. + * @param expireTime Expire time. + * @param ttl TTL. + * @throws IgniteCheckedException If failed. + */ + private void updateOffheap(@Nullable CacheObject val, long expireTime, long ttl) throws IgniteCheckedException { + if (cctx.offheapTiered() && val != null) { + IgniteUuid valClsLdrId; + IgniteUuid keyClsLdrId; + + if (cctx.kernalContext().config().isPeerClassLoadingEnabled()) { + valClsLdrId = cctx.deploy().getClassLoaderId( + U.detectObjectClassLoader(val.value(cctx.cacheObjectContext(), false))); + + keyClsLdrId = cctx.deploy().getClassLoaderId( + U.detectObjectClassLoader(key.value(cctx.cacheObjectContext(), false))); + } + else { + valClsLdrId = null; + keyClsLdrId = null; + } + + byte[] valBytes = val.valueBytes(cctx.cacheObjectContext()); + + cctx.swap().write(key(), + ByteBuffer.wrap(valBytes), + val.type(), + ver, + ttl, + expireTime, + keyClsLdrId, + valClsLdrId); + } + } + /** {@inheritDoc} */ @Override public boolean equals(Object o) { // Identity comparison left on purpose.
