Repository: incubator-ignite Updated Branches: refs/heads/ignite-23 f3c685575 -> f903ff3cb
# ignite-23 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f903ff3c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f903ff3c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f903ff3c Branch: refs/heads/ignite-23 Commit: f903ff3cbf837e5c5e1c86d114bea250419bdbfe Parents: f3c6855 Author: sboikov <semen.boi...@inria.fr> Authored: Fri May 22 06:51:42 2015 +0300 Committer: sboikov <semen.boi...@inria.fr> Committed: Fri May 22 06:59:50 2015 +0300 ---------------------------------------------------------------------- .../dht/atomic/GridNearAtomicUpdateFuture.java | 2 +- .../colocated/GridDhtColocatedLockFuture.java | 11 ++-- .../distributed/near/GridNearLockFuture.java | 11 ++-- ...niteCacheClientNodeChangingTopologyTest.java | 62 ++++++++++++-------- 4 files changed, 52 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f903ff3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index b7c4719..f78ced3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -343,7 +343,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> */ public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) { if (res.remapKeys() != null) { - assert cctx.config().getAtomicWriteOrderMode() == PRIMARY || cctx.kernalContext().clientNode(); + assert !fastMap || cctx.kernalContext().clientNode(); mapOnTopology(res.remapKeys(), true, nodeId, true); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f903ff3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index a90c6e4..788a101 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -326,13 +326,14 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture * Undoes all locks. * * @param dist If {@code true}, then remove locks from remote nodes as well. + * @param rollback {@code True} if should rollback tx. */ - private void undoLocks(boolean dist) { + private void undoLocks(boolean dist, boolean rollback) { // Transactions will undo during rollback. if (dist && tx == null) cctx.colocated().removeLocks(threadId, lockVer, keys); else { - if (tx != null) { + if (rollback && tx != null) { if (tx.setRollbackOnly()) { if (log.isDebugEnabled()) log.debug("Marked transaction as rollback only because locks could not be acquired: " + tx); @@ -350,7 +351,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture * @param dist {@code True} if need to distribute lock release. */ private void onFailed(boolean dist) { - undoLocks(dist); + undoLocks(dist, true); complete(false); } @@ -475,7 +476,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture ", fut=" + this + ']'); if (!success) - undoLocks(distribute); + undoLocks(distribute, true); if (tx != null) cctx.tm().txContext(tx); @@ -1369,6 +1370,8 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture * */ private void remap() { + undoLocks(false, false); + mapOnTopology(true); onDone(true); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f903ff3c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 92498f0..001c78c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -349,13 +349,14 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean * Undoes all locks. * * @param dist If {@code true}, then remove locks from remote nodes as well. + * @param rollback {@code True} if should rollback tx. */ - private void undoLocks(boolean dist) { + private void undoLocks(boolean dist, boolean rollback) { // Transactions will undo during rollback. if (dist && tx == null) cctx.nearTx().removeLocks(lockVer, keys); else { - if (tx != null) { + if (rollback && tx != null) { if (tx.setRollbackOnly()) { if (log.isDebugEnabled()) log.debug("Marked transaction as rollback only because locks could not be acquired: " + tx); @@ -396,7 +397,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean * @param dist {@code True} if need to distribute lock release. */ private void onFailed(boolean dist) { - undoLocks(dist); + undoLocks(dist, true); complete(false); } @@ -606,7 +607,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean ", fut=" + this + ']'); if (!success) - undoLocks(distribute); + undoLocks(distribute, true); if (tx != null) cctx.tm().txContext(tx); @@ -1512,6 +1513,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean * */ private void remap() { + undoLocks(false, false); + mapOnTopology(true); onDone(true); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f903ff3c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java index 20f3d58..0236446 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java @@ -166,7 +166,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac putFut.get(); - checkData(map, 4); + checkData(map, cache, 4); ignite3.close(); @@ -200,14 +200,14 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac putFut.get(); - checkData(map, 4); + checkData(map, cache, 4); for (int i = 0; i < 100; i++) map.put(i, i + 2); cache.putAll(map); - checkData(map, 4); + checkData(map, cache, 4); } /** @@ -265,7 +265,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac putFut.get(); - checkData(map, 4); + checkData(map, cache, 4); map.clear(); @@ -274,7 +274,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac cache.putAll(map); - checkData(map, 4); + checkData(map, cache, 4); } /** * @throws Exception If failed. @@ -286,7 +286,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac /** * @throws Exception If failed. */ - public void _testPessimisticTxNearEnabled() throws Exception { + public void testPessimisticTxNearEnabled() throws Exception { pessimisticTx(new NearCacheConfiguration()); } @@ -351,7 +351,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac putFut.get(); - checkData(map, 4); + checkData(map, cache, 4); ignite3.close(); @@ -384,7 +384,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac putFut.get(); - checkData(map, 4); + checkData(map, cache, 4); for (int i = 0; i < 100; i++) map.put(i, i + 2); @@ -395,7 +395,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac tx.commit(); } - checkData(map, 4); + checkData(map, cache, 4); } /** @@ -448,7 +448,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac checkClientPrepareMessages(spi.recordedMessages(), 6); - checkData(map, 4); + checkData(map, cache, 4); cache.putAll(map); @@ -456,7 +456,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac spi.record(null); - checkData(map, 4); + checkData(map, cache, 4); IgniteCache<Integer, Integer> cache0 = ignite0.cache(null); @@ -475,7 +475,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac for (Object msg : msgs) assertFalse(((GridNearTxPrepareRequest)msg).firstClientRequest()); - checkData(map, 4); + checkData(map, cache, 4); } /** @@ -546,13 +546,19 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac * @param expNodes Expected nodes number. * @throws Exception If failed. */ - private void checkData(final Map<Integer, Integer> map, final int expNodes) throws Exception { + private void checkData(final Map<Integer, Integer> map, IgniteCache<?, ?> clientCache, final int expNodes) + throws Exception + { final List<Ignite> nodes = G.allGrids(); final Affinity<Integer> aff = nodes.get(0).affinity(null); assertEquals(expNodes, nodes.size()); + boolean hasNearCache = clientCache.getConfiguration(CacheConfiguration.class).getNearConfiguration() != null; + + final Ignite nearCacheNode = hasNearCache ? clientCache.unwrap(Ignite.class) : null; + boolean wait = GridTestUtils.waitForCondition(new PA() { @Override public boolean apply() { try { @@ -562,7 +568,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac for (Ignite node : nodes) { IgniteCache<Integer, Integer> cache = node.cache(null); - if (aff.isPrimaryOrBackup(node.cluster().localNode(), key)) + if (aff.isPrimaryOrBackup(node.cluster().localNode(), key) || node == nearCacheNode) assertEquals("Unexpected value for " + node.name(), e.getValue(), cache.localPeek(key)); else assertNull("Unexpected non-null value for " + node.name(), cache.localPeek(key)); @@ -744,16 +750,18 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac } }); - updateBarrier.await(15_000, TimeUnit.MILLISECONDS); - - CyclicBarrier barrier0 = updateBarrier; - - if (barrier0 != null) { + try { + updateBarrier.await(30_000, TimeUnit.MILLISECONDS); + } + catch (TimeoutException e) { log.info("Failed to wait for update."); U.dumpThreads(log); - barrier0.reset(); + CyclicBarrier barrier0 = updateBarrier; + + if (barrier0 != null) + barrier0.reset(); fail("Failed to wait for update."); } @@ -770,16 +778,20 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac } }); - updateBarrier.await(15_000, TimeUnit.MILLISECONDS); - - barrier0 = updateBarrier; + updateBarrier.await(30_000, TimeUnit.MILLISECONDS); - if (barrier0 != null) { + try { + updateBarrier.await(30_000, TimeUnit.MILLISECONDS); + } + catch (TimeoutException e) { log.info("Failed to wait for update."); U.dumpThreads(log); - barrier0.reset(); + CyclicBarrier barrier0 = updateBarrier; + + if (barrier0 != null) + barrier0.reset(); fail("Failed to wait for update."); }