# ignite-23
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/32cb360f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/32cb360f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/32cb360f Branch: refs/heads/ignite-23 Commit: 32cb360f736cdc84af8d461ad4c49cc2806cf996 Parents: 6432ec0 Author: sboikov <sboi...@gridgain.com> Authored: Tue May 26 15:49:12 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue May 26 17:46:53 2015 +0300 ---------------------------------------------------------------------- .../dht/atomic/GridDhtAtomicCache.java | 2 +- .../GridDhtPartitionsExchangeFuture.java | 2 - ...niteCacheClientNodeChangingTopologyTest.java | 431 ++++++++++++++++--- 3 files changed, 380 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32cb360f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 9135f0a..69f5501 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1062,7 +1062,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (ver == null) { // Assign next version for update inside entries lock. - ver = ctx.versions().next(req.topologyVersion()); + ver = ctx.versions().next(topology().topologyVersion()); if (hasNear) res.nearVersion(ver); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32cb360f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index e04432f..1c1ebd5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -514,8 +514,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (exchId.isLeft()) cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion()); - cctx.mvcc().finishLocks(exchId.topologyVersion()).get(); - onDone(exchId.topologyVersion()); skipPreload = true; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/32cb360f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java index b067797..45fa275 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java @@ -211,7 +211,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac putFut.get(); - checkData(map, cache, 4); + checkData(map, null, cache, 4); ignite3.close(); @@ -248,7 +248,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac putFut.get(); - checkData(map, cache, 4); + checkData(map, null, cache, 4); for (int i = 0; i < KEYS; i++) map.put(i, i + 2); @@ -258,7 +258,106 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac else cache.put(0, 2); - checkData(map, cache, 4); + checkData(map, null, cache, 4); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicNoRemapClockMode() throws Exception { + atomicNoRemap(CLOCK); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicNoRemapPrimaryMode() throws Exception { + atomicNoRemap(PRIMARY); + } + + /** + * @param writeOrder Write order. + * @throws Exception If failed. + */ + private void atomicNoRemap(CacheAtomicWriteOrderMode writeOrder) throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(writeOrder); + ccfg.setRebalanceMode(SYNC); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + IgniteEx ignite2 = startGrid(2); + + client = true; + + Ignite ignite3 = startGrid(3); + + assertTrue(ignite3.configuration().isClientMode()); + + final Map<Integer, Integer> map = new HashMap<>(); + + map.put(primaryKey(ignite0.cache(null)), 0); + map.put(primaryKey(ignite1.cache(null)), 1); + map.put(primaryKey(ignite2.cache(null)), 2); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi(); + + // Block messages requests for both nodes. + spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite1.localNode().id()); + spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite2.localNode().id()); + + spi.record(GridNearAtomicUpdateRequest.class); + + final IgniteCache<Integer, Integer> cache = ignite3.cache(null); + + assertEquals(writeOrder, cache.getConfiguration(CacheConfiguration.class).getAtomicWriteOrderMode()); + + IgniteInternalFuture<?> putFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Thread.currentThread().setName("put-thread"); + + cache.putAll(map); + + return null; + } + }); + + IgniteEx ignite4 = startGrid(4); + + assertTrue(ignite4.configuration().isClientMode()); + + assertFalse(putFut.isDone()); + + log.info("Stop block."); + + spi.stopBlock(); + + putFut.get(); + + spi.record(null); + + checkData(map, null, cache, 5); + + List<Object> msgs = spi.recordedMessages(); + + assertEquals(3, msgs.size()); + + for (Object msg : msgs) + assertTrue(((GridNearAtomicUpdateRequest)msg).clientRequest()); + + map.put(primaryKey(ignite0.cache(null)), 3); + map.put(primaryKey(ignite1.cache(null)), 4); + map.put(primaryKey(ignite2.cache(null)), 5); + + cache.putAll(map); + + checkData(map, null, cache, 5); } /** @@ -334,7 +433,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac Integer old = putFut.get(); - checkData(map, cache, 4); + checkData(map, null, cache, 4); assertEquals((Object)0, old); } @@ -394,7 +493,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac putFut.get(); - checkData(map, cache, 4); + checkData(map, null, cache, 4); map.clear(); @@ -403,7 +502,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac cache.putAll(map); - checkData(map, cache, 4); + checkData(map, null, cache, 4); } /** * @throws Exception If failed. @@ -452,6 +551,8 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac spi.blockMessages(GridNearLockRequest.class, ignite0.localNode().id()); spi.blockMessages(GridNearLockRequest.class, ignite1.localNode().id()); + spi.record(GridNearLockRequest.class); + final IgniteCache<Integer, Integer> cache = ignite2.cache(null); IgniteInternalFuture<?> putFut = GridTestUtils.runAsync(new Callable<Object>() { @@ -480,7 +581,17 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac putFut.get(); - checkData(map, cache, 4); + spi.record(null); + + checkData(map, null, cache, 4); + + List<Object> msgs = spi.recordedMessages(); + + assertTrue(((GridNearLockRequest)msgs.get(0)).firstClientRequest()); + assertTrue(((GridNearLockRequest)msgs.get(1)).firstClientRequest()); + + for (int i = 2; i < msgs.size(); i++) + assertFalse(((GridNearLockRequest)msgs.get(i)).firstClientRequest()); ignite3.close(); @@ -513,7 +624,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac putFut.get(); - checkData(map, cache, 4); + checkData(map, null, cache, 4); for (int i = 0; i < 100; i++) map.put(i, i + 2); @@ -524,7 +635,192 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac tx.commit(); } - checkData(map, cache, 4); + checkData(map, null, cache, 4); + } + + /** + * Tests specific scenario when mapping for first locked keys does not change, but changes for second one. + * + * @throws Exception If failed. + */ + public void testPessimisticTx2() throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + IgniteEx ignite2 = startGrid(2); + + client = true; + + final Ignite ignite3 = startGrid(3); + + assertTrue(ignite3.configuration().isClientMode()); + + AffinityTopologyVersion topVer1 = new AffinityTopologyVersion(4, 0); + + assertEquals(topVer1, ignite0.context().cache().internalCache(null).context().topology().topologyVersion()); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi(); + + final Integer key1 = 0; + final Integer key2 = 7; + + spi.blockMessages(GridNearLockRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearLockRequest.class, ignite1.localNode().id()); + spi.blockMessages(GridNearLockRequest.class, ignite2.localNode().id()); + + final IgniteCache<Integer, Integer> cache = ignite3.cache(null); + + IgniteInternalFuture<?> putFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Thread.currentThread().setName("put-thread"); + + try (Transaction tx = ignite3.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key1, 1); + cache.put(key2, 2); + + tx.commit(); + } + + return null; + } + }); + + client = false; + + IgniteEx ignite4 = startGrid(4); + + AffinityTopologyVersion topVer2 = new AffinityTopologyVersion(5, 0); + + assertEquals(topVer2, ignite0.context().cache().internalCache(null).context().topology().topologyVersion()); + + GridCacheAffinityManager aff = ignite0.context().cache().internalCache(null).context().affinity(); + + List<ClusterNode> nodes1 = aff.nodes(key1, topVer1); + List<ClusterNode> nodes2 = aff.nodes(key1, topVer2); + + assertEquals(nodes1, nodes2); + + nodes1 = aff.nodes(key2, topVer1); + nodes2 = aff.nodes(key2, topVer2); + + assertFalse(nodes1.get(0).equals(nodes2.get(0))); + + assertFalse(putFut.isDone()); + + log.info("Stop block."); + + spi.stopBlock(); + + putFut.get(); + + checkData(F.asMap(key1, 1, key2, 2), null, cache, 5); + } + + /** + * @throws Exception If failed. + */ + public void testPessimisticTxNearEnabledNoRemap() throws Exception { + pessimisticTxNoRemap(new NearCacheConfiguration()); + } + + /** + * @throws Exception If failed. + */ + public void testPessimisticTxNoRemap() throws Exception { + pessimisticTxNoRemap(null); + } + + /** + * @param nearCfg Near cache configuration. + * @throws Exception If failed. + */ + private void pessimisticTxNoRemap(@Nullable NearCacheConfiguration nearCfg) throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + ccfg.setNearConfiguration(nearCfg); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + IgniteEx ignite2 = startGrid(2); + + client = true; + + final Ignite ignite3 = startGrid(3); + + assertTrue(ignite3.configuration().isClientMode()); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi(); + + final Map<Integer, Integer> map = new HashMap<>(); + + for (int i = 0; i < 100; i++) + map.put(i, i); + + spi.blockMessages(GridNearLockRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearLockRequest.class, ignite1.localNode().id()); + spi.blockMessages(GridNearLockRequest.class, ignite2.localNode().id()); + + spi.record(GridNearLockRequest.class); + + final IgniteCache<Integer, Integer> cache = ignite3.cache(null); + + IgniteInternalFuture<?> putFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Thread.currentThread().setName("put-thread"); + + try (Transaction tx = ignite3.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + for (Map.Entry<Integer, Integer> e : map.entrySet()) + cache.put(e.getKey(), e.getValue()); + + tx.commit(); + } + + return null; + } + }); + + IgniteEx ignite4 = startGrid(4); + + assertTrue(ignite4.configuration().isClientMode()); + + assertFalse(putFut.isDone()); + + log.info("Stop block."); + + spi.stopBlock(); + + putFut.get(); + + spi.record(null); + + checkData(map, null, cache, 5); + + List<Object> msgs = spi.recordedMessages(); + + checkClientLockMessages(msgs, map.size()); + + for (int i = 0; i < 100; i++) + map.put(i, i + 1); + + try (Transaction tx = ignite3.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.putAll(map); + + tx.commit(); + } + + checkData(map, null, cache, 5); } /** @@ -775,7 +1071,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac checkClientPrepareMessages(spi.recordedMessages(), 6); - checkData(map, cache, 4); + checkData(map, null, cache, 4); cache.putAll(map); @@ -783,7 +1079,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac spi.record(null); - checkData(map, cache, 4); + checkData(map, null, cache, 4); IgniteCache<Integer, Integer> cache0 = ignite0.cache(null); @@ -802,7 +1098,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac for (Object msg : msgs) assertFalse(((GridNearTxPrepareRequest)msg).firstClientRequest()); - checkData(map, cache, 4); + checkData(map, null, cache, 4); } /** @@ -944,11 +1240,15 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac /** * @param map Expected data. + * @param keys Expected keys (if expected data is not specified). * @param clientCache Client cache. * @param expNodes Expected nodes number. * @throws Exception If failed. */ - private void checkData(final Map<Integer, Integer> map, IgniteCache<?, ?> clientCache, final int expNodes) + private void checkData(final Map<Integer, Integer> map, + final Set<Integer> keys, + IgniteCache<?, ?> clientCache, + final int expNodes) throws Exception { final List<Ignite> nodes = G.allGrids(); @@ -964,18 +1264,26 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac boolean wait = GridTestUtils.waitForCondition(new PA() { @Override public boolean apply() { try { - for (Map.Entry<Integer, Integer> e : map.entrySet()) { - Integer key = e.getKey(); + Set<Integer> keys0 = map != null ? map.keySet() : keys; + assertNotNull(keys0); + + for (Integer key : keys0) { GridCacheVersion ver = null; + Object val = null; for (Ignite node : nodes) { IgniteCache<Integer, Integer> cache = node.cache(null); boolean affNode = aff.isPrimaryOrBackup(node.cluster().localNode(), key); + Object val0 = cache.localPeek(key); + if (affNode || node == nearCacheNode) { - assertEquals("Unexpected value for " + node.name(), e.getValue(), cache.localPeek(key)); + if (map != null) + assertEquals("Unexpected value for " + node.name(), map.get(key), val0); + else + assertNotNull("Unexpected value for " + node.name(), val0); GridCacheAdapter cache0 = ((IgniteKernal)node).internalCache(null); @@ -991,13 +1299,28 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac assertNotNull("Null version [node=" + node.name() + ", key=" + key + ']', ver0); - if (ver == null) + if (ver == null) { ver = ver0; - else - assertEquals(ver0, ver); + val = val0; + } + else { + assertEquals("Version check failed [node=" + node.name() + + ", key=" + key + + ", affNode=" + affNode + + ", primary=" + aff.isPrimary(node.cluster().localNode(), key) + ']', + ver0, + ver); + + assertEquals("Value check failed [node=" + node.name() + + ", key=" + key + + ", affNode=" + affNode + + ", primary=" + aff.isPrimary(node.cluster().localNode(), key) + ']', + val0, + val); + } } else - assertNull("Unexpected non-null value for " + node.name(), cache.localPeek(key)); + assertNull("Unexpected non-null value for " + node.name(), val0); } } } @@ -1080,10 +1403,6 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac clients.add(ignite); } - client = false; - - List<IgniteInternalFuture<?>> futs = new ArrayList<>(); - final AtomicBoolean stop = new AtomicBoolean(); final AtomicInteger threadIdx = new AtomicInteger(0); @@ -1092,8 +1411,10 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac final ConcurrentHashSet<Integer> putKeys = new ConcurrentHashSet<>(); + IgniteInternalFuture<?> fut; + try { - GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { @Override public Object call() throws Exception { int clientIdx = threadIdx.getAndIncrement() % CLIENT_CNT; @@ -1122,7 +1443,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac for (int i = 0; i < 100; i++) { Integer key = rnd.nextInt(0, 1000); - map.put(key, key); + map.put(key, rnd.nextInt()); } try { @@ -1157,28 +1478,31 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac } }, THREADS, "update-thread"); - for (final Ignite ignite : clients) { + long stopTime = System.currentTimeMillis() + 60_000; - futs.add(GridTestUtils.runAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - Thread.currentThread().setName("update-" + ignite.name()); + while (System.currentTimeMillis() < stopTime) { + boolean restartClient = ThreadLocalRandom.current().nextBoolean(); - log.info("Start updates from node: " + ignite.name()); + Integer idx = null; + if (restartClient) { + log.info("Start client node."); - return null; - } - })); - } + client = true; - long stopTime = System.currentTimeMillis() + 60_000; + IgniteEx ignite = startGrid(SRV_CNT + CLIENT_CNT); - while (System.currentTimeMillis() < stopTime) { - int idx = ThreadLocalRandom.current().nextInt(0, SRV_CNT); + IgniteCache<Integer, Integer> cache = ignite.cache(null); - log.info("Stop node: " + idx); + assertNotNull(cache); + } + else { + idx = ThreadLocalRandom.current().nextInt(0, SRV_CNT); + + log.info("Stop server node: " + idx); - stopGrid(idx); + stopGrid(idx); + } updateBarrier = new CyclicBarrier(THREADS + 1, new Runnable() { @Override public void run() { @@ -1190,7 +1514,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac updateBarrier.await(30_000, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { - log.info("Failed to wait for update."); + log.error("Failed to wait for update."); U.dumpThreads(log); @@ -1204,9 +1528,18 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac U.sleep(500); - log.info("Start node: " + idx); + if (restartClient) { + log.info("Stop client node."); - startGrid(idx); + stopGrid(SRV_CNT + CLIENT_CNT); + } + else { + log.info("Start server node: " + idx); + + client = false; + + startGrid(idx); + } updateBarrier = new CyclicBarrier(THREADS + 1, new Runnable() { @Override public void run() { @@ -1218,7 +1551,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac updateBarrier.await(30_000, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { - log.info("Failed to wait for update."); + log.error("Failed to wait for update."); U.dumpThreads(log); @@ -1237,15 +1570,9 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac stop.set(true); } - for (IgniteInternalFuture<?> fut : futs) - fut.get(); - - Map<Integer, Integer> map = new HashMap<>(); - - for (Integer key : putKeys) - map.put(key, key); + fut.get(30_000); - checkData(map, grid(SRV_CNT).cache(null), SRV_CNT + CLIENT_CNT); + checkData(null, putKeys, grid(SRV_CNT).cache(null), SRV_CNT + CLIENT_CNT); } /**