# ignite-51
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/69fecdc0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/69fecdc0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/69fecdc0 Branch: refs/heads/ignite-user-req Commit: 69fecdc045e1fd53ebbb6d04bce9affb1f9720f3 Parents: 4c7f456 Author: sboikov <sboi...@gridgain.com> Authored: Wed Mar 4 14:36:49 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Mar 4 15:55:06 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheContext.java | 4 +-- .../processors/cache/GridCacheMapEntry.java | 34 +++++++++++--------- .../processors/cache/KeyCacheObjectImpl.java | 4 +-- .../dht/atomic/GridDhtAtomicCache.java | 8 ++--- .../dht/atomic/GridNearAtomicUpdateFuture.java | 3 +- .../local/atomic/GridLocalAtomicCache.java | 8 ++--- .../dataload/IgniteDataLoaderImpl.java | 6 ++-- .../portable/GridPortableProcessor.java | 5 ++- .../portable/os/GridOsPortableProcessor.java | 4 +-- 9 files changed, 43 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/69fecdc0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 16a403e..be22b85 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -1701,10 +1701,10 @@ public class GridCacheContext<K, V> implements Externalizable { * @return Heap-based object. */ @Nullable public <T> T unwrapTemporary(@Nullable Object obj) { - if (!offheapTiered() || !portableEnabled()) + if (!offheapTiered()) return (T)obj; - return (T)portable().unwrapTemporary(obj); + return (T)portable().unwrapTemporary(this, obj); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/69fecdc0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 52f9659..b2a2e04 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -159,7 +159,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { ttlAndExpireTimeExtras(ttl, CU.toExpireTime(ttl)); - val = (CacheObject)cctx.kernalContext().portable().prepareForCache(val, cctx); + val = cctx.kernalContext().portable().prepareForCache(val, cctx); synchronized (this) { value(val); @@ -499,7 +499,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (delta >= 0) { CacheObject val = e.value(); - val = (CacheObject)cctx.kernalContext().portable().prepareForCache(val, cctx); + val = cctx.kernalContext().portable().prepareForCache(val, cctx); // Set unswapped value. update(val, e.expireTime(), e.ttl(), e.version()); @@ -846,7 +846,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (startVer.equals(ver)) { if (ret != null) { // Detach value before index update. - ret = (CacheObject)cctx.kernalContext().portable().prepareForCache(ret, cctx); + ret = cctx.kernalContext().portable().prepareForCache(ret, cctx); GridCacheVersion nextVer = nextVersion(); @@ -929,7 +929,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { long expTime = CU.toExpireTime(ttl); // Detach value before index update. - ret = (CacheObject)cctx.kernalContext().portable().prepareForCache(ret, cctx); + ret = cctx.kernalContext().portable().prepareForCache(ret, cctx); // Update indexes. if (ret != null) { @@ -1066,7 +1066,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { assert expireTime >= 0 : expireTime; // Detach value before index update. - val = (CacheObject)cctx.kernalContext().portable().prepareForCache(val, cctx); + val = cctx.kernalContext().portable().prepareForCache(val, cctx); // Update index inside synchronization since it can be updated // in load methods without actually holding entry lock. @@ -1378,7 +1378,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { } // Detach value before index update. - old = (CacheObject)cctx.kernalContext().portable().prepareForCache(old, cctx); + old = cctx.kernalContext().portable().prepareForCache(old, cctx); if (old != null) updateIndex(old, expireTime, ver, null); @@ -1514,7 +1514,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // Try write-through. if (op == GridCacheOperation.UPDATE) { // Detach value before index update. - updated = (CacheObject)cctx.kernalContext().portable().prepareForCache(updated, cctx); + updated = cctx.kernalContext().portable().prepareForCache(updated, cctx); if (writeThrough) // Must persist inside synchronization in non-tx mode. @@ -1554,12 +1554,20 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // Must persist inside synchronization in non-tx mode. cctx.store().removeFromStore(null, key); + boolean hasValPtr = valPtr != 0; + // Update index inside synchronization since it can be updated // in load methods without actually holding entry lock. clearIndex(old); update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, ver); + if (cctx.offheapTiered() && hasValPtr) { + boolean rmv = cctx.swap().removeOffheap(key); + + assert rmv; + } + if (evt) { CacheObject evtOld = null; @@ -1803,7 +1811,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { readThrough = true; // Detach value before index update. - oldVal = (CacheObject)cctx.kernalContext().portable().prepareForCache(oldVal, cctx); + oldVal = cctx.kernalContext().portable().prepareForCache(oldVal, cctx); // Calculate initial TTL and expire time. long initTtl; @@ -2036,7 +2044,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // Do not change size. } - updated = (CacheObject)cctx.kernalContext().portable().prepareForCache(updated, cctx); + updated = cctx.kernalContext().portable().prepareForCache(updated, cctx); // Update index inside synchronization since it can be updated // in load methods without actually holding entry lock. @@ -2957,7 +2965,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // in load methods without actually holding entry lock. long expireTime = expireTimeExtras(); - val = (CacheObject)cctx.kernalContext().portable().prepareForCache(val, cctx); + val = cctx.kernalContext().portable().prepareForCache(val, cctx); updateIndex(val, expireTime, nextVer, old); @@ -3227,17 +3235,13 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { long topVer, GridDrType drType) throws IgniteCheckedException, GridCacheEntryRemovedException { -// TODO IGNITE-51. -// if (cctx.isUnmarshalValues() && valBytes != null && val == null && isNewLocked()) -// val = cctx.marshaller().<V>unmarshal(valBytes, cctx.deploy().globalLoader()); - synchronized (this) { checkObsolete(); if (isNew() || (!preload && deletedUnlocked())) { long expTime = expireTime < 0 ? CU.toExpireTime(ttl) : expireTime; - val = (CacheObject)cctx.kernalContext().portable().prepareForCache(val, cctx); + val = cctx.kernalContext().portable().prepareForCache(val, cctx); if (val != null) updateIndex(val, expTime, ver, null); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/69fecdc0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java index 20852be..981da1f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java @@ -166,7 +166,7 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb /** {@inheritDoc} */ @Override public void prepareMarshal(CacheObjectContext ctx) throws IgniteCheckedException { if (valBytes == null) - valBytes = CU.marshal(ctx.kernalContext().cache().context(), val); + valBytes = ctx.kernalContext().portable().marshal(ctx, val); } /** {@inheritDoc} */ @@ -174,7 +174,7 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb if (val == null) { assert valBytes != null; - val = ctx.marshaller().unmarshal(valBytes, ldr); + val = ctx.kernalContext().portable().unmarshal(ctx.cacheObjectContext(), valBytes, ldr); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/69fecdc0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 8737ac3..103f3a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -670,7 +670,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { args, null, null, - true, + false, false, null, null); @@ -714,7 +714,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { args, null, null, - true, + false, false, null, null); @@ -744,7 +744,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { args, null, null, - true, + false, false, null, null); @@ -1784,7 +1784,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } if (op == TRANSFORM) { - assert req.returnValue(); + assert !req.returnValue(); if (updRes.computedResult() != null) { if (retVal == null) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/69fecdc0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 9756544..e1d6afb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -327,7 +327,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem GridCacheReturn ret = (GridCacheReturn)res; - Object retval = res == null ? null : rawRetval ? ret : this.retval ? ret.value() : ret.success(); + Object retval = + res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ? ret.value() : ret.success(); if (op == TRANSFORM && retval == null) retval = Collections.emptyMap(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/69fecdc0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index b47891f..f0a17f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -696,7 +696,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { invokeMap.values(), args, expiryPerCall(), - true, + false, false, null, ctx.writeThrough()); @@ -720,7 +720,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut = updateAllAsync0(null, invokeMap, args, - true, + false, false, null); @@ -784,7 +784,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { map.values(), args, expiryPerCall(), - true, + false, false, null, ctx.writeThrough()); @@ -1039,7 +1039,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { throw err; Object ret = res == null ? null : rawRetval ? - new GridCacheReturn<>(res.get2(), res.get1()) : retval ? res.get2() : res.get1(); + new GridCacheReturn<>(res.get2(), res.get1()) : (retval || op == TRANSFORM) ? res.get2() : res.get1(); if (op == TRANSFORM && ret == null) ret = Collections.emptyMap(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/69fecdc0/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java index 953bcb3..35a79be 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java @@ -426,9 +426,9 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay public IgniteFuture<?> addDataInternal(Collection<? extends IgniteDataLoaderEntry> entries) { enterBusy(); - try { - GridFutureAdapter<Object> resFut = new GridFutureAdapter<>(ctx); + GridFutureAdapter<Object> resFut = new GridFutureAdapter<>(ctx); + try { resFut.listenAsync(rmvActiveFut); activeFuts.add(resFut); @@ -447,6 +447,8 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay return new IgniteFutureImpl<>(resFut); } catch (IgniteException e) { + resFut.onDone(e); + return new IgniteFinishedFutureImpl<>(ctx, e); } finally { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/69fecdc0/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java index f564c32..7636891 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java @@ -59,11 +59,12 @@ public interface GridPortableProcessor extends GridProcessor { /** * Converts temporary offheap object to heap-based. * + * @param ctx Context. * @param obj Object. * @return Heap-based object. * @throws IgniteException In case of error. */ - @Nullable public Object unwrapTemporary(@Nullable Object obj) throws IgniteException; + @Nullable public Object unwrapTemporary(GridCacheContext ctx, @Nullable Object obj) throws IgniteException; /** * @param obj Object to marshal. @@ -141,6 +142,7 @@ public interface GridPortableProcessor extends GridProcessor { public byte[] marshal(CacheObjectContext ctx, Object val) throws IgniteCheckedException; /** + * @param ctx Context. * @param bytes Bytes. * @param clsLdr Class loader. * @return Unmarshalled object. @@ -175,6 +177,7 @@ public interface GridPortableProcessor extends GridProcessor { /** * @param ctx Cache context. * @param obj Key value. + * @param bytes Optional key bytes. * @return Cache key object. */ public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj, byte[] bytes); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/69fecdc0/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java index e982ad4..1d0acb4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java @@ -55,8 +55,8 @@ public class GridOsPortableProcessor extends IgniteCacheObjectProcessorAdapter { } /** {@inheritDoc} */ - @Override public Object unwrapTemporary(Object obj) throws IgniteException { - return null; + @Override public Object unwrapTemporary(GridCacheContext ctx, Object obj) throws IgniteException { + return obj; } /** {@inheritDoc} */