# ignite-51
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/eb251dda Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/eb251dda Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/eb251dda Branch: refs/heads/ignite-51 Commit: eb251dda1aee48582d2a17bd61ddf5977b0aa608 Parents: 3cdc1fa Author: sboikov <sboi...@gridgain.com> Authored: Fri Feb 27 15:32:31 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Feb 27 17:53:06 2015 +0300 ---------------------------------------------------------------------- .../internal/managers/GridManagerAdapter.java | 31 --- .../cache/CacheInvokeDirectResult.java | 4 +- .../internal/processors/cache/CacheObject.java | 19 +- .../processors/cache/CacheObjectImpl.java | 26 ++- .../processors/cache/EvictableEntryImpl.java | 10 +- .../processors/cache/GridCacheAdapter.java | 42 ++-- .../cache/GridCacheBatchSwapEntry.java | 13 -- .../cache/GridCacheConcurrentMap.java | 2 +- .../processors/cache/GridCacheContext.java | 89 +++++++- .../processors/cache/GridCacheEntryInfo.java | 6 +- .../GridCacheEntryInfoCollectSwapListener.java | 1 - .../processors/cache/GridCacheEventManager.java | 6 +- .../cache/GridCacheEvictionManager.java | 2 +- .../processors/cache/GridCacheMapEntry.java | 213 +++++++++---------- .../processors/cache/GridCacheMessage.java | 16 +- .../cache/GridCacheOffheapSwapEntry.java | 6 +- .../cache/GridCacheSwapEntryImpl.java | 10 +- .../processors/cache/GridCacheSwapListener.java | 3 +- .../processors/cache/GridCacheSwapManager.java | 150 ++++++------- .../processors/cache/GridCacheUtils.java | 17 +- .../processors/cache/KeyCacheObjectImpl.java | 22 +- .../processors/cache/UserCacheObjectImpl.java | 3 +- .../cache/UserKeyCacheObjectImpl.java | 3 +- .../distributed/dht/GridDhtLocalPartition.java | 5 +- .../distributed/dht/GridDhtLockResponse.java | 4 +- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 20 +- .../dht/atomic/GridDhtAtomicUpdateResponse.java | 12 +- .../dht/atomic/GridNearAtomicUpdateRequest.java | 12 +- .../atomic/GridNearAtomicUpdateResponse.java | 24 ++- .../dht/preloader/GridDhtForceKeysRequest.java | 8 +- .../dht/preloader/GridDhtForceKeysResponse.java | 12 +- .../GridDhtPartitionSupplyMessage.java | 8 +- .../distributed/near/GridNearGetRequest.java | 8 +- .../distributed/near/GridNearGetResponse.java | 4 +- .../transactions/IgniteTxLocalAdapter.java | 4 +- .../offheap/GridOffHeapProcessor.java | 9 +- .../util/offheap/unsafe/GridUnsafeMemory.java | 19 ++ .../org/apache/ignite/spi/IgniteSpiAdapter.java | 18 -- .../org/apache/ignite/spi/IgniteSpiContext.java | 43 ---- .../testframework/GridSpiTestContext.java | 18 -- 40 files changed, 469 insertions(+), 453 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb251dda/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java index 5a92de7..2159976 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java @@ -430,37 +430,6 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan } } - @Nullable @Override public <T> T readFromOffheap(String spaceName, int part, Object key, - byte[] keyBytes, @Nullable ClassLoader ldr) { - try { - return ctx.offheap().getValue(spaceName, part, key, keyBytes, ldr); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - - @Override public boolean removeFromOffheap(@Nullable String spaceName, int part, Object key, - @Nullable byte[] keyBytes) { - try { - return ctx.offheap().removex(spaceName, part, key, keyBytes); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - - @Override public void writeToOffheap(@Nullable String spaceName, int part, Object key, - @Nullable byte[] keyBytes, Object val, @Nullable byte[] valBytes, @Nullable ClassLoader ldr) { - try { - ctx.offheap().put(spaceName, part, key, keyBytes, valBytes != null ? valBytes : - ctx.config().getMarshaller().marshal(val)); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - @SuppressWarnings({"unchecked"}) @Nullable @Override public <T> T readFromSwap(String spaceName, SwapKey key, @Nullable ClassLoader ldr) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb251dda/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java index a67abce..327df48 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java @@ -93,7 +93,7 @@ public class CacheInvokeDirectResult implements Message { } /** {@inheritDoc} */ - public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { + public void prepareMarshal(GridCacheContext ctx) throws IgniteCheckedException { key.prepareMarshal(ctx); if (err != null) @@ -104,7 +104,7 @@ public class CacheInvokeDirectResult implements Message { } /** {@inheritDoc} */ - public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { + public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException { key.finishUnmarshal(ctx, ldr); if (errBytes != null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb251dda/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java index 425cf09..45b5ee1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java @@ -27,9 +27,10 @@ import org.jetbrains.annotations.*; public interface CacheObject extends Message { /** * @param ctx Context. + * @param cpy If {@code true} need to copy value. * @return Value. */ - @Nullable public <T> T value(GridCacheContext ctx); + @Nullable public <T> T value(GridCacheContext ctx, boolean cpy); /** * @param name Field name. @@ -41,14 +42,26 @@ public interface CacheObject extends Message { * @param ctx Context. * @throws IgniteCheckedException If failed. */ - public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException; + public void prepareMarshal(GridCacheContext ctx) throws IgniteCheckedException; + + /** + * @return {@code True} if value is byte array. + */ + public boolean byteArray(); + + /** + * @param ctx Context. + * @return Value bytes. + * @throws IgniteCheckedException If failed. + */ + public byte[] valueBytes(GridCacheContext ctx) throws IgniteCheckedException; /** * @param ctx Context. * @param ldr Class loader. * @throws IgniteCheckedException If failed. */ - public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException; + public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException; /** * @param ctx Cache context. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb251dda/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java index 9f6d8c1..fbc2bde 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java @@ -82,13 +82,29 @@ public class CacheObjectImpl implements CacheObject, Externalizable { } /** {@inheritDoc} */ - @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { - if (valBytes == null && !(val instanceof byte[])) - valBytes = CU.marshal(ctx, val); + @Override public boolean byteArray() { + return val instanceof byte[]; } /** {@inheritDoc} */ - @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { + @Override public byte[] valueBytes(GridCacheContext ctx) throws IgniteCheckedException { + if (byteArray()) + return (byte[])val; + + if (valBytes == null) + valBytes = CU.marshal(ctx.shared(), val); + + return valBytes; + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(GridCacheContext ctx) throws IgniteCheckedException { + if (valBytes == null && !byteArray()) + valBytes = CU.marshal(ctx.shared(), val); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException { assert val != null || valBytes != null; } @@ -105,7 +121,7 @@ public class CacheObjectImpl implements CacheObject, Externalizable { writer.onHeaderWritten(); } - boolean byteArr = val instanceof byte[]; + boolean byteArr = byteArray(); switch (writer.state()) { case 0: http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb251dda/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EvictableEntryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EvictableEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EvictableEntryImpl.java index e72b9d0..d4706b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EvictableEntryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/EvictableEntryImpl.java @@ -46,7 +46,7 @@ public class EvictableEntryImpl<K, V> implements EvictableEntry<K, V> { /** {@inheritDoc} */ @Override public K getKey() throws IllegalStateException { - return cached.key().value(cached.context()); + return cached.key().value(cached.context(), false); } /** {@inheritDoc} */ @@ -79,7 +79,7 @@ public class EvictableEntryImpl<K, V> implements EvictableEntry<K, V> { try { CacheObject val = cached.peek(GridCachePeekMode.GLOBAL); - return val != null ? val.<V>value(cached.context()) : null; + return val != null ? val.<V>value(cached.context(), false) : null; } catch (GridCacheEntryRemovedException e) { return null; @@ -96,13 +96,13 @@ public class EvictableEntryImpl<K, V> implements EvictableEntry<K, V> { GridTuple<CacheObject> peek = tx.peek(cached.context(), false, cached.key(), null); if (peek != null) - return peek.get().value(cached.context()); + return peek.get().value(cached.context(), false); } if (cached.detached()) { CacheObject val = cached.rawGet(); - return val != null ? val.<V>value(cached.context()) : null; + return val != null ? val.<V>value(cached.context(), false) : null; } for (;;) { @@ -114,7 +114,7 @@ public class EvictableEntryImpl<K, V> implements EvictableEntry<K, V> { try { CacheObject val = e.peek(GridCachePeekMode.GLOBAL, CU.<K, V>empty()); - return val != null ? val.<V>value(cached.context()) : null; + return val != null ? val.<V>value(cached.context(), false) : null; } catch (GridCacheEntryRemovedException ignored) { // No-op. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb251dda/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 7121ac7..a641e23 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -871,7 +871,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, else cacheVal = localCachePeek0(cacheKey, modes.heap, modes.offheap, modes.swap, plc); - Object val = cacheVal != null ? cacheVal.value(ctx) : null; + Object val = cacheVal != null ? cacheVal.value(ctx, true) : null; if (ctx.portableEnabled()) val = ctx.unwrapPortableIfNeeded(val, ctx.keepPortable()); @@ -963,7 +963,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, if (peek != null) { CacheObject v = peek.get(); - return v.value(ctx); + return v.value(ctx, true); // TODO IGNITE-51 // if (ctx.portableEnabled()) // v = (V)ctx.unwrapPortableIfNeeded(v, ctx.keepPortable()); @@ -980,7 +980,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, if (peek != null) { CacheObject v = peek.get(); - return v.value(ctx); + return v.value(ctx, true); // TODO IGNITE-51 // if (ctx.portableEnabled()) // v = (V)ctx.unwrapPortableIfNeeded(v, ctx.keepPortable()); @@ -1070,7 +1070,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } if (val != null) - return F.t((V)val.get().value(ctx)); + return F.t((V)val.get().value(ctx, true)); } } catch (GridCacheEntryRemovedException ignore) { @@ -1311,22 +1311,22 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** {@inheritDoc} */ @Override public Set<Cache.Entry<K, V>> primaryEntrySet() { - return primaryEntrySet((IgnitePredicate<Cache.Entry<K, V>>[])null); + return primaryEntrySet((IgnitePredicate<Cache.Entry<K, V>>[]) null); } /** {@inheritDoc} */ @Override public Set<K> keySet() { - return keySet((IgnitePredicate<Cache.Entry<K, V>>[])null); + return keySet((IgnitePredicate<Cache.Entry<K, V>>[]) null); } /** {@inheritDoc} */ @Override public Set<K> primaryKeySet() { - return primaryKeySet((IgnitePredicate<Cache.Entry<K, V>>[])null); + return primaryKeySet((IgnitePredicate<Cache.Entry<K, V>>[]) null); } /** {@inheritDoc} */ @Override public Collection<V> values() { - return values((IgnitePredicate<Cache.Entry<K, V>>[])null); + return values((IgnitePredicate<Cache.Entry<K, V>>[]) null); } /** {@inheritDoc} */ @@ -1620,7 +1620,8 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, return getAllAsync(Collections.singletonList(key), /*force primary*/true, /*skip tx*/false, null, null, taskName, true, false).chain(new CX1<IgniteInternalFuture<Map<K, V>>, V>() { - @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException { + @Override + public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException { return e.get().get(key); } }); @@ -1677,7 +1678,9 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, String taskName, final IgniteBiInClosure<KeyCacheObject, Object> vis) { return ctx.closures().callLocalSafe(new GPC<Object>() { - @Nullable @Override public Object call() { + @Nullable + @Override + public Object call() { try { ctx.store().loadAllFromStore(tx, keys, vis); } @@ -1806,11 +1809,11 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, if (map != null) { if (set || wasNew) - map.put(key.<K>value(ctx), (V)val); + map.put(key.<K>value(ctx, false), (V)val); else { try { // TODO IGNITE-51. - K k = key.<K>value(ctx); + K k = key.<K>value(ctx, false); GridTuple<V> v = peek0(false, k, GLOBAL); @@ -2089,8 +2092,9 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, if (ctx.config().getInterceptor() != null) fut = fut.chain(new CX1<IgniteInternalFuture<V>, V>() { - @Override public V applyx(IgniteInternalFuture<V> f) throws IgniteCheckedException { - return (V)ctx.config().getInterceptor().onGet(key, f.get()); + @Override + public V applyx(IgniteInternalFuture<V> f) throws IgniteCheckedException { + return (V) ctx.config().getInterceptor().onGet(key, f.get()); } }); @@ -4583,7 +4587,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, CacheObject val = unswapped.value(); - Object val0 = val != null ? val.value(ctx) : null; + Object val0 = val != null ? val.value(ctx, true) : null; if (ctx.portableEnabled()) return (V)ctx.unwrapPortableIfNeeded(val0, !deserializePortable); @@ -5412,7 +5416,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, CacheObject val = entryEx(cacheKey).innerReload(); - return (V)(val != null ? val.value(ctx) : null); + return (V)(val != null ? val.value(ctx, true) : null); } catch (GridCacheEntryRemovedException ignored) { if (log.isDebugEnabled()) @@ -5645,8 +5649,8 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, KeyCacheObject key = entry.key(); - Object key0 = key.value(ctx); - Object val0 = val.value(ctx); + Object key0 = key.value(ctx, true); + Object val0 = val.value(ctx, true); if (deserializePortable && ctx.portableEnabled()) { key0 = ctx.unwrapPortableIfNeeded(key0, true); @@ -6408,7 +6412,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, { assert ver != null; - if (p != null && !p.apply(key.<K>value(ctx), (V)val)) + if (p != null && !p.apply(key.<K>value(ctx, false), (V)val)) return; long ttl = 0; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb251dda/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheBatchSwapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheBatchSwapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheBatchSwapEntry.java index e29868e..0d54465 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheBatchSwapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheBatchSwapEntry.java @@ -30,9 +30,6 @@ public class GridCacheBatchSwapEntry extends GridCacheSwapEntryImpl { /** Key. */ private KeyCacheObject key; - /** Key bytes. */ - private byte[] keyBytes; - /** Partition. */ private int part; @@ -40,7 +37,6 @@ public class GridCacheBatchSwapEntry extends GridCacheSwapEntryImpl { * Creates batch swap entry. * * @param key Key. - * @param keyBytes Key bytes. * @param part Partition id. * @param valBytes Value bytes. * @param valIsByteArr Whether value is byte array. @@ -51,7 +47,6 @@ public class GridCacheBatchSwapEntry extends GridCacheSwapEntryImpl { * @param valClsLdrId Optional value class loader ID. */ public GridCacheBatchSwapEntry(KeyCacheObject key, - byte[] keyBytes, int part, ByteBuffer valBytes, boolean valIsByteArr, @@ -63,7 +58,6 @@ public class GridCacheBatchSwapEntry extends GridCacheSwapEntryImpl { super(valBytes, valIsByteArr, ver, ttl, expireTime, keyClsLdrId, valClsLdrId); this.key = key; - this.keyBytes = keyBytes; this.part = part; } @@ -75,13 +69,6 @@ public class GridCacheBatchSwapEntry extends GridCacheSwapEntryImpl { } /** - * @return Key bytes. - */ - public byte[] keyBytes() { - return keyBytes; - } - - /** * @return Partition id. */ public int partition() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb251dda/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java index 71c5045..70b41c2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java @@ -2174,7 +2174,7 @@ public class GridCacheConcurrentMap { /** {@inheritDoc} */ @Override public K next() { - return it.next().key().value(it.ctx); + return it.next().key().value(it.ctx, true); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb251dda/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 3a1692e..084c984 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -290,7 +290,7 @@ public class GridCacheContext<K, V> implements Externalizable { trueArr = new IgnitePredicate[]{F.alwaysTrue()}; // Create unsafe memory only if writing values - unsafeMemory = cacheCfg.getMemoryMode() == OFFHEAP_VALUES ? + unsafeMemory = (cacheCfg.getMemoryMode() == OFFHEAP_VALUES || cacheCfg.getMemoryMode() == OFFHEAP_TIERED) ? new GridUnsafeMemory(cacheCfg.getOffHeapMaxMemory()) : null; gate = new GridCacheGateway<>(this); @@ -1777,7 +1777,7 @@ public class GridCacheContext<K, V> implements Externalizable { * @param obj Object. * @return Cache object. */ - @Nullable public KeyCacheObject toCacheKeyObject(@Nullable Object obj) { + public KeyCacheObject toCacheKeyObject(Object obj) { if (obj instanceof KeyCacheObject) return (KeyCacheObject)obj; @@ -1785,6 +1785,84 @@ public class GridCacheContext<K, V> implements Externalizable { } /** + * @param obj Object. + * @param bytes Key bytes. + * @return Cache object. + * @throws IgniteCheckedException If failed. + */ + public KeyCacheObject toCacheKeyObject(Object obj, byte[] bytes) throws IgniteCheckedException { + // TODO IGNITE-51 move to processor. + assert obj != null || bytes != null; + + if (obj == null) + obj = marshaller().unmarshal(bytes, deploy().globalLoader()); + + return new KeyCacheObjectImpl(obj, bytes); + } + + /** + * @param bytes Bytes. + * @param valIsByteArr {@code True} if value is byte array. + * @param clsLdrId Class loader ID. + * @return Cache object. + * @throws IgniteCheckedException If failed. + */ + @Nullable public CacheObject unswapCacheObject(byte[] bytes, boolean valIsByteArr, @Nullable IgniteUuid clsLdrId) + throws IgniteCheckedException + { + // TODO IGNITE-51 move to processor. + if (valIsByteArr) + return new CacheObjectImpl(bytes, null); + + ClassLoader ldr = clsLdrId != null ? deploy().getClassLoader(clsLdrId) : deploy().localLoader(); + + if (ldr == null) + return null; + + return new CacheObjectImpl(marshaller().unmarshal(bytes, ldr), bytes); + } + + /** */ + private static final sun.misc.Unsafe UNSAFE = GridUnsafe.unsafe(); + + /** + * @param valPtr Value pointer. + * @param tmp If {@code true} can return temporary instance which is valid while entry lock is held. + * @return Cache object. + * @throws IgniteCheckedException If failed. + */ + public CacheObject fromOffheap(long valPtr, boolean tmp) throws IgniteCheckedException { + assert config().getMemoryMode() == OFFHEAP_TIERED || config().getMemoryMode() == OFFHEAP_VALUES; + + // TODO IGNITE-51. + if (portableEnabled()) + return (CacheObject)portable().unmarshal(valPtr, !tmp); + + long ptr = valPtr; + + int size = UNSAFE.getInt(ptr); + + ptr += 4; + + boolean plainByteArr = UNSAFE.getByte(ptr++) == 1; + + byte[] bytes = U.copyMemory(ptr, size); + + if (plainByteArr) + return new CacheObjectImpl(bytes, null); + + if (offheapTiered()) { + IgniteUuid valClsLdrId = U.readGridUuid(ptr + size); + + ClassLoader ldr = valClsLdrId != null ? deploy().getClassLoader(valClsLdrId) : deploy().localLoader(); + + return new CacheObjectImpl(marshaller().unmarshal(bytes, ldr), bytes); + } + else + return new CacheObjectImpl(marshaller().unmarshal(bytes, U.gridClassLoader()), bytes); + } + + /** * @param map Map. * @param key Key. * @param val Value. @@ -1798,13 +1876,14 @@ public class GridCacheContext<K, V> implements Externalizable { CacheObject val, boolean skipVals, boolean keepCacheObjects, - boolean deserializePortable) { + boolean deserializePortable, + boolean cpy) { assert key != null; assert val != null; if (!keepCacheObjects) { - Object key0 = key.value(this); - Object val0 = skipVals ? true : val.value(this); + Object key0 = key.value(this, cpy); + Object val0 = skipVals ? true : val.value(this, cpy); if (portableEnabled() && deserializePortable) { key0 = unwrapPortableIfNeeded(key0, false); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb251dda/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java index 12fac3b..6f0e626 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java @@ -322,7 +322,7 @@ public class GridCacheEntryInfo implements Externalizable, Message { * @param ctx Cache context. * @throws IgniteCheckedException In case of error. */ - public void marshal(GridCacheSharedContext<?, ?> ctx) throws IgniteCheckedException { + public void marshal(GridCacheContext ctx) throws IgniteCheckedException { key.prepareMarshal(ctx); val.prepareMarshal(ctx); @@ -350,9 +350,9 @@ public class GridCacheEntryInfo implements Externalizable, Message { * @throws IgniteCheckedException If unmarshalling failed. */ public void unmarshal(GridCacheContext ctx, ClassLoader clsLdr) throws IgniteCheckedException { - key.finishUnmarshal(ctx.shared(), clsLdr); + key.finishUnmarshal(ctx, clsLdr); - val.finishUnmarshal(ctx.shared(), clsLdr); + val.finishUnmarshal(ctx, clsLdr); // TODO IGNITE-51 // Marshaller mrsh = ctx.marshaller(); // http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb251dda/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java index 0c953d7..9a43797 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfoCollectSwapListener.java @@ -48,7 +48,6 @@ public class GridCacheEntryInfoCollectSwapListener implements GridCacheSwapListe /** {@inheritDoc} */ @Override public void onEntryUnswapped(int part, KeyCacheObject key, - byte[] keyBytes, GridCacheSwapEntry swapEntry) { // TODO IGNITE-51. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb251dda/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java index fb8d124..d87a224 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java @@ -244,12 +244,12 @@ public class GridCacheEventManager extends GridCacheManagerAdapter { type, part, cctx.isNear(), - key.value(cctx), + key.value(cctx, false), xid, lockId, - newVal != null ? newVal.value(cctx) : null, + newVal != null ? newVal.value(cctx, false) : null, hasNewVal, - oldVal != null ? oldVal.value(cctx) : null, + oldVal != null ? oldVal.value(cctx, false) : null, hasOldVal, subjId, cloClsName, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb251dda/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java index 96b6402..e3a9b1f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java @@ -940,7 +940,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V try { for (GridCacheEntryEx entry : cached.values()) { // Do not evict internal entries. - if (entry.key() instanceof GridCacheInternal) + if (entry.key().internal()) continue; // Lock entry. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb251dda/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 9a8f9f0..f30e0c1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -47,7 +47,6 @@ import java.util.concurrent.atomic.*; import static org.apache.ignite.events.EventType.*; import static org.apache.ignite.internal.processors.cache.CacheFlag.*; -import static org.apache.ignite.internal.processors.dr.GridDrType.*; import static org.apache.ignite.transactions.TransactionState.*; /** @@ -227,14 +226,10 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { assert mem != null; - if (val != null || valBytes != null) { -// TODO IGNITE-51. -// boolean valIsByteArr = val instanceof byte[]; -// -// if (valBytes == null && !valIsByteArr) -// valBytes = CU.marshal(cctx.shared(), val); -// -// valPtr = mem.putOffHeap(valPtr, valIsByteArr ? (byte[])val : valBytes, valIsByteArr); + if (val != null) { + boolean valIsByteArr = val.byteArray(); + + valPtr = mem.putOffHeap(valPtr, val.valueBytes(cctx), valIsByteArr); } else { mem.removeOffHeap(valPtr); @@ -489,7 +484,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { valPtr = e.offheapPointer(); if (needVal) { - CacheObject val = unmarshalOffheap(false); + CacheObject val = cctx.fromOffheap(valPtr, false); e.value(val); } @@ -497,6 +492,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { else { // Read from swap. valPtr = 0; + // TODO IGNITE-51. if (cctx.portableEnabled() && !e.valueIsByteArray()) e.valueBytes(null); // Clear bytes marshalled with portable marshaller. } @@ -517,8 +513,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (delta >= 0) { CacheObject val = e.value(); - if (cctx.portableEnabled()) - val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx); + val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx); // Set unswapped value. update(val, e.valueBytes(), e.expireTime(), e.ttl(), e.version()); @@ -550,7 +545,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (expireTime > 0 && U.currentTimeMillis() >= expireTime) { // Don't swap entry if it's expired. // Entry might have been updated. if (cctx.offheapTiered()) { - cctx.swap().removeOffheap(key, getOrMarshalKeyBytes()); + cctx.swap().removeOffheap(key); valPtr = 0; } @@ -563,28 +558,26 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { log.debug("Value did not change, skip write swap entry: " + this); if (cctx.swap().offheapEvictionEnabled()) - cctx.swap().enableOffheapEviction(key(), getOrMarshalKeyBytes()); + cctx.swap().enableOffheapEviction(key()); return; } -// TODO IGNITE-51. -// boolean plain = val instanceof byte[]; -// -// IgniteUuid valClsLdrId = null; -// -// if (val != null) -// valClsLdrId = cctx.deploy().getClassLoaderId(val.getClass().getClassLoader()); -// -// cctx.swap().write(key(), -// getOrMarshalKeyBytes(), -// plain ? ByteBuffer.wrap((byte[])val) : swapValueBytes(), -// plain, -// ver, -// ttlExtras(), -// expireTime, -// cctx.deploy().getClassLoaderId(U.detectObjectClassLoader(key)), -// valClsLdrId); + IgniteUuid valClsLdrId = null; + + if (val != null) + valClsLdrId = cctx.deploy().getClassLoaderId(val.value(cctx, false).getClass().getClassLoader()); + + IgniteBiTuple<byte[], Boolean> valBytes = swapValueBytes(); + + cctx.swap().write(key(), + ByteBuffer.wrap(valBytes.get1()), + valBytes.get2(), + ver, + ttlExtras(), + expireTime, + cctx.deploy().getClassLoaderId(U.detectObjectClassLoader(key.value(cctx, false))), + valClsLdrId); if (log.isDebugEnabled()) log.debug("Wrote swap entry: " + this); @@ -592,12 +585,33 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { } /** + * @return Value bytes and flag indicating whether value is byte array. + * @throws IgniteCheckedException If failed. + */ + private IgniteBiTuple<byte[], Boolean> swapValueBytes() throws IgniteCheckedException { + if (valPtr != 0) { + assert isOffHeapValuesOnly() || cctx.offheapTiered(); + + return cctx.unsafeMemory().get(valPtr); + } + else { + assert val != null; + + byte[] bytes = val.valueBytes(cctx); + + boolean plain = val.byteArray(); + + return new IgniteBiTuple<>(bytes, plain); + } + } + + /** * @throws IgniteCheckedException If failed. */ protected final void releaseSwap() throws IgniteCheckedException { if (cctx.isSwapOrOffheapEnabled()) { synchronized (this){ - cctx.swap().remove(key(), getOrMarshalKeyBytes()); + cctx.swap().remove(key()); } if (log.isDebugEnabled()) @@ -858,7 +872,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { update(ret, null, expTime, ttl, nextVer); if (hadValPtr && cctx.offheapTiered()) - cctx.swap().removeOffheap(key, getOrMarshalKeyBytes()); + cctx.swap().removeOffheap(key); if (cctx.deferredDelete() && deletedUnlocked() && !isInternal() && !detached()) deletedUnlocked(false); @@ -1183,7 +1197,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { update(null, null, 0, 0, newVer); if (cctx.offheapTiered() && hadValPtr) { - boolean rmv = cctx.swap().removeOffheap(key, getOrMarshalKeyBytes()); + boolean rmv = cctx.swap().removeOffheap(key); assert rmv; } @@ -1382,7 +1396,9 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { assert entryProcessor != null; // TODO IGNITE-51. - CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry<>(cctx, key.value(cctx), old.value(cctx)); + CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry<>(cctx, + key.value(cctx, false), + old.value(cctx, false)); try { Object computed = entryProcessor.process(entry, invokeArgs); @@ -1814,8 +1830,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { EntryProcessor<Object, Object, ?> entryProcessor = (EntryProcessor<Object, Object, ?>)writeObj; - key0 = key.value(cctx); - old0 = CU.value(old0, oldVal, cctx); + key0 = key.value(cctx, false); + old0 = value(old0, oldVal, false); CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry<>(cctx, key0, old0); @@ -1943,9 +1959,9 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // Actual update. if (op == GridCacheOperation.UPDATE) { if (intercept) { - key0 = CU.value(key0, key, cctx); - old0 = CU.value(old0, oldVal, cctx); - updated0 = CU.value(updated0, updated, cctx); + key0 = value(key0, key, false); + old0 = value(old0, oldVal, false); + updated0 = value(updated0, updated, false); Object interceptorVal = cctx.config().getInterceptor().onBeforePut(key0, old0, updated0); @@ -2021,8 +2037,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { } else { if (intercept) { - key0 = CU.value(key0, key, cctx); - old0 = CU.value(old0, oldVal, cctx); + key0 = value(key0, key, false); + old0 = value(old0, oldVal, false); interceptRes = cctx.config().getInterceptor().onBeforeRemove(key0, old0); @@ -2075,7 +2091,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { assert newSysExpireTime == CU.EXPIRE_TIME_CALCULATE; if (cctx.offheapTiered() && hasValPtr) { - boolean rmv = cctx.swap().removeOffheap(key, getOrMarshalKeyBytes()); + boolean rmv = cctx.swap().removeOffheap(key); assert rmv; } @@ -2120,12 +2136,12 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE); if (intercept) { - key0 = CU.value(key0, key, cctx); + key0 = value(key0, key, false); if (op == GridCacheOperation.UPDATE) - cctx.config().getInterceptor().onAfterPut(key0, val.value(cctx)); + cctx.config().getInterceptor().onAfterPut(key0, val.value(cctx, false)); else { - old0 = CU.value(old0, oldVal, cctx); + old0 = value(old0, oldVal, false); cctx.config().getInterceptor().onAfterRemove(key0, old0); } @@ -2150,6 +2166,19 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { } /** + * @param val Value. + * @param cacheObj Cache object. + * @param cpy Copy flag. + * @return Cache object value. + */ + @Nullable private Object value(@Nullable Object val, @Nullable CacheObject cacheObj, boolean cpy) { + if (val != null) + return val; + + return cacheObj != null ? cacheObj.value(cctx, cpy) : null; + } + + /** * @param expiry Expiration policy. * @return Tuple holding initial TTL and expire time with the given expiry. */ @@ -3131,17 +3160,10 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (val != null) return val; - GridCacheValueBytes valBytes = valueBytesUnlocked(); - -// TODO IGNITE-51. -// if (!valBytes.isNull()) -// val = valBytes.isPlain() ? (V)valBytes.get() : cctx.marshaller().<V>unmarshal(valBytes.get(), -// cctx.deploy().globalLoader()); - - if (val == null && cctx.offheapTiered() && valPtr != 0) - val = unmarshalOffheap(tmp); + if (valPtr != 0) + return cctx.fromOffheap(valPtr, tmp); - return val; + return null; } /** {@inheritDoc} */ @@ -3786,7 +3808,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { else val = rawGetOrUnmarshal(false); - return new CacheEntryImpl<>(key.<K>value(cctx), val != null ? val.<V>value(cctx) : null); + return new CacheEntryImpl<>(key.<K>value(cctx, false), val != null ? val.<V>value(cctx, false) : null); } catch (GridCacheFilterFailedException ignored) { throw new IgniteException("Should never happen."); @@ -3805,7 +3827,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { @Override public <K, V> Cache.Entry<K, V> wrapFilterLocked() throws IgniteCheckedException { CacheObject val = rawGetOrUnmarshal(true); - return new CacheEntryImpl<>(key.<K>value(cctx), val != null ? val.<V>value(cctx) : null) ; + return new CacheEntryImpl<>(key.<K>value(cctx, false), val != null ? val.<V>value(cctx, false) : null) ; } /** {@inheritDoc} */ @@ -3815,7 +3837,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { /** {@inheritDoc} */ @Override public synchronized <K, V> CacheVersionedEntryImpl<K, V> wrapVersioned() { - return new CacheVersionedEntryImpl<>(key.<K>value(cctx), null, ver); + return new CacheVersionedEntryImpl<>(key.<K>value(cctx, false), null, ver); } /** {@inheritDoc} */ @@ -3924,24 +3946,22 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { try { if (!hasReaders() && markObsolete0(obsoleteVer, false)) { if (!isStartVersion() && hasValueUnlocked()) { -// TODO IGNITE-51. -// boolean plain = val instanceof byte[]; -// -// IgniteUuid valClsLdrId = null; -// -// if (val != null) -// valClsLdrId = cctx.deploy().getClassLoaderId(U.detectObjectClassLoader(val)); -// -// ret = new GridCacheBatchSwapEntry<>(key(), -// getOrMarshalKeyBytes(), -// partition(), -// plain ? ByteBuffer.wrap((byte[])val) : swapValueBytes(), -// plain, -// ver, -// ttlExtras(), -// expireTimeExtras(), -// cctx.deploy().getClassLoaderId(U.detectObjectClassLoader(key)), -// valClsLdrId); + IgniteUuid valClsLdrId = null; + + if (val != null) + valClsLdrId = cctx.deploy().getClassLoaderId(U.detectObjectClassLoader(val.value(cctx, false))); + + IgniteBiTuple<byte[], Boolean> valBytes = swapValueBytes(); + + ret = new GridCacheBatchSwapEntry(key(), + partition(), + ByteBuffer.wrap(valBytes.get1()), + valBytes.get2(), + ver, + ttlExtras(), + expireTimeExtras(), + cctx.deploy().getClassLoaderId(U.detectObjectClassLoader(key.value(cctx, false))), + valClsLdrId); } value(null, null); @@ -3956,37 +3976,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { } /** - * Create value bytes wrapper from the given object. - * - * @return Value bytes wrapper. - * @throws IgniteCheckedException If failed. - */ - private ByteBuffer swapValueBytes() throws IgniteCheckedException { - assert val != null || valPtr != 0; - - if (cctx.offheapTiered() && cctx.portableEnabled()) { - if (val != null) - return cctx.portable().marshal(val, false); - -// TODO IGNITE-51. -// V val0 = cctx.marshaller().unmarshal(valBytes, U.gridClassLoader()); -// -// return cctx.portable().marshal(val0, false); - return null; - } - else { - GridCacheValueBytes res = valueBytesUnlocked(); - - if (res.isNull()) - res = GridCacheValueBytes.marshaled(CU.marshal(cctx.shared(), val)); - - assert res.get() != null; - - return ByteBuffer.wrap(res.get()); - } - } - - /** * @param filter Entry filter. * @return {@code True} if entry is visitable. */ @@ -4431,7 +4420,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { /** {@inheritDoc} */ @Override public K getKey() { - return key.value(cctx); + return key.value(cctx, false); } /** {@inheritDoc} */ @@ -4444,11 +4433,11 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { GridTuple<CacheObject> peek = tx.peek(cctx, false, key, null); if (peek != null) - return peek.get().value(cctx); + return peek.get().value(cctx, false); } if (detached()) - return rawGet().value(cctx); + return rawGet().value(cctx, false); for (;;) { GridCacheEntryEx e = cctx.cache().peekEx(key); @@ -4457,7 +4446,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { return null; try { - return e.peek(GridCachePeekMode.GLOBAL, CU.<K, V>empty()).value(cctx); + return e.peek(GridCachePeekMode.GLOBAL, CU.<K, V>empty()).value(cctx, false); } catch (GridCacheEntryRemovedException ignored) { // No-op. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb251dda/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java index ad773ac..61b01d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java @@ -271,15 +271,15 @@ public abstract class GridCacheMessage implements Message { * @param ctx Context. * @throws IgniteCheckedException If failed. */ - protected final void marshalInfo(GridCacheEntryInfo info, GridCacheSharedContext ctx) throws IgniteCheckedException { + protected final void marshalInfo(GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException { assert ctx != null; if (info != null) { info.marshal(ctx); if (ctx.deploymentEnabled()) { - prepareObject(info.key(), ctx); - prepareObject(info.value(), ctx); + prepareObject(info.key(), ctx.shared()); + prepareObject(info.value(), ctx.shared()); } } } @@ -306,7 +306,7 @@ public abstract class GridCacheMessage implements Message { */ protected final void marshalInfos( Iterable<? extends GridCacheEntryInfo> infos, - GridCacheSharedContext ctx + GridCacheContext ctx ) throws IgniteCheckedException { assert ctx != null; @@ -581,7 +581,7 @@ public abstract class GridCacheMessage implements Message { */ @SuppressWarnings("ForLoopReplaceableByForEach") protected final void prepareMarshalCacheObjects(@Nullable List<? extends CacheObject> col, - GridCacheSharedContext ctx) throws IgniteCheckedException { + GridCacheContext ctx) throws IgniteCheckedException { if (col == null) return; @@ -597,7 +597,7 @@ public abstract class GridCacheMessage implements Message { * @throws IgniteCheckedException If failed. */ protected final void prepareMarshalCacheObjects(@Nullable Collection<? extends CacheObject> col, - GridCacheSharedContext ctx) throws IgniteCheckedException { + GridCacheContext ctx) throws IgniteCheckedException { if (col == null) return; @@ -613,7 +613,7 @@ public abstract class GridCacheMessage implements Message { */ @SuppressWarnings("ForLoopReplaceableByForEach") protected final void finishUnmarshalCacheObjects(@Nullable List<? extends CacheObject> col, - GridCacheSharedContext ctx, + GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException { @@ -633,7 +633,7 @@ public abstract class GridCacheMessage implements Message { * @throws IgniteCheckedException If failed. */ protected final void finishUnmarshalCacheObjects(@Nullable Collection<? extends CacheObject> col, - GridCacheSharedContext ctx, + GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb251dda/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapSwapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapSwapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapSwapEntry.java index 93879bd..a260fca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapSwapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapSwapEntry.java @@ -75,11 +75,11 @@ public class GridCacheOffheapSwapEntry implements GridCacheSwapEntry { readPtr += verEx ? GridCacheSwapEntryImpl.VERSION_EX_SIZE : GridCacheSwapEntryImpl.VERSION_SIZE; - valIsByteArr = UNSAFE.getByte(readPtr) != 0; + valIsByteArr = UNSAFE.getByte(readPtr + 4) == 1; valPtr = readPtr; - assert (ptr + size) > (UNSAFE.getInt(valPtr + 1) + valPtr + 5); + assert (ptr + size) > (UNSAFE.getInt(valPtr) + valPtr + 5); } /** @@ -97,7 +97,7 @@ public class GridCacheOffheapSwapEntry implements GridCacheSwapEntry { ptr += verEx ? GridCacheSwapEntryImpl.VERSION_EX_SIZE : GridCacheSwapEntryImpl.VERSION_SIZE; - assert (ptr + size) > (UNSAFE.getInt(ptr + 1) + ptr + 5); + assert (ptr + size) > (UNSAFE.getInt(ptr) + ptr + 5); return ptr; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb251dda/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java index b3e085d..590d15f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java @@ -57,7 +57,7 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry { /** Value. */ private CacheObject val; - /** Falg indicating that value is byte array, so valBytes should not be unmarshalled. */ + /** Flag indicating that value is byte array, so valBytes should not be unmarshalled. */ private boolean valIsByteArr; /** Class loader ID. */ @@ -274,12 +274,12 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry { off = U.writeVersion(arr, off, ver); - UNSAFE.putBoolean(arr, off++, valIsByteArr); - UNSAFE.putInt(arr, off, len); off += 4; + UNSAFE.putBoolean(arr, off++, valIsByteArr); + UNSAFE.copyMemory(valBytes.array(), BYTE_ARR_OFF, arr, off, len); off += len; @@ -312,12 +312,12 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry { off += verEx ? VERSION_EX_SIZE : VERSION_SIZE; - boolean valIsByteArr = UNSAFE.getBoolean(arr, off++); - int arrLen = UNSAFE.getInt(arr, off); off += 4; + boolean valIsByteArr = UNSAFE.getBoolean(arr, off++); + byte[] valBytes = new byte[arrLen]; UNSAFE.copyMemory(arr, off, valBytes, BYTE_ARR_OFF, arrLen); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb251dda/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java index 5a707c8..ff5ad32 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapListener.java @@ -24,8 +24,7 @@ public interface GridCacheSwapListener { /** * @param part Partition. * @param key Cache key. - * @param keyBytes Key bytes. * @param e Entry. */ - public void onEntryUnswapped(int part, KeyCacheObject key, byte[] keyBytes, GridCacheSwapEntry e); + public void onEntryUnswapped(int part, KeyCacheObject key, GridCacheSwapEntry e); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb251dda/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index e231254..26c5366 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -119,7 +119,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { if (!firstEvictWarn) warnFirstEvict(); - writeToSwap(part, null, kb, vb); + writeToSwap(part, cctx.toCacheKeyObject(null, kb), vb); } catch (IgniteCheckedException e) { log.error("Failed to unmarshal off-heap entry [part=" + part + ", hash=" + hash + ']', e); @@ -225,32 +225,29 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { /** * @param part Partition. * @param key Cache key. - * @param keyBytes Key bytes. * @param e Entry. */ - private void onUnswapped(int part, KeyCacheObject key, byte[] keyBytes, GridCacheSwapEntry e) { - onEntryUnswapped(swapLsnrs, part, key, keyBytes, e); + private void onUnswapped(int part, KeyCacheObject key, GridCacheSwapEntry e) { + onEntryUnswapped(swapLsnrs, part, key, e); } /** * @param part Partition. * @param key Cache key. - * @param keyBytes Key bytes. * @param e Entry. */ - private void onOffHeaped(int part, KeyCacheObject key, byte[] keyBytes, GridCacheSwapEntry e) { - onEntryUnswapped(offheapLsnrs, part, key, keyBytes, e); + private void onOffHeaped(int part, KeyCacheObject key, GridCacheSwapEntry e) { + onEntryUnswapped(offheapLsnrs, part, key, e); } /** * @param map Listeners. * @param part Partition. * @param key Cache key. - * @param keyBytes Key bytes. * @param e Entry. */ private void onEntryUnswapped(ConcurrentMap<Integer, Collection<GridCacheSwapListener>> map, - int part, KeyCacheObject key, byte[] keyBytes, GridCacheSwapEntry e) { + int part, KeyCacheObject key, GridCacheSwapEntry e) { Collection<GridCacheSwapListener> lsnrs = map.get(part); if (lsnrs == null) { @@ -261,7 +258,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { } for (GridCacheSwapListener lsnr : lsnrs) - lsnr.onEntryUnswapped(part, key, keyBytes, e); + lsnr.onEntryUnswapped(part, key, e); } /** @@ -404,12 +401,25 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { * @return Reconstituted swap entry or {@code null} if entry is obsolete. * @throws IgniteCheckedException If failed. */ - @Nullable private GridCacheSwapEntry swapEntry(GridCacheSwapEntry e, boolean unmarshal) throws IgniteCheckedException { + @Nullable private GridCacheSwapEntry swapEntry(GridCacheSwapEntry e, boolean unmarshal) + throws IgniteCheckedException + { assert e != null; checkIteratorQueue(); -// TODO IGNITE-51. + if (e.valueIsByteArray()) + e.value(cctx.unswapCacheObject(e.valueBytes(), e.valueIsByteArray(), null)); + else if (unmarshal) { + CacheObject val = cctx.unswapCacheObject(e.valueBytes(), e.valueIsByteArray(), e.valueClassLoaderId()); + + if (val == null) + return null; + + e.value(val); + } + +// TODO IGNITE-51 remove after tested with portables. // if (e.valueIsByteArray()) // e.value((V)e.valueBytes()); // else if (unmarshal) { @@ -435,11 +445,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { /** * @param key Key to check. - * @param keyBytes Key bytes to check. * @return {@code True} if key is contained. * @throws IgniteCheckedException If failed. */ - public boolean containsKey(KeyCacheObject key, byte[] keyBytes) throws IgniteCheckedException { + public boolean containsKey(KeyCacheObject key) throws IgniteCheckedException { if (!offheapEnabled && !swapEnabled) return false; @@ -449,13 +458,14 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { // First check off-heap store. if (offheapEnabled) - if (offheap.contains(spaceName, part, key, keyBytes)) + if (offheap.contains(spaceName, part, key, key.valueBytes(cctx))) return true; if (swapEnabled) { assert key != null; - byte[] valBytes = swapMgr.read(spaceName, new SwapKey(key, part, keyBytes), + byte[] valBytes = swapMgr.read(spaceName, + new SwapKey(key.value(cctx), part, key.valueBytes(cctx)), cctx.deploy().globalLoader()); return valBytes != null; @@ -501,7 +511,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { // First check off-heap store. if (readOffheap && offheapEnabled) { - byte[] bytes = offheap.get(spaceName, part, key, keyBytes); + byte[] bytes = offheap.get(spaceName, part, key, key.valueBytes(cctx)); if (bytes != null) return swapEntry(unmarshalSwapEntry(bytes)); @@ -527,12 +537,11 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { /** * @param key Key to remove. - * @param keyBytes Key bytes. * @return Value from swap or {@code null}. * @throws IgniteCheckedException If failed. */ @SuppressWarnings({"unchecked"}) - @Nullable GridCacheSwapEntry readAndRemove(final KeyCacheObject key, final byte[] keyBytes) + @Nullable GridCacheSwapEntry readAndRemove(final KeyCacheObject key) throws IgniteCheckedException { if (!offheapEnabled && !swapEnabled) return null; @@ -543,7 +552,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { // First try removing from offheap. if (offheapEnabled) { - byte[] entryBytes = offheap.remove(spaceName, part, key, keyBytes); + byte[] entryBytes = offheap.remove(spaceName, part, key, key.valueBytes(cctx)); if (entryBytes != null) { GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes)); @@ -552,7 +561,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { return null; // Always fire this event, since preloading depends on it. - onOffHeaped(part, key, keyBytes, entry); + onOffHeaped(part, key, entry); if (cctx.events().isRecordable(EVT_CACHE_OBJECT_FROM_OFFHEAP)) cctx.events().addEvent( @@ -579,19 +588,17 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { } } - return readAndRemoveSwap(key, part, keyBytes); + return readAndRemoveSwap(key, part); } /** * @param key Key. * @param part Partition. - * @param keyBytes Key bytes. * @return Value from swap or {@code null}. * @throws IgniteCheckedException If failed. */ @Nullable private GridCacheSwapEntry readAndRemoveSwap(final KeyCacheObject key, - final int part, - final byte[] keyBytes) + final int part) throws IgniteCheckedException { if (!swapEnabled) return null; @@ -599,7 +606,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { final GridTuple<GridCacheSwapEntry> t = F.t1(); final GridTuple<IgniteCheckedException> err = F.t1(); - swapMgr.remove(spaceName, new SwapKey(key, part, keyBytes), new CI1<byte[]>() { + swapMgr.remove(spaceName, new SwapKey(key, part, key.valueBytes(cctx)), new CI1<byte[]>() { @Override public void apply(byte[] rmv) { if (rmv != null) { try { @@ -631,7 +638,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { } // Always fire this event, since preloading depends on it. - onUnswapped(part, key, keyBytes, entry); + onUnswapped(part, key, entry); GridCacheQueryManager qryMgr = cctx.queries(); @@ -691,7 +698,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { byte[] keyBytes = entry.getOrMarshalKeyBytes(); - IgniteBiTuple<Long, Integer> ptr = offheap.valuePointer(spaceName, part, key, keyBytes); + IgniteBiTuple<Long, Integer> ptr = offheap.valuePointer(spaceName, part, key, key.valueBytes(cctx)); if (ptr != null) { assert ptr.get1() != null; @@ -700,7 +707,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { return new GridCacheOffheapSwapEntry(ptr.get1(), ptr.get2()); } - return readAndRemoveSwap(key, part, keyBytes); + return readAndRemoveSwap(key, part); } /** @@ -732,7 +739,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { if (!offheapEnabled && !swapEnabled) return null; - return readAndRemove(entry.key(), entry.getOrMarshalKeyBytes()); + return readAndRemove(entry.key()); } /** @@ -757,16 +764,14 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { for (KeyCacheObject key : keys) { int part = cctx.affinity().partition(key); - byte[] keyBytes = CU.marshal(cctx.shared(), key); - - byte[] entryBytes = offheap.remove(spaceName, part, key, keyBytes); + byte[] entryBytes = offheap.remove(spaceName, part, key, key.valueBytes(cctx)); if (entryBytes != null) { GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes)); if (entry != null) { // Always fire this event, since preloading depends on it. - onOffHeaped(part, key, keyBytes, entry); + onOffHeaped(part, key, entry); if (cctx.events().isRecordable(EVT_CACHE_OBJECT_FROM_OFFHEAP)) cctx.events().addEvent(part, key, cctx.nodeId(), (IgniteUuid)null, null, @@ -776,7 +781,6 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { qryMgr.onUnswap(key, entry.value(), entry.valueBytes()); GridCacheBatchSwapEntry unswapped = new GridCacheBatchSwapEntry(key, - keyBytes, part, entry.valueIsByteArray() ? null : ByteBuffer.wrap(entry.valueBytes()), entry.valueIsByteArray(), @@ -829,10 +833,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { if (entry == null) return; - KeyCacheObject key = (KeyCacheObject)swapKey.key(); + KeyCacheObject key = cctx.toCacheKeyObject(swapKey.key(), swapKey.keyBytes()); GridCacheBatchSwapEntry unswapped = new GridCacheBatchSwapEntry(key, - swapKey.keyBytes(), swapKey.partition(), entry.valueIsByteArray() ? null : ByteBuffer.wrap(entry.valueBytes()), entry.valueIsByteArray(), @@ -864,7 +867,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { } // Always fire this event, since preloading depends on it. - onUnswapped(swapKey.partition(), key, swapKey.keyBytes(), entry); + onUnswapped(swapKey.partition(), key, entry); if (qryMgr != null) qryMgr.onUnswap(key, entry.value(), entry.valueBytes()); @@ -884,31 +887,18 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { } /** - * @param key Key to read swap entry for. - * @return Read value. - * @throws IgniteCheckedException If read failed. - */ - @Nullable GridCacheSwapEntry readAndRemove(KeyCacheObject key) throws IgniteCheckedException { - if (!offheapEnabled && !swapEnabled) - return null; - - return readAndRemove(key, CU.marshal(cctx.shared(), key)); - } - - /** * @param key Key to remove. - * @param keyBytes Key bytes. * @return {@code True} If succeeded. * @throws IgniteCheckedException If failed. */ - boolean removeOffheap(final KeyCacheObject key, byte[] keyBytes) throws IgniteCheckedException { + boolean removeOffheap(final KeyCacheObject key) throws IgniteCheckedException { assert offheapEnabled; checkIteratorQueue(); int part = cctx.affinity().partition(key); - return offheap.removex(spaceName, part, key, keyBytes); + return offheap.removex(spaceName, part, key, key.valueBytes(cctx)); } /** @@ -922,10 +912,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { * Enables eviction for offheap entry after {@link #readOffheapPointer} was called. * * @param key Key. - * @param keyBytes Key bytes. * @throws IgniteCheckedException If failed. */ - void enableOffheapEviction(final KeyCacheObject key, byte[] keyBytes) throws IgniteCheckedException { + void enableOffheapEviction(final KeyCacheObject key) throws IgniteCheckedException { if (!offheapEnabled) return; @@ -933,15 +922,14 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { int part = cctx.affinity().partition(key); - offheap.enableEviction(spaceName, part, key, keyBytes); + offheap.enableEviction(spaceName, part, key, key.valueBytes(cctx)); } /** * @param key Key to remove. - * @param keyBytes Key bytes. * @throws IgniteCheckedException If failed. */ - public void remove(final KeyCacheObject key, byte[] keyBytes) throws IgniteCheckedException { + public void remove(final KeyCacheObject key) throws IgniteCheckedException { if (!offheapEnabled && !swapEnabled) return; @@ -972,7 +960,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { // First try offheap. if (offheapEnabled) { - byte[] val = offheap.remove(spaceName, part, key, keyBytes); + byte[] val = offheap.remove(spaceName, part, key, key.valueBytes(cctx)); if (val != null) { if (c != null) @@ -983,7 +971,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { } if (swapEnabled) - swapMgr.remove(spaceName, new SwapKey(key, part, keyBytes), c, + swapMgr.remove(spaceName, new SwapKey(key, part, key.valueBytes(cctx)), c, cctx.deploy().globalLoader()); } @@ -991,7 +979,6 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { * Writes a versioned value to swap. * * @param key Key. - * @param keyBytes Key bytes. * @param val Value. * @param valIsByteArr Whether value is byte array. * @param ver Version. @@ -1002,7 +989,6 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { * @throws IgniteCheckedException If failed. */ void write(KeyCacheObject key, - byte[] keyBytes, ByteBuffer val, boolean valIsByteArr, GridCacheVersion ver, @@ -1027,14 +1013,14 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { valClsLdrId); if (offheapEnabled) { - offheap.put(spaceName, part, key, keyBytes, entry.marshal()); + offheap.put(spaceName, part, key, key.valueBytes(cctx), entry.marshal()); if (cctx.events().isRecordable(EVT_CACHE_OBJECT_TO_OFFHEAP)) cctx.events().addEvent(part, key, cctx.nodeId(), (IgniteUuid)null, null, EVT_CACHE_OBJECT_TO_OFFHEAP, null, false, null, true, null, null, null); } else if (swapEnabled) - writeToSwap(part, key, keyBytes, entry.marshal()); + writeToSwap(part, key, entry.marshal()); GridCacheQueryManager qryMgr = cctx.queries(); @@ -1060,7 +1046,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { offheap.put(spaceName, swapEntry.partition(), swapEntry.key(), - swapEntry.keyBytes(), + swapEntry.key().valueBytes(cctx), swapEntry.marshal()); if (cctx.events().isRecordable(EVT_CACHE_OBJECT_TO_OFFHEAP)) @@ -1075,7 +1061,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { Map<SwapKey, byte[]> batch = new LinkedHashMap<>(); for (GridCacheBatchSwapEntry entry : swapped) - batch.put(new SwapKey(entry.key(), entry.partition(), entry.keyBytes()), entry.marshal()); + batch.put(new SwapKey(entry.key(), entry.partition(), entry.key().valueBytes(cctx)), entry.marshal()); swapMgr.writeAll(spaceName, batch, cctx.deploy().globalLoader()); @@ -1096,22 +1082,20 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { * * @param part Partition. * @param key Key. If {@code null} then it will be deserialized from {@code keyBytes}. - * @param keyBytes Key bytes. * @param entry Entry bytes. * @throws IgniteCheckedException If failed. */ private void writeToSwap(int part, - @Nullable KeyCacheObject key, - byte[] keyBytes, + KeyCacheObject key, byte[] entry) throws IgniteCheckedException { checkIteratorQueue(); - if (key == null) - key = unmarshalKey(keyBytes, cctx.deploy().globalLoader()); - - swapMgr.write(spaceName, new SwapKey(key, part, keyBytes), entry, cctx.deploy().globalLoader()); + swapMgr.write(spaceName, + new SwapKey(key.value(cctx), part, key.valueBytes(cctx)), + entry, + cctx.deploy().globalLoader()); if (cctx.events().isRecordable(EVT_CACHE_OBJECT_SWAPPED)) cctx.events().addEvent(part, key, cctx.nodeId(), (IgniteUuid)null, null, @@ -1291,11 +1275,11 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { @Override protected void onRemove() throws IgniteCheckedException { if (offheapFlag) { - KeyCacheObject key = unmarshalKey(cur.getKey(), cctx.deploy().globalLoader()); + KeyCacheObject key = cctx.toCacheKeyObject(null, cur.getKey()); int part = cctx.affinity().partition(key); - offheap.removex(spaceName, part, key, cur.getKey()); + offheap.removex(spaceName, part, key, key.valueBytes(cctx)); } else it.removeX(); @@ -1516,11 +1500,11 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { } @Override protected void onRemove() throws IgniteCheckedException { - KeyCacheObject key = unmarshalKey(cur.getKey(), cctx.deploy().globalLoader()); + KeyCacheObject key = cctx.toCacheKeyObject(cur.getKey(), cur.getKey()); int part = cctx.affinity().partition(key); - offheap.removex(spaceName, part, key, cur.getKey()); + offheap.removex(spaceName, part, key, key.valueBytes(cctx)); } @Override protected void onClose() throws IgniteCheckedException { @@ -1655,12 +1639,11 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { else { if (valLdrId == null && swapEntry.value() == null && !swapEntry.valueIsByteArray()) { // We need value here only for classloading purposes. -// TODO IGNITE-51. -// V val = cctx.marshaller().unmarshal(swapEntry.valueBytes(), -// cctx.deploy().globalLoader()); -// -// if (val != null) -// valLdrId = cctx.deploy().getClassLoaderId(val.getClass().getClassLoader()); + Object val = cctx.marshaller().unmarshal(swapEntry.valueBytes(), + cctx.deploy().globalLoader()); + + if (val != null) + valLdrId = cctx.deploy().getClassLoaderId(val.getClass().getClassLoader()); } if (ldrId.equals(valLdrId)) { @@ -1803,7 +1786,6 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { /** {@inheritDoc} */ @Override public void onEntryUnswapped(int part, KeyCacheObject key, - byte[] keyBytes, GridCacheSwapEntry e) { if (this.key.equals(key)) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb251dda/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 5915e71..a60feb6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -1782,20 +1782,7 @@ public class GridCacheUtils { * @param ctx Cache context. * @return Cache object value. */ - @Nullable public static <T> T value(@Nullable CacheObject cacheObj, GridCacheContext ctx) { - return cacheObj != null ? cacheObj.<T>value(ctx) : null; - } - - /** - * @param val Value. - * @param cacheObj Cache object. - * @param ctx Cache context. - * @return Cache object value. - */ - @Nullable public static <T> T value(@Nullable T val, @Nullable CacheObject cacheObj, GridCacheContext ctx) { - if (val != null) - return val; - - return cacheObj != null ? cacheObj.<T>value(ctx) : null; + @Nullable public static <T> T value(@Nullable CacheObject cacheObj, GridCacheContext ctx, boolean cpy) { + return cacheObj != null ? cacheObj.<T>value(ctx, cpy) : null; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb251dda/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java index 9a0c971..72f8f3e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java @@ -58,7 +58,23 @@ public class KeyCacheObjectImpl implements KeyCacheObject, Externalizable { } /** {@inheritDoc} */ + @Override public boolean byteArray() { + assert false; + + return false; + } + + /** {@inheritDoc} */ + @Override public byte[] valueBytes(GridCacheContext ctx) { + assert valBytes != null; + + return valBytes; + } + + /** {@inheritDoc} */ @Override public boolean internal() { + assert val != null; + return val instanceof GridCacheInternal; } @@ -141,13 +157,13 @@ public class KeyCacheObjectImpl implements KeyCacheObject, Externalizable { } /** {@inheritDoc} */ - @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { + @Override public void prepareMarshal(GridCacheContext ctx) throws IgniteCheckedException { if (valBytes == null) - valBytes = CU.marshal(ctx, val); + valBytes = CU.marshal(ctx.shared(), val); } /** {@inheritDoc} */ - @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { + @Override public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException { assert valBytes != null; val = ctx.marshaller().unmarshal(valBytes, ldr); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb251dda/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserCacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserCacheObjectImpl.java index 1631ae4..966e7ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserCacheObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserCacheObjectImpl.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.internal.*; import java.util.*; @@ -42,7 +43,7 @@ public class UserCacheObjectImpl extends CacheObjectImpl { else { try { if (valBytes == null) - valBytes = ctx.marshaller().marshal(val); + valBytes = CU.marshal(ctx.shared(), val); return new CacheObjectImpl(null, valBytes); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eb251dda/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java index a0216e0..b0fd9b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.internal.*; /** * Cache object wrapping key provided by user. Need to be copied before stored in cache. @@ -37,7 +36,7 @@ public class UserKeyCacheObjectImpl extends KeyCacheObjectImpl { if (valBytes == null) valBytes = ctx.marshaller().marshal(val); - return new KeyCacheObjectImpl(ctx.marshaller().unmarshal(valBytes, U.gridClassLoader()), valBytes); + return new KeyCacheObjectImpl(ctx.marshaller().unmarshal(valBytes, ctx.deploy().globalLoader()), valBytes); } catch (IgniteCheckedException e) { throw new IgniteException("Failed to marshal object: " + val, e);