#Fixing tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7bc0d511 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7bc0d511 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7bc0d511 Branch: refs/heads/ingite-9655-merge Commit: 7bc0d511a879fa17a176b8a05bcbd1c7bd201ea2 Parents: 7236561 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Thu Jan 29 17:36:24 2015 -0800 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Thu Jan 29 17:36:24 2015 -0800 ---------------------------------------------------------------------- .../distributed/dht/GridDhtTxPrepareFuture.java | 35 +++++++- .../near/GridNearTxPrepareFuture.java | 4 - .../transactions/IgniteTxLocalAdapter.java | 84 +++++--------------- .../cache/transactions/IgniteTxManager.java | 2 +- .../cache/GridCacheAbstractFullApiSelfTest.java | 16 +++- 5 files changed, 67 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7bc0d511/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 461c0ea..407b8fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; +import javax.cache.processor.*; import java.io.*; import java.util.*; import java.util.concurrent.atomic.*; @@ -322,8 +323,38 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu null, null); - if (retVal) - ret.value(val); + if (retVal) { + if (!F.isEmpty(txEntry.entryProcessors())) { + K key = txEntry.key(); + + Object procRes = null; + Exception err = null; + + + for (T2<EntryProcessor<K, V, ?>, Object[]> t : txEntry.entryProcessors()) { + try { + CacheInvokeEntry<K, V> invokeEntry = new CacheInvokeEntry<>(key, val); + + EntryProcessor<K, V, ?> processor = t.get1(); + + procRes = processor.process(invokeEntry, t.get2()); + + val = invokeEntry.getValue(); + } + catch (Exception e) { + err = e; + + break; + } + } + + if (err != null || procRes != null) + ret.addEntryProcessResult(key, + err == null ? new CacheInvokeResult<>(procRes) : new CacheInvokeResult<>(err)); + } + else + ret.value(val); + } if (hasFilters && !cacheCtx.isAll(cached, txEntry.filters())) { txEntry.op(GridCacheOperation.NOOP); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7bc0d511/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java index 59e90a7..e7b3601 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java @@ -618,10 +618,6 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut tx.taskNameHash()); for (IgniteTxEntry<K, V> txEntry : m.writes()) { - assert txEntry.cached().detached() : "Expected detached entry while preparing transaction " + - "[locNodeId=" + cctx.localNodeId() + - ", txEntry=" + txEntry + ']'; - if (txEntry.op() == TRANSFORM) req.addDhtVersion(txEntry.txKey(), null); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7bc0d511/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 7789023..f097833 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1879,7 +1879,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> map, invokeArgs, null, - false, + true, null, null); } @@ -1956,13 +1956,15 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> boolean rmv = lookup == null && invokeMap == null; - Set<K> missedForInvoke = null; + Set<K> missedForLoad = null; try { // Set transform flag for transaction. if (invokeMap != null) transform = true; + assert !transform || retval; + groupLockSanityCheck(cacheCtx, keys); for (K key : keys) { @@ -2132,69 +2134,24 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> if (old == null) { boolean load = retval && !readThrough; - // Check for transform here to avoid map creation. - load |= (op == TRANSFORM && keys.size() == 1); - if (load) { - // If return value is required, then we know for sure that there is only - // one key in the keys collection. - assert keys.size() == 1; - - IgniteInternalFuture<Boolean> fut = loadMissing( - cacheCtx, - op == TRANSFORM || cacheCtx.loadPreviousValue(), - true, - F.asList(key), - deserializePortables(cacheCtx), - new CI2<K, V>() { - @Override public void apply(K k, V v) { - if (log.isDebugEnabled()) - log.debug("Loaded value from remote node [key=" + k + ", val=" + - v + ']'); - - if (op == TRANSFORM) { - IgniteTxEntry<K, V> e = - entry(new IgniteTxKey<>(k, cacheCtx.cacheId())); - - assert e != null && e.op() == TRANSFORM : e; - - addInvokeResult(e, v, ret); - } - else - ret.set(v, true); - } - }); - - return new GridEmbeddedFuture<>( - cctx.kernalContext(), - fut, - new C2<Boolean, Exception, Set<K>>() { - @Override public Set<K> apply(Boolean b, Exception e) { - if (e != null) - throw new GridClosureException(e); - - return Collections.emptySet(); - } - } - ); + if (missedForLoad == null) + missedForLoad = new HashSet<>(); + + missedForLoad.add(key); } else { + assert !transform; + assert txEntry.op() != TRANSFORM; + if (retval) ret.set(null, true); - else { - if (txEntry.op() == TRANSFORM) { - if (missedForInvoke == null) - missedForInvoke = new HashSet<>(); - - missedForInvoke.add(key); - } - else - ret.success(true); - } + else + ret.success(true); } } else { - if (retval) + if (retval && !transform) ret.set(old, true); else { if (txEntry.op() == TRANSFORM) @@ -2206,7 +2163,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> } // Pessimistic. else { - if (retval) + if (retval && !transform) ret.set(old, true); else ret.success(true); @@ -2264,7 +2221,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> if (!pessimistic()) { txEntry.markValid(); - if (retval) + if (retval && !transform) ret.set(v, true); else ret.success(true); @@ -2276,15 +2233,12 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> return new GridFinishedFuture<>(cctx.kernalContext(), e); } - if (missedForInvoke != null) { - assert optimistic(); - assert invokeMap != null; - + if (missedForLoad != null) { IgniteInternalFuture<Boolean> fut = loadMissing( cacheCtx, true, true, - missedForInvoke, + missedForLoad, deserializePortables(cacheCtx), new CI2<K, V>() { @Override public void apply(K key, V val) { @@ -2484,6 +2438,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V> EntryProcessor<K, V, ?> entryProcessor = t.get1(); res = entryProcessor.process(invokeEntry, t.get2()); + + val = invokeEntry.getValue(); } if (res != null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7bc0d511/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 993c08e..14447fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -1162,7 +1162,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> { Boolean committed = completedVers.get(tx.xidVersion()); // 1. Make sure that committed version has been recorded. - if (!(committed != null && committed) || tx.writeSet().isEmpty() || tx.isSystemInvalidate()) { + if (!((committed != null && committed) || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) { uncommitTx(tx); throw new IgniteException("Missing commit version (consider increasing " + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7bc0d511/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index 66bde26..237ee2f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -1080,10 +1080,20 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract try { if (startVal) cache.put("key", 2); + else + assertEquals(null, cache.get("key")); - cache.invoke("key", INCR_PROCESSOR); - cache.invoke("key", INCR_PROCESSOR); - cache.invoke("key", INCR_PROCESSOR); + Integer expectedRes = startVal ? 2 : null; + + assertEquals(String.valueOf(expectedRes), cache.invoke("key", INCR_PROCESSOR)); + + expectedRes = startVal ? 3 : 1; + + assertEquals(String.valueOf(expectedRes), cache.invoke("key", INCR_PROCESSOR)); + + expectedRes++; + + assertEquals(String.valueOf(expectedRes), cache.invoke("key", INCR_PROCESSOR)); if (tx != null) tx.commit();