Repository: incubator-ignite Updated Branches: refs/heads/ignite-51 8a5bc2803 -> 3a4ede2f7
# ignite-51 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3a4ede2f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3a4ede2f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3a4ede2f Branch: refs/heads/ignite-51 Commit: 3a4ede2f7f307aaf4eb83bb0afc1d7efa374b310 Parents: 8a5bc28 Author: sboikov <sboi...@gridgain.com> Authored: Fri Mar 6 18:31:58 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Mar 6 18:36:23 2015 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 5 + .../cache/CacheInvokeDirectResult.java | 11 +- .../processors/cache/GridCacheAdapter.java | 95 ++---- .../processors/cache/GridCacheMapEntry.java | 7 +- .../cache/GridCacheProjectionImpl.java | 4 +- .../processors/cache/GridCacheReturn.java | 313 ++++++++++++++++--- .../cache/GridCacheUpdateAtomicResult.java | 7 +- .../cache/distributed/dht/GridDhtTxLocal.java | 2 + .../distributed/dht/GridDhtTxLocalAdapter.java | 2 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 7 +- .../dht/atomic/GridDhtAtomicCache.java | 59 ++-- .../dht/atomic/GridNearAtomicUpdateFuture.java | 38 +-- .../atomic/GridNearAtomicUpdateResponse.java | 105 ++----- .../cache/distributed/near/GridNearTxLocal.java | 4 +- .../near/GridNearTxPrepareResponse.java | 26 +- .../processors/cache/local/GridLocalTx.java | 2 + .../local/atomic/GridLocalAtomicCache.java | 4 +- .../cache/transactions/IgniteInternalTx.java | 6 +- .../cache/transactions/IgniteTxAdapter.java | 13 +- .../transactions/IgniteTxLocalAdapter.java | 54 ++-- .../cache/transactions/IgniteTxLocalEx.java | 2 +- 21 files changed, 458 insertions(+), 308 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a4ede2f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index f7f0fc2..d702540 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -489,6 +489,11 @@ public class GridIoMessageFactory implements MessageFactory { break; + case 88: + msg = new GridCacheReturn(); + + break; + case 89: msg = new CacheObjectImpl(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a4ede2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java index 3c5c5c7..03a2d96 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java @@ -92,7 +92,10 @@ public class CacheInvokeDirectResult implements Message { return err; } - /** {@inheritDoc} */ + /** + * @param ctx Cache context. + * @throws IgniteCheckedException If failed. + */ public void prepareMarshal(GridCacheContext ctx) throws IgniteCheckedException { key.prepareMarshal(ctx.cacheObjectContext()); @@ -103,7 +106,11 @@ public class CacheInvokeDirectResult implements Message { res.prepareMarshal(ctx.cacheObjectContext()); } - /** {@inheritDoc} */ + /** + * @param ctx Cache context. + * @param ldr Class loader. + * @throws IgniteCheckedException If failed. + */ public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException { key.finishUnmarshal(ctx.cacheObjectContext(), ldr); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a4ede2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index d469a20..9c05499 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -96,7 +96,16 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, }; /** {@link GridCacheReturn}-to-value conversion. */ - private IgniteClosure RET2VAL; + private static final IgniteClosure RET2VAL = + new CX1<IgniteInternalFuture<GridCacheReturn<Object>>, Object>() { + @Nullable @Override public Object applyx(IgniteInternalFuture<GridCacheReturn<Object>> fut) throws IgniteCheckedException { + return fut.get().value(); + } + + @Override public String toString() { + return "Cache return value to value converter."; + } + }; /** {@link GridCacheReturn}-to-success conversion. */ private static final IgniteClosure RET2FLAG = @@ -212,18 +221,6 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, protected GridCacheAdapter(final GridCacheContext<K, V> ctx, GridCacheConcurrentMap map) { assert ctx != null; - RET2VAL = new CX1<IgniteInternalFuture<GridCacheReturn<CacheObject>>, Object>() { - @Nullable @Override public Object applyx(IgniteInternalFuture<GridCacheReturn<CacheObject>> fut) - throws IgniteCheckedException - { - return CU.value(fut.get().value(), ctx, true); - } - - @Override public String toString() { - return "Cache return value to value converter."; - } - }; - this.ctx = ctx; gridCfg = ctx.gridConfig(); @@ -2462,9 +2459,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, V prevVal = ctx.cloneOnFlag(syncOp(new SyncOp<V>(true) { @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - CacheObject prev = tx.putAllAsync(ctx, F.t(key, val), true, cached, ttl, filter).get().value(); - - return CU.value(prev, ctx, false); + return (V)tx.putAllAsync(ctx, F.t(key, val), true, cached, ttl, filter).get().value(); } @Override public String toString() { @@ -2882,9 +2877,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, return ctx.cloneOnFlag(syncOp(new SyncOp<V>(true) { @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - CacheObject prev = tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray()).get().value(); - - return CU.value(prev, ctx, true); + return (V)tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray()).get().value(); } @Override public String toString() { @@ -3001,9 +2994,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, return ctx.cloneOnFlag(syncOp(new SyncOp<V>(true) { @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - CacheObject prev = tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.hasValArray()).get().value(); - - return CU.value(prev, ctx, true); + return (V)tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.hasValArray()).get().value(); } @Override public String toString() { @@ -3239,14 +3230,12 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, V prevVal = ctx.cloneOnFlag(syncOp(new SyncOp<V>(true) { @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - CacheObject ret = tx.removeAllAsync(ctx, Collections.singletonList(key), entry, true, filter).get().value(); - - V retVal = CU.value(ret, ctx, true); + V ret = (V)tx.removeAllAsync(ctx, Collections.singletonList(key), entry, true, filter).get().value(); if (ctx.config().getInterceptor() != null) - return (V)ctx.config().getInterceptor().onBeforeRemove(new CacheEntryImpl(key, retVal)).get2(); + return (V)ctx.config().getInterceptor().onBeforeRemove(new CacheEntryImpl(key, ret)).get2(); - return retVal; + return ret; } @Override public String toString() { @@ -3466,17 +3455,11 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, if (ctx.deploymentEnabled()) ctx.deploy().registerClass(val); - GridCacheReturn ret = tx.removeAllAsync(ctx, + return (GridCacheReturn)tx.removeAllAsync(ctx, Collections.singletonList(key), null, true, ctx.equalsValArray(val)).get(); - - CacheObject val = (CacheObject)ret.value(); - - ret.value(CU.value(val, ctx, true)); - - return ret; } @Override public String toString() { @@ -3544,14 +3527,12 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, if (ctx.deploymentEnabled()) ctx.deploy().registerClass(oldVal); - GridCacheReturn ret = - tx.putAllAsync(ctx, F.t(key, newVal), true, null, -1, ctx.equalsValArray(oldVal)).get(); - - CacheObject val = (CacheObject)ret.value(); - - ret.value(CU.value(val, ctx, true)); - - return ret; + return (GridCacheReturn)tx.putAllAsync(ctx, + F.t(key, newVal), + true, + null, + -1, + ctx.equalsValArray(oldVal)).get(); } @Override public String toString() { @@ -3580,24 +3561,13 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, return new GridFinishedFuture<>(ctx.kernalContext(), e); } - IgniteInternalFuture<GridCacheReturn<CacheObject>> fut = tx.removeAllAsync(ctx, + IgniteInternalFuture<GridCacheReturn<V>> fut = (IgniteInternalFuture)tx.removeAllAsync(ctx, Collections.singletonList(key), null, true, ctx.equalsValArray(val)); - return fut.chain(new CX1<IgniteInternalFuture<GridCacheReturn<CacheObject>>, GridCacheReturn<V>>() { - @Override public GridCacheReturn<V> applyx(IgniteInternalFuture<GridCacheReturn<CacheObject>> fut) - throws IgniteCheckedException { - GridCacheReturn ret = fut.get(); - - CacheObject val = (CacheObject)ret.value(); - - ret.value(CU.value(val, ctx, true)); - - return ret; - } - }); + return fut; } @Override public String toString() { @@ -3629,25 +3599,14 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, return new GridFinishedFuture<>(ctx.kernalContext(), e); } - IgniteInternalFuture<GridCacheReturn<CacheObject>> fut = tx.putAllAsync(ctx, + IgniteInternalFuture<GridCacheReturn<V>> fut = (IgniteInternalFuture)tx.putAllAsync(ctx, F.t(key, newVal), true, null, -1, ctx.equalsValArray(oldVal)); - return fut.chain(new CX1<IgniteInternalFuture<GridCacheReturn<CacheObject>>, GridCacheReturn<V>>() { - @Override public GridCacheReturn<V> applyx(IgniteInternalFuture<GridCacheReturn<CacheObject>> fut) - throws IgniteCheckedException { - GridCacheReturn ret = fut.get(); - - CacheObject val = (CacheObject)ret.value(); - - ret.value(CU.value(val, ctx, true)); - - return ret; - } - }); + return fut; } @Override public String toString() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a4ede2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 972a7d9..b20504b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1623,7 +1623,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { GridCacheVersionConflictContext<?, ?> conflictCtx = null; - CacheInvokeDirectResult invokeRes = null; + IgniteBiTuple<Object, Exception> invokeRes = null; // System TTL/ET which may have special values. long newSysTtl; @@ -1872,11 +1872,10 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx { key0 = entry.key(); if (computed != null) - invokeRes = new CacheInvokeDirectResult(key, - cctx.toCacheObject(cctx.unwrapTemporary(computed))); + invokeRes = new IgniteBiTuple(cctx.unwrapTemporary(computed), null); } catch (Exception e) { - invokeRes = new CacheInvokeDirectResult(key, e); + invokeRes = new IgniteBiTuple(null, e); updated = oldVal; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a4ede2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java index a91488f..8afe378 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java @@ -996,7 +996,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V // Check k-v predicate first. if (!isAll(key, newVal)) - return new GridFinishedFuture<>(cctx.kernalContext(), new GridCacheReturn<V>(false)); + return new GridFinishedFuture<>(cctx.kernalContext(), new GridCacheReturn<V>(true, false)); return cache.replacexAsync(key, oldVal, newVal); } @@ -1014,7 +1014,7 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K, V /** {@inheritDoc} */ @Override public IgniteInternalFuture<GridCacheReturn<V>> removexAsync(K key, V val) { return !isAll(key, val) ? new GridFinishedFuture<>(cctx.kernalContext(), - new GridCacheReturn<V>(false)) : cache.removexAsync(key, val); + new GridCacheReturn<V>(true, false)) : cache.removexAsync(key, val); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a4ede2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java index 2579c32..b536a09 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java @@ -17,29 +17,36 @@ package org.apache.ignite.internal.processors.cache; +import org.apache.ignite.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.plugin.extensions.communication.*; import org.jetbrains.annotations.*; import javax.cache.processor.*; import java.io.*; +import java.nio.*; import java.util.*; /** * Return value for cases where both, value and success flag need to be returned. */ -public class GridCacheReturn<V> implements Externalizable, OptimizedMarshallable { +public class GridCacheReturn<V> implements Externalizable, Message { /** */ private static final long serialVersionUID = 0L; - /** */ - @SuppressWarnings({"NonConstantFieldWithUpperCaseName", "JavaAbbreviationUsage", "UnusedDeclaration"}) - private static Object GG_CLASS_ID; - /** Value. */ @GridToStringInclude - private volatile V v; + @GridDirectTransient + private volatile Object v; + + /** */ + private CacheObject cacheObj; + + /** */ + @GridDirectCollection(CacheInvokeDirectResult.class) + private Collection<CacheInvokeDirectResult> invokeResCol; /** Success flag. */ private volatile boolean success; @@ -47,49 +54,63 @@ public class GridCacheReturn<V> implements Externalizable, OptimizedMarshallable /** */ private volatile boolean invokeRes; + /** Local result flag, if non local then do not need unwrap cache objects. */ + @GridDirectTransient + private transient boolean loc; + + /** */ + private int cacheId; + /** * Empty constructor. */ public GridCacheReturn() { - // No-op. + loc = true; } /** - * - * @param success Success flag. + * @param loc {@code True} if created on the node initiated cache operation. */ - public GridCacheReturn(boolean success) { - this.success = success; + public GridCacheReturn(boolean loc) { + this.loc = loc; } /** - * - * @param v Value. + * @param loc {@code True} if created on the node initiated cache operation. * @param success Success flag. */ - public GridCacheReturn(V v, boolean success) { - this.v = v; + public GridCacheReturn(boolean loc, boolean success) { + this.loc = loc; this.success = success; } /** - * + * @param cctx Cache context. + * @param loc {@code True} if created on the node initiated cache operation. * @param v Value. * @param success Success flag. */ - public GridCacheReturn(V v, boolean success, boolean invokeRes) { - assert !invokeRes || v instanceof Map; - - this.v = v; + public GridCacheReturn(GridCacheContext cctx, boolean loc, Object v, boolean success) { + this.loc = loc; this.success = success; - this.invokeRes = invokeRes; + + if (v != null) { + if (v instanceof CacheObject) + initValue(cctx, (CacheObject)v); + else { + assert loc; + + this.v = v; + } + } } /** * @return Value. */ + @SuppressWarnings("unchecked") @Nullable public V value() { - return v; + return (V)v; } /** @@ -116,11 +137,12 @@ public class GridCacheReturn<V> implements Externalizable, OptimizedMarshallable } /** + * @param cctx Cache context. * @param v Value. * @return This instance for chaining. */ - public GridCacheReturn<V> value(V v) { - this.v = v; + public GridCacheReturn value(GridCacheContext cctx, CacheObject v) { + initValue(cctx, v); return this; } @@ -133,18 +155,36 @@ public class GridCacheReturn<V> implements Externalizable, OptimizedMarshallable } /** - * @param v Value to set. + * @param cctx Cache context. + * @param cacheObj Value to set. * @param success Success flag to set. * @return This instance for chaining. */ - public GridCacheReturn<V> set(@Nullable V v, boolean success) { - this.v = v; + public GridCacheReturn set(GridCacheContext cctx, @Nullable CacheObject cacheObj, boolean success) { this.success = success; + initValue(cctx, cacheObj); + return this; } /** + * @param cctx Cache context. + * @param cacheObj Cache object. + */ + private void initValue(GridCacheContext cctx, @Nullable CacheObject cacheObj) { + if (loc) + v = CU.value(cacheObj, cctx, true); + else { + assert cacheId == 0 || cacheId == cctx.cacheId(); + + cacheId = cctx.cacheId(); + + this.cacheObj = cacheObj; + } + } + + /** * @param success Success flag. * @return This instance for chaining. */ @@ -155,34 +195,69 @@ public class GridCacheReturn<V> implements Externalizable, OptimizedMarshallable } /** + * @param cctx Context. * @param key Key. + * @param key0 Key value. * @param res Result. + * @param err Error. */ @SuppressWarnings("unchecked") - public synchronized void addEntryProcessResult(Object key, EntryProcessorResult<?> res) { + public synchronized void addEntryProcessResult( + GridCacheContext cctx, + KeyCacheObject key, + @Nullable Object key0, + @Nullable Object res, + @Nullable Exception err) { assert v == null || v instanceof Map : v; assert key != null; - assert res != null; + assert res != null || err != null; invokeRes = true; - HashMap<Object, EntryProcessorResult> resMap = (HashMap<Object, EntryProcessorResult>)v; + if (loc) { + HashMap<Object, EntryProcessorResult> resMap = (HashMap<Object, EntryProcessorResult>)v; - if (resMap == null) { - resMap = new HashMap<>(); + if (resMap == null) { + resMap = new HashMap<>(); + + v = resMap; + } + + CacheInvokeResult res0 = err == null ? new CacheInvokeResult(res) : new CacheInvokeResult(err); - v = (V)resMap; + resMap.put(key0 != null ? key0 : CU.value(key, cctx, true), res0); } + else { + assert v == null; + assert cacheId == 0 || cacheId == cctx.cacheId(); - resMap.put(key, res); + cacheId = cctx.cacheId(); + + if (invokeResCol == null) + invokeResCol = new ArrayList<>(); + + CacheInvokeDirectResult res0 = err == null ? + new CacheInvokeDirectResult(key, cctx.toCacheObject(res)) : new CacheInvokeDirectResult(key, err); + + invokeResCol.add(res0); + } + } + + /** + * @return Cache ID. + */ + public int cacheId() { + return cacheId; } /** * @param other Other result to merge with. */ - public synchronized void mergeEntryProcessResults(GridCacheReturn<Object> other) { + @SuppressWarnings("unchecked") + public synchronized void mergeEntryProcessResults(GridCacheReturn other) { assert invokeRes || v == null : "Invalid state to merge: " + this; assert other.invokeRes; + assert loc == other.loc : loc; if (other.v == null) return; @@ -194,30 +269,178 @@ public class GridCacheReturn<V> implements Externalizable, OptimizedMarshallable if (resMap == null) { resMap = new HashMap<>(); - v = (V)resMap; + v = resMap; } resMap.putAll((Map<Object, EntryProcessorResult>)other.v); } + /** + * @param ctx Cache context. + * @throws IgniteCheckedException If failed. + */ + public void prepareMarshal(GridCacheContext ctx) throws IgniteCheckedException { + assert !loc; + + if (cacheObj != null) + cacheObj.prepareMarshal(ctx.cacheObjectContext()); + + if (invokeRes && invokeResCol != null) { + for (CacheInvokeDirectResult res : invokeResCol) + res.prepareMarshal(ctx); + } + } + + /** + * @param ctx Cache context. + * @param ldr Class loader. + * @throws IgniteCheckedException If failed. + */ + public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException { + loc = true; + + if (cacheObj != null) { + cacheObj.finishUnmarshal(ctx.cacheObjectContext(), ldr); + + v = cacheObj.value(ctx.cacheObjectContext(), false); + } + + if (invokeRes && invokeResCol != null) { + for (CacheInvokeDirectResult res : invokeResCol) + res.finishUnmarshal(ctx, ldr); + + Map<Object, CacheInvokeResult> map0 = U.newHashMap(invokeResCol.size()); + + for (CacheInvokeDirectResult res : invokeResCol) { + CacheInvokeResult<?> res0 = res.error() == null ? + new CacheInvokeResult<>(CU.value(res.result(), ctx, false)) : new CacheInvokeResult<>(res.error()); + + map0.put(res.key().value(ctx.cacheObjectContext(), false), res0); + } + + v = map0; + } + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 88; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeInt("cacheId", cacheId)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeMessage("cacheObj", cacheObj)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeBoolean("invokeRes", invokeRes)) + return false; + + writer.incrementState(); + + case 3: + if (!writer.writeCollection("invokeResCol", invokeResCol, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeBoolean("success", success)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + cacheId = reader.readInt("cacheId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + cacheObj = reader.readMessage("cacheObj"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + invokeRes = reader.readBoolean("invokeRes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 3: + invokeResCol = reader.readCollection("invokeResCol", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + success = reader.readBoolean("success"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return true; + } + /** {@inheritDoc} */ - @Override public Object ggClassId() { - return GG_CLASS_ID; + @Override public byte fieldsCount() { + return 5; } /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeBoolean(success); - out.writeObject(v); - out.writeBoolean(invokeRes); + assert false; } /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - success = in.readBoolean(); - v = (V)in.readObject(); - invokeRes = in.readBoolean(); + assert false; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a4ede2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java index 45aeae5..c6bcccf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; import javax.cache.processor.*; @@ -57,7 +58,7 @@ public class GridCacheUpdateAtomicResult { private final boolean sndToDht; /** Value computed by entry processor. */ - private CacheInvokeDirectResult res; + private IgniteBiTuple<Object, Exception> res; /** * Constructor. @@ -75,7 +76,7 @@ public class GridCacheUpdateAtomicResult { public GridCacheUpdateAtomicResult(boolean success, @Nullable CacheObject oldVal, @Nullable CacheObject newVal, - @Nullable CacheInvokeDirectResult res, + @Nullable IgniteBiTuple<Object, Exception> res, long newTtl, long conflictExpireTime, @Nullable GridCacheVersion rmvVer, @@ -95,7 +96,7 @@ public class GridCacheUpdateAtomicResult { /** * @return Value computed by the {@link EntryProcessor}. */ - @Nullable public CacheInvokeDirectResult computedResult() { + @Nullable public IgniteBiTuple<Object, Exception> computedResult() { return res; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a4ede2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index a5e98ca..fd989bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -149,6 +149,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa threadId = nearThreadId; assert !F.eq(xidVer, nearXidVer); + + initResult(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a4ede2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index bd11c0e..ce35b2e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -515,7 +515,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { return new GridFinishedFuture<>(cctx.kernalContext(), e); } - final GridCacheReturn<Object> ret = new GridCacheReturn<>(false); + final GridCacheReturn<Object> ret = new GridCacheReturn<>(localResult(), false); if (F.isEmpty(entries)) return new GridFinishedFuture<>(cctx.kernalContext(), ret); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a4ede2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 3d29e21..bb12cd8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -272,7 +272,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu * */ private void onEntriesLocked() { - ret = new GridCacheReturn<>(null, true); + ret = new GridCacheReturn<>(null, tx.localResult(), null, true); for (IgniteTxEntry txEntry : tx.optimisticLockEntries()) { GridCacheContext cacheCtx = txEntry.context(); @@ -337,13 +337,12 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu } if (err != null || procRes != null) - ret.addEntryProcessResult(key.value(cacheCtx.cacheObjectContext(), false), - err == null ? new CacheInvokeResult<>(procRes) : new CacheInvokeResult<>(err)); + ret.addEntryProcessResult(txEntry.context(), key, null, procRes, err); else ret.invokeResult(true); } else - ret.value(val); + ret.value(cacheCtx, val); } if (hasFilters && !cacheCtx.isAll(cached, txEntry.filters())) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a4ede2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index b49dfd8..a53638e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1128,7 +1128,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { dhtFut = updRes.dhtFuture(); if (req.operation() == TRANSFORM) - retVal = new GridCacheReturn<>((Object)updRes.invokeResults(), true); + retVal = updRes.invokeResults(); } else { UpdateSingleResult<K, V> updRes = updateSingle(node, @@ -1149,9 +1149,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } if (retVal == null) - retVal = new GridCacheReturn<>(null, true); + retVal = new GridCacheReturn<>(ctx, node.isLocal(), null, true); - res.returnValue(req.operation() == TRANSFORM, retVal); + res.returnValue(retVal); } else // Should remap all keys. @@ -1268,7 +1268,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridCacheOperation op = req.operation(); - Collection<CacheInvokeDirectResult> invokeResults = op == TRANSFORM ? new ArrayList(size) : null; + GridCacheReturn invokeRes = null; int firstEntryIdx = 0; @@ -1329,7 +1329,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(ctx, entry.key(), old); CacheObject updated; - CacheInvokeDirectResult invokeRes = null; try { Object computed = entryProcessor.process(invokeEntry, req.invokeArguments()); @@ -1338,19 +1337,22 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { updated = ctx.toCacheObject(updatedVal); - if (computed != null) - invokeRes = new CacheInvokeDirectResult(entry.key(), - ctx.toCacheObject(ctx.unwrapTemporary(computed))); + if (computed != null) { + if (invokeRes == null) + invokeRes = new GridCacheReturn(node.isLocal()); + + invokeRes.addEntryProcessResult(ctx, entry.key(), invokeEntry.key(), computed, null); + } } catch (Exception e) { - invokeRes = new CacheInvokeDirectResult(entry.key(), e); + if (invokeRes == null) + invokeRes = new GridCacheReturn(node.isLocal()); + + invokeRes.addEntryProcessResult(ctx, entry.key(), invokeEntry.key(), null, e); updated = old; } - if (invokeRes != null) - invokeResults.add(invokeRes); - if (updated == null) { if (intercept) { CacheLazyEntry e = new CacheLazyEntry(ctx, entry.key(), invokeEntry.key(), old, oldVal); @@ -1554,7 +1556,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { updRes.dhtFuture(dhtFut); - updRes.invokeResult(invokeResults); + updRes.invokeResult(invokeRes); return updRes; } @@ -1652,8 +1654,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean intercept = ctx.config().getInterceptor() != null; - Collection<CacheInvokeDirectResult> computed = null; - // Avoid iterator creation. for (int i = 0; i < keys.size(); i++) { KeyCacheObject k = keys.get(i); @@ -1795,22 +1795,28 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (op == TRANSFORM) { assert !req.returnValue(); - if (updRes.computedResult() != null) { - if (retVal == null) { - computed = new ArrayList(keys.size()); + IgniteBiTuple<Object, Exception> compRes = updRes.computedResult(); - retVal = new GridCacheReturn<>((Object)computed, updRes.success(), false); - } + if (compRes != null && (compRes.get1() != null || compRes.get2() != null)) { + if (retVal == null) + retVal = new GridCacheReturn<>(node.isLocal()); - computed.add(updRes.computedResult()); + retVal.addEntryProcessResult(ctx, + k, + null, + compRes.get1(), + compRes.get2()); } } else { // Create only once. if (retVal == null) { - Object ret = updRes.oldValue(); + CacheObject ret = updRes.oldValue(); - retVal = new GridCacheReturn<>(req.returnValue() ? ret : null, updRes.success()); + retVal = new GridCacheReturn<>(ctx, + node.isLocal(), + req.returnValue() ? ret : null, + updRes.success()); } } } @@ -1828,6 +1834,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param entries Entries to update. * @param ver Version to set. * @param node Originating node. + * @param writeVals Write values. * @param putMap Values to put. * @param rmvKeys Keys to remove. * @param entryProcessorMap Entry processors. @@ -2697,7 +2704,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { private boolean readersOnly; /** */ - private Collection<CacheInvokeDirectResult> invokeRes; + private GridCacheReturn invokeRes; /** * @param entry Entry. @@ -2732,14 +2739,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** * @param invokeRes Result for invoke operation. */ - private void invokeResult(Collection<CacheInvokeDirectResult> invokeRes) { + private void invokeResult(GridCacheReturn invokeRes) { this.invokeRes = invokeRes; } /** * @return Result for invoke operation. */ - Collection<CacheInvokeDirectResult> invokeResults() { + GridCacheReturn invokeResults() { return invokeRes; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a4ede2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index a53a730..b9b4d39 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -359,12 +359,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem GridCacheReturn ret = res.returnValue(); - if (op != TRANSFORM && ret != null) { - CacheObject val = (CacheObject)ret.value(); - - ret.value(CU.value(val, cctx, false)); - } - Boolean single0 = single; if (single0 != null && single0) { @@ -811,7 +805,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); if (syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) - onDone(new GridCacheReturn(null, true)); + onDone(new GridCacheReturn(cctx, true, null, true)); } catch (IgniteCheckedException e) { onDone(addFailedKeys(req.keys(), e)); @@ -857,7 +851,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem if (syncMode == FULL_ASYNC) // In FULL_ASYNC mode always return (null, true). - opRes = new GridCacheReturn<>(null, true); + opRes = new GridCacheReturn(cctx, true, null, true); if (locUpdate != null) { cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate, @@ -889,33 +883,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem @SuppressWarnings("unchecked") private synchronized void addInvokeResults(GridCacheReturn ret) { assert op == TRANSFORM : op; - assert ret.value() == null || ret.value() instanceof Collection : ret.value(); + assert ret.value() == null || ret.value() instanceof Map : ret.value(); if (ret.value() != null) { - Collection<CacheInvokeDirectResult> results = - (Collection<CacheInvokeDirectResult>)ret.value(); - - Map<Object, CacheInvokeResult> map0 = U.newHashMap(results.size()); - - for (CacheInvokeDirectResult res : results) { - CacheInvokeResult<?> res0 = res.error() == null ? - new CacheInvokeResult<>(CU.value(res.result(), cctx, false)) : new CacheInvokeResult<>(res.error()); - - map0.put(res.key().value(cctx.cacheObjectContext(), false), res0); - } - - if (opRes != null) { - Map<Object, CacheInvokeResult> oldMap = (Map<Object, CacheInvokeResult>)opRes.value(); - - assert oldMap != null; - - oldMap.putAll(map0); - } - else { - ret.value(map0); - + if (opRes != null) + opRes.mergeEntryProcessResults(ret); + else opRes = ret; - } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a4ede2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java index 5dfc075..aa8955d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java @@ -56,16 +56,9 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr /** Serialized error. */ private byte[] errBytes; - /** */ - private boolean success; - - /** */ + /** Return value. */ @GridToStringInclude - private CacheObject retVal; - - /** */ - @GridDirectCollection(CacheInvokeDirectResult.class) - private Collection<CacheInvokeDirectResult> invokeRes; + private GridCacheReturn ret; /** Failed keys. */ @GridToStringInclude @@ -161,21 +154,15 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr * @return Return value. */ public GridCacheReturn<?> returnValue() { - return invokeRes != null ? new GridCacheReturn<>(invokeRes, success) : new GridCacheReturn<>(retVal, success); + return ret; } /** - * @param invoke {@code True} if result for {@code invoke} operation. - * @param retVal Return value. + * @param ret Return value. */ @SuppressWarnings("unchecked") - public void returnValue(boolean invoke, GridCacheReturn<Object> retVal) { - success = retVal.success(); - - if (invoke) - invokeRes = (Collection<CacheInvokeDirectResult>)retVal.value(); - else - this.retVal = (CacheObject)retVal.value(); + public void returnValue(GridCacheReturn ret) { + this.ret = ret; } /** @@ -395,13 +382,8 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr prepareMarshalCacheObjects(nearVals, cctx); - if (retVal != null) - retVal.prepareMarshal(cctx.cacheObjectContext()); - - if (invokeRes != null) { - for (CacheInvokeDirectResult res : invokeRes) - res.prepareMarshal(cctx); - } + if (ret != null) + ret.prepareMarshal(cctx); } /** {@inheritDoc} */ @@ -419,13 +401,8 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr finishUnmarshalCacheObjects(nearVals, cctx, ldr); - if (retVal != null) - retVal.finishUnmarshal(cctx.cacheObjectContext(), ldr); - - if (invokeRes != null) { - for (CacheInvokeDirectResult res : invokeRes) - res.finishUnmarshal(cctx, ldr); - } + if (ret != null) + ret.finishUnmarshal(cctx, ldr); } /** {@inheritDoc} */ @@ -462,61 +439,49 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr writer.incrementState(); case 6: - if (!writer.writeCollection("invokeRes", invokeRes, MessageCollectionItemType.MSG)) - return false; - - writer.incrementState(); - - case 7: if (!writer.writeMessage("nearExpireTimes", nearExpireTimes)) return false; writer.incrementState(); - case 8: + case 7: if (!writer.writeCollection("nearSkipIdxs", nearSkipIdxs, MessageCollectionItemType.INT)) return false; writer.incrementState(); - case 9: + case 8: if (!writer.writeMessage("nearTtls", nearTtls)) return false; writer.incrementState(); - case 10: + case 9: if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 11: + case 10: if (!writer.writeCollection("nearValsIdxs", nearValsIdxs, MessageCollectionItemType.INT)) return false; writer.incrementState(); - case 12: + case 11: if (!writer.writeMessage("nearVer", nearVer)) return false; writer.incrementState(); - case 13: + case 12: if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 14: - if (!writer.writeMessage("retVal", retVal)) - return false; - - writer.incrementState(); - - case 15: - if (!writer.writeBoolean("success", success)) + case 13: + if (!writer.writeMessage("ret", ret)) return false; writer.incrementState(); @@ -562,14 +527,6 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); case 6: - invokeRes = reader.readCollection("invokeRes", MessageCollectionItemType.MSG); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 7: nearExpireTimes = reader.readMessage("nearExpireTimes"); if (!reader.isLastRead()) @@ -577,7 +534,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); - case 8: + case 7: nearSkipIdxs = reader.readCollection("nearSkipIdxs", MessageCollectionItemType.INT); if (!reader.isLastRead()) @@ -585,7 +542,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); - case 9: + case 8: nearTtls = reader.readMessage("nearTtls"); if (!reader.isLastRead()) @@ -593,7 +550,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); - case 10: + case 9: nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -601,7 +558,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); - case 11: + case 10: nearValsIdxs = reader.readCollection("nearValsIdxs", MessageCollectionItemType.INT); if (!reader.isLastRead()) @@ -609,7 +566,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); - case 12: + case 11: nearVer = reader.readMessage("nearVer"); if (!reader.isLastRead()) @@ -617,7 +574,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); - case 13: + case 12: remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -625,16 +582,8 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr reader.incrementState(); - case 14: - retVal = reader.readMessage("retVal"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 15: - success = reader.readBoolean("success"); + case 13: + ret = reader.readMessage("ret"); if (!reader.isLastRead()) return false; @@ -653,7 +602,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 16; + return 14; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a4ede2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 8057922..2da3f9d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -140,6 +140,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { partLock, subjId, taskNameHash); + + initResult(); } /** {@inheritDoc} */ @@ -1078,7 +1080,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { return new GridFinishedFuture<>(cctx.kernalContext(), e); } - final GridCacheReturn<Object> ret = new GridCacheReturn<>(false); + final GridCacheReturn<Object> ret = new GridCacheReturn<>(localResult(), false); if (F.isEmpty(keys)) return new GridFinishedFuture<>(cctx.kernalContext(), ret); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a4ede2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java index c7460d9..31b5f4a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java @@ -74,12 +74,8 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse private Collection<CacheVersionedValue> ownedValVals; /** Cache return value. */ - @GridDirectTransient private GridCacheReturn<Object> retVal; - /** Return value bytes. */ - private byte[] retValBytes; - /** Filter failed keys. */ @GridDirectCollection(IgniteTxKey.class) private Collection<IgniteTxKey> filterFailedKeys; @@ -244,8 +240,13 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse } } - if (retValBytes == null && retVal != null) - retValBytes = ctx.marshaller().marshal(retVal); + if (retVal != null && retVal.cacheId() != 0) { + GridCacheContext cctx = ctx.cacheContext(retVal.cacheId()); + + assert cctx != null : retVal.cacheId(); + + retVal.prepareMarshal(cctx); + } if (filterFailedKeys != null) { for (IgniteTxKey key :filterFailedKeys) { @@ -284,8 +285,13 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse } } - if (retVal == null && retValBytes != null) - retVal = ctx.marshaller().unmarshal(retValBytes, ldr); + if (retVal != null && retVal.cacheId() != 0) { + GridCacheContext cctx = ctx.cacheContext(retVal.cacheId()); + + assert cctx != null : retVal.cacheId(); + + retVal.finishUnmarshal(cctx, ldr); + } if (filterFailedKeys != null) { for (IgniteTxKey key :filterFailedKeys) { @@ -360,7 +366,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse writer.incrementState(); case 18: - if (!writer.writeByteArray("retValBytes", retValBytes)) + if (!writer.writeMessage("retVal", retVal)) return false; writer.incrementState(); @@ -446,7 +452,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse reader.incrementState(); case 18: - retValBytes = reader.readByteArray("retValBytes"); + retVal = reader.readMessage("retVal"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a4ede2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTx.java index 5c94223..35b0a12 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTx.java @@ -71,6 +71,8 @@ class GridLocalTx extends IgniteTxLocalAdapter { ) { super(ctx, ctx.versions().next(), implicit, implicitSingle, false, concurrency, isolation, timeout, false, true, txSize, null, false, subjId, taskNameHash); + + initResult(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a4ede2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index dee9447..792504d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -1034,8 +1034,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { if (err != null) throw err; - Object ret = res == null ? null : rawRetval ? - new GridCacheReturn<>(res.get2(), res.get1()) : (retval || op == TRANSFORM) ? res.get2() : res.get1(); + Object ret = res == null ? null : rawRetval ? new GridCacheReturn<>(ctx, true, res.get2(), res.get1()) : + (retval || op == TRANSFORM) ? res.get2() : res.get1(); if (op == TRANSFORM && ret == null) ret = Collections.emptyMap(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a4ede2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index c072bfb..778c3ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -29,7 +29,6 @@ import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; -import javax.cache.*; import java.util.*; /** @@ -55,6 +54,11 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject { } /** + * @return {@code True} if transaction started on the node initiated cache operation. + */ + public boolean localResult(); + + /** * Gets unique identifier for this transaction. * * @return Transaction UID. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a4ede2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 69ab335..48fdf82 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -326,6 +326,13 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter log = U.logger(cctx.kernalContext(), logRef, this); } + /** {@inheritDoc} */ + @Override public boolean localResult() { + assert originatingNodeId() != null; + + return cctx.localNodeId().equals(originatingNodeId()); + } + /** * Acquires lock. */ @@ -1293,7 +1300,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter * @param op Initially proposed operation. * @param txEntry TX entry being updated. * @param newVal New value. - * @param newValBytes New value bytes. * @param newVer New version. * @param old Old entry. * @return Tuple with adjusted operation type and conflict context. @@ -1593,6 +1599,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter } /** {@inheritDoc} */ + @Override public boolean localResult() { + return false; + } + + /** {@inheritDoc} */ @Override public IgniteUuid xid() { return xid; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a4ede2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 21bde1d..bd43d5c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -100,7 +100,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter protected boolean needRetVal; /** Implicit transaction result. */ - protected GridCacheReturn<Object> implicitRes = new GridCacheReturn<>(false); + protected GridCacheReturn implicitRes; /** * Empty constructor required for {@link Externalizable}. @@ -150,6 +150,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter minVer = xidVer; } + /** + * Creates result instance. + */ + protected void initResult() { + implicitRes = new GridCacheReturn(localResult(), false); + } + /** {@inheritDoc} */ @Override public UUID eventNodeId() { return cctx.localNodeId(); @@ -274,14 +281,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } /** {@inheritDoc} */ - @Override public GridCacheReturn<Object> implicitSingleResult() { + @Override public GridCacheReturn implicitSingleResult() { return implicitRes; } /** * @param ret Result. */ - public void implicitSingleResult(GridCacheReturn<Object> ret) { + public void implicitSingleResult(GridCacheReturn ret) { if (ret.invokeResult()) implicitRes.mergeEntryProcessResults(ret); else @@ -2100,7 +2107,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (!filter(entry, filter)) { skipped = skip(skipped, cacheKey); - ret.set(old, false); + ret.set(cacheCtx, old, false); if (!readCommitted() && old != null) { // Enlist failed filters as reads for non-read-committed mode, @@ -2166,14 +2173,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter assert txEntry.op() != TRANSFORM; if (retval) - ret.set(null, true); + ret.set(cacheCtx, null, true); else ret.success(true); } } else { if (retval && !transform) - ret.set(old, true); + ret.set(cacheCtx, old, true); else { if (txEntry.op() == TRANSFORM) addInvokeResult(txEntry, old, ret); @@ -2185,7 +2192,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter // Pessimistic. else { if (retval && !transform) - ret.set(old, true); + ret.set(cacheCtx, old, true); else ret.success(true); } @@ -2213,7 +2220,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (!filter(entry, filter)) { skipped = skip(skipped, cacheKey); - ret.set(v, false); + ret.set(cacheCtx, v, false); continue; } @@ -2243,7 +2250,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter txEntry.markValid(); if (retval && !transform) - ret.set(v, true); + ret.set(cacheCtx, v, true); else ret.success(true); } @@ -2276,7 +2283,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (e.op() == TRANSFORM) addInvokeResult(e, cacheVal, ret); else - ret.set(cacheVal, true); + ret.set(cacheCtx, cacheVal, true); } }); @@ -2389,7 +2396,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter addInvokeResult(txEntry, v, ret); } else - ret.value(v); + ret.value(cacheCtx, v); } boolean pass = F.isEmpty(filter) || cacheCtx.isAll(cached, filter); @@ -2413,7 +2420,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter failed = skip(failed, k); // Revert operation to previous. (if no - NOOP, so entry will be unlocked). - txEntry.setAndMarkValid(txEntry.previousOperation(), (CacheObject)ret.value()); + txEntry.setAndMarkValid(txEntry.previousOperation(), cacheCtx.toCacheObject(ret.value())); txEntry.filters(CU.empty0()); txEntry.filtersSet(false); @@ -2454,7 +2461,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter * @param cacheVal Value. * @param ret Return value to update. */ - private void addInvokeResult(IgniteTxEntry txEntry, CacheObject cacheVal, GridCacheReturn<?> ret) { + private void addInvokeResult(IgniteTxEntry txEntry, CacheObject cacheVal, GridCacheReturn ret) { GridCacheContext ctx = txEntry.context(); Object key0 = null; @@ -2476,18 +2483,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter key0 = invokeEntry.key(); } - if (res != null) { - if (key0 == null) - key0 = txEntry.key().value(ctx.cacheObjectContext(), true); - - ret.addEntryProcessResult(key0, new CacheInvokeResult<>(res)); - } + if (res != null) + ret.addEntryProcessResult(ctx, txEntry.key(), key0, res, null); } catch (Exception e) { - if (key0 == null) - key0 = txEntry.key().value(ctx.cacheObjectContext(), true); - - ret.addEntryProcessResult(key0, new CacheInvokeResult(e)); + ret.addEntryProcessResult(ctx, txEntry.key(), key0, null, e); } } @@ -2559,7 +2559,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter init(); - final GridCacheReturn<CacheObject> ret = new GridCacheReturn<>(false); + final GridCacheReturn<CacheObject> ret = new GridCacheReturn<>(localResult(), false); if (F.isEmpty(map0) && F.isEmpty(invokeMap0)) { if (implicit()) @@ -2670,7 +2670,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter loadFut.get(); } catch (IgniteCheckedException e) { - return new GridFinishedFutureEx<>(new GridCacheReturn<V>(), e); + return new GridFinishedFutureEx<>(new GridCacheReturn<V>(localResult()), e); } return commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn<Object>>() { @@ -2755,7 +2755,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter return new GridFinishedFuture<>(cctx.kernalContext(), e); } - final GridCacheReturn<CacheObject> ret = new GridCacheReturn<>(false); + final GridCacheReturn<CacheObject> ret = new GridCacheReturn<>(localResult(), false); if (F.isEmpty(keys0)) { if (implicit()) { @@ -2974,7 +2974,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter try { init(); - GridCacheReturn<CacheObject> ret = new GridCacheReturn<>(false); + GridCacheReturn<CacheObject> ret = new GridCacheReturn<>(localResult(), false); Collection<KeyCacheObject> enlisted = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a4ede2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java index 936e4e8..05caac6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java @@ -159,7 +159,7 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { /** * @return Return value for */ - public <V> GridCacheReturn<CacheObject> implicitSingleResult(); + public GridCacheReturn implicitSingleResult(); /** * Finishes transaction (either commit or rollback).