Repository: incubator-ignite Updated Branches: refs/heads/ignite-51 a265949e8 -> 832f114e8
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index eef10d9..d080e32 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -167,7 +167,21 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M public void init() { long topVer = this.topVer > 0 ? this.topVer : cctx.affinity().affinityTopologyVersion(); - map(keys, Collections.<ClusterNode, LinkedHashMap<K, Boolean>>emptyMap(), topVer); + Collection<KeyCacheObject> keys0 = F.viewReadOnly(keys, new C1<K, KeyCacheObject>() { + @Override public KeyCacheObject apply(K key) { + if (key == null) { + NullPointerException err = new NullPointerException("Null key."); + + onDone(err); + + throw err; + } + + return cctx.toCacheKeyObject(key); + } + }); + + map(keys0, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer); markInitialized(); } @@ -182,13 +196,6 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M // Should not flip trackable flag from true to false since get future can be remapped. } - /** - * @return Keys. - */ - Collection<? extends K> keys() { - return keys; - } - /** {@inheritDoc} */ @Override public IgniteUuid futureId() { return futId; @@ -274,14 +281,18 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M * @param mapped Mappings to check for duplicates. * @param topVer Topology version on which keys should be mapped. */ - private void map(Collection<? extends K> keys, Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped, long topVer) { + private void map(Collection<KeyCacheObject> keys, + Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped, + long topVer) + { if (CU.affinityNodes(cctx, topVer).isEmpty()) { - onDone(new ClusterTopologyCheckedException("Failed to map keys for cache (all partition nodes left the grid).")); + onDone(new ClusterTopologyCheckedException("Failed to map keys for cache " + + "(all partition nodes left the grid).")); return; } - Map<ClusterNode, LinkedHashMap<K, Boolean>> mappings = + Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings = U.newHashMap(CU.affinityNodes(cctx, topVer).size()); final int keysSize = keys.size(); @@ -291,17 +302,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M boolean hasRmtNodes = false; // Assign keys to primary nodes. - for (K key : keys) { - if (key == null) { - NullPointerException err = new NullPointerException("Null key"); - - onDone(err); - - throw err; - } -// TODO IGNITE-51. -// hasRmtNodes |= map(key, mappings, locVals, topVer, mapped); - } + for (KeyCacheObject key : keys) + hasRmtNodes |= map(key, mappings, locVals, topVer, mapped); if (isDone()) return; @@ -316,10 +318,10 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M } // Create mini futures. - for (Map.Entry<ClusterNode, LinkedHashMap<K, Boolean>> entry : mappings.entrySet()) { + for (Map.Entry<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> entry : mappings.entrySet()) { final ClusterNode n = entry.getKey(); - final LinkedHashMap<K, Boolean> mappedKeys = entry.getValue(); + final LinkedHashMap<KeyCacheObject, Boolean> mappedKeys = entry.getValue(); assert !mappedKeys.isEmpty(); @@ -328,24 +330,21 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M final GridDhtFuture<Collection<GridCacheEntryInfo>> fut = cache().getDhtAsync(n.id(), -1, - // TODO IGNITE-51 - // mappedKeys, - null, + mappedKeys, readThrough, reload, topVer, subjId, taskName == null ? 0 : taskName.hashCode(), - deserializePortable, expiryPlc, skipVals); final Collection<Integer> invalidParts = fut.invalidPartitions(); if (!F.isEmpty(invalidParts)) { - Collection<K> remapKeys = new ArrayList<>(keysSize); + Collection<KeyCacheObject> remapKeys = new ArrayList<>(keysSize); - for (K key : keys) { + for (KeyCacheObject key : keys) { if (key != null && invalidParts.contains(cctx.affinity().partition(key))) remapKeys.add(key); } @@ -357,8 +356,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M ", invalidParts=" + invalidParts + ']'; // Remap recursively. - // TODO IGNITE-51 - // map(remapKeys, mappings, updTopVer); + map(remapKeys, mappings, updTopVer); } // Add new future. @@ -385,9 +383,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M futId, fut.futureId(), ver, - // TODO IGNITE-51 - // mappedKeys, - null, + mappedKeys, readThrough, reload, topVer, @@ -421,8 +417,10 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M * @return {@code True} if has remote nodes. */ @SuppressWarnings("ConstantConditions") - private boolean map(K key, Map<ClusterNode, LinkedHashMap<K, Boolean>> mappings, Map<K, V> locVals, - long topVer, Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped) { + private boolean map(KeyCacheObject key, + Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings, Map<K, V> locVals, + long topVer, + Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mapped) { GridDhtCacheAdapter<K, V> colocated = cache(); boolean remote = false; @@ -433,14 +431,11 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M while (true) { GridCacheEntryEx entry = null; - // TODO IGNITE-51. - KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); - try { if (!reload && allowLocRead) { try { - entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(cacheKey) : - colocated.peekEx(cacheKey); + entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) : + colocated.peekEx(key); // If our DHT cache do has value, then we peek it. if (entry != null) { @@ -464,18 +459,10 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M // Entry was not in memory or in swap, so we remove it from cache. if (v == null) { if (isNew && entry.markObsoleteIfEmpty(ver)) - colocated.removeIfObsolete(cacheKey); + colocated.removeIfObsolete(key); } else { - K key0 = key; - -// TODO IGNITE-51. -// if (cctx.portableEnabled()) { -// v = (V)cctx.unwrapPortableIfNeeded(v, !deserializePortable); -// key0 = (K)cctx.unwrapPortableIfNeeded(key, !deserializePortable); -// } -// -// locVals.put(key0, v); + cctx.addResult(locVals, key, v, skipVals, false, deserializePortable); return false; } @@ -490,19 +477,19 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M remote = !node.isLocal(); - LinkedHashMap<K, Boolean> keys = mapped.get(node); + LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(node); if (keys != null && keys.containsKey(key)) { if (remapCnt.incrementAndGet() > MAX_REMAP_CNT) { - onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " + MAX_REMAP_CNT - + " attempts (key got remapped to the same node) [key=" + key + ", node=" + + onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " + + MAX_REMAP_CNT + " attempts (key got remapped to the same node) [key=" + key + ", node=" + U.toShortString(node) + ", mappings=" + mapped + ']')); return false; } } - LinkedHashMap<K, Boolean> old = mappings.get(node); + LinkedHashMap<KeyCacheObject, Boolean> old = mappings.get(node); if (old == null) mappings.put(node, old = new LinkedHashMap<>(3, 1f)); @@ -546,30 +533,13 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M private Map<K, V> createResultMap(Collection<GridCacheEntryInfo> infos) { int keysSize = infos.size(); - try { - if (keysSize != 0) { - Map<K, V> map = new GridLeanMap<>(keysSize); + if (keysSize != 0) { + Map<K, V> map = new GridLeanMap<>(keysSize); - for (GridCacheEntryInfo info : infos) { - info.unmarshalValue(cctx, cctx.deploy().globalLoader()); - - K key = info.key().value(cctx); - V val = info.value().value(cctx); - - if (cctx.portableEnabled()) { - key = (K)cctx.unwrapPortableIfNeeded(key, !deserializePortable); - val = (V)cctx.unwrapPortableIfNeeded(val, !deserializePortable); - } + for (GridCacheEntryInfo info : infos) + cctx.addResult(map, info.key(), info.value(), skipVals, false, deserializePortable); - map.put(key, val); - } - - return map; - } - } - catch (IgniteCheckedException e) { - // Fail. - onDone(e); + return map; } return Collections.emptyMap(); @@ -596,7 +566,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M /** Keys. */ @GridToStringInclude - private LinkedHashMap<K, Boolean> keys; + private LinkedHashMap<KeyCacheObject, Boolean> keys; /** Topology version on which this future was mapped. */ private long topVer; @@ -613,7 +583,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M * @param keys Keys. * @param topVer Topology version. */ - MiniFuture(ClusterNode node, LinkedHashMap<K, Boolean> keys, long topVer) { + MiniFuture(ClusterNode node, LinkedHashMap<KeyCacheObject, Boolean> keys, long topVer) { super(cctx.kernalContext()); this.node = node; @@ -638,7 +608,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M /** * @return Keys. */ - public Collection<K> keys() { + public Collection<KeyCacheObject> keys() { return keys.keySet(); } @@ -715,8 +685,8 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M long topVer = fut.get(); // This will append new futures to compound list. - map(F.view(keys.keySet(), new P1<K>() { - @Override public boolean apply(K key) { + map(F.view(keys.keySet(), new P1<KeyCacheObject>() { + @Override public boolean apply(KeyCacheObject key) { return invalidParts.contains(cctx.affinity().partition(key)); } }), F.t(node, keys), topVer); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 6a54177..ec06a70 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -414,18 +414,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public GridCacheReturn<CacheObject> removex(K key, V val) throws IgniteCheckedException { + @Override public GridCacheReturn<V> removex(K key, V val) throws IgniteCheckedException { return removexAsync(key, val).get(); } /** {@inheritDoc} */ - @Override public GridCacheReturn<CacheObject> replacex(K key, V oldVal, V newVal) throws IgniteCheckedException { + @Override public GridCacheReturn<V> replacex(K key, V oldVal, V newVal) throws IgniteCheckedException { return replacexAsync(key, oldVal, newVal).get(); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteInternalFuture<GridCacheReturn<CacheObject>> removexAsync(K key, V val) { + @Override public IgniteInternalFuture<GridCacheReturn<V>> removexAsync(K key, V val) { A.notNull(key, "key", val, "val"); if (ctx.portableEnabled()) @@ -436,7 +436,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteInternalFuture<GridCacheReturn<CacheObject>> replacexAsync(K key, V oldVal, V newVal) { + @Override public IgniteInternalFuture<GridCacheReturn<V>> replacexAsync(K key, V oldVal, V newVal) { if (ctx.portableEnabled()) oldVal = (V)ctx.marshalToPortable(oldVal); @@ -917,7 +917,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { // Optimisation: try to resolve value locally and escape 'get future' creation. if (!reload && !forcePrimary) { - Map<K, V> locVals = new HashMap<>(keys.size(), 1.0f); + Map<K, V> locVals = U.newHashMap(keys.size()); boolean success = true; @@ -960,16 +960,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { success = false; } - else { - Object val = v.value(ctx); - - if (ctx.portableEnabled() && deserializePortable) { - key = (K)ctx.unwrapPortableIfNeeded(key, false); - val = ctx.unwrapPortableIfNeeded(val, false); - } - - locVals.put(key, (V)val); - } + else + ctx.addResult(locVals, cacheKey, v, skipVals, false, deserializePortable); } else success = false; @@ -1177,7 +1169,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (retVal == null) retVal = new GridCacheReturn<>(null, true); - res.returnValue(retVal); + res.returnValue(req.operation() == TRANSFORM, retVal); } else // Should remap all keys. @@ -1292,8 +1284,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridCacheOperation op = req.operation(); - Map<KeyCacheObject, EntryProcessorResult> invokeResMap = - op == TRANSFORM ? U.<KeyCacheObject, EntryProcessorResult>newHashMap(size) : null; + Collection<CacheInvokeDirectResult> invokeResults = op == TRANSFORM ? new ArrayList(size) : null; int firstEntryIdx = 0; @@ -1358,7 +1349,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { oldVal); CacheObject updated; - CacheInvokeResult invokeRes = null; + CacheInvokeDirectResult invokeRes = null; try { Object computed = entryProcessor.process(invokeEntry, req.invokeArguments()); @@ -1368,16 +1359,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { updated = ctx.toCacheObject(updatedVal); if (computed != null) - invokeRes = new CacheInvokeResult<>(ctx.unwrapTemporary(computed)); + invokeRes = new CacheInvokeDirectResult(entry.key(), + ctx.toCacheObject(ctx.unwrapTemporary(computed))); } catch (Exception e) { - invokeRes = new CacheInvokeResult<>(e); + invokeRes = new CacheInvokeDirectResult(entry.key(), e); updated = old; } if (invokeRes != null) - invokeResMap.put(entry.key(), invokeRes); + invokeResults.add(invokeRes); if (updated == null) { if (intercept) { @@ -1570,7 +1562,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { updRes.dhtFuture(dhtFut); - updRes.invokeResult(invokeResMap); + updRes.invokeResult(invokeResults); return updRes; } @@ -1668,7 +1660,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean intercept = ctx.config().getInterceptor() != null; - Map<KeyCacheObject, EntryProcessorResult<?>> computedMap = null; + Collection<CacheInvokeDirectResult> computed = null; // Avoid iterator creation. for (int i = 0; i < keys.size(); i++) { @@ -1814,12 +1806,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (updRes.computedResult() != null) { if (retVal == null) { - computedMap = U.newHashMap(keys.size()); + computed = new ArrayList(keys.size()); - retVal = new GridCacheReturn<>((Object)computedMap, updRes.success(), true); + retVal = new GridCacheReturn<>((Object)computed, updRes.success(), false); } - computedMap.put(k, updRes.computedResult()); + computed.add(updRes.computedResult()); } } else { @@ -2710,7 +2702,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { private boolean readersOnly; /** */ - private Map<KeyCacheObject, EntryProcessorResult> invokeRes; + private Collection<CacheInvokeDirectResult> invokeRes; /** * @param entry Entry. @@ -2745,14 +2737,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** * @param invokeRes Result for invoke operation. */ - private void invokeResult(Map<KeyCacheObject, EntryProcessorResult> invokeRes) { + private void invokeResult(Collection<CacheInvokeDirectResult> invokeRes) { this.invokeRes = invokeRes; } /** * @return Result for invoke operation. */ - Map<KeyCacheObject, EntryProcessorResult> invokeResults() { + Collection<CacheInvokeDirectResult> invokeResults() { return invokeRes; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index d36a850..d34901f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -100,7 +100,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem private volatile CachePartialUpdateCheckedException err; /** Operation result. */ - private volatile GridCacheReturn<Object> opRes; + private volatile GridCacheReturn<?> opRes; /** Return value require flag. */ private final boolean retval; @@ -321,11 +321,18 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem } /** {@inheritDoc} */ + @SuppressWarnings("ConstantConditions") @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { assert res == null || res instanceof GridCacheReturn; GridCacheReturn ret = (GridCacheReturn)res; + if (op != TRANSFORM) { + CacheObject val = (CacheObject)ret.value(); + + ret.value(CU.value(val, cctx)); + } + Object retval = res == null ? null : rawRetval ? ret : this.retval ? ret.value() : ret.success(); if (op == TRANSFORM && retval == null) @@ -367,9 +374,17 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem if (res.error() != null) onDone(addFailedKeys(res.failedKeys(), res.error())); else { - GridCacheReturn<Object> opRes0 = opRes = res.returnValue(); + if (op == TRANSFORM) { + if (res.returnValue() != null) + addInvokeResults(res.returnValue()); - onDone(opRes0); + onDone(opRes); + } + else { + GridCacheReturn<?> opRes0 = opRes = res.returnValue(); + + onDone(opRes0); + } } } else { @@ -869,18 +884,36 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem /** * @param ret Result from single node. */ - private synchronized void addInvokeResults(GridCacheReturn<Object> ret) { + @SuppressWarnings("unchecked") + private synchronized void addInvokeResults(GridCacheReturn ret) { assert op == TRANSFORM : op; - assert ret.value() == null || ret.value() instanceof Map : ret.value(); + assert ret.value() == null || ret.value() instanceof Collection : ret.value(); if (ret.value() != null) { + Collection<CacheInvokeDirectResult> results = + (Collection<CacheInvokeDirectResult>)ret.value(); + + Map<Object, CacheInvokeResult> map0 = U.newHashMap(results.size()); + + for (CacheInvokeDirectResult res : results) { + CacheInvokeResult<?> res0 = res.error() == null ? + new CacheInvokeResult<>(CU.value(res.result(), cctx)) : new CacheInvokeResult<>(res.error()); + + map0.put(res.key().value(cctx), res0); + } + if (opRes != null) { - Map<Object, Object> map = (Map<Object, Object>)opRes.value(); + Map<Object, CacheInvokeResult> oldMap = (Map<Object, CacheInvokeResult>)opRes.value(); + + assert oldMap != null; - map.putAll((Map<Object, Object>)ret.value()); + oldMap.putAll(map0); } - else + else { + ret.value(map0); + opRes = ret; + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java index 1d6e896..83ceecb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java @@ -56,12 +56,16 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr /** Serialized error. */ private byte[] errBytes; - /** Return value. */ - @GridDirectTransient - private GridCacheReturn<Object> retVal; + /** */ + private boolean success; - /** Serialized return value. */ - private byte[] retValBytes; + /** */ + @GridToStringInclude + private CacheObject retVal; + + /** */ + @GridDirectCollection(CacheInvokeDirectResult.class) + private Collection<CacheInvokeDirectResult> invokeRes; /** Failed keys. */ @GridToStringInclude @@ -156,15 +160,22 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr /** * @return Return value. */ - public GridCacheReturn<Object> returnValue() { - return retVal; + public GridCacheReturn<?> returnValue() { + return invokeRes != null ? new GridCacheReturn<>(invokeRes, success) : new GridCacheReturn<>(retVal, success); } /** + * @param invoke {@code True} if result for {@code invoke} operation. * @param retVal Return value. */ - public void returnValue(GridCacheReturn<Object> retVal) { - this.retVal = retVal; + @SuppressWarnings("unchecked") + public void returnValue(boolean invoke, GridCacheReturn<Object> retVal) { + success = retVal.success(); + + if (invoke) + invokeRes = (Collection<CacheInvokeDirectResult>)retVal.value(); + else + this.retVal = (CacheObject)retVal.value(); } /** @@ -376,14 +387,19 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr if (err != null) errBytes = ctx.marshaller().marshal(err); - if (retVal != null) - retValBytes = ctx.marshaller().marshal(retVal); - prepareMarshalCacheObjects(failedKeys, ctx); prepareMarshalCacheObjects(remapKeys, ctx); prepareMarshalCacheObjects(nearVals, ctx); + + if (retVal != null) + retVal.prepareMarshal(ctx); + + if (invokeRes != null) { + for (CacheInvokeDirectResult res : invokeRes) + res.prepareMarshal(ctx); + } } /** {@inheritDoc} */ @@ -393,14 +409,19 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr if (errBytes != null) err = ctx.marshaller().unmarshal(errBytes, ldr); - if (retValBytes != null) - retVal = ctx.marshaller().unmarshal(retValBytes, ldr); - finishUnmarshalCacheObjects(failedKeys, ctx, ldr); finishUnmarshalCacheObjects(remapKeys, ctx, ldr); finishUnmarshalCacheObjects(nearVals, ctx, ldr); + + if (retVal != null) + retVal.finishUnmarshal(ctx, ldr); + + if (invokeRes != null) { + for (CacheInvokeDirectResult res : invokeRes) + res.finishUnmarshal(ctx, ldr); + } } /** {@inheritDoc} */ @@ -437,49 +458,61 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr writer.incrementState(); case 6: - if (!writer.writeMessage("nearExpireTimes", nearExpireTimes)) + if (!writer.writeCollection("invokeRes", invokeRes, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 7: - if (!writer.writeCollection("nearSkipIdxs", nearSkipIdxs, MessageCollectionItemType.INT)) + if (!writer.writeMessage("nearExpireTimes", nearExpireTimes)) return false; writer.incrementState(); case 8: - if (!writer.writeMessage("nearTtls", nearTtls)) + if (!writer.writeCollection("nearSkipIdxs", nearSkipIdxs, MessageCollectionItemType.INT)) return false; writer.incrementState(); case 9: - if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("nearTtls", nearTtls)) return false; writer.incrementState(); case 10: - if (!writer.writeCollection("nearValsIdxs", nearValsIdxs, MessageCollectionItemType.INT)) + if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 11: - if (!writer.writeMessage("nearVer", nearVer)) + if (!writer.writeCollection("nearValsIdxs", nearValsIdxs, MessageCollectionItemType.INT)) return false; writer.incrementState(); case 12: - if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("nearVer", nearVer)) return false; writer.incrementState(); case 13: - if (!writer.writeByteArray("retValBytes", retValBytes)) + if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 14: + if (!writer.writeMessage("retVal", retVal)) + return false; + + writer.incrementState(); + + case 15: + if (!writer.writeBoolean("success", success)) return false; writer.incrementState(); @@ -525,7 +558,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); case 6: - nearExpireTimes = reader.readMessage("nearExpireTimes"); + invokeRes = reader.readCollection("invokeRes", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -533,7 +566,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); case 7: - nearSkipIdxs = reader.readCollection("nearSkipIdxs", MessageCollectionItemType.INT); + nearExpireTimes = reader.readMessage("nearExpireTimes"); if (!reader.isLastRead()) return false; @@ -541,7 +574,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); case 8: - nearTtls = reader.readMessage("nearTtls"); + nearSkipIdxs = reader.readCollection("nearSkipIdxs", MessageCollectionItemType.INT); if (!reader.isLastRead()) return false; @@ -549,7 +582,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); case 9: - nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG); + nearTtls = reader.readMessage("nearTtls"); if (!reader.isLastRead()) return false; @@ -557,7 +590,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); case 10: - nearValsIdxs = reader.readCollection("nearValsIdxs", MessageCollectionItemType.INT); + nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -565,7 +598,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); case 11: - nearVer = reader.readMessage("nearVer"); + nearValsIdxs = reader.readCollection("nearValsIdxs", MessageCollectionItemType.INT); if (!reader.isLastRead()) return false; @@ -573,7 +606,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); case 12: - remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG); + nearVer = reader.readMessage("nearVer"); if (!reader.isLastRead()) return false; @@ -581,7 +614,23 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); case 13: - retValBytes = reader.readByteArray("retValBytes"); + remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 14: + retVal = reader.readMessage("retVal"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 15: + success = reader.readBoolean("success"); if (!reader.isLastRead()) return false; @@ -600,7 +649,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 14; + return 16; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 3c09c56..15befd9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -299,14 +299,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte success = false; } - else { - Object val = v.value(ctx); - - if (ctx.portableEnabled() && !skipVals) - val = ctx.unwrapPortableIfNeeded(val, !deserializePortable); - - locVals.put(key, (V)CU.skipValue(val, skipVals)); - } + else + ctx.addResult(locVals, cacheKey, v, skipVals, false, deserializePortable); } else success = false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 9af2767..a30f372 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -482,24 +482,24 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { } /** {@inheritDoc} */ - @Override public GridCacheReturn<CacheObject> removex(K key, V val) throws IgniteCheckedException { + @Override public GridCacheReturn<V> removex(K key, V val) throws IgniteCheckedException { return dht.removex(key, val); } /** {@inheritDoc} */ - @Override public GridCacheReturn<CacheObject> replacex(K key, V oldVal, V newVal) throws IgniteCheckedException { + @Override public GridCacheReturn<V> replacex(K key, V oldVal, V newVal) throws IgniteCheckedException { return dht.replacex(key, oldVal, newVal); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteInternalFuture<GridCacheReturn<CacheObject>> removexAsync(K key, V val) { + @Override public IgniteInternalFuture<GridCacheReturn<V>> removexAsync(K key, V val) { return dht.removexAsync(key, val); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteInternalFuture<GridCacheReturn<CacheObject>> replacexAsync(K key, V oldVal, V newVal) { + @Override public IgniteInternalFuture<GridCacheReturn<V>> replacexAsync(K key, V oldVal, V newVal) { return dht.replacexAsync(key, oldVal, newVal); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index 20e5b87..93c6167 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -335,7 +335,6 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma topVer, subjId, taskName == null ? 0 : taskName.hashCode(), - deserializePortable, expiryPlc, skipVals); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java index cb939f4..4561db3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java @@ -51,7 +51,15 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep /** */ @GridToStringInclude @GridDirectTransient - private LinkedHashMap<KeyCacheObject, Boolean> keys; + private LinkedHashMap<KeyCacheObject, Boolean> keyMap; + + /** */ + @GridDirectCollection(KeyCacheObject.class) + private Collection<KeyCacheObject> keys; + + /** */ + @GridDirectCollection(boolean.class) + private Collection<Boolean> flags; /** Reload flag. */ private boolean reload; @@ -62,11 +70,6 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep /** Skip values flag. Used for {@code containsKey} method. */ private boolean skipVals; - /** */ - @GridToStringExclude - @GridDirectMap(keyType = byte[].class, valueType = boolean.class) - private LinkedHashMap<byte[], Boolean> keyBytes; - /** Topology version. */ private long topVer; @@ -124,7 +127,8 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep this.futId = futId; this.miniId = miniId; this.ver = ver; - this.keys = keys; + this.keys = keys.keySet(); + this.flags = keys.values(); this.readThrough = readThrough; this.reload = reload; this.topVer = topVer; @@ -173,7 +177,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep * @return Keys */ public LinkedHashMap<KeyCacheObject, Boolean> keys() { - return keys; + return keyMap; } /** @@ -221,9 +225,9 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep assert ctx != null; assert !F.isEmpty(keys); + assert keys.size() == flags.size(); - if (keyBytes == null) - keyBytes = marshalBooleanLinkedMap(keys, ctx); + prepareMarshalCacheObjects(keys, ctx); } /** @@ -234,8 +238,20 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - if (keys == null) - keys = unmarshalBooleanLinkedMap(keyBytes, ctx, ldr); + finishUnmarshalCacheObjects(keys, ctx, ldr); + + assert !F.isEmpty(keys); + assert keys.size() == flags.size(); + + if (keyMap == null) { + keyMap = U.newLinkedHashMap(keys.size()); + + Iterator<KeyCacheObject> keysIt = keys.iterator(); + Iterator<Boolean> flagsIt = flags.iterator(); + + while (keysIt.hasNext()) + keyMap.put(keysIt.next(), flagsIt.next()); + } } /** {@inheritDoc} */ @@ -260,60 +276,66 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep writer.incrementState(); case 4: - if (!writer.writeIgniteUuid("futId", futId)) + if (!writer.writeCollection("flags", flags, MessageCollectionItemType.BOOLEAN)) return false; writer.incrementState(); case 5: - if (!writer.writeMap("keyBytes", keyBytes, MessageCollectionItemType.BYTE_ARR, MessageCollectionItemType.BOOLEAN)) + if (!writer.writeIgniteUuid("futId", futId)) return false; writer.incrementState(); case 6: - if (!writer.writeIgniteUuid("miniId", miniId)) + if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 7: - if (!writer.writeBoolean("readThrough", readThrough)) + if (!writer.writeIgniteUuid("miniId", miniId)) return false; writer.incrementState(); case 8: - if (!writer.writeBoolean("reload", reload)) + if (!writer.writeBoolean("readThrough", readThrough)) return false; writer.incrementState(); case 9: - if (!writer.writeBoolean("skipVals", skipVals)) + if (!writer.writeBoolean("reload", reload)) return false; writer.incrementState(); case 10: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeBoolean("skipVals", skipVals)) return false; writer.incrementState(); case 11: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 12: - if (!writer.writeLong("topVer", topVer)) + if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); case 13: + if (!writer.writeLong("topVer", topVer)) + return false; + + writer.incrementState(); + + case 14: if (!writer.writeMessage("ver", ver)) return false; @@ -344,7 +366,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep reader.incrementState(); case 4: - futId = reader.readIgniteUuid("futId"); + flags = reader.readCollection("flags", MessageCollectionItemType.BOOLEAN); if (!reader.isLastRead()) return false; @@ -352,7 +374,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep reader.incrementState(); case 5: - keyBytes = reader.readMap("keyBytes", MessageCollectionItemType.BYTE_ARR, MessageCollectionItemType.BOOLEAN, true); + futId = reader.readIgniteUuid("futId"); if (!reader.isLastRead()) return false; @@ -360,7 +382,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep reader.incrementState(); case 6: - miniId = reader.readIgniteUuid("miniId"); + keys = reader.readCollection("keys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -368,7 +390,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep reader.incrementState(); case 7: - readThrough = reader.readBoolean("readThrough"); + miniId = reader.readIgniteUuid("miniId"); if (!reader.isLastRead()) return false; @@ -376,7 +398,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep reader.incrementState(); case 8: - reload = reader.readBoolean("reload"); + readThrough = reader.readBoolean("readThrough"); if (!reader.isLastRead()) return false; @@ -384,7 +406,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep reader.incrementState(); case 9: - skipVals = reader.readBoolean("skipVals"); + reload = reader.readBoolean("reload"); if (!reader.isLastRead()) return false; @@ -392,7 +414,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep reader.incrementState(); case 10: - subjId = reader.readUuid("subjId"); + skipVals = reader.readBoolean("skipVals"); if (!reader.isLastRead()) return false; @@ -400,7 +422,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep reader.incrementState(); case 11: - taskNameHash = reader.readInt("taskNameHash"); + subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) return false; @@ -408,7 +430,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep reader.incrementState(); case 12: - topVer = reader.readLong("topVer"); + taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) return false; @@ -416,6 +438,14 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep reader.incrementState(); case 13: + topVer = reader.readLong("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 14: ver = reader.readMessage("ver"); if (!reader.isLastRead()) @@ -436,7 +466,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 14; + return 15; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java index afb0ac8..560e1c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java @@ -50,12 +50,9 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe /** Result. */ @GridToStringInclude - @GridDirectTransient + @GridDirectCollection(GridCacheEntryInfo.class) private Collection<GridCacheEntryInfo> entries; - /** */ - private byte[] entriesBytes; - /** Keys to retry due to ownership shift. */ @GridToStringInclude @GridDirectCollection(int.class) @@ -176,9 +173,8 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe super.prepareMarshal(ctx); if (entries != null) { - marshalInfos(entries, ctx); - - entriesBytes = ctx.marshaller().marshal(entries); + for (GridCacheEntryInfo info : entries) + info.marshal(ctx); } if (err != null) @@ -189,10 +185,11 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { super.finishUnmarshal(ctx, ldr); - if (entriesBytes != null) { - entries = ctx.marshaller().unmarshal(entriesBytes, ldr); + GridCacheContext cctx = ctx.cacheContext(cacheId()); - unmarshalInfos(entries, ctx.cacheContext(cacheId()), ldr); + if (entries != null) { + for (GridCacheEntryInfo info : entries) + info.unmarshal(cctx, ldr); } if (errBytes != null) @@ -215,7 +212,7 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe switch (writer.state()) { case 3: - if (!writer.writeByteArray("entriesBytes", entriesBytes)) + if (!writer.writeCollection("entries", entries, MessageCollectionItemType.MSG)) return false; writer.incrementState(); @@ -273,7 +270,7 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe switch (reader.state()) { case 3: - entriesBytes = reader.readByteArray("entriesBytes"); + entries = reader.readCollection("entries", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index 85bf527..8433d11 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -267,7 +267,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public GridCacheReturn<CacheObject> replacex(K key, V oldVal, V newVal) throws IgniteCheckedException { + @Override public GridCacheReturn<V> replacex(K key, V oldVal, V newVal) throws IgniteCheckedException { A.notNull(key, "key", oldVal, "oldVal", newVal, "newVal"); ctx.denyOnLocalRead(); @@ -275,7 +275,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { if (ctx.portableEnabled()) oldVal = (V)ctx.marshalToPortable(oldVal); - return (GridCacheReturn<CacheObject>)updateAllInternal(UPDATE, + return (GridCacheReturn<V>)updateAllInternal(UPDATE, Collections.singleton(key), Collections.singleton(newVal), null, @@ -288,7 +288,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public GridCacheReturn<CacheObject> removex(K key, V val) throws IgniteCheckedException { + @Override public GridCacheReturn<V> removex(K key, V val) throws IgniteCheckedException { A.notNull(key, "key", val, "val"); ctx.denyOnLocalRead(); @@ -296,7 +296,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { if (ctx.portableEnabled()) val = (V)ctx.marshalToPortable(val); - return (GridCacheReturn<CacheObject>)updateAllInternal(DELETE, + return (GridCacheReturn<V>)updateAllInternal(DELETE, Collections.singleton(key), null, null, @@ -309,7 +309,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteInternalFuture<GridCacheReturn<CacheObject>> removexAsync(K key, V val) { + @Override public IgniteInternalFuture<GridCacheReturn<V>> removexAsync(K key, V val) { A.notNull(key, "key", val, "val"); ctx.denyOnLocalRead(); @@ -322,7 +322,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteInternalFuture<GridCacheReturn<CacheObject>> replacexAsync(K key, V oldVal, V newVal) { + @Override public IgniteInternalFuture<GridCacheReturn<V>> replacexAsync(K key, V oldVal, V newVal) { A.notNull(key, "key", oldVal, "oldVal", newVal, "newVal"); ctx.denyOnLocalRead(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java index a460d4d..ac688a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableProcessor.java @@ -28,6 +28,8 @@ import java.nio.*; /** * Portable processor. + * + * TODO IGNITE-51: rename. */ public interface GridPortableProcessor extends GridProcessor { /** {@inheritDoc} */ @@ -86,10 +88,13 @@ public interface GridPortableProcessor extends GridProcessor { public Object marshalToPortable(@Nullable Object obj) throws IgniteException; /** + * TODO IGNITE-51: rename. + * * @param obj Object (portable or not). + * @param cctx Cache context. * @return Detached portable object or original object. */ - public Object detachPortable(@Nullable Object obj); + public Object detachPortable(@Nullable Object obj, GridCacheContext cctx); /** * @return Portable marshaller for client connectivity or {@code null} if it's not http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/832f114e/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java index 5f0b3bf..ce81f31 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/os/GridOsPortableProcessor.java @@ -75,8 +75,13 @@ public class GridOsPortableProcessor extends GridProcessorAdapter implements Gri } /** {@inheritDoc} */ - @Override public Object detachPortable(@Nullable Object obj) { - return obj; + @Override public Object detachPortable(@Nullable Object obj, GridCacheContext cctx) { + if (obj == null) + return obj; + + assert obj instanceof CacheObject : obj; + + return ((CacheObject)obj).prepareForCache(cctx); } /** {@inheritDoc} */ @@ -121,11 +126,11 @@ public class GridOsPortableProcessor extends GridProcessorAdapter implements Gri /** {@inheritDoc} */ @Nullable @Override public KeyCacheObject toCacheKeyObject(@Nullable Object obj) { - return new KeyCacheObjectImpl(obj); + return new UserKeyCacheObjectImpl(obj); } /** {@inheritDoc} */ @Nullable @Override public CacheObject toCacheObject(@Nullable Object obj) { - return new CacheObjectImpl(obj); + return new UserCacheObjectImpl(obj); } }