ignite-946: fixing version retrieval for transactions
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2e7799d4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2e7799d4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2e7799d4 Branch: refs/heads/master Commit: 2e7799d446653bba379cc231628ba2b02c993e5e Parents: f0fe076 Author: Denis Magda <dma...@gridgain.com> Authored: Fri Jul 31 15:32:45 2015 +0300 Committer: nikolay_tikhonov <ntikho...@gridgain.com> Committed: Fri Jul 31 15:49:14 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/CacheInvokeEntry.java | 2 +- .../processors/cache/GridCacheMapEntry.java | 4 +- .../cache/transactions/IgniteTxAdapter.java | 14 ++- .../cache/transactions/IgniteTxEntry.java | 11 ++- .../transactions/IgniteTxLocalAdapter.java | 90 ++++++++++++++++---- 5 files changed, 98 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2e7799d4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java index e6f8d4e..2d8f738 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java @@ -121,7 +121,7 @@ public class CacheInvokeEntry<K, V> extends CacheLazyEntry<K, V> implements Muta /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public <T> T unwrap(Class<T> cls) { - if (cls.isAssignableFrom(VersionedEntry.class)) + if (cls.isAssignableFrom(VersionedEntry.class) && ver != null) return (T)new CacheVersionedEntryImpl<>(getKey(), getValue(), ver); return super.unwrap(cls); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2e7799d4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index ebcb908..43cf2fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1653,7 +1653,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme oldVal = rawGetOrUnmarshalUnlocked(true); - CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(cctx, key, oldVal, this.ver); + CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(cctx, key, oldVal, version()); try { Object computed = entryProcessor.process(entry, invokeArgs); @@ -1878,7 +1878,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme EntryProcessor<Object, Object, ?> entryProcessor = (EntryProcessor<Object, Object, ?>)writeObj; - CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(cctx, key, oldVal, this.ver); + CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(cctx, key, oldVal, version()); try { Object computed = entryProcessor.process(entry, invokeArgs); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2e7799d4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 0d14012..797f75e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -1228,9 +1228,21 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter Object key = null; + GridCacheVersion ver; + + try { + ver = txEntry.cached().version(); + } + catch (GridCacheEntryRemovedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']'); + + ver = null; + } + for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) { CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(txEntry.context(), - txEntry.key(), key, cacheVal, val, txEntry.cached().version()); + txEntry.key(), key, cacheVal, val, ver); try { EntryProcessor<Object, Object, Object> processor = t.get1(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2e7799d4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 7f06380..ed57bf2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -571,10 +571,19 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { Object val = null; Object keyVal = null; + GridCacheVersion ver; + + try { + ver = entry.version(); + } + catch (GridCacheEntryRemovedException e) { + ver = null; + } + for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : entryProcessors()) { try { CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(ctx, key, keyVal, cacheVal, val, - entry.version()); + ver); EntryProcessor processor = t.get1(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2e7799d4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index d8797fe..7f171c2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1938,13 +1938,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter Map<KeyCacheObject, GridCacheDrInfo> drMap ) { return this.<Object, Object>putAllAsync0(cacheCtx, - null, - null, - null, - drMap, - false, - null, - null); + null, + null, + null, + drMap, + false, + null, + null); } /** {@inheritDoc} */ @@ -2229,8 +2229,22 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (retval && !transform) ret.set(cacheCtx, old, true); else { - if (txEntry.op() == TRANSFORM) - addInvokeResult(txEntry, old, ret); + if (txEntry.op() == TRANSFORM) { + GridCacheVersion ver; + + try { + ver = entry.version(); + } + catch (GridCacheEntryRemovedException ex) { + if (log.isDebugEnabled()) + log.debug("Failed to get entry version: [msg=" + + ex.getMessage() + ']'); + + ver = null; + } + + addInvokeResult(txEntry, old, ret, ver); + } else ret.success(true); } @@ -2290,8 +2304,21 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter enlisted.add(cacheKey); - if (txEntry.op() == TRANSFORM) - addInvokeResult(txEntry, txEntry.value(), ret); + if (txEntry.op() == TRANSFORM) { + GridCacheVersion ver; + + try { + ver = entry.version(); + } + catch (GridCacheEntryRemovedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']'); + + ver = null; + } + + addInvokeResult(txEntry, txEntry.value(), ret, ver); + } } if (!pessimistic()) { @@ -2328,8 +2355,21 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter CacheObject cacheVal = cacheCtx.toCacheObject(val); - if (e.op() == TRANSFORM) - addInvokeResult(e, cacheVal, ret); + if (e.op() == TRANSFORM) { + GridCacheVersion ver; + + try { + ver = e.cached().version(); + } + catch (GridCacheEntryRemovedException ex) { + if (log.isDebugEnabled()) + log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']'); + + ver = null; + } + + addInvokeResult(e, cacheVal, ret, ver); + } else ret.set(cacheCtx, cacheVal, true); } @@ -2442,8 +2482,21 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } if (txEntry.op() == TRANSFORM) { - if (computeInvoke) - addInvokeResult(txEntry, v, ret); + if (computeInvoke) { + GridCacheVersion ver; + + try { + ver = cached.version(); + } + catch (GridCacheEntryRemovedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']'); + + ver = null; + } + + addInvokeResult(txEntry, v, ret, ver); + } } else ret.value(cacheCtx, v); @@ -2510,8 +2563,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter * @param txEntry Entry. * @param cacheVal Value. * @param ret Return value to update. + * @param ver Entry version. */ - private void addInvokeResult(IgniteTxEntry txEntry, CacheObject cacheVal, GridCacheReturn ret) { + private void addInvokeResult(IgniteTxEntry txEntry, CacheObject cacheVal, GridCacheReturn ret, + GridCacheVersion ver) { GridCacheContext ctx = txEntry.context(); Object key0 = null; @@ -2522,8 +2577,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) { CacheInvokeEntry<Object, Object> invokeEntry = - new CacheInvokeEntry(txEntry.context(), txEntry.key(), key0, cacheVal, val0, - txEntry.cached().version()); + new CacheInvokeEntry(txEntry.context(), txEntry.key(), key0, cacheVal, val0, ver); EntryProcessor<Object, Object, ?> entryProcessor = t.get1();