# ignite-23
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/00eadd62 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/00eadd62 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/00eadd62 Branch: refs/heads/ignite-929 Commit: 00eadd62ea451922e9fe3396085f5f003f36c32e Parents: e4e54ba Author: sboikov <sboi...@gridgain.com> Authored: Mon May 25 13:34:13 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon May 25 13:34:13 2015 +0300 ---------------------------------------------------------------------- .../dht/atomic/GridNearAtomicUpdateFuture.java | 49 +++++++++--- ...niteCacheClientNodeChangingTopologyTest.java | 78 ++++++++++++++++++++ 2 files changed, 115 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00eadd62/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 50c3d56..64a4882 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -143,6 +143,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** Skip store flag. */ private final boolean skipStore; + /** */ + private boolean fastMapRemap; + /** * @param cctx Cache context. * @param cache Cache instance. @@ -345,7 +348,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (res.remapKeys() != null) { assert !fastMap || cctx.kernalContext().clientNode(); - Collection<?> remapKeys = fastMap && cctx.kernalContext().clientNode() ? null : res.remapKeys(); + Collection<KeyCacheObject> remapKeys = fastMap ? null : res.remapKeys(); mapOnTopology(remapKeys, true, nodeId, true); @@ -456,8 +459,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> else { if (waitTopFut) { fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { - @Override - public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { mapOnTopology(keys, remap, oldNodeId, waitTopFut); } }); @@ -478,15 +480,29 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** * Checks if future is ready to be completed. */ - private synchronized void checkComplete() { - if ((syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) || mappings.isEmpty()) { - CachePartialUpdateCheckedException err0 = err; + private void checkComplete() { + boolean remap = false; - if (err0 != null) - onDone(err0); - else - onDone(opRes); + synchronized (this) { + if ((syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) || mappings.isEmpty()) { + CachePartialUpdateCheckedException err0 = err; + + if (err0 != null) + onDone(err0); + else { + if (fastMapRemap) { + assert cctx.kernalContext().clientNode(); + + remap = true; + } + else + onDone(opRes); + } + } } + + if (remap) + mapOnTopology(null, true, null, true); } /** @@ -500,7 +516,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> @Nullable Collection<?> remapKeys, boolean remap, @Nullable UUID oldNodeId) { - assert oldNodeId == null || remap; + assert oldNodeId == null || remap || fastMapRemap; Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); @@ -652,9 +668,16 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> // Must do this in synchronized block because we need to atomically remove and add mapping. // Otherwise checkComplete() may see empty intermediate state. synchronized (this) { - if (remap) + if (oldNodeId != null) removeMapping(oldNodeId); + // For fastMap mode wait for all responses before remapping. + if (remap && fastMap && !mappings.isEmpty()) { + fastMapRemap = true; + + return; + } + // Create mappings first, then send messages. for (Object key : keys) { if (key == null) { @@ -772,6 +795,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> i++; } } + + fastMapRemap = false; } if ((single == null || single) && pendingMappings.size() == 1) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00eadd62/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java index 4603aaf..5a5a648 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java @@ -264,6 +264,84 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac /** * @throws Exception If failed. */ + public void testAtomicGetAndPutClockMode() throws Exception { + atomicGetAndPut(CLOCK); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicGetAndPutPrimaryMode() throws Exception { + atomicGetAndPut(PRIMARY); + } + + /** + * @param writeOrder Write order. + * @throws Exception If failed. + */ + private void atomicGetAndPut(CacheAtomicWriteOrderMode writeOrder) throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(writeOrder); + ccfg.setRebalanceMode(SYNC); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + + client = true; + + ignite0.cache(null).put(0, 0); + + Ignite ignite2 = startGrid(2); + + assertTrue(ignite2.configuration().isClientMode()); + + final Map<Integer, Integer> map = new HashMap<>(); + + map.put(0, 1); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi(); + + // Block messages requests for both nodes. + spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite1.localNode().id()); + + final IgniteCache<Integer, Integer> cache = ignite2.cache(null); + + assertEquals(writeOrder, cache.getConfiguration(CacheConfiguration.class).getAtomicWriteOrderMode()); + + IgniteInternalFuture<Integer> putFut = GridTestUtils.runAsync(new Callable<Integer>() { + @Override public Integer call() throws Exception { + Thread.currentThread().setName("put-thread"); + + return cache.getAndPut(0, 1); + } + }); + + assertFalse(putFut.isDone()); + + client = false; + + startGrid(3); + + log.info("Stop block."); + + spi.stopBlock(); + + Integer old = putFut.get(); + + checkData(map, cache, 4); + + assertEquals((Object)0, old); + } + + /** + * @throws Exception If failed. + */ public void testTxPutAll() throws Exception { ccfg = new CacheConfiguration();