Repository: incubator-ignite Updated Branches: refs/heads/sprint-2 3c884b5d5 -> e0a087ce5
# ignite-297 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e855d9a7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e855d9a7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e855d9a7 Branch: refs/heads/sprint-2 Commit: e855d9a7c92bad81fb1494963bbf1fd1dffbfb1c Parents: f139706 Author: sboikov <sboi...@gridgain.com> Authored: Wed Feb 18 11:00:51 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Feb 18 12:28:48 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 102 +++++++++++++++---- .../cache/GridCacheEvictionManager.java | 21 ++-- .../processors/cache/GridCacheMapEntry.java | 6 ++ .../processors/cache/GridCacheUtils.java | 8 +- .../dht/GridPartitionedGetFuture.java | 8 +- .../dht/atomic/GridDhtAtomicCache.java | 15 +++ .../distributed/near/GridNearGetFuture.java | 15 ++- .../local/atomic/GridLocalAtomicCache.java | 9 +- .../cache/transactions/IgniteTxAdapter.java | 11 +- .../cache/transactions/IgniteTxEntry.java | 3 + .../transactions/IgniteTxLocalAdapter.java | 2 +- .../junits/common/GridCommonAbstractTest.java | 2 + 12 files changed, 159 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e855d9a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 9d8f243..46e15f4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -647,9 +647,12 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Boolean> containsKeyAsync(final K key) { + @Override public IgniteInternalFuture<Boolean> containsKeyAsync(K key) { A.notNull(key, "key"); + if (ctx.portableEnabled()) + key = (K)ctx.marshalToPortable(key); + return getAllAsync( Collections.singletonList(key), /*force primary*/false, @@ -660,10 +663,14 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /*deserialize portable*/false, /*skip values*/true ).chain(new CX1<IgniteInternalFuture<Map<K, V>>, Boolean>() { - @Override public Boolean applyx(IgniteInternalFuture<Map<K, V>> fut) throws IgniteCheckedException { - return fut.get().get(key) != null; - } - }); + @Override public Boolean applyx(IgniteInternalFuture<Map<K, V>> fut) throws IgniteCheckedException { + Map<K, V> map = fut.get(); + + assert map.isEmpty() || map.size() == 1 : map.size(); + + return map.isEmpty() ? false : map.values().iterator().next() != null; + } + }); } /** {@inheritDoc} */ @@ -680,6 +687,14 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, @Override public IgniteInternalFuture<Boolean> containsKeysAsync(Collection<? extends K> keys) { A.notNull(keys, "keys"); + if (ctx.portableEnabled() && !F.isEmpty(keys)) { + keys = F.viewReadOnly(keys, new C1<K, K>() { + @Override public K apply(K k) { + return (K)ctx.marshalToPortable(k); + } + }); + } + return getAllAsync( keys, /*force primary*/false, @@ -693,9 +708,10 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, @Override public Boolean applyx(IgniteInternalFuture<Map<K, V>> fut) throws IgniteCheckedException { Map<K, V> kvMap = fut.get(); - for (Map.Entry<K, V> entry : kvMap.entrySet()) + for (Map.Entry<K, V> entry : kvMap.entrySet()) { if (entry.getValue() == null) return false; + } return true; } @@ -2157,10 +2173,14 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, else { val = ctx.cloneOnFlag(val); - if (ctx.portableEnabled() && deserializePortable) + K key0 = key; + + if (ctx.portableEnabled() && deserializePortable) { val = (V)ctx.unwrapPortableIfNeeded(val, false); + key0 = (K)ctx.unwrapPortableIfNeeded(key, false); + } - map.put(key, val); + map.put(key0, val); if (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED)) ctx.evicts().touch(entry, topVer); @@ -3001,7 +3021,12 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, if (ctx.deploymentEnabled()) ctx.deploy().registerClass(oldVal); - return tx.putAllAsync(ctx, F.t(key, newVal), false, null, -1, ctx.equalsPeekArray(oldVal)).get() + V oldVal0 = oldVal; + + if (ctx.portableEnabled()) + oldVal0 = (V)ctx.marshalToPortable(oldVal); + + return tx.putAllAsync(ctx, F.t(key, newVal), false, null, -1, ctx.equalsPeekArray(oldVal0)).get() .success(); } @@ -3040,7 +3065,12 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } } - return tx.putAllAsync(ctx, F.t(key, newVal), false, null, -1, ctx.equalsPeekArray(oldVal)).chain( + V oldVal0 = oldVal; + + if (ctx.portableEnabled()) + oldVal0 = (V)ctx.marshalToPortable(oldVal); + + return tx.putAllAsync(ctx, F.t(key, newVal), false, null, -1, ctx.equalsPeekArray(oldVal0)).chain( (IgniteClosure<IgniteInternalFuture<GridCacheReturn<V>>, Boolean>)RET2FLAG); } @@ -3517,19 +3547,22 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, validateCacheValue(val); - boolean removed = syncOp(new SyncOp<Boolean>(true) { + boolean rmv = syncOp(new SyncOp<Boolean>(true) { @Override public Boolean op(IgniteTxLocalAdapter<K, V> tx) throws IgniteCheckedException { // Register before hiding in the filter. if (ctx.deploymentEnabled()) ctx.deploy().registerClass(val); K key0 = key; + V val0 = val; - if (ctx.portableEnabled()) + if (ctx.portableEnabled()) { key0 = (K)ctx.marshalToPortable(key); + val0 = (V)ctx.marshalToPortable(val); + } return tx.removeAllAsync(ctx, Collections.singletonList(key0), null, false, - ctx.vararg(F.<K, V>cacheContainsPeek(val))).get().success(); + ctx.vararg(F.<K, V>cacheContainsPeek(val0))).get().success(); } @Override public String toString() { @@ -3537,10 +3570,10 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } }); - if (statsEnabled && removed) + if (statsEnabled && rmv) metrics0().addRemoveTimeNanos(System.nanoTime() - start); - return removed; + return rmv; } /** {@inheritDoc} */ @@ -3571,10 +3604,12 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } K key0 = key; + V val0 = val; if (ctx.portableEnabled()) { try { key0 = (K)ctx.marshalToPortable(key); + val0 = (V)ctx.marshalToPortable(val); } catch (IgniteException e) { return new GridFinishedFuture<>(ctx.kernalContext(), e); @@ -3582,7 +3617,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, } return tx.removeAllAsync(ctx, Collections.singletonList(key0), null, false, - ctx.vararg(F.<K, V>cacheContainsPeek(val))).chain( + ctx.vararg(F.<K, V>cacheContainsPeek(val0))).chain( (IgniteClosure<IgniteInternalFuture<GridCacheReturn<V>>, Boolean>)RET2FLAG); } @@ -3946,6 +3981,18 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, ExpiryPolicy plc = prj != null ? prj.expiry() : null; + final Collection<? extends K> keys0; + + if (ctx.portableEnabled() && !ctx.store().convertPortable()) { + keys0 = F.viewReadOnly(keys, new C1<K, K>() { + @Override public K apply(K k) { + return (K)ctx.marshalToPortable(k); + } + }); + } + else + keys0 = keys; + if (replaceExisting) { if (ctx.store().isLocalStore()) { Collection<ClusterNode> nodes = ctx.grid().forDataNodes(name()).nodes(); @@ -3954,14 +4001,14 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, return new GridFinishedFuture<>(ctx.kernalContext()); return ctx.closures().callAsyncNoFailover(BROADCAST, - new LoadKeysCallable<>(ctx.name(), keys, true, plc), + new LoadKeysCallable<>(ctx.name(), keys0, true, plc), nodes, true); } else { return ctx.closures().callLocalSafe(new Callable<Void>() { @Override public Void call() throws Exception { - localLoadAndUpdate(keys); + localLoadAndUpdate(keys0); return null; } @@ -5029,16 +5076,22 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, /** * @param key Key. + * @param deserializePortable Deserialize portable flag. * @return Cached value. * @throws IgniteCheckedException If failed. */ @Nullable public V get(K key, boolean deserializePortable) throws IgniteCheckedException { - return getAllAsync(F.asList(key), deserializePortable).get().get(key); + Map<K, V> map = getAllAsync(F.asList(key), deserializePortable).get(); + + assert map.isEmpty() || map.size() == 1 : map.size(); + + return map.isEmpty() ? null : map.values().iterator().next(); } /** * @param key Key. + * @param deserializePortable Deserialize portable flag. * @return Read operation future. */ public final IgniteInternalFuture<V> getAsync(final K key, boolean deserializePortable) { @@ -5053,15 +5106,19 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, return getAllAsync(Collections.singletonList(key), deserializePortable).chain( new CX1<IgniteInternalFuture<Map<K, V>>, V>() { - @Override - public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException { - return e.get().get(key); + @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException { + Map<K, V> map = e.get(); + + assert map.isEmpty() || map.size() == 1 : map.size(); + + return map.isEmpty() ? null : map.values().iterator().next(); } }); } /** * @param keys Keys. + * @param deserializePortable Deserialize portable flag. * @return Map of cached values. * @throws IgniteCheckedException If read failed. */ @@ -5994,6 +6051,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>, * @param keys Keys. * @param update If {@code true} calls {@link #localLoadAndUpdate(Collection)} * otherwise {@link #localLoad(Collection, ExpiryPolicy)}. + * @param plc Expiry policy. */ LoadKeysCallable(String cacheName, Collection<? extends K> keys, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e855d9a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java index a53ac74..382eb61 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEvictionManager.java @@ -920,18 +920,23 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V GridCacheAdapter<K, V> cache = cctx.cache(); - Map<K, GridCacheEntryEx<K, V>> cached = U.newHashMap(keys.size()); + Map<K, GridCacheEntryEx<K, V>> cached = U.newLinkedHashMap(keys.size()); // Get all participating entries to avoid deadlock. - for (K k : keys) - cached.put(k, cache.peekEx(k)); + for (K k : keys) { + if (cctx.portableEnabled()) + k = (K)cctx.marshalToPortable(k); - try { - for (K key : keys) { - GridCacheEntryEx<K, V> entry = cached.get(key); + GridCacheEntryEx<K, V> e = cache.peekEx(k); + + if (e != null) + cached.put(k, e); + } + try { + for (GridCacheEntryEx<K, V> entry : cached.values()) { // Do not evict internal entries. - if (entry == null || entry.key() instanceof GridCacheInternal) + if (entry.key() instanceof GridCacheInternal) continue; // Lock entry. @@ -953,7 +958,7 @@ public class GridCacheEvictionManager<K, V> extends GridCacheManagerAdapter<K, V } // Batch write to swap. - if (swapped != null) + if (!swapped.isEmpty()) cctx.swap().writeAll(swapped); } finally { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e855d9a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index dc0fa59..6af0606 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1397,6 +1397,9 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> updated = cctx.unwrapTemporary(entry.getValue()); + if (cctx.portableEnabled() && entry.modified()) + updated = (V)cctx.marshalToPortable(updated); + invokeRes = computed != null ? new CacheInvokeResult<>(cctx.unwrapTemporary(computed)) : null; } catch (Exception e) { @@ -1843,6 +1846,9 @@ public abstract class GridCacheMapEntry<K, V> implements GridCacheEntryEx<K, V> updated = cctx.unwrapTemporary(entry.getValue()); + if (cctx.portableEnabled() && entry.modified()) + updated = (V)cctx.marshalToPortable(updated); + if (computed != null) invokeRes = new CacheInvokeResult<>(cctx.unwrapTemporary(computed)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e855d9a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index a3a5b20..e78577a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -54,6 +54,7 @@ import static org.apache.ignite.internal.GridTopic.*; import static org.apache.ignite.internal.IgniteNodeAttributes.*; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*; import static org.apache.ignite.internal.processors.cache.GridCachePeekMode.*; +import static org.apache.ignite.transactions.TransactionState.*; /** * Cache utility methods. @@ -1166,7 +1167,12 @@ public class GridCacheUtils { * @throws IgniteCheckedException If execution failed. */ public static <T> T outTx(Callable<T> cmd, GridCacheContext ctx) throws IgniteCheckedException { - if (ctx.tm().inUserTx()) + IgniteInternalTx<?, ?> tx = ctx.tm().txx(); + + boolean inTx = tx != null && tx.user() && + (tx.state() != UNKNOWN && tx.state() != ROLLED_BACK && tx.state() != COMMITTED); + + if (inTx) return ctx.closures().callLocalSafe(cmd, false).get(); else { try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e855d9a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 79f6b4d..980389c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -458,10 +458,14 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M colocated.removeIfObsolete(key); } else { - if (cctx.portableEnabled()) + K key0 = key; + + if (cctx.portableEnabled()) { v = (V)cctx.unwrapPortableIfNeeded(v, !deserializePortable); + key0 = (K)cctx.unwrapPortableIfNeeded(key, !deserializePortable); + } - locVals.put(key, v); + locVals.put(key0, v); return false; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e855d9a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index b90d78e..918f2e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -397,6 +397,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Override public IgniteInternalFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) { A.notNull(key, "key", oldVal, "oldVal", newVal, "newVal"); + if (ctx.portableEnabled()) + oldVal = (V)ctx.marshalToPortable(oldVal); + return putxAsync(key, newVal, ctx.equalsPeekArray(oldVal)); } @@ -415,12 +418,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Override public IgniteInternalFuture<GridCacheReturn<V>> removexAsync(K key, V val) { A.notNull(key, "key", val, "val"); + if (ctx.portableEnabled()) + val = (V)ctx.marshalToPortable(val); + return removeAllAsync0(F.asList(key), null, null, true, true, ctx.equalsPeekArray(val)); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public IgniteInternalFuture<GridCacheReturn<V>> replacexAsync(K key, V oldVal, V newVal) { + if (ctx.portableEnabled()) + oldVal = (V)ctx.marshalToPortable(oldVal); + return updateAllAsync0(F.asMap(key, newVal), null, null, @@ -525,6 +534,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Override public IgniteInternalFuture<Boolean> removeAsync(K key, V val) { A.notNull(key, "key", val, "val"); + if (ctx.portableEnabled()) + val = (V)ctx.marshalToPortable(val); + return removexAsync(key, ctx.equalsPeekArray(val)); } @@ -1330,6 +1342,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { updated = ctx.unwrapTemporary(invokeEntry.getValue()); + if (ctx.portableEnabled()) + updated = (V)ctx.marshalToPortable(updated); + if (computed != null) invokeRes = new CacheInvokeResult<>(ctx.unwrapTemporary(computed)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e855d9a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index 3ae2fc6..9628f3b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -504,10 +504,14 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma } if (v != null && !reload) { - if (cctx.portableEnabled()) + K key0 = key; + + if (cctx.portableEnabled()) { v = (V)cctx.unwrapPortableIfNeeded(v, !deserializePortable); + key0 = (K)cctx.unwrapPortableIfNeeded(key, !deserializePortable); + } - add(new GridFinishedFuture<>(cctx.kernalContext(), Collections.singletonMap(key, v))); + add(new GridFinishedFuture<>(cctx.kernalContext(), Collections.singletonMap(key0, v))); } else { if (primary == null) @@ -637,11 +641,14 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma } V val = info.value(); + K key = info.key(); - if (cctx.portableEnabled()) + if (cctx.portableEnabled()) { val = (V)cctx.unwrapPortableIfNeeded(val, !deserializePortable); + key = (K)cctx.unwrapPortableIfNeeded(key, !deserializePortable); + } - map.put(info.key(), val); + map.put(key, val); } catch (GridCacheEntryRemovedException ignore) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e855d9a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index 2d81681..6a8a6cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -408,7 +408,7 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { ctx.denyOnLocalRead(); - Boolean removed = (Boolean)updateAllInternal(DELETE, + Boolean rmv = (Boolean)updateAllInternal(DELETE, Collections.singleton(key), null, null, @@ -418,10 +418,10 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { filter, ctx.writeThrough()); - if (statsEnabled && removed) + if (statsEnabled && rmv) metrics0().addRemoveTimeNanos(System.nanoTime() - start); - return removed; + return rmv; } /** {@inheritDoc} */ @@ -1133,6 +1133,9 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> { updated = ctx.unwrapTemporary(invokeEntry.getValue()); + if (ctx.portableEnabled()) + updated = (V)ctx.marshalToPortable(updated); + if (computed != null) invokeRes = new CacheInvokeResult<>(ctx.unwrapTemporary(computed)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e855d9a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 62d4e3b..c5c242d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -1228,7 +1228,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter CacheInvokeEntry<K, V> invokeEntry = new CacheInvokeEntry<>(txEntry.context(), txEntry.key(), val); try { - EntryProcessor processor = t.get1(); + EntryProcessor<K, V, ?> processor = t.get1(); processor.process(invokeEntry, t.get2()); @@ -1241,6 +1241,13 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter modified |= invokeEntry.modified(); } + if (modified) { + val = (V)cacheCtx.<V>unwrapTemporary(val); + + if (cacheCtx.portableEnabled()) + val = (V)cacheCtx.marshalToPortable(val); + } + GridCacheOperation op = modified ? (val == null ? DELETE : UPDATE) : NOOP; if (op == NOOP) { @@ -1259,7 +1266,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter } } - return F.t(op, (V)cacheCtx.<V>unwrapTemporary(val), null); + return F.t(op, val, null); } catch (GridCacheFilterFailedException e) { assert false : "Empty filter failed for innerGet: " + e; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e855d9a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index eac07be..5468de2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -643,6 +643,9 @@ public class IgniteTxEntry<K, V> implements GridPeerDeployAware, Externalizable, } } + if (ctx.portableEnabled()) + val = (V)ctx.marshalToPortable(val); + return val; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e855d9a7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 8ffe224..35c89d1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -2412,7 +2412,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> * @param val Value. * @param ret Return value to update. */ - private void addInvokeResult(IgniteTxEntry<K, V> txEntry, V val, GridCacheReturn ret) { + private void addInvokeResult(IgniteTxEntry<K, V> txEntry, V val, GridCacheReturn<?> ret) { try { Object res = null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e855d9a7/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index 5472aa9..f1e813f 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -254,6 +254,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { * @param cache Cache. * @param keys Keys. * @param replaceExistingValues Replace existing values. + * @throws Exception If failed. */ protected static <K> void loadAll(Cache<K, ?> cache, Set<K> keys, boolean replaceExistingValues) throws Exception { final AtomicReference<Exception> ex = new AtomicReference<>(); @@ -282,6 +283,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { * @param cache Cache. * @param key Keys. * @param replaceExistingValues Replace existing values. + * @throws Exception If failed. */ protected static <K> void load(Cache<K, ?> cache, K key, boolean replaceExistingValues) throws Exception { loadAll(cache, Collections.singleton(key), replaceExistingValues);