Repository: incubator-ignite Updated Branches: refs/heads/ignite-51 bf2b5ac28 -> 4c7f4566f
# ignite-51 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4c7f4566 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4c7f4566 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4c7f4566 Branch: refs/heads/ignite-51 Commit: 4c7f4566fe60fedbfb76ee164388b8e56dba50d8 Parents: bf2b5ac Author: sboikov <sboi...@gridgain.com> Authored: Wed Mar 4 11:55:01 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Mar 4 13:32:22 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/CacheObjectAdapter.java | 1 + .../processors/cache/CacheObjectImpl.java | 10 +-- .../processors/cache/GridCacheContext.java | 43 ++-------- .../processors/cache/GridCacheMapEntry.java | 83 ++++---------------- .../processors/cache/KeyCacheObjectImpl.java | 4 +- .../processors/cache/UserCacheObjectImpl.java | 2 +- .../cache/UserKeyCacheObjectImpl.java | 13 +-- .../local/atomic/GridLocalAtomicCache.java | 11 --- .../cache/query/GridCacheQueryManager.java | 4 +- .../transactions/IgniteTxLocalAdapter.java | 55 ------------- .../dataload/IgniteDataLoaderImpl.java | 4 +- .../portable/GridPortableProcessor.java | 55 ++++++++----- .../IgniteCacheObjectProcessorAdapter.java | 81 +++++++++++++++++++ .../portable/os/GridOsPortableProcessor.java | 44 ----------- 14 files changed, 160 insertions(+), 250 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c7f4566/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java index 89a2ac1..b8ca690 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java @@ -57,6 +57,7 @@ public abstract class CacheObjectAdapter implements CacheObject, Externalizable assert valBytes != null; out.writeBoolean(byteArray()); + U.writeByteArray(out, valBytes); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c7f4566/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java index aef1978..51cb487 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java @@ -59,7 +59,7 @@ public class CacheObjectImpl extends CacheObjectAdapter { if (byteArray()) return (T)Arrays.copyOf(bytes, bytes.length); else - return ctx.marshaller().unmarshal(valBytes, U.gridClassLoader()); + return (T)ctx.portable().unmarshal(ctx.cacheObjectContext(), valBytes, U.gridClassLoader()); } if (val != null) @@ -67,7 +67,7 @@ public class CacheObjectImpl extends CacheObjectAdapter { assert valBytes != null; - val = ctx.marshaller().unmarshal(valBytes, U.gridClassLoader()); + val = ctx.portable().unmarshal(ctx.cacheObjectContext(), valBytes, U.gridClassLoader()); } catch (IgniteCheckedException e) { throw new IgniteException("Failed to unmarshal object.", e); @@ -87,7 +87,7 @@ public class CacheObjectImpl extends CacheObjectAdapter { return (byte[])val; if (valBytes == null) - valBytes = CU.marshal(ctx.shared(), val); + valBytes = ctx.portable().marshal(ctx.cacheObjectContext(), val); return valBytes; } @@ -95,7 +95,7 @@ public class CacheObjectImpl extends CacheObjectAdapter { /** {@inheritDoc} */ @Override public void prepareMarshal(CacheObjectContext ctx) throws IgniteCheckedException { if (valBytes == null && !byteArray()) - valBytes = CU.marshal(ctx.kernalContext().cache().context(), val); + valBytes = ctx.kernalContext().portable().marshal(ctx, val); } /** {@inheritDoc} */ @@ -103,7 +103,7 @@ public class CacheObjectImpl extends CacheObjectAdapter { assert val != null || valBytes != null; if (val == null && ctx.isUnmarshalValues()) - val = ctx.marshaller().unmarshal(valBytes, ldr); + val = ctx.portable().unmarshal(ctx.cacheObjectContext(), valBytes, ldr); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c7f4566/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 577a978..16a403e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -1780,6 +1780,7 @@ public class GridCacheContext<K, V> implements Externalizable { /** * @param obj Object. + * @param bytes Optional value bytes. * @return Cache object. */ @Nullable public CacheObject toCacheObject(@Nullable Object obj, byte[] bytes) { @@ -1791,7 +1792,7 @@ public class GridCacheContext<K, V> implements Externalizable { * @return Cache object. */ public KeyCacheObject toCacheKeyObject(Object obj) { - return portable().toCacheKeyObject(cacheObjCtx, obj); + return portable().toCacheKeyObject(cacheObjCtx, obj, null); } /** @@ -1803,17 +1804,16 @@ public class GridCacheContext<K, V> implements Externalizable { */ public KeyCacheObject toCacheKeyObject(Object obj, byte[] bytes, boolean transferOnly) throws IgniteCheckedException { - // TODO IGNITE-51 move to processor. assert obj != null || bytes != null; if (obj == null) { if (transferOnly) return new KeyCacheObjectTransferImpl(bytes); - obj = marshaller().unmarshal(bytes, deploy().globalLoader()); + obj = ctx.portable().unmarshal(cacheObjCtx, bytes, deploy().globalLoader()); } - return new KeyCacheObjectImpl(obj, bytes); + return ctx.portable().toCacheKeyObject(cacheObjCtx, obj, bytes); } /** @@ -1826,7 +1826,6 @@ public class GridCacheContext<K, V> implements Externalizable { @Nullable public CacheObject unswapCacheObject(byte[] bytes, boolean valIsByteArr, @Nullable IgniteUuid clsLdrId) throws IgniteCheckedException { - // TODO IGNITE-51 move to processor. if (valIsByteArr) return new CacheObjectImpl(bytes, null); @@ -1835,12 +1834,9 @@ public class GridCacheContext<K, V> implements Externalizable { if (ldr == null) return null; - return new CacheObjectImpl(marshaller().unmarshal(bytes, ldr), bytes); + return new CacheObjectImpl(portable().unmarshal(cacheObjCtx, bytes, ldr), bytes); } - /** */ - private static final sun.misc.Unsafe UNSAFE = GridUnsafe.unsafe(); - /** * @param valPtr Value pointer. * @param tmp If {@code true} can return temporary instance which is valid while entry lock is held. @@ -1849,33 +1845,9 @@ public class GridCacheContext<K, V> implements Externalizable { */ public CacheObject fromOffheap(long valPtr, boolean tmp) throws IgniteCheckedException { assert config().getMemoryMode() == OFFHEAP_TIERED || config().getMemoryMode() == OFFHEAP_VALUES; + assert valPtr != 0; - // TODO IGNITE-51. - if (portableEnabled()) - return (CacheObject)portable().unmarshal(valPtr, !tmp); - - long ptr = valPtr; - - int size = UNSAFE.getInt(ptr); - - ptr += 4; - - boolean plainByteArr = UNSAFE.getByte(ptr++) == 1; - - byte[] bytes = U.copyMemory(ptr, size); - - if (plainByteArr) - return new CacheObjectImpl(bytes, null); - - if (offheapTiered()) { - IgniteUuid valClsLdrId = U.readGridUuid(ptr + size); - - ClassLoader ldr = valClsLdrId != null ? deploy().getClassLoader(valClsLdrId) : deploy().localLoader(); - - return new CacheObjectImpl(marshaller().unmarshal(bytes, ldr), bytes); - } - else - return new CacheObjectImpl(marshaller().unmarshal(bytes, U.gridClassLoader()), bytes); + return ctx.portable().toCacheObject(this, valPtr, tmp); } /** @@ -1885,6 +1857,7 @@ public class GridCacheContext<K, V> implements Externalizable { * @param skipVals Skip values flag. * @param keepCacheObjects Keep cache objects flag. * @param deserializePortable Deserialize portable flag. + * @param cpy Copy flag. */ @SuppressWarnings("unchecked") public <K1, V1> void addResult(Map<K1, V1> map, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c7f4566/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 2adc34a..52f9659 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -149,7 +149,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { GridCacheMapEntry next, long ttl, int hdrId) { log = U.logger(cctx.kernalContext(), logRef, GridCacheMapEntry.class); - key = (KeyCacheObject)cctx.kernalContext().portable().detachPortable(key, cctx); + key = (KeyCacheObject)cctx.kernalContext().portable().prepareForCache(key, cctx); assert key != null; @@ -159,7 +159,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { ttlAndExpireTimeExtras(ttl, CU.toExpireTime(ttl)); - val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx); + val = (CacheObject)cctx.kernalContext().portable().prepareForCache(val, cctx); synchronized (this) { value(val); @@ -499,7 +499,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (delta >= 0) { CacheObject val = e.value(); - val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx); + val = (CacheObject)cctx.kernalContext().portable().prepareForCache(val, cctx); // Set unswapped value. update(val, e.expireTime(), e.ttl(), e.version()); @@ -846,7 +846,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (startVer.equals(ver)) { if (ret != null) { // Detach value before index update. - ret = (CacheObject)cctx.kernalContext().portable().detachPortable(ret, cctx); + ret = (CacheObject)cctx.kernalContext().portable().prepareForCache(ret, cctx); GridCacheVersion nextVer = nextVersion(); @@ -929,7 +929,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { long expTime = CU.toExpireTime(ttl); // Detach value before index update. - ret = (CacheObject)cctx.kernalContext().portable().detachPortable(ret, cctx); + ret = (CacheObject)cctx.kernalContext().portable().prepareForCache(ret, cctx); // Update indexes. if (ret != null) { @@ -1066,7 +1066,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { assert expireTime >= 0 : expireTime; // Detach value before index update. - val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx); + val = (CacheObject)cctx.kernalContext().portable().prepareForCache(val, cctx); // Update index inside synchronization since it can be updated // in load methods without actually holding entry lock. @@ -1378,7 +1378,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { } // Detach value before index update. - old = (CacheObject)cctx.kernalContext().portable().detachPortable(old, cctx); + old = (CacheObject)cctx.kernalContext().portable().prepareForCache(old, cctx); if (old != null) updateIndex(old, expireTime, ver, null); @@ -1514,7 +1514,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // Try write-through. if (op == GridCacheOperation.UPDATE) { // Detach value before index update. - updated = (CacheObject)cctx.kernalContext().portable().detachPortable(updated, cctx); + updated = (CacheObject)cctx.kernalContext().portable().prepareForCache(updated, cctx); if (writeThrough) // Must persist inside synchronization in non-tx mode. @@ -1803,7 +1803,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { readThrough = true; // Detach value before index update. - oldVal = (CacheObject)cctx.kernalContext().portable().detachPortable(oldVal, cctx); + oldVal = (CacheObject)cctx.kernalContext().portable().prepareForCache(oldVal, cctx); // Calculate initial TTL and expire time. long initTtl; @@ -2036,7 +2036,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // Do not change size. } - updated = (CacheObject)cctx.kernalContext().portable().detachPortable(updated, cctx); + updated = (CacheObject)cctx.kernalContext().portable().prepareForCache(updated, cctx); // Update index inside synchronization since it can be updated // in load methods without actually holding entry lock. @@ -2957,7 +2957,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { // in load methods without actually holding entry lock. long expireTime = expireTimeExtras(); - val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx); + val = (CacheObject)cctx.kernalContext().portable().prepareForCache(val, cctx); updateIndex(val, expireTime, nextVer, old); @@ -3237,7 +3237,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { if (isNew() || (!preload && deletedUnlocked())) { long expTime = expireTime < 0 ? CU.toExpireTime(ttl) : expireTime; - val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx); + val = (CacheObject)cctx.kernalContext().portable().prepareForCache(val, cctx); if (val != null) updateIndex(val, expTime, ver, null); @@ -3290,7 +3290,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { CacheObject val = unswapped.value(); if (cctx.portableEnabled()) { - val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx); + val = (CacheObject)cctx.kernalContext().portable().prepareForCache(val, cctx); if (cctx.offheapTiered() && !unswapped.valueIsByteArray()) unswapped.valueBytes(cctx.convertPortableBytes(unswapped.valueBytes())); @@ -3337,7 +3337,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { long expTime = CU.toExpireTime(ttl); // Detach value before index update. - val = (CacheObject)cctx.kernalContext().portable().detachPortable(val, cctx); + val = (CacheObject)cctx.kernalContext().portable().prepareForCache(val, cctx); if (val != null) { updateIndex(val, expTime, newVer, old); @@ -4303,61 +4303,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { return extras != null ? extras.size() : 0; } - /** - * @return Value bytes read from offheap. - * @throws IgniteCheckedException If failed. - */ - private GridCacheValueBytes offheapValueBytes() throws IgniteCheckedException { - assert cctx.offheapTiered() && valPtr != 0; - - long ptr = valPtr; - - boolean plainByteArr = UNSAFE.getByte(ptr++) != 0; - - if (plainByteArr || !cctx.portableEnabled()) { - int size = UNSAFE.getInt(ptr); - - byte[] bytes = U.copyMemory(ptr + 4, size); - - return plainByteArr ? GridCacheValueBytes.plain(bytes) : GridCacheValueBytes.marshaled(bytes); - } - - assert cctx.portableEnabled(); - - return GridCacheValueBytes.marshaled(CU.marshal(cctx.shared(), cctx.portable().unmarshal(valPtr, true))); - } - - /** - * @param tmp If {@code true} can return temporary object. - * @return Unmarshalled value. - * @throws IgniteCheckedException If unmarshalling failed. - */ - private CacheObject unmarshalOffheap(boolean tmp) throws IgniteCheckedException { - assert cctx.offheapTiered() && valPtr != 0; - - if (cctx.portableEnabled()) - return (CacheObject)cctx.portable().unmarshal(valPtr, !tmp); - - long ptr = valPtr; - - boolean plainByteArr = UNSAFE.getByte(ptr++) != 0; - - int size = UNSAFE.getInt(ptr); - - byte[] res = U.copyMemory(ptr + 4, size); - -// TODO IGNITE-51. -// if (plainByteArr) -// return (V)res; - - IgniteUuid valClsLdrId = U.readGridUuid(ptr + 4 + size); - - ClassLoader ldr = valClsLdrId != null ? cctx.deploy().getClassLoader(valClsLdrId) : - cctx.deploy().localLoader(); - - return cctx.marshaller().unmarshal(res, ldr); - } - /** {@inheritDoc} */ @Override public boolean equals(Object o) { // Identity comparison left on purpose. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c7f4566/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java index 9dff5d8..20852be 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java @@ -85,7 +85,9 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb if (cpy) { try { - return (T)ctx.marshaller().unmarshal(valBytes, ctx.deploy().globalLoader()); + return (T)ctx.portable().unmarshal(ctx.cacheObjectContext(), + valBytes, + ctx.deploy().globalLoader()); } catch (IgniteCheckedException e) { throw new IgniteException("Failed to unmarshal object.", e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c7f4566/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserCacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserCacheObjectImpl.java index 2453b33..41d47db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserCacheObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserCacheObjectImpl.java @@ -51,7 +51,7 @@ public class UserCacheObjectImpl extends CacheObjectImpl { else { try { if (valBytes == null) - valBytes = CU.marshal(ctx.shared(), val); + valBytes = ctx.portable().marshal(ctx.cacheObjectContext(), val); return new CacheObjectImpl(null, valBytes); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c7f4566/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java index 0b7bc2e..eb5d748 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/UserKeyCacheObjectImpl.java @@ -26,9 +26,10 @@ import org.apache.ignite.internal.util.typedef.internal.*; public class UserKeyCacheObjectImpl extends KeyCacheObjectImpl { /** * @param val Key value. + * @param bytes Bytes. */ - public UserKeyCacheObjectImpl(Object val) { - super(val, null); + public UserKeyCacheObjectImpl(Object val, byte[] bytes) { + super(val, bytes); } /** @@ -41,7 +42,7 @@ public class UserKeyCacheObjectImpl extends KeyCacheObjectImpl { /** {@inheritDoc} */ @Override public byte[] valueBytes(GridCacheContext ctx) throws IgniteCheckedException { if (valBytes == null) - valBytes = CU.marshal(ctx.shared(), val); + valBytes = ctx.portable().marshal(ctx.cacheObjectContext(), val); return valBytes; } @@ -50,10 +51,12 @@ public class UserKeyCacheObjectImpl extends KeyCacheObjectImpl { @Override public CacheObject prepareForCache(GridCacheContext ctx) { try { if (valBytes == null) - valBytes = ctx.marshaller().marshal(val); + valBytes = ctx.portable().marshal(ctx.cacheObjectContext(), val); if (needCopy(ctx)) { - Object val = ctx.marshaller().unmarshal(valBytes, ctx.deploy().globalLoader()); + Object val = ctx.portable().unmarshal(ctx.cacheObjectContext(), + valBytes, + ctx.deploy().globalLoader()); return new KeyCacheObjectImpl(val, valBytes); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c7f4566/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index a4a586f..b47891f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -490,9 +490,6 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { String taskName = ctx.kernalContext().job().currentTaskName(); - if (ctx.portableEnabled()) - key = (K)ctx.marshalToPortable(key); - Map<K, V> m = getAllInternal(Collections.singleton(key), ctx.isSwapOrOffheapEnabled(), ctx.readThrough(), @@ -519,14 +516,6 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { String taskName = ctx.kernalContext().job().currentTaskName(); - if (ctx.portableEnabled() && !F.isEmpty(keys)) { - keys = F.viewReadOnly(keys, new C1<K, K>() { - @Override public K apply(K k) { - return (K)ctx.marshalToPortable(k); - } - }); - } - return getAllInternal(keys, ctx.isSwapOrOffheapEnabled(), ctx.readThrough(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c7f4566/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index fa74020..260cff1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -2463,7 +2463,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte @Override protected V unmarshalValue() throws IgniteCheckedException { long ptr = GridCacheOffheapSwapEntry.valueAddress(valPtr.get1(), valPtr.get2()); - V val = (V)cctx.portable().unmarshal(ptr, false); + CacheObject obj = cctx.fromOffheap(ptr, false); + + V val = CU.value(obj, cctx, false); assert val != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c7f4566/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index a0898f6..5651d0f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -2535,43 +2535,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter invokeMap0 = null; } -// TODO IGNITE-51. -// else if (cacheCtx.portableEnabled()) { -// if (map != null) { -// map0 = U.newHashMap(map.size()); -// -// try { -// for (Map.Entry<? extends K, ? extends V> e : map.entrySet()) { -// K key = (K)cacheCtx.marshalToPortable(e.getKey()); -// V val = (V)cacheCtx.marshalToPortable(e.getValue()); -// -// map0.put(key, val); -// } -// } -// catch (IgniteException e) { -// return new GridFinishedFuture<>(cctx.kernalContext(), e); -// } -// } -// else -// map0 = null; -// -// if (invokeMap != null) { -// invokeMap0 = U.newHashMap(invokeMap.size()); -// -// try { -// for (Map.Entry<? extends K, ? extends EntryProcessor<K, V, Object>> e : invokeMap.entrySet()) { -// K key = (K)cacheCtx.marshalToPortable(e.getKey()); -// -// invokeMap0.put(key, e.getValue()); -// } -// } -// catch (IgniteException e) { -// return new GridFinishedFuture<>(cctx.kernalContext(), e); -// } -// } -// else -// invokeMap0 = null; -// } else { map0 = (Map<K, V>)map; invokeMap0 = (Map<K, EntryProcessor<K, V, Object>>)invokeMap; @@ -2786,24 +2749,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter keys0 = drMap.keySet(); } -// TODO IGNITE-51. -// else if (cacheCtx.portableEnabled()) { -// try { -// if (keys != null) { -// Collection<K> pKeys = new ArrayList<>(keys.size()); -// -// for (K key : keys) -// pKeys.add((K)cacheCtx.marshalToPortable(key)); -// -// keys0 = pKeys; -// } -// else -// keys0 = null; -// } -// catch (IgniteException e) { -// return new GridFinishedFuture<>(cctx.kernalContext(), e); -// } -// } else keys0 = keys; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c7f4566/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java index 4b4621d..953bcb3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataLoaderImpl.java @@ -371,7 +371,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay // TODO IGNITE-51. Collection<? extends IgniteDataLoaderEntry> entries0 = F.viewReadOnly(entries, new C1<Entry<K, V>, IgniteDataLoaderEntry>() { @Override public IgniteDataLoaderEntry apply(Entry<K, V> e) { - KeyCacheObject key = cacheObjProc.toCacheKeyObject(cacheObjCtx, e.getKey()); + KeyCacheObject key = cacheObjProc.toCacheKeyObject(cacheObjCtx, e.getKey(), null); CacheObject val = cacheObjProc.toCacheObject(cacheObjCtx, e.getValue(), null); return new IgniteDataLoaderEntry(key, val); @@ -465,7 +465,7 @@ public class IgniteDataLoaderImpl<K, V> implements IgniteDataLoader<K, V>, Delay @Override public IgniteFuture<?> addData(K key, V val) { A.notNull(key, "key"); - KeyCacheObject key0 = cacheObjProc.toCacheKeyObject(cacheObjCtx, key); + KeyCacheObject key0 = cacheObjProc.toCacheKeyObject(cacheObjCtx, key, null); CacheObject val0 = cacheObjProc.toCacheObject(cacheObjCtx, val, null); return addDataInternal(Collections.singleton(new IgniteDataLoaderEntry(key0, val0))); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c7f4566/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java index 35f6760..f564c32 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java @@ -49,14 +49,6 @@ public interface GridPortableProcessor extends GridProcessor { public int typeId(Object obj); /** - * @param obj Object to marshal. - * @param trim If {@code true} trims result byte buffer. - * @return Object bytes. - * @throws IgniteException In case of error. - */ - public ByteBuffer marshal(@Nullable Object obj, boolean trim) throws IgniteException; - - /** * @param arr Byte array. * @param off Offset. * @return Unmarshalled object. @@ -65,14 +57,6 @@ public interface GridPortableProcessor extends GridProcessor { public Object unmarshal(byte[] arr, int off) throws IgniteException; /** - * @param ptr Offheap pointer. - * @param forceHeap If {@code true} creates heap-based object. - * @return Unmarshalled object. - * @throws IgniteException In case of error. - */ - public Object unmarshal(long ptr, boolean forceHeap) throws IgniteException; - - /** * Converts temporary offheap object to heap-based. * * @param obj Object. @@ -89,13 +73,13 @@ public interface GridPortableProcessor extends GridProcessor { public Object marshalToPortable(@Nullable Object obj) throws IgniteException; /** - * TODO IGNITE-51: rename. + * Prepares cache object for cache (e.g. copies user-provided object if needed). * - * @param obj Object (portable or not). + * @param obj Cache object. * @param cctx Cache context. - * @return Detached portable object or original object. + * @return Object to be store in cache. */ - public Object detachPortable(@Nullable Object obj, GridCacheContext cctx); + @Nullable public CacheObject prepareForCache(@Nullable CacheObject obj, GridCacheContext cctx); /** * @return Portable marshaller for client connectivity or {@code null} if it's not @@ -149,22 +133,51 @@ public interface GridPortableProcessor extends GridProcessor { public boolean hasField(Object obj, String fieldName); /** + * @param ctx Cache object context. + * @param val Value. + * @return Value bytes. + * @throws IgniteCheckedException If failed. + */ + public byte[] marshal(CacheObjectContext ctx, Object val) throws IgniteCheckedException; + + /** + * @param bytes Bytes. + * @param clsLdr Class loader. + * @return Unmarshalled object. + * @throws IgniteCheckedException If failed. + */ + public Object unmarshal(CacheObjectContext ctx, byte[] bytes, ClassLoader clsLdr) throws IgniteCheckedException; + + /** + * @param node Node. * @param cacheName Cache name. * @return Cache object context. */ public CacheObjectContext contextForCache(ClusterNode node, @Nullable String cacheName); /** + * @param ctx Cache context. * @param obj Object. + * @param bytes Object bytes. * @return Cache object. */ @Nullable public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable Object obj, byte[] bytes); /** + * @param ctx Context. + * @param valPtr Value pointer. + * @param tmp If {@code true} can return temporary instance which is valid while entry lock is held. + * @return Cache object. + * @throws IgniteCheckedException If failed. + */ + public CacheObject toCacheObject(GridCacheContext ctx, long valPtr, boolean tmp) throws IgniteCheckedException; + + /** + * @param ctx Cache context. * @param obj Key value. * @return Cache key object. */ - public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj); + public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj, byte[] bytes); /** * @param obj Value. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c7f4566/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/IgniteCacheObjectProcessorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/IgniteCacheObjectProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/IgniteCacheObjectProcessorAdapter.java index 55b56b6..bca8127 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/IgniteCacheObjectProcessorAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/IgniteCacheObjectProcessorAdapter.java @@ -17,11 +17,16 @@ package org.apache.ignite.internal.processors.portable; +import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; import java.math.*; import java.util.*; @@ -30,6 +35,9 @@ import java.util.*; * */ public abstract class IgniteCacheObjectProcessorAdapter extends GridProcessorAdapter implements GridPortableProcessor { + /** */ + private static final sun.misc.Unsafe UNSAFE = GridUnsafe.unsafe(); + /** Immutable classes. */ private static final Collection<Class<?>> IMMUTABLE_CLS = new HashSet<>(); @@ -63,6 +71,79 @@ public abstract class IgniteCacheObjectProcessorAdapter extends GridProcessorAda } /** {@inheritDoc} */ + @Nullable @Override public CacheObject prepareForCache(@Nullable CacheObject obj, GridCacheContext cctx) { + if (obj == null) + return null; + + return obj.prepareForCache(cctx); + } + + /** {@inheritDoc} */ + @Override public byte[] marshal(CacheObjectContext ctx, Object val) throws IgniteCheckedException { + return CU.marshal(ctx.kernalContext().cache().context(), val); + } + + /** {@inheritDoc} */ + @Override public Object unmarshal(CacheObjectContext ctx, byte[] bytes, ClassLoader clsLdr) + throws IgniteCheckedException + { + return ctx.kernalContext().cache().context().marshaller().unmarshal(bytes, clsLdr); + } + + /** {@inheritDoc} */ + @Nullable public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj, byte[] bytes) { + if (obj instanceof KeyCacheObject) + return (KeyCacheObject)obj; + + return new UserKeyCacheObjectImpl(obj, bytes); + } + + /** {@inheritDoc} */ + @Override public CacheObject toCacheObject(GridCacheContext ctx, long valPtr, boolean tmp) + throws IgniteCheckedException + { + long ptr = valPtr; + + int size = UNSAFE.getInt(ptr); + + ptr += 4; + + boolean plainByteArr = UNSAFE.getByte(ptr++) == 1; + + byte[] bytes = U.copyMemory(ptr, size); + + if (plainByteArr) + return new CacheObjectImpl(bytes, null); + + if (ctx.offheapTiered()) { + IgniteUuid valClsLdrId = U.readGridUuid(ptr + size); + + ClassLoader ldr = + valClsLdrId != null ? ctx.deploy().getClassLoader(valClsLdrId) : ctx.deploy().localLoader(); + + return new CacheObjectImpl(ctx.marshaller().unmarshal(bytes, ldr), bytes); + } + else + return new CacheObjectImpl(ctx.marshaller().unmarshal(bytes, U.gridClassLoader()), bytes); + } + + /** {@inheritDoc} */ + @Nullable @Override public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable Object obj, byte[] bytes) { + if ((obj == null && bytes == null) || obj instanceof CacheObject) + return (CacheObject)obj; + + if (bytes != null) + return new CacheObjectImpl(obj, bytes); + + return new UserCacheObjectImpl(obj); + } + + /** {@inheritDoc} */ + @Override public CacheObjectContext contextForCache(ClusterNode node, @Nullable String cacheName) { + return new CacheObjectContext(ctx); + } + + /** {@inheritDoc} */ @Override public boolean immutable(Object obj) { assert obj != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4c7f4566/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java index 77ba7ef..e982ad4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java @@ -50,21 +50,11 @@ public class GridOsPortableProcessor extends IgniteCacheObjectProcessorAdapter { } /** {@inheritDoc} */ - @Override public ByteBuffer marshal(@Nullable Object obj, boolean trim) throws IgniteException { - return null; - } - - /** {@inheritDoc} */ @Nullable @Override public Object unmarshal(byte[] arr, int off) throws IgniteException { return null; } /** {@inheritDoc} */ - @Override public Object unmarshal(long ptr, boolean forceHeap) throws IgniteException { - return null; - } - - /** {@inheritDoc} */ @Override public Object unwrapTemporary(Object obj) throws IgniteException { return null; } @@ -75,16 +65,6 @@ public class GridOsPortableProcessor extends IgniteCacheObjectProcessorAdapter { } /** {@inheritDoc} */ - @Override public Object detachPortable(@Nullable Object obj, GridCacheContext cctx) { - if (obj == null) - return obj; - - assert obj instanceof CacheObject : obj; - - return ((CacheObject)obj).prepareForCache(cctx); - } - - /** {@inheritDoc} */ @Nullable @Override public GridClientMarshaller portableMarshaller() { return null; } @@ -123,28 +103,4 @@ public class GridOsPortableProcessor extends IgniteCacheObjectProcessorAdapter { @Override public boolean hasField(Object obj, String fieldName) { return false; } - - /** {@inheritDoc} */ - @Override public CacheObjectContext contextForCache(ClusterNode node, @Nullable String cacheName) { - return new CacheObjectContext(ctx); - } - - /** {@inheritDoc} */ - @Nullable public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, Object obj) { - if (obj instanceof KeyCacheObject) - return (KeyCacheObject)obj; - - return new UserKeyCacheObjectImpl(obj); - } - - /** {@inheritDoc} */ - @Nullable @Override public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable Object obj, byte[] bytes) { - if ((obj == null && bytes == null) || obj instanceof CacheObject) - return (CacheObject)obj; - - if (bytes != null) - return new CacheObjectImpl(obj, bytes); - - return new UserCacheObjectImpl(obj); - } }