# ignite-51
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2ff54f23 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2ff54f23 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2ff54f23 Branch: refs/heads/ignite-user-req Commit: 2ff54f230fdf646ddbe290a50b6942a5d252bbfe Parents: a06c150 Author: sboikov <semen.boi...@inria.fr> Authored: Thu Mar 5 20:08:33 2015 +0300 Committer: sboikov <semen.boi...@inria.fr> Committed: Fri Mar 6 07:21:05 2015 +0300 ---------------------------------------------------------------------- .../cache/CacheEntryPredicateAdapter.java | 33 +----- .../cache/CacheEntryPredicateContainsValue.java | 13 +-- .../cache/CacheEntryPredicateHasValue.java | 2 +- .../cache/CacheEntryPredicateNoValue.java | 2 +- .../processors/cache/CacheInvokeEntry.java | 16 ++- .../internal/processors/cache/CacheObject.java | 10 +- .../processors/cache/CacheObjectAdapter.java | 17 ++- .../cache/GridCacheBatchSwapEntry.java | 6 +- .../processors/cache/GridCacheContext.java | 53 +++------ .../processors/cache/GridCacheEntryEx.java | 9 +- .../processors/cache/GridCacheEntryInfo.java | 95 ++++++++++------ .../processors/cache/GridCacheMapEntry.java | 101 ++++++++-------- .../cache/GridCacheOffheapSwapEntry.java | 8 +- .../processors/cache/GridCacheProcessor.java | 2 +- .../processors/cache/GridCacheSwapEntry.java | 12 +- .../cache/GridCacheSwapEntryImpl.java | 45 ++++---- .../processors/cache/GridCacheSwapManager.java | 103 ++++++----------- .../processors/cache/GridCacheUtils.java | 2 +- .../cache/KeyCacheObjectTransferImpl.java | 114 ------------------- .../processors/cache/UserCacheObjectImpl.java | 7 +- .../cache/UserKeyCacheObjectImpl.java | 9 +- .../distributed/dht/GridDhtLocalPartition.java | 4 +- .../GridDhtPartitionSupplyMessage.java | 16 +-- .../preloader/GridDhtPartitionSupplyPool.java | 10 +- .../cache/query/GridCacheQueryManager.java | 19 +--- .../transactions/IgniteTxLocalAdapter.java | 22 ++-- .../dataload/IgniteDataLoaderImpl.java | 8 +- .../datastructures/DataStructuresProcessor.java | 17 +-- .../portable/GridPortableProcessor.java | 11 +- .../IgniteCacheObjectProcessorAdapter.java | 37 +++--- .../util/offheap/unsafe/GridUnsafeMemory.java | 16 ++- .../processors/cache/GridCacheTestEntryEx.java | 10 +- .../query/GridCacheSwapScanQuerySelfTest.java | 1 + .../cache/GridCacheOffHeapAndSwapSelfTest.java | 2 +- .../cache/GridCacheQueryLoadSelfTest.java | 3 +- 35 files changed, 340 insertions(+), 495 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ff54f23/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateAdapter.java index 3214e10..ef4cd3b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateAdapter.java @@ -18,16 +18,14 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; -import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.plugin.extensions.communication.*; -import java.io.*; import java.nio.*; /** * */ -public abstract class CacheEntryPredicateAdapter implements CacheEntryPredicate, Serializable { +public abstract class CacheEntryPredicateAdapter implements CacheEntryPredicate { /** {@inheritDoc} */ @Override public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException { // No-op. @@ -65,33 +63,4 @@ public abstract class CacheEntryPredicateAdapter implements CacheEntryPredicate, return false; } - - /** - * @param e Entry. - * @return {@code True} if given entry has value. - */ - protected boolean hasValue(GridCacheEntryEx e) { - try { - if (e.hasValue()) - return true; - - GridCacheContext cctx = e.context(); - - if (cctx.transactional()) { - IgniteInternalTx tx = cctx.tm().userTx(); - - if (tx != null) - return tx.peek(cctx, false, e.key(), null) != null; - } - - return false; - } - catch (GridCacheFilterFailedException err) { - assert false; - - err.printStackTrace(); - - return false; - } - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ff54f23/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java index 0765930..ec23735 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java @@ -51,16 +51,11 @@ public class CacheEntryPredicateContainsValue implements CacheEntryPredicate { } /** {@inheritDoc} */ - @Override public boolean apply(GridCacheEntryEx entry) { - try { - CacheObject val = entry.rawGetOrUnmarshal(true); + @Override public boolean apply(GridCacheEntryEx e) { + CacheObject val = e.peekVisibleValue(); - return F.eq(this.val.value(entry.context().cacheObjectContext(), false), - CU.value(val, entry.context(), false)); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } + return F.eq(this.val.value(e.context().cacheObjectContext(), false), + CU.value(val, e.context(), false)); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ff54f23/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateHasValue.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateHasValue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateHasValue.java index 3b921ca..fe28f92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateHasValue.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateHasValue.java @@ -23,6 +23,6 @@ package org.apache.ignite.internal.processors.cache; public class CacheEntryPredicateHasValue extends CacheEntryPredicateAdapter { /** {@inheritDoc} */ @Override public boolean apply(GridCacheEntryEx e) { - return hasValue(e); + return e.peekVisibleValue() != null; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ff54f23/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateNoValue.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateNoValue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateNoValue.java index 13f022d..f32f345 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateNoValue.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateNoValue.java @@ -23,6 +23,6 @@ package org.apache.ignite.internal.processors.cache; public class CacheEntryPredicateNoValue extends CacheEntryPredicateAdapter { /** {@inheritDoc} */ @Override public boolean apply(GridCacheEntryEx e) { - return !hasValue(e); + return e.peekVisibleValue() == null; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ff54f23/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java index a0297a9..2817748 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; import javax.cache.processor.*; @@ -39,7 +40,9 @@ public class CacheInvokeEntry<K, V> extends CacheLazyEntry<K, V> implements Muta * @param keyObj Key cache object. * @param valObj Cache object value. */ - public CacheInvokeEntry(GridCacheContext cctx, KeyCacheObject keyObj, CacheObject valObj) { + public CacheInvokeEntry(GridCacheContext cctx, + KeyCacheObject keyObj, + @Nullable CacheObject valObj) { super(cctx, keyObj, valObj); this.hadVal = valObj != null; @@ -47,14 +50,17 @@ public class CacheInvokeEntry<K, V> extends CacheLazyEntry<K, V> implements Muta /** * @param ctx Cache context. - * @param keyObject Key cache object. + * @param keyObj Key cache object. * @param key Key value. * @param valObj Value cache object. * @param val Value. */ - public CacheInvokeEntry(GridCacheContext<K, V> ctx, KeyCacheObject keyObject, K key, CacheObject valObj, - V val) { - super(ctx, keyObject, key, valObj, val); + public CacheInvokeEntry(GridCacheContext<K, V> ctx, + KeyCacheObject keyObj, + @Nullable K key, + @Nullable CacheObject valObj, + @Nullable V val) { + super(ctx, keyObj, key, valObj, val); this.hadVal = valObj != null || val != null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ff54f23/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java index 686f7b0..6db4c84 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java @@ -33,11 +33,6 @@ public interface CacheObject extends Message { @Nullable public <T> T value(CacheObjectContext ctx, boolean cpy); /** - * @return {@code True} if value is byte array. - */ - public boolean byteArray(); - - /** * @param ctx Context. * @return Value bytes. * @throws IgniteCheckedException If failed. @@ -45,6 +40,11 @@ public interface CacheObject extends Message { public byte[] valueBytes(CacheObjectContext ctx) throws IgniteCheckedException; /** + * @return Object type. + */ + public byte type(); + + /** * Prepares cache object for cache (e.g. copies user-provided object if needed). * * @param ctx Cache context. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ff54f23/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java index 0e63506..4094489 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; import java.io.*; @@ -29,6 +28,12 @@ import java.io.*; */ public abstract class CacheObjectAdapter implements CacheObject, Externalizable { /** */ + public static final byte TYPE_REGULAR = 1; + + /** */ + public static final byte TYPE_BYTE_ARR = 2; + + /** */ @GridToStringInclude @GridDirectTransient protected Object val; @@ -44,6 +49,16 @@ public abstract class CacheObjectAdapter implements CacheObject, Externalizable return ctx.copyOnGet() && val != null && !ctx.processor().immutable(val); } + /** + * @return {@code True} if value is byte array. + */ + protected abstract boolean byteArray(); + + /** {@inheritDoc} */ + @Override public byte type() { + return byteArray() ? TYPE_BYTE_ARR : TYPE_REGULAR; + } + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { byte[] valBytes = byteArray() ? (byte[])val : this.valBytes; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ff54f23/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheBatchSwapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheBatchSwapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheBatchSwapEntry.java index 0d54465..90d19ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheBatchSwapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheBatchSwapEntry.java @@ -39,7 +39,7 @@ public class GridCacheBatchSwapEntry extends GridCacheSwapEntryImpl { * @param key Key. * @param part Partition id. * @param valBytes Value bytes. - * @param valIsByteArr Whether value is byte array. + * @param type Value type. * @param ver Version. * @param ttl Time to live. * @param expireTime Expire time. @@ -49,13 +49,13 @@ public class GridCacheBatchSwapEntry extends GridCacheSwapEntryImpl { public GridCacheBatchSwapEntry(KeyCacheObject key, int part, ByteBuffer valBytes, - boolean valIsByteArr, + byte type, GridCacheVersion ver, long ttl, long expireTime, IgniteUuid keyClsLdrId, @Nullable IgniteUuid valClsLdrId) { - super(valBytes, valIsByteArr, ver, ttl, expireTime, keyClsLdrId, valClsLdrId); + super(valBytes, type, ver, ttl, expireTime, keyClsLdrId, valClsLdrId); this.key = key; this.part = part; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ff54f23/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 78c0800..05ae513 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -1753,66 +1753,47 @@ public class GridCacheContext<K, V> implements Externalizable { * @return Cache object. */ @Nullable public CacheObject toCacheObject(@Nullable Object obj) { - return portable().toCacheObject(cacheObjCtx, obj, null); + return portable().toCacheObject(cacheObjCtx, obj); } /** * @param obj Object. - * @param bytes Optional value bytes. - * @return Cache object. - */ - @Nullable public CacheObject toCacheObject(@Nullable Object obj, byte[] bytes) { - return portable().toCacheObject(cacheObjCtx, obj, bytes); - } - - /** - * @param obj Object. - * @return Cache object. + * @return Cache key object. */ public KeyCacheObject toCacheKeyObject(Object obj) { - return portable().toCacheKeyObject(cacheObjCtx, obj, null); + return portable().toCacheKeyObject(cacheObjCtx, obj); } /** - * @param obj Object. - * @param bytes Key bytes. - * @param transferOnly If {@code true} creates temporary object which is valid only for marshalling. - * @return Cache object. - * @throws IgniteCheckedException If failed. + * @param bytes Bytes. + * @return Cache key object. */ - public KeyCacheObject toCacheKeyObject(Object obj, byte[] bytes, boolean transferOnly) - throws IgniteCheckedException { - assert obj != null || bytes != null; - - if (obj == null) { - if (transferOnly) - return new KeyCacheObjectTransferImpl(bytes); + public KeyCacheObject toCacheKeyObject(byte[] bytes) throws IgniteCheckedException { + Object obj = ctx.portable().unmarshal(cacheObjCtx, bytes, deploy().localLoader()); - obj = ctx.portable().unmarshal(cacheObjCtx, bytes, deploy().globalLoader()); - } - - return ctx.portable().toCacheKeyObject(cacheObjCtx, obj, bytes); + return portable().toCacheKeyObject(cacheObjCtx, obj); } /** + * @param type Type. * @param bytes Bytes. - * @param valIsByteArr {@code True} if value is byte array. * @param clsLdrId Class loader ID. * @return Cache object. * @throws IgniteCheckedException If failed. */ - @Nullable public CacheObject unswapCacheObject(byte[] bytes, boolean valIsByteArr, @Nullable IgniteUuid clsLdrId) + @Nullable public CacheObject unswapCacheObject(byte type, byte[] bytes, @Nullable IgniteUuid clsLdrId) throws IgniteCheckedException { - if (valIsByteArr) - return new CacheObjectImpl(bytes, null); + if (ctx.config().isPeerClassLoadingEnabled() && type != CacheObjectAdapter.TYPE_BYTE_ARR) { + ClassLoader ldr = clsLdrId != null ? deploy().getClassLoader(clsLdrId) : deploy().localLoader(); - ClassLoader ldr = clsLdrId != null ? deploy().getClassLoader(clsLdrId) : deploy().localLoader(); + if (ldr == null) + return null; - if (ldr == null) - return null; + return ctx.portable().toCacheObject(cacheObjCtx, ctx.portable().unmarshal(cacheObjCtx, bytes, ldr)); + } - return new CacheObjectImpl(portable().unmarshal(cacheObjCtx, bytes, ldr), bytes); + return ctx.portable().toCacheObject(cacheObjCtx, type, bytes); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ff54f23/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 5fb1346..75541eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -148,12 +148,11 @@ public interface GridCacheEntryEx { public <K, V> Cache.Entry<K, V> wrapLazyValue(); /** - * Wraps this map entry into cache entry for filter evaluation inside entry lock. + * Peeks value provided to public API entries and to entry filters. * - * @return Wrapped entry. - * @throws IgniteCheckedException If failed. + * @return Value. */ - public <K, V> Cache.Entry<K, V> wrapFilterLocked() throws IgniteCheckedException; + @Nullable public CacheObject peekVisibleValue(); /** * @return Entry which is safe to pass into eviction policy. @@ -622,7 +621,7 @@ public interface GridCacheEntryEx { * @throws GridCacheFilterFailedException If filter failed. */ @SuppressWarnings({"RedundantTypeArguments"}) - @Nullable public <K, V> GridTuple<CacheObject> peek0(boolean failFast, + @Nullable public GridTuple<CacheObject> peek0(boolean failFast, GridCachePeekMode mode, @Nullable CacheEntryPredicate[] filter, @Nullable IgniteInternalTx tx) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ff54f23/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java index 9b08714..00bc332 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java @@ -38,6 +38,9 @@ public class GridCacheEntryInfo implements Externalizable, Message { @GridToStringInclude private KeyCacheObject key; + /** */ + private byte[] keyBytes; + /** Cache ID. */ private int cacheId; @@ -83,6 +86,13 @@ public class GridCacheEntryInfo implements Externalizable, Message { } /** + * @param bytes Key bytes. + */ + public void keyBytes(byte[] bytes) { + this.keyBytes = bytes; + } + + /** * @return Entry key. */ public KeyCacheObject key() { @@ -175,7 +185,6 @@ public class GridCacheEntryInfo implements Externalizable, Message { /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - // TODO IGNITE-51: field 'remaining'. writer.setBuffer(buf); if (!writer.isHeaderWritten()) { @@ -205,18 +214,24 @@ public class GridCacheEntryInfo implements Externalizable, Message { writer.incrementState(); case 3: - if (!writer.writeLong("ttl", ttl)) + if (!writer.writeByteArray("keyBytes", keyBytes)) return false; writer.incrementState(); case 4: - if (!writer.writeMessage("val", val)) + if (!writer.writeLong("ttl", ttl)) return false; writer.incrementState(); case 5: + if (!writer.writeMessage("val", val)) + return false; + + writer.incrementState(); + + case 6: if (!writer.writeMessage("ver", ver)) return false; @@ -229,7 +244,6 @@ public class GridCacheEntryInfo implements Externalizable, Message { /** {@inheritDoc} */ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - // TODO IGNITE-51: field 'remaining'. reader.setBuffer(buf); if (!reader.beforeMessageRead()) @@ -261,7 +275,7 @@ public class GridCacheEntryInfo implements Externalizable, Message { reader.incrementState(); case 3: - ttl = reader.readLong("ttl"); + keyBytes = reader.readByteArray("keyBytes"); if (!reader.isLastRead()) return false; @@ -269,7 +283,7 @@ public class GridCacheEntryInfo implements Externalizable, Message { reader.incrementState(); case 4: - val = reader.readMessage("val"); + ttl = reader.readLong("ttl"); if (!reader.isLastRead()) return false; @@ -277,6 +291,14 @@ public class GridCacheEntryInfo implements Externalizable, Message { reader.incrementState(); case 5: + val = reader.readMessage("val"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: ver = reader.readMessage("ver"); if (!reader.isLastRead()) @@ -296,7 +318,7 @@ public class GridCacheEntryInfo implements Externalizable, Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 6; + return 7; } /** @@ -313,9 +335,24 @@ public class GridCacheEntryInfo implements Externalizable, Message { /** * @return Marshalled size. */ - public int marshalledSize() { + public int marshalledSize(GridCacheContext ctx) throws IgniteCheckedException { // TODO IGNITE-51. - return 0; + int size = 0; + + CacheObjectContext cacheObjCtx = ctx.cacheObjectContext(); + + if (val != null) + size += val.valueBytes(cacheObjCtx).length; + + if (key == null) { + assert keyBytes != null; + + size += keyBytes.length; + } + else + size += key.valueBytes(cacheObjCtx).length; + + return size; } /** @@ -323,24 +360,14 @@ public class GridCacheEntryInfo implements Externalizable, Message { * @throws IgniteCheckedException In case of error. */ public void marshal(GridCacheContext ctx) throws IgniteCheckedException { - key.prepareMarshal(ctx.cacheObjectContext()); + // TODO IGNITE-51: field 'remaining'. + assert key != null ^ keyBytes != null; + + if (key != null) + key.prepareMarshal(ctx.cacheObjectContext()); if (val != null) val.prepareMarshal(ctx.cacheObjectContext()); -// TODO IGNITE-51 -// boolean depEnabled = ctx.gridDeploy().enabled(); -// -// boolean valIsByteArr = val != null && val instanceof byte[]; -// -// if (keyBytes == null && depEnabled) -// keyBytes = CU.marshal(ctx, key); -// -// keyBytesSent = depEnabled || key == null; -// -// if (valBytes == null && val != null && !valIsByteArr) -// valBytes = CU.marshal(ctx, val); -// -// valBytesSent = (valBytes != null && !valIsByteArr) || val == null; } /** @@ -351,18 +378,20 @@ public class GridCacheEntryInfo implements Externalizable, Message { * @throws IgniteCheckedException If unmarshalling failed. */ public void unmarshal(GridCacheContext ctx, ClassLoader clsLdr) throws IgniteCheckedException { - key.finishUnmarshal(ctx.cacheObjectContext(), clsLdr); + if (key == null) { + assert keyBytes != null; + + CacheObjectContext cacheObjCtx = ctx.cacheObjectContext(); + + Object key0 = ctx.portable().unmarshal(cacheObjCtx, keyBytes, clsLdr); + + key = ctx.portable().toCacheKeyObject(cacheObjCtx, key0); + } + else + key.finishUnmarshal(ctx.cacheObjectContext(), clsLdr); if (val != null) val.finishUnmarshal(ctx.cacheObjectContext(), clsLdr); -// TODO IGNITE-51 -// Marshaller mrsh = ctx.marshaller(); -// -// if (key == null) -// key = mrsh.unmarshal(keyBytes, clsLdr); -// -// if (ctx.isUnmarshalValues() && val == null && valBytes != null) -// val = mrsh.unmarshal(valBytes, clsLdr); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ff54f23/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 227ccf7..2cd0645 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -229,9 +229,9 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { assert mem != null; if (val != null) { - boolean valIsByteArr = val.byteArray(); + byte type = val.type(); - valPtr = mem.putOffHeap(valPtr, val.valueBytes(cctx.cacheObjectContext()), valIsByteArr); + valPtr = mem.putOffHeap(valPtr, val.valueBytes(cctx.cacheObjectContext()), type); } else { mem.removeOffHeap(valPtr); @@ -254,11 +254,16 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { * @param valBytes Value bytes. * @return Length of value. */ - private int valueLength0(@Nullable CacheObject val, @Nullable IgniteBiTuple<byte[], Boolean> valBytes) { + private int valueLength0(@Nullable CacheObject val, @Nullable IgniteBiTuple<byte[], Byte> valBytes) { byte[] bytes = val != null ? (byte[])val.value(cctx.cacheObjectContext(), false) : null; - return bytes != null ? bytes.length : - (valBytes == null) ? 0 : valBytes.get1().length - (valBytes.get2() ? 0 : 6); + if (bytes != null) + return bytes.length; + + if (valBytes == null) + return 0; + + return valBytes.get1().length - (((valBytes.get2() == CacheObjectAdapter.TYPE_BYTE_ARR) ? 0 : 6)); } /** @@ -270,12 +275,9 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { CacheObject val0 = val; if (val0 == null && valPtr != 0) { - IgniteBiTuple<byte[], Boolean> t = valueBytes0(); + IgniteBiTuple<byte[], Byte> t = valueBytes0(); - if (t.get2()) - val0 = cctx.toCacheObject(t.get1(), null); - else - val0 = cctx.toCacheObject(null, t.get1()); + return cctx.portable().toCacheObject(cctx.cacheObjectContext(), t.get2(), t.get1()); } return val0; @@ -526,7 +528,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { val.value(cctx.cacheObjectContext(), false).getClass().getClassLoader()); } - IgniteBiTuple<byte[], Boolean> valBytes = valueBytes0(); + IgniteBiTuple<byte[], Byte> valBytes = valueBytes0(); cctx.swap().write(key(), ByteBuffer.wrap(valBytes.get1()), @@ -545,7 +547,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { /** * @return Value bytes and flag indicating whether value is byte array. */ - protected IgniteBiTuple<byte[], Boolean> valueBytes0() { + protected IgniteBiTuple<byte[], Byte> valueBytes0() { assert Thread.holdsLock(this); if (valPtr != 0) { @@ -559,9 +561,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { try { byte[] bytes = val.valueBytes(cctx.cacheObjectContext()); - boolean plain = val.byteArray(); - - return new IgniteBiTuple<>(bytes, plain); + return new IgniteBiTuple<>(bytes, val.type()); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -2855,7 +2855,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { * @throws GridCacheFilterFailedException If filter failed. */ @SuppressWarnings({"RedundantTypeArguments"}) - @Nullable @Override public <K, V> GridTuple<CacheObject> peek0(boolean failFast, GridCachePeekMode mode, + @Nullable @Override public GridTuple<CacheObject> peek0(boolean failFast, GridCachePeekMode mode, CacheEntryPredicate[] filter, @Nullable IgniteInternalTx tx) throws GridCacheEntryRemovedException, GridCacheFilterFailedException, IgniteCheckedException { assert tx == null || tx.local(); @@ -3763,12 +3763,38 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { return new LazyValueEntry(key); } - /** {@inheritDoc} */ - @Override public <K, V> Cache.Entry<K, V> wrapFilterLocked() throws IgniteCheckedException { - CacheObject val = rawGetOrUnmarshal(true); + /** {@inheritDoc} */ + @Nullable public CacheObject peekVisibleValue() { + try { + IgniteInternalTx tx = cctx.tm().userTx(); + + if (tx != null) { + GridTuple<CacheObject> peek = tx.peek(cctx, false, key, null); + + if (peek != null) + return peek.get(); + } + + if (detached()) + return rawGet(); + + for (;;) { + GridCacheEntryEx e = cctx.cache().peekEx(key); - return new CacheEntryImpl<>(key.<K>value(cctx.cacheObjectContext(), false), - CU.<V>value(val, cctx, false)); + if (e == null) + return null; + + try { + return e.peek(GridCachePeekMode.GLOBAL, CU.empty0()); + } + catch (GridCacheEntryRemovedException ignored) { + // No-op. + } + } + } + catch (GridCacheFilterFailedException ignored) { + throw new IgniteException("Should never happen."); + } } /** {@inheritDoc} */ @@ -3893,7 +3919,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { valClsLdrId = cctx.deploy().getClassLoaderId( U.detectObjectClassLoader(val.value(cctx.cacheObjectContext(), false))); - IgniteBiTuple<byte[], Boolean> valBytes = valueBytes0(); + IgniteBiTuple<byte[], Byte> valBytes = valueBytes0(); ret = new GridCacheBatchSwapEntry(key(), partition(), @@ -4313,36 +4339,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public V getValue() { - try { - IgniteInternalTx tx = cctx.tm().userTx(); - - if (tx != null) { - GridTuple<CacheObject> peek = tx.peek(cctx, false, key, null); - - if (peek != null) - return CU.value(peek.get(), cctx, false); - } - - if (detached()) - return CU.value(rawGet(), cctx, false); - - for (;;) { - GridCacheEntryEx e = cctx.cache().peekEx(key); - - if (e == null) - return null; - - try { - return CU.value(e.peek(GridCachePeekMode.GLOBAL, CU.empty0()), cctx, false); - } - catch (GridCacheEntryRemovedException ignored) { - // No-op. - } - } - } - catch (GridCacheFilterFailedException ignored) { - throw new IgniteException("Should never happen."); - } + return CU.value(peekVisibleValue(), cctx, true); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ff54f23/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapSwapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapSwapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapSwapEntry.java index a260fca..d9ee44c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapSwapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapSwapEntry.java @@ -55,7 +55,7 @@ public class GridCacheOffheapSwapEntry implements GridCacheSwapEntry { private CacheObject val; /** */ - private final boolean valIsByteArr; + private final byte type; /** * @param ptr Value pointer. @@ -75,7 +75,7 @@ public class GridCacheOffheapSwapEntry implements GridCacheSwapEntry { readPtr += verEx ? GridCacheSwapEntryImpl.VERSION_EX_SIZE : GridCacheSwapEntryImpl.VERSION_SIZE; - valIsByteArr = UNSAFE.getByte(readPtr + 4) == 1; + type = UNSAFE.getByte(readPtr + 4); valPtr = readPtr; @@ -153,8 +153,8 @@ public class GridCacheOffheapSwapEntry implements GridCacheSwapEntry { } /** {@inheritDoc} */ - @Override public boolean valueIsByteArray() { - return valIsByteArr; + @Override public byte type() { + return type; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ff54f23/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 0d148a6..a052c9d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1482,7 +1482,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (qryMgr != null) { try { - KeyCacheObject key = cctx.toCacheKeyObject(null, keyBytes, false); + KeyCacheObject key = cctx.toCacheKeyObject(keyBytes); qryMgr.remove(key.value(cctx.cacheObjectContext(), false)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ff54f23/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntry.java index 72524dd..4391918 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntry.java @@ -31,6 +31,11 @@ public interface GridCacheSwapEntry { public byte[] valueBytes(); /** + * @return Object type. + */ + public byte type(); + + /** * @param valBytes Value bytes. */ public void valueBytes(@Nullable byte[] valBytes); @@ -43,12 +48,7 @@ public interface GridCacheSwapEntry { /** * @param val Value. */ - void value(CacheObject val); - - /** - * @return Whether value is byte array. - */ - public boolean valueIsByteArray(); + public void value(CacheObject val); /** * @return Version. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ff54f23/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java index 590d15f..acf1961 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java @@ -57,8 +57,8 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry { /** Value. */ private CacheObject val; - /** Flag indicating that value is byte array, so valBytes should not be unmarshalled. */ - private boolean valIsByteArr; + /** Type. */ + private byte type; /** Class loader ID. */ private IgniteUuid keyClsLdrId; @@ -77,7 +77,7 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry { /** * @param valBytes Value. - * @param valIsByteArr Whether value of this entry is byte array. + * @param type Type. * @param ver Version. * @param ttl Entry time to live. * @param expireTime Expire time. @@ -86,7 +86,7 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry { */ public GridCacheSwapEntryImpl( ByteBuffer valBytes, - boolean valIsByteArr, + byte type, GridCacheVersion ver, long ttl, long expireTime, @@ -95,7 +95,7 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry { assert ver != null; this.valBytes = valBytes; - this.valIsByteArr = valIsByteArr; + this.type = type; this.ver = ver; this.ttl = ttl; this.expireTime = expireTime; @@ -135,27 +135,24 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry { * @param bytes Entry bytes. * @return Value if value is byte array, otherwise {@code null}. */ - @Nullable public static byte[] getValueIfByteArray(byte[] bytes) { - int off = VERSION_OFFSET; // Skip ttl, expire time. + @Nullable public static IgniteBiTuple<byte[], Byte> getValue(byte[] bytes) { + long off = BYTE_ARR_OFF + VERSION_OFFSET; // Skip ttl, expire time. - boolean verEx = bytes[off++] != 0; + boolean verEx = UNSAFE.getByte(bytes, off++) != 0; off += verEx ? VERSION_EX_SIZE : VERSION_SIZE; - if (bytes[off++] > 0) { - int size = UNSAFE.getInt(bytes, BYTE_ARR_OFF + off); + int arrLen = UNSAFE.getInt(bytes, off); - assert size >= 0; - assert bytes.length > size + off + 4; + off += 4; - byte[] res = new byte[size]; + byte type = UNSAFE.getByte(bytes, off++); - UNSAFE.copyMemory(bytes, BYTE_ARR_OFF + off + 4, res, BYTE_ARR_OFF, size); + byte[] valBytes = new byte[arrLen]; - return res; - } + UNSAFE.copyMemory(bytes, off, valBytes, BYTE_ARR_OFF, arrLen); - return null; + return new IgniteBiTuple<>(valBytes, type); } /** @@ -202,15 +199,11 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry { /** {@inheritDoc} */ @Override public void value(CacheObject val) { this.val = val; - -// TODO IGNITE-51. -// if (val instanceof byte[]) -// valBytes = null; } /** {@inheritDoc} */ - @Override public boolean valueIsByteArray() { - return valIsByteArr; + @Override public byte type() { + return type; } /** {@inheritDoc} */ @@ -278,7 +271,7 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry { off += 4; - UNSAFE.putBoolean(arr, off++, valIsByteArr); + UNSAFE.putByte(arr, off++, type); UNSAFE.copyMemory(valBytes.array(), BYTE_ARR_OFF, arr, off, len); @@ -316,7 +309,7 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry { off += 4; - boolean valIsByteArr = UNSAFE.getBoolean(arr, off++); + byte type = UNSAFE.getByte(arr, off++); byte[] valBytes = new byte[arrLen]; @@ -331,7 +324,7 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry { IgniteUuid keyClsLdrId = U.readGridUuid(arr, off); return new GridCacheSwapEntryImpl(ByteBuffer.wrap(valBytes), - valIsByteArr, + type, ver, ttl, expireTime, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ff54f23/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index e75ccd6..1e8e663 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -119,7 +119,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { if (!firstEvictWarn) warnFirstEvict(); - writeToSwap(part, cctx.toCacheKeyObject(null, kb, false), vb); + writeToSwap(part, cctx.toCacheKeyObject(kb), vb); } catch (IgniteCheckedException e) { log.error("Failed to unmarshal off-heap entry [part=" + part + ", hash=" + hash + ']', e); @@ -385,39 +385,24 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { } /** - * @param e Swap entry to reconstitute. - * @return Reconstituted swap entry or {@code null} if entry is obsolete. - * @throws IgniteCheckedException If failed. - */ - @Nullable private GridCacheSwapEntry swapEntry(GridCacheSwapEntry e) throws IgniteCheckedException { - return swapEntry(e, true); - } - - /** * Recreates raw swap entry (that just has been received from swap storage). * * @param e Swap entry to reconstitute. - * @param unmarshal If {@code true} then value is unmarshalled. * @return Reconstituted swap entry or {@code null} if entry is obsolete. * @throws IgniteCheckedException If failed. */ - @Nullable private GridCacheSwapEntry swapEntry(GridCacheSwapEntry e, boolean unmarshal) - throws IgniteCheckedException + @Nullable private GridCacheSwapEntry swapEntry(GridCacheSwapEntry e) throws IgniteCheckedException { assert e != null; checkIteratorQueue(); - if (e.valueIsByteArray()) - e.value(cctx.unswapCacheObject(e.valueBytes(), e.valueIsByteArray(), null)); - else if (unmarshal) { - CacheObject val = cctx.unswapCacheObject(e.valueBytes(), e.valueIsByteArray(), e.valueClassLoaderId()); + CacheObject val = cctx.unswapCacheObject(e.type(), e.valueBytes(), e.valueClassLoaderId()); - if (val == null) - return null; + if (val == null) + return null; - e.value(val); - } + e.value(val); return e; } @@ -767,8 +752,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { GridCacheBatchSwapEntry unswapped = new GridCacheBatchSwapEntry(key, part, - entry.valueIsByteArray() ? null : ByteBuffer.wrap(entry.valueBytes()), - entry.valueIsByteArray(), + ByteBuffer.wrap(entry.valueBytes()), + entry.type(), entry.version(), entry.ttl(), entry.expireTime(), entry.keyClassLoaderId(), @@ -826,12 +811,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { if (entry == null) return; - KeyCacheObject key = cctx.toCacheKeyObject(swapKey.key(), swapKey.keyBytes(), false); + KeyCacheObject key = cctx.toCacheKeyObject(swapKey.keyBytes()); GridCacheBatchSwapEntry unswapped = new GridCacheBatchSwapEntry(key, swapKey.partition(), - entry.valueIsByteArray() ? null : ByteBuffer.wrap(entry.valueBytes()), - entry.valueIsByteArray(), + ByteBuffer.wrap(entry.valueBytes()), + entry.type(), entry.version(), entry.ttl(), entry.expireTime(), @@ -983,7 +968,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { * * @param key Key. * @param val Value. - * @param valIsByteArr Whether value is byte array. + * @param type Value type. * @param ver Version. * @param ttl Entry time to live. * @param expireTime Swap entry expiration time. @@ -993,7 +978,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { */ void write(KeyCacheObject key, ByteBuffer val, - boolean valIsByteArr, + byte type, GridCacheVersion ver, long ttl, long expireTime, @@ -1008,7 +993,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { int part = cctx.affinity().partition(key); GridCacheSwapEntryImpl entry = new GridCacheSwapEntryImpl(val, - valIsByteArr, + type, ver, ttl, expireTime, @@ -1132,13 +1117,11 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { * Gets offheap and swap iterator over partition. * * @param part Partition to iterate over. - * @param unmarshal Unmarshal value flag. * @return Iterator over partition. * @throws IgniteCheckedException If failed. */ @Nullable public GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iterator( - final int part, - final boolean unmarshal) + final int part) throws IgniteCheckedException { if (!swapEnabled() && !offHeapEnabled()) return null; @@ -1146,10 +1129,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { checkIteratorQueue(); if (offHeapEnabled() && !swapEnabled()) - return offHeapIterator(part, unmarshal); + return offHeapIterator(part); if (swapEnabled() && !offHeapEnabled()) - return swapIterator(part, unmarshal); + return swapIterator(part); // Both, swap and off-heap are enabled. return new GridCloseableIteratorAdapter<Map.Entry<byte[], GridCacheSwapEntry>>() { @@ -1160,7 +1143,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { private boolean done; { - it = offHeapIterator(part, unmarshal); + it = offHeapIterator(part); advance(); } @@ -1174,7 +1157,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { if (offheap) { offheap = false; - it = swapIterator(part, unmarshal); + it = swapIterator(part); assert it != null; @@ -1283,7 +1266,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { @Override protected void onRemove() throws IgniteCheckedException { if (offheapFlag) { - KeyCacheObject key = cctx.toCacheKeyObject(null, cur.getKey(), false); + KeyCacheObject key = cctx.toCacheKeyObject(cur.getKey()); int part = cctx.affinity().partition(key); @@ -1411,7 +1394,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { cur = new Map.Entry<K, V>() { @Override public K getKey() { try { - KeyCacheObject key = cctx.toCacheKeyObject(null, cur0.getKey(), false); + KeyCacheObject key = cctx.toCacheKeyObject(cur0.getKey()); return key.value(cctx.cacheObjectContext(), false); } @@ -1505,7 +1488,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { @Override protected KeyCacheObject onNext() { try { - cur = cctx.toCacheKeyObject(null, it.next().getKey(), false); + cur = cctx.toCacheKeyObject(it.next().getKey()); return cur; } @@ -1551,20 +1534,18 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { * Gets offheap iterator over partition. * * @param part Partition to iterate over. - * @param unmarshal Unmarshal value flag. * @return Iterator over partition. * @throws IgniteCheckedException If failed. */ @Nullable public GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> offHeapIterator( - int part, - boolean unmarshal) + int part) throws IgniteCheckedException { if (!offheapEnabled) return null; checkIteratorQueue(); - return new IteratorWrapper(offheap.iterator(spaceName, part), unmarshal); + return new IteratorWrapper(offheap.iterator(spaceName, part)); } /** @@ -1603,7 +1584,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { } @Override protected void onRemove() throws IgniteCheckedException { - KeyCacheObject key = cctx.toCacheKeyObject(cur.getKey(), cur.getKey(), false); + KeyCacheObject key = cctx.toCacheKeyObject(cur.getKey()); int part = cctx.affinity().partition(key); @@ -1620,20 +1601,18 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { * Gets swap space iterator over partition. * * @param part Partition to iterate over. - * @param unmarshal Unmarshal value flag. * @return Iterator over partition. * @throws IgniteCheckedException If failed. */ @Nullable public GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> swapIterator( - int part, - boolean unmarshal) + int part) throws IgniteCheckedException { if (!swapEnabled) return null; checkIteratorQueue(); - return new IteratorWrapper(swapMgr.rawIterator(spaceName, part), unmarshal); + return new IteratorWrapper(swapMgr.rawIterator(spaceName, part)); } /** @@ -1740,9 +1719,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { undeployCnt++; } else { - if (valLdrId == null && swapEntry.value() == null && !swapEntry.valueIsByteArray()) { + if (valLdrId == null && + swapEntry.value() == null && + swapEntry.type() != CacheObjectAdapter.TYPE_BYTE_ARR) { // We need value here only for classloading purposes. - Object val = cctx.marshaller().unmarshal(swapEntry.valueBytes(), + Object val = cctx.portable().unmarshal(cctx.cacheObjectContext(), + swapEntry.valueBytes(), cctx.deploy().globalLoader()); if (val != null) @@ -1791,16 +1773,6 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { } /** - * @param bytes Bytes to unmarshal. - * @param ldr Class loader. - * @return Unmarshalled value. - * @throws IgniteCheckedException If unmarshal failed. - */ - private <T> T unmarshalKey(byte[] bytes, ClassLoader ldr) throws IgniteCheckedException { - return (T)cctx.marshaller().unmarshal(bytes, ldr); - } - - /** * @return Size of internal weak iterator set. */ int iteratorSetSize() { @@ -1830,18 +1802,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { /** */ private final GridCloseableIterator<? extends Map.Entry<byte[], byte[]>> iter; - /** */ - private final boolean unmarshal; - /** * @param iter Iterator. - * @param unmarshal Unmarshal value flag. */ - private IteratorWrapper(GridCloseableIterator<? extends Map.Entry<byte[], byte[]>> iter, boolean unmarshal) { + private IteratorWrapper(GridCloseableIterator<? extends Map.Entry<byte[], byte[]>> iter) { assert iter != null; this.iter = iter; - this.unmarshal = unmarshal; } /** {@inheritDoc} */ @@ -1850,7 +1817,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { GridCacheSwapEntry unmarshalled = unmarshalSwapEntry(e.getValue()); - return F.t(e.getKey(), swapEntry(unmarshalled, unmarshal)); + return F.t(e.getKey(), swapEntry(unmarshalled)); } /** {@inheritDoc} */ @@ -1893,7 +1860,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { { if (this.key.equals(key)) { entry = new GridCacheSwapEntryImpl(ByteBuffer.wrap(e.valueBytes()), - e.valueIsByteArray(), + e.type(), e.version(), e.ttl(), e.expireTime(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ff54f23/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 7700fa4..16bc56f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -904,7 +904,7 @@ public class GridCacheUtils { try { Object val = CU.value(e.rawGetOrUnmarshal(true), e.context(), false); - return val != null && + return val == null || valType.isAssignableFrom(val.getClass()) && keyType.isAssignableFrom(e.key().value(e.context().cacheObjectContext(), false).getClass()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ff54f23/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectTransferImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectTransferImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectTransferImpl.java deleted file mode 100644 index 7122f66..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectTransferImpl.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.plugin.extensions.communication.*; -import org.jetbrains.annotations.*; - -import java.nio.*; - -/** - * - */ -public class KeyCacheObjectTransferImpl implements KeyCacheObject { - /** */ - private final byte[] valBytes; - - /** - * @param valBytes Value bytes. - */ - public KeyCacheObjectTransferImpl(byte[] valBytes) { - assert valBytes != null; - - this.valBytes = valBytes; - } - - /** {@inheritDoc} */ - @Override public boolean internal() { - throw new IllegalStateException(); - } - - /** {@inheritDoc} */ - @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) { - throw new IllegalStateException(); - } - - /** {@inheritDoc} */ - @Override public void prepareMarshal(CacheObjectContext ctx) throws IgniteCheckedException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean byteArray() { - throw new IllegalStateException(); - } - - /** {@inheritDoc} */ - @Override public byte[] valueBytes(CacheObjectContext ctx) throws IgniteCheckedException { - throw new IllegalStateException(); - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(CacheObjectContext ctx, ClassLoader ldr) throws IgniteCheckedException { - throw new IllegalStateException(); - } - - /** {@inheritDoc} */ - @Override public CacheObject prepareForCache(CacheObjectContext ctx) { - throw new IllegalStateException(); - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeByteArray("valBytes", valBytes)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - throw new IllegalStateException(); - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return KeyCacheObjectImpl.DIRECT_TYPE; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 1; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ff54f23/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserCacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserCacheObjectImpl.java index 8026995..b7e90bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserCacheObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserCacheObjectImpl.java @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; import java.util.*; @@ -41,6 +41,11 @@ public class UserCacheObjectImpl extends CacheObjectImpl { } /** {@inheritDoc} */ + @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) { + return super.value(ctx, false); + } + + /** {@inheritDoc} */ @Override public CacheObject prepareForCache(CacheObjectContext ctx) { if (needCopy(ctx)) { if (val instanceof byte[]) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ff54f23/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java index d052f94..e9fb75b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java @@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; /** * Cache object wrapping key provided by user. Need to be copied before stored in cache. @@ -40,6 +40,11 @@ public class UserKeyCacheObjectImpl extends KeyCacheObjectImpl { } /** {@inheritDoc} */ + @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) { + return super.value(ctx, false); + } + + /** {@inheritDoc} */ @Override public byte[] valueBytes(CacheObjectContext ctx) throws IgniteCheckedException { if (valBytes == null) valBytes = ctx.processor().marshal(ctx, val); @@ -56,7 +61,7 @@ public class UserKeyCacheObjectImpl extends KeyCacheObjectImpl { if (needCopy(ctx)) { Object val = ctx.processor().unmarshal(ctx, valBytes, - ctx.kernalContext().config().getClassLoader()); + this.val.getClass().getClassLoader()); return new KeyCacheObjectImpl(val, valBytes); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ff54f23/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 5f0e7b0..2af9a3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -486,7 +486,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition> assert state() == EVICTED; try { - GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> it = cctx.swap().iterator(id, false); + GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> it = cctx.swap().iterator(id); boolean isLocStore = cctx.store().isLocalStore(); @@ -497,7 +497,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition> byte[] keyBytes = entry.getKey(); - KeyCacheObject key = cctx.toCacheKeyObject(null, keyBytes, false); + KeyCacheObject key = cctx.toCacheKeyObject(keyBytes); cctx.swap().remove(key); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ff54f23/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java index 0348e8a..6129ee0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java @@ -185,14 +185,12 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G * @param ctx Cache context. * @throws IgniteCheckedException If failed. */ - void addEntry(int p, GridCacheEntryInfo info, GridCacheSharedContext ctx) throws IgniteCheckedException { + void addEntry(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException { assert info != null; - GridCacheContext cctx = ctx.cacheContext(cacheId); + marshalInfo(info, ctx); - marshalInfo(info, cctx); - - msgSize += info.marshalledSize(); + msgSize += info.marshalledSize(ctx); CacheEntryInfoCollection infoCol = infos.get(p); @@ -213,17 +211,15 @@ public class GridDhtPartitionSupplyMessage extends GridCacheMessage implements G * @param ctx Cache context. * @throws IgniteCheckedException If failed. */ - void addEntry0(int p, GridCacheEntryInfo info, GridCacheSharedContext ctx) throws IgniteCheckedException { + void addEntry0(int p, GridCacheEntryInfo info, GridCacheContext ctx) throws IgniteCheckedException { assert info != null; assert info.key() != null; assert info.value() != null; - GridCacheContext cctx = ctx.cacheContext(cacheId); - // Need to call this method to initialize info properly. - marshalInfo(info, cctx); + marshalInfo(info, ctx); - msgSize += info.marshalledSize(); + msgSize += info.marshalledSize(ctx); CacheEntryInfoCollection infoCol = infos.get(p); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ff54f23/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java index 9ad221e..2ba161e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java @@ -314,7 +314,7 @@ class GridDhtPartitionSupplyPool<K, V> { if (info != null && !(info.key() instanceof GridPartitionLockKey) && !info.isNew()) { if (preloadPred == null || preloadPred.apply(info)) - s.addEntry(part, info, cctx.shared()); + s.addEntry(part, info, cctx); else if (log.isDebugEnabled()) log.debug("Preload predicate evaluated to false (will not sender cache entry): " + info); @@ -326,7 +326,7 @@ class GridDhtPartitionSupplyPool<K, V> { if (cctx.isSwapOrOffheapEnabled()) { GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter = - cctx.swap().iterator(part, false); + cctx.swap().iterator(part); // Iterator may be null if space does not exist. if (iter != null) { @@ -366,14 +366,14 @@ class GridDhtPartitionSupplyPool<K, V> { GridCacheEntryInfo info = new GridCacheEntryInfo(); - info.key(cctx.toCacheKeyObject(null, e.getKey(), true)); + info.keyBytes(e.getKey()); info.ttl(swapEntry.ttl()); info.expireTime(swapEntry.expireTime()); info.version(swapEntry.version()); info.value(swapEntry.value()); if (preloadPred == null || preloadPred.apply(info)) - s.addEntry0(part, info, cctx.shared()); + s.addEntry0(part, info, cctx); else { if (log.isDebugEnabled()) log.debug("Preload predicate evaluated to false (will not send " + @@ -447,7 +447,7 @@ class GridDhtPartitionSupplyPool<K, V> { } if (preloadPred == null || preloadPred.apply(info)) - s.addEntry(part, info, cctx.shared()); + s.addEntry(part, info, cctx); else if (log.isDebugEnabled()) log.debug("Preload predicate evaluated to false (will not sender cache entry): " + info); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ff54f23/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 2472867..9f79452 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -2334,7 +2334,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte if (key != null) return key; - key = cctx.toCacheKeyObject(null, keyBytes(), false).value(cctx.cacheObjectContext(), false); + key = cctx.toCacheKeyObject(keyBytes()).value(cctx.cacheObjectContext(), false); return key; } @@ -2398,22 +2398,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte /** {@inheritDoc} */ @SuppressWarnings("IfMayBeConditional") @Override protected V unmarshalValue() throws IgniteCheckedException { - byte[] bytes = e.getValue(); + IgniteBiTuple<byte[], Byte> t = GridCacheSwapEntryImpl.getValue(e.getValue()); - byte[] val = GridCacheSwapEntryImpl.getValueIfByteArray(bytes); + CacheObject obj = cctx.portable().toCacheObject(cctx.cacheObjectContext(), t.get2(), t.get1()); - if (val != null) - return (V)val; - - if (cctx.offheapTiered() && cctx.portableEnabled()) - return (V)cctx.portable().unmarshal(bytes, GridCacheSwapEntryImpl.valueOffset(bytes)); - else { - GridByteArrayInputStream in = new GridByteArrayInputStream(bytes, - GridCacheSwapEntryImpl.valueOffset(bytes), - bytes.length); - - return cctx.marshaller().unmarshal(in, cctx.deploy().globalLoader()); - } + return obj.value(cctx.cacheObjectContext(), false); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ff54f23/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index d65618a..35673c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -2459,37 +2459,37 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter private void addInvokeResult(IgniteTxEntry txEntry, CacheObject cacheVal, GridCacheReturn<?> ret) { GridCacheContext ctx = txEntry.context(); - Object keyVal = null; - Object val = null; + Object key0 = null; + Object val0 = null; try { Object res = null; for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) { CacheInvokeEntry<Object, Object> invokeEntry = - new CacheInvokeEntry(txEntry.context(), txEntry.key(), keyVal, cacheVal, val); + new CacheInvokeEntry(txEntry.context(), txEntry.key(), key0, cacheVal, val0); EntryProcessor<Object, Object, ?> entryProcessor = t.get1(); res = entryProcessor.process(invokeEntry, t.get2()); - val = invokeEntry.value(); + val0 = invokeEntry.value(); - keyVal = invokeEntry.key(); + key0 = invokeEntry.key(); } if (res != null) { - if (keyVal == null) - keyVal = txEntry.key().value(ctx.cacheObjectContext(), true); + if (key0 == null) + key0 = txEntry.key().value(ctx.cacheObjectContext(), true); - ret.addEntryProcessResult(keyVal, new CacheInvokeResult<>(res)); + ret.addEntryProcessResult(key0, new CacheInvokeResult<>(res)); } } catch (Exception e) { - if (keyVal == null) - keyVal = txEntry.key().value(ctx.cacheObjectContext(), true); + if (key0 == null) + key0 = txEntry.key().value(ctx.cacheObjectContext(), true); - ret.addEntryProcessResult(keyVal, new CacheInvokeResult(e)); + ret.addEntryProcessResult(key0, new CacheInvokeResult(e)); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ff54f23/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java index 154e685..53b2c3a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java @@ -378,8 +378,8 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay // TODO IGNITE-51. Collection<? extends IgniteDataLoaderEntry> entries0 = F.viewReadOnly(entries, new C1<Entry<K, V>, IgniteDataLoaderEntry>() { @Override public IgniteDataLoaderEntry apply(Entry<K, V> e) { - KeyCacheObject key = cacheObjProc.toCacheKeyObject(cacheObjCtx, e.getKey(), null); - CacheObject val = cacheObjProc.toCacheObject(cacheObjCtx, e.getValue(), null); + KeyCacheObject key = cacheObjProc.toCacheKeyObject(cacheObjCtx, e.getKey()); + CacheObject val = cacheObjProc.toCacheObject(cacheObjCtx, e.getValue()); return new IgniteDataLoaderEntry(key, val); } @@ -486,8 +486,8 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay @Override public IgniteFuture<?> addData(K key, V val) { A.notNull(key, "key"); - KeyCacheObject key0 = cacheObjProc.toCacheKeyObject(cacheObjCtx, key, null); - CacheObject val0 = cacheObjProc.toCacheObject(cacheObjCtx, val, null); + KeyCacheObject key0 = cacheObjProc.toCacheKeyObject(cacheObjCtx, key); + CacheObject val0 = cacheObjProc.toCacheObject(cacheObjCtx, val); return addDataInternal(Collections.singleton(new IgniteDataLoaderEntry(key0, val0))); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ff54f23/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 6befea6..b2475c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -118,22 +118,17 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { assert atomicsCache != null; - dsView = atomicsCache.projection(GridCacheInternal.class, GridCacheInternal.class).flagsOn(CLONE); + dsView = atomicsCache.flagsOn(CLONE); - cntDownLatchView = atomicsCache.projection - (GridCacheInternalKey.class, GridCacheCountDownLatchValue.class).flagsOn(CLONE); + cntDownLatchView = atomicsCache.flagsOn(CLONE); - atomicLongView = atomicsCache.projection - (GridCacheInternalKey.class, GridCacheAtomicLongValue.class).flagsOn(CLONE); + atomicLongView = atomicsCache.flagsOn(CLONE); - atomicRefView = atomicsCache.projection - (GridCacheInternalKey.class, GridCacheAtomicReferenceValue.class).flagsOn(CLONE); + atomicRefView = atomicsCache.flagsOn(CLONE); - atomicStampedView = atomicsCache.projection - (GridCacheInternalKey.class, GridCacheAtomicStampedValue.class).flagsOn(CLONE); + atomicStampedView = atomicsCache.flagsOn(CLONE); - seqView = atomicsCache.projection - (GridCacheInternalKey.class, GridCacheAtomicSequenceValue.class).flagsOn(CLONE); + seqView = atomicsCache.flagsOn(CLONE); dsCacheCtx = ctx.cache().internalCache(CU.ATOMICS_CACHE_NAME).context(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ff54f23/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java index 7636891..f2d6287 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java @@ -160,10 +160,17 @@ public interface GridPortableProcessor extends GridProcessor { /** * @param ctx Cache context. * @param obj Object. + * @return Cache object. + */ + @Nullable public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable Object obj); + + /** + * @param ctx Cache context. + * @param type Object type. * @param bytes Object bytes. * @return Cache object. */ - @Nullable public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable Object obj, byte[] bytes); + public CacheObject toCacheObject(CacheObjectContext ctx, byte type, byte[] bytes); /** * @param ctx Context. @@ -180,7 +187,7 @@ public interface GridPortableProcessor extends GridProcessor { * @param bytes Optional key bytes. * @return Cache key object. */ - public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj, byte[] bytes); + public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj); /** * @param obj Value. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ff54f23/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/IgniteCacheObjectProcessorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/IgniteCacheObjectProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/IgniteCacheObjectProcessorAdapter.java index 9500571..6e65c2f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/IgniteCacheObjectProcessorAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/IgniteCacheObjectProcessorAdapter.java @@ -93,11 +93,11 @@ public abstract class IgniteCacheObjectProcessorAdapter extends GridProcessorAda } /** {@inheritDoc} */ - @Nullable public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj, byte[] bytes) { + @Nullable public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj) { if (obj instanceof KeyCacheObject) return (KeyCacheObject)obj; - return new UserKeyCacheObjectImpl(obj, bytes); + return new UserKeyCacheObjectImpl(obj, null); } /** {@inheritDoc} */ @@ -110,32 +110,41 @@ public abstract class IgniteCacheObjectProcessorAdapter extends GridProcessorAda ptr += 4; - boolean plainByteArr = UNSAFE.getByte(ptr++) == 1; + byte type = UNSAFE.getByte(ptr++); byte[] bytes = U.copyMemory(ptr, size); - if (plainByteArr) - return new CacheObjectImpl(bytes, null); - - if (ctx.offheapTiered()) { + if (ctx.kernalContext().config().isPeerClassLoadingEnabled() && + ctx.offheapTiered() && + type != CacheObjectAdapter.TYPE_BYTE_ARR) { IgniteUuid valClsLdrId = U.readGridUuid(ptr + size); ClassLoader ldr = valClsLdrId != null ? ctx.deploy().getClassLoader(valClsLdrId) : ctx.deploy().localLoader(); - return new CacheObjectImpl(ctx.marshaller().unmarshal(bytes, ldr), bytes); + return toCacheObject(ctx.cacheObjectContext(), unmarshal(ctx.cacheObjectContext(), bytes, ldr)); } else - return new CacheObjectImpl(ctx.marshaller().unmarshal(bytes, U.gridClassLoader()), bytes); + return toCacheObject(ctx.cacheObjectContext(), type, bytes); } /** {@inheritDoc} */ - @Nullable @Override public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable Object obj, byte[] bytes) { - if ((obj == null && bytes == null) || obj instanceof CacheObject) - return (CacheObject)obj; + @Override public CacheObject toCacheObject(CacheObjectContext ctx, byte type, byte[] bytes) { + switch (type) { + case CacheObjectAdapter.TYPE_BYTE_ARR: + return new CacheObjectImpl(bytes, null); + + case CacheObjectAdapter.TYPE_REGULAR: + return new CacheObjectImpl(null, bytes); + } - if (bytes != null) - return new CacheObjectImpl(obj, bytes); + throw new IllegalArgumentException("Invalid object type: " + type); + } + + /** {@inheritDoc} */ + @Nullable @Override public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable Object obj) { + if (obj == null || obj instanceof CacheObject) + return (CacheObject)obj; return new UserCacheObjectImpl(obj); }