http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesAbstractSelfTest.java index 85256b4..f996568 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesAbstractSelfTest.java @@ -21,10 +21,11 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.affinity.rendezvous.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.spi.discovery.tcp.*; +import java.io.*; import java.util.concurrent.atomic.*; import static org.apache.ignite.cache.CacheMode.*; @@ -50,20 +51,24 @@ public abstract class GridCacheClientModesAbstractSelfTest extends GridCacheAbst super.beforeTestsStarted(); - if (!clientOnly()) - grid(nearOnlyGridName).createNearCache(null, new NearCacheConfiguration()); + if (nearEnabled()) + grid(nearOnlyGridName).createNearCache(null, nearConfiguration()); } /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - if (gridCnt.getAndIncrement() == 0) { + int cnt = gridCnt.incrementAndGet(); + + if ((cnt == gridCount() && isClientStartedLast()) || (cnt == 1 && !isClientStartedLast())) { cfg.setClientMode(true); nearOnlyGridName = gridName; } + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true); + return cfg; } @@ -75,17 +80,14 @@ public abstract class GridCacheClientModesAbstractSelfTest extends GridCacheAbst cfg.setCacheStoreFactory(null); cfg.setReadThrough(false); cfg.setWriteThrough(false); - cfg.setAffinity(new RendezvousAffinityFunction(false, 32)); cfg.setBackups(1); - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - super.afterTestsStopped(); + if (cfg.getCacheMode() == REPLICATED) + cfg.setAffinity(null); + else + cfg.setAffinity(new RendezvousAffinityFunction(false, 32)); - gridCnt.set(0); + return cfg; } /** {@inheritDoc} */ @@ -94,9 +96,11 @@ public abstract class GridCacheClientModesAbstractSelfTest extends GridCacheAbst } /** - * @return If {@code true} then uses CLIENT_ONLY mode, otherwise NEAR_ONLY. + * @return boolean {@code True} if client's grid must be started last, {@code false} if it must be started first. */ - protected abstract boolean clientOnly(); + protected boolean isClientStartedLast() { + return false; + } /** * @throws Exception If failed. @@ -110,7 +114,7 @@ public abstract class GridCacheClientModesAbstractSelfTest extends GridCacheAbst nearOnly.putAll(F.asMap(5, 5, 6, 6, 7, 7, 8, 8, 9, 9)); for (int key = 0; key < 10; key++) { - for (int i = 1; i < gridCount(); i++) { + for (int i = 0; i < gridCount(); i++) { if (grid(i).affinity(null).isPrimaryOrBackup(grid(i).localNode(), key)) assertEquals(key, grid(i).cache(null).localPeek(key, CachePeekMode.ONHEAP)); } @@ -120,6 +124,24 @@ public abstract class GridCacheClientModesAbstractSelfTest extends GridCacheAbst assertNull(nearOnly.localPeek(key, CachePeekMode.PRIMARY, CachePeekMode.BACKUP)); } + + Integer key = 1000; + + nearOnly.put(key, new TestClass1(key)); + + if (nearEnabled()) + assertNotNull(nearOnly.localPeek(key, CachePeekMode.ALL)); + else + assertNull(nearOnly.localPeek(key, CachePeekMode.ALL)); + + for (int i = 0; i < gridCount(); i++) { + if (grid(i).affinity(null).isPrimaryOrBackup(grid(i).localNode(), key)) { + TestClass1 val = (TestClass1)grid(i).cache(null).localPeek(key, CachePeekMode.ONHEAP); + + assertNotNull(val); + assertEquals(key.intValue(), val.val); + } + } } /** @@ -147,6 +169,18 @@ public abstract class GridCacheClientModesAbstractSelfTest extends GridCacheAbst if (nearEnabled()) assertEquals(key, nearOnly.localPeek(key, CachePeekMode.ONHEAP)); } + + Integer key = 2000; + + dht.put(key, new TestClass2(key)); + + TestClass2 val = (TestClass2)nearOnly.get(key); + + assertNotNull(val); + assertEquals(key.intValue(), val.val); + + if (nearEnabled()) + assertNotNull(nearOnly.localPeek(key, CachePeekMode.ONHEAP)); } /** @@ -210,4 +244,34 @@ public abstract class GridCacheClientModesAbstractSelfTest extends GridCacheAbst return null; } + + /** + * + */ + static class TestClass1 implements Serializable { + /** */ + int val; + + /** + * @param val Value. + */ + public TestClass1(int val) { + this.val = val; + } + } + + /** + * + */ + static class TestClass2 implements Serializable { + /** */ + int val; + + /** + * @param val Value. + */ + public TestClass2(int val) { + this.val = val; + } + } }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesTcpClientDiscoveryAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesTcpClientDiscoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesTcpClientDiscoveryAbstractTest.java new file mode 100644 index 0000000..bbc9144 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheClientModesTcpClientDiscoveryAbstractTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.tcp.*; + +import static org.apache.ignite.cache.CacheMode.*; + +/** + * Tests {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi} in client mode. + */ +@SuppressWarnings("RedundantMethodOverride") +public abstract class GridCacheClientModesTcpClientDiscoveryAbstractTest extends GridCacheClientModesAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected boolean isClientStartedLast() { + return true; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(false); + + return cfg; + } + + /** */ + public static class CaseNearReplicatedAtomic extends GridCacheClientModesTcpClientDiscoveryAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return REPLICATED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } + } + + /** */ + public static class CaseNearReplicatedTransactional extends GridCacheClientModesTcpClientDiscoveryAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return REPLICATED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.TRANSACTIONAL; + } + } + + /** */ + public static class CaseNearPartitionedAtomic extends GridCacheClientModesTcpClientDiscoveryAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } + } + + /** */ + public static class CaseNearPartitionedTransactional extends GridCacheClientModesTcpClientDiscoveryAbstractTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.TRANSACTIONAL; + } + } + + /** */ + public static class CaseClientReplicatedAtomic extends GridCacheClientModesTcpClientDiscoveryAbstractTest { + /** {@inheritDoc} */ + @Override protected NearCacheConfiguration nearConfiguration() { + return null; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return REPLICATED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } + } + + /** */ + public static class CaseClientReplicatedTransactional extends GridCacheClientModesTcpClientDiscoveryAbstractTest { + /** {@inheritDoc} */ + @Override protected NearCacheConfiguration nearConfiguration() { + return null; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return REPLICATED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.TRANSACTIONAL; + } + } + + /** */ + public static class CaseClientPartitionedAtomic extends GridCacheClientModesTcpClientDiscoveryAbstractTest { + /** {@inheritDoc} */ + @Override protected NearCacheConfiguration nearConfiguration() { + return null; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } + } + + /** */ + public static class CaseClientPartitionedTransactional extends GridCacheClientModesTcpClientDiscoveryAbstractTest { + /** {@inheritDoc} */ + @Override protected NearCacheConfiguration nearConfiguration() { + return null; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.TRANSACTIONAL; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMixedModeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMixedModeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMixedModeSelfTest.java index e19442f..a3c977f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMixedModeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheMixedModeSelfTest.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.testframework.junits.common.*; /** @@ -31,6 +32,8 @@ public class GridCacheMixedModeSelfTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true); + cfg.setCacheConfiguration(cacheConfiguration(gridName)); if (F.eq(gridName, getTestGridName(0))) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java new file mode 100644 index 0000000..6782ff4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java @@ -0,0 +1,1803 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.internal.processors.cache.version.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.plugin.extensions.communication.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; +import org.apache.ignite.transactions.*; +import org.eclipse.jetty.util.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*; +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheRebalanceMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; +import static org.apache.ignite.transactions.TransactionConcurrency.*; +import static org.apache.ignite.transactions.TransactionIsolation.*; + +/** + * + */ +public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstractTest { + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private CacheConfiguration ccfg; + + /** */ + private boolean client; + + /** */ + private volatile CyclicBarrier updateBarrier; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder).setForceServerMode(true); + + cfg.setClientMode(client); + + cfg.setCommunicationSpi(new TestCommunicationSpi()); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicPutAllClockMode() throws Exception { + atomicPut(CLOCK, true, null); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicPutAllPrimaryMode() throws Exception { + atomicPut(PRIMARY, true, null); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicPutAllNearEnabledClockMode() throws Exception { + atomicPut(CLOCK, true, new NearCacheConfiguration()); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicPutAllNearEnabledPrimaryMode() throws Exception { + atomicPut(PRIMARY, true, new NearCacheConfiguration()); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicPutClockMode() throws Exception { + atomicPut(CLOCK, false, null); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicPutPrimaryMode() throws Exception { + atomicPut(PRIMARY, false, null); + } + + /** + * @param writeOrder Write order. + * @param putAll If {@code true} executes putAll. + * @param nearCfg Near cache configuration. + * @throws Exception If failed. + */ + private void atomicPut(CacheAtomicWriteOrderMode writeOrder, + final boolean putAll, + @Nullable NearCacheConfiguration nearCfg) throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(writeOrder); + ccfg.setRebalanceMode(SYNC); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + + ccfg.setNearConfiguration(nearCfg); + + client = true; + + ccfg.setNearConfiguration(null); + + Ignite ignite2 = startGrid(2); + + assertTrue(ignite2.configuration().isClientMode()); + + final Map<Integer, Integer> map = new HashMap<>(); + + final int KEYS = putAll ? 100 : 1; + + for (int i = 0; i < KEYS; i++) + map.put(i, i); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi(); + + // Block messages requests for both nodes. + spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite1.localNode().id()); + + final IgniteCache<Integer, Integer> cache = ignite2.cache(null); + + assertEquals(writeOrder, cache.getConfiguration(CacheConfiguration.class).getAtomicWriteOrderMode()); + + IgniteInternalFuture<?> putFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Thread.currentThread().setName("put-thread"); + + if (putAll) + cache.putAll(map); + else + cache.put(0, 0); + + return null; + } + }); + + assertFalse(putFut.isDone()); + + client = false; + + IgniteEx ignite3 = startGrid(3); + + log.info("Stop block1."); + + spi.stopBlock(); + + putFut.get(); + + checkData(map, null, cache, 4); + + ignite3.close(); + + map.clear(); + + for (int i = 0; i < KEYS; i++) + map.put(i, i + 1); + + // Block messages requests for single node. + spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id()); + + putFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Thread.currentThread().setName("put-thread"); + + if (putAll) + cache.putAll(map); + else + cache.put(0, 1); + + return null; + } + }); + + assertFalse(putFut.isDone()); + + client = false; + + startGrid(3); + + log.info("Stop block2."); + + spi.stopBlock(); + + putFut.get(); + + checkData(map, null, cache, 4); + + for (int i = 0; i < KEYS; i++) + map.put(i, i + 2); + + if (putAll) + cache.putAll(map); + else + cache.put(0, 2); + + checkData(map, null, cache, 4); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicNoRemapClockMode() throws Exception { + atomicNoRemap(CLOCK); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicNoRemapPrimaryMode() throws Exception { + atomicNoRemap(PRIMARY); + } + + /** + * @param writeOrder Write order. + * @throws Exception If failed. + */ + private void atomicNoRemap(CacheAtomicWriteOrderMode writeOrder) throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(writeOrder); + ccfg.setRebalanceMode(SYNC); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + IgniteEx ignite2 = startGrid(2); + + client = true; + + Ignite ignite3 = startGrid(3); + + assertTrue(ignite3.configuration().isClientMode()); + + final Map<Integer, Integer> map = new HashMap<>(); + + map.put(primaryKey(ignite0.cache(null)), 0); + map.put(primaryKey(ignite1.cache(null)), 1); + map.put(primaryKey(ignite2.cache(null)), 2); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi(); + + // Block messages requests for both nodes. + spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite1.localNode().id()); + spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite2.localNode().id()); + + spi.record(GridNearAtomicUpdateRequest.class); + + final IgniteCache<Integer, Integer> cache = ignite3.cache(null); + + assertEquals(writeOrder, cache.getConfiguration(CacheConfiguration.class).getAtomicWriteOrderMode()); + + IgniteInternalFuture<?> putFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Thread.currentThread().setName("put-thread"); + + cache.putAll(map); + + return null; + } + }); + + IgniteEx ignite4 = startGrid(4); + + assertTrue(ignite4.configuration().isClientMode()); + + assertFalse(putFut.isDone()); + + log.info("Stop block."); + + spi.stopBlock(); + + putFut.get(); + + spi.record(null); + + checkData(map, null, cache, 5); + + List<Object> msgs = spi.recordedMessages(); + + assertEquals(3, msgs.size()); + + for (Object msg : msgs) + assertTrue(((GridNearAtomicUpdateRequest)msg).clientRequest()); + + map.put(primaryKey(ignite0.cache(null)), 3); + map.put(primaryKey(ignite1.cache(null)), 4); + map.put(primaryKey(ignite2.cache(null)), 5); + + cache.putAll(map); + + checkData(map, null, cache, 5); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicGetAndPutClockMode() throws Exception { + atomicGetAndPut(CLOCK); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicGetAndPutPrimaryMode() throws Exception { + atomicGetAndPut(PRIMARY); + } + + /** + * @param writeOrder Write order. + * @throws Exception If failed. + */ + private void atomicGetAndPut(CacheAtomicWriteOrderMode writeOrder) throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(writeOrder); + ccfg.setRebalanceMode(SYNC); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + + client = true; + + ignite0.cache(null).put(0, 0); + + Ignite ignite2 = startGrid(2); + + assertTrue(ignite2.configuration().isClientMode()); + + final Map<Integer, Integer> map = new HashMap<>(); + + map.put(0, 1); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi(); + + // Block messages requests for both nodes. + spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite1.localNode().id()); + + final IgniteCache<Integer, Integer> cache = ignite2.cache(null); + + assertEquals(writeOrder, cache.getConfiguration(CacheConfiguration.class).getAtomicWriteOrderMode()); + + IgniteInternalFuture<Integer> putFut = GridTestUtils.runAsync(new Callable<Integer>() { + @Override public Integer call() throws Exception { + Thread.currentThread().setName("put-thread"); + + return cache.getAndPut(0, 1); + } + }); + + assertFalse(putFut.isDone()); + + client = false; + + startGrid(3); + + log.info("Stop block."); + + spi.stopBlock(); + + Integer old = putFut.get(); + + checkData(map, null, cache, 4); + + assertEquals((Object)0, old); + } + + /** + * @throws Exception If failed. + */ + public void testTxPutAll() throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + + client = true; + + Ignite ignite2 = startGrid(2); + + assertTrue(ignite2.configuration().isClientMode()); + + final Map<Integer, Integer> map = new HashMap<>(); + + for (int i = 0; i < 100; i++) + map.put(i, i); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi(); + + spi.blockMessages(GridNearTxPrepareRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearTxPrepareRequest.class, ignite1.localNode().id()); + + final IgniteCache<Integer, Integer> cache = ignite2.cache(null); + + IgniteInternalFuture<?> putFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Thread.currentThread().setName("put-thread"); + + cache.putAll(map); + + return null; + } + }); + + assertFalse(putFut.isDone()); + + client = false; + + IgniteEx ignite3 = startGrid(3); + + log.info("Stop block."); + + spi.stopBlock(); + + putFut.get(); + + checkData(map, null, cache, 4); + + map.clear(); + + for (int i = 0; i < 100; i++) + map.put(i, i + 1); + + cache.putAll(map); + + checkData(map, null, cache, 4); + } + /** + * @throws Exception If failed. + */ + public void testPessimisticTx() throws Exception { + pessimisticTx(null); + } + + /** + * @throws Exception If failed. + */ + public void testPessimisticTxNearEnabled() throws Exception { + pessimisticTx(new NearCacheConfiguration()); + } + + /** + * @param nearCfg Near cache configuration. + * @throws Exception If failed. + */ + private void pessimisticTx(NearCacheConfiguration nearCfg) throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + ccfg.setNearConfiguration(nearCfg); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + + client = true; + + final Ignite ignite2 = startGrid(2); + + assertTrue(ignite2.configuration().isClientMode()); + + final Map<Integer, Integer> map = new HashMap<>(); + + for (int i = 0; i < 100; i++) + map.put(i, i); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi(); + + spi.blockMessages(GridNearLockRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearLockRequest.class, ignite1.localNode().id()); + + spi.record(GridNearLockRequest.class); + + final IgniteCache<Integer, Integer> cache = ignite2.cache(null); + + IgniteInternalFuture<?> putFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Thread.currentThread().setName("put-thread"); + + try (Transaction tx = ignite2.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.putAll(map); + + tx.commit(); + } + + return null; + } + }); + + assertFalse(putFut.isDone()); + + client = false; + + IgniteEx ignite3 = startGrid(3); + + log.info("Stop block1."); + + spi.stopBlock(); + + putFut.get(); + + spi.record(null); + + checkData(map, null, cache, 4); + + List<Object> msgs = spi.recordedMessages(); + + assertTrue(((GridNearLockRequest)msgs.get(0)).firstClientRequest()); + assertTrue(((GridNearLockRequest)msgs.get(1)).firstClientRequest()); + + for (int i = 2; i < msgs.size(); i++) + assertFalse(((GridNearLockRequest)msgs.get(i)).firstClientRequest()); + + ignite3.close(); + + for (int i = 0; i < 100; i++) + map.put(i, i + 1); + + spi.blockMessages(GridNearLockRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearLockRequest.class, ignite1.localNode().id()); + + putFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Thread.currentThread().setName("put-thread"); + + try (Transaction tx = ignite2.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + for (Map.Entry<Integer, Integer> e : map.entrySet()) + cache.put(e.getKey(), e.getValue()); + + tx.commit(); + } + + return null; + } + }); + + ignite3 = startGrid(3); + + log.info("Stop block2."); + + spi.stopBlock(); + + putFut.get(); + + checkData(map, null, cache, 4); + + for (int i = 0; i < 100; i++) + map.put(i, i + 2); + + try (Transaction tx = ignite2.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.putAll(map); + + tx.commit(); + } + + checkData(map, null, cache, 4); + } + + /** + * Tests specific scenario when mapping for first locked keys does not change, but changes for second one. + * + * @throws Exception If failed. + */ + public void testPessimisticTx2() throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + IgniteEx ignite2 = startGrid(2); + + client = true; + + final Ignite ignite3 = startGrid(3); + + assertTrue(ignite3.configuration().isClientMode()); + + AffinityTopologyVersion topVer1 = new AffinityTopologyVersion(4, 0); + + assertEquals(topVer1, ignite0.context().cache().internalCache(null).context().topology().topologyVersion()); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi(); + + final Integer key1 = 0; + final Integer key2 = 7; + + spi.blockMessages(GridNearLockRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearLockRequest.class, ignite1.localNode().id()); + spi.blockMessages(GridNearLockRequest.class, ignite2.localNode().id()); + + final IgniteCache<Integer, Integer> cache = ignite3.cache(null); + + IgniteInternalFuture<?> putFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Thread.currentThread().setName("put-thread"); + + try (Transaction tx = ignite3.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key1, 1); + cache.put(key2, 2); + + tx.commit(); + } + + return null; + } + }); + + client = false; + + IgniteEx ignite4 = startGrid(4); + + AffinityTopologyVersion topVer2 = new AffinityTopologyVersion(5, 0); + + assertEquals(topVer2, ignite0.context().cache().internalCache(null).context().topology().topologyVersion()); + + GridCacheAffinityManager aff = ignite0.context().cache().internalCache(null).context().affinity(); + + List<ClusterNode> nodes1 = aff.nodes(key1, topVer1); + List<ClusterNode> nodes2 = aff.nodes(key1, topVer2); + + assertEquals(nodes1, nodes2); + + nodes1 = aff.nodes(key2, topVer1); + nodes2 = aff.nodes(key2, topVer2); + + assertFalse(nodes1.get(0).equals(nodes2.get(0))); + + assertFalse(putFut.isDone()); + + log.info("Stop block."); + + spi.stopBlock(); + + putFut.get(); + + checkData(F.asMap(key1, 1, key2, 2), null, cache, 5); + } + + /** + * @throws Exception If failed. + */ + public void testPessimisticTxNearEnabledNoRemap() throws Exception { + pessimisticTxNoRemap(new NearCacheConfiguration()); + } + + /** + * @throws Exception If failed. + */ + public void testPessimisticTxNoRemap() throws Exception { + pessimisticTxNoRemap(null); + } + + /** + * @param nearCfg Near cache configuration. + * @throws Exception If failed. + */ + private void pessimisticTxNoRemap(@Nullable NearCacheConfiguration nearCfg) throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + ccfg.setNearConfiguration(nearCfg); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + IgniteEx ignite2 = startGrid(2); + + client = true; + + final Ignite ignite3 = startGrid(3); + + assertTrue(ignite3.configuration().isClientMode()); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi(); + + final Map<Integer, Integer> map = new HashMap<>(); + + for (int i = 0; i < 100; i++) + map.put(i, i); + + spi.blockMessages(GridNearLockRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearLockRequest.class, ignite1.localNode().id()); + spi.blockMessages(GridNearLockRequest.class, ignite2.localNode().id()); + + spi.record(GridNearLockRequest.class); + + final IgniteCache<Integer, Integer> cache = ignite3.cache(null); + + IgniteInternalFuture<?> putFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Thread.currentThread().setName("put-thread"); + + try (Transaction tx = ignite3.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + for (Map.Entry<Integer, Integer> e : map.entrySet()) + cache.put(e.getKey(), e.getValue()); + + tx.commit(); + } + + return null; + } + }); + + IgniteEx ignite4 = startGrid(4); + + assertTrue(ignite4.configuration().isClientMode()); + + assertFalse(putFut.isDone()); + + log.info("Stop block."); + + spi.stopBlock(); + + putFut.get(); + + spi.record(null); + + checkData(map, null, cache, 5); + + List<Object> msgs = spi.recordedMessages(); + + checkClientLockMessages(msgs, map.size()); + + for (int i = 0; i < 100; i++) + map.put(i, i + 1); + + try (Transaction tx = ignite3.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.putAll(map); + + tx.commit(); + } + + checkData(map, null, cache, 5); + } + + /** + * @throws Exception If failed. + */ + public void testLock() throws Exception { + lock(null); + } + + /** + * @throws Exception If failed. + */ + public void testLockNearEnabled() throws Exception { + lock(new NearCacheConfiguration()); + } + + /** + * @param nearCfg Near cache configuration. + * @throws Exception If failed. + */ + private void lock(NearCacheConfiguration nearCfg) throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + ccfg.setNearConfiguration(nearCfg); + + final IgniteEx ignite0 = startGrid(0); + final IgniteEx ignite1 = startGrid(1); + + client = true; + + final Ignite ignite2 = startGrid(2); + + assertTrue(ignite2.configuration().isClientMode()); + + final List<Integer> keys = new ArrayList<>(); + + for (int i = 0; i < 100; i++) + keys.add(i); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi(); + + spi.blockMessages(GridNearLockRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearLockRequest.class, ignite1.localNode().id()); + + final IgniteCache<Integer, Integer> cache = ignite2.cache(null); + + final CountDownLatch lockedLatch = new CountDownLatch(1); + + final CountDownLatch unlockLatch = new CountDownLatch(1); + + IgniteInternalFuture<Lock> lockFut = GridTestUtils.runAsync(new Callable<Lock>() { + @Override public Lock call() throws Exception { + Thread.currentThread().setName("put-thread"); + + Lock lock = cache.lockAll(keys); + + lock.lock(); + + log.info("Locked"); + + lockedLatch.countDown(); + + unlockLatch.await(); + + lock.unlock(); + + return lock; + } + }); + + client = false; + + IgniteEx ignite3 = startGrid(3); + + log.info("Stop block."); + + assertEquals(1, lockedLatch.getCount()); + + spi.stopBlock(); + + assertTrue(lockedLatch.await(3000, TimeUnit.MILLISECONDS)); + + IgniteCache<Integer, Integer> cache0 = ignite0.cache(null); + + for (Integer key : keys) { + Lock lock = cache0.lock(key); + + assertFalse(lock.tryLock()); + } + + unlockLatch.countDown(); + + lockFut.get(); + + boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return unlocked(ignite0) && unlocked(ignite1); + } + + private boolean unlocked(Ignite ignite) { + IgniteCache<Integer, Integer> cache = ignite.cache(null); + + for (Integer key : keys) { + if (cache.isLocalLocked(key, false)) { + log.info("Key is locked [key=" + key + ", node=" + ignite.name() + ']'); + + return false; + } + } + + return true; + } + }, 10_000); + + assertTrue(wait); + + for (Integer key : keys) { + Lock lock = cache0.lock(key); + + assertTrue("Failed to lock: " + key, lock.tryLock()); + + lock.unlock(); + } + } + + /** + * @throws Exception If failed. + */ + public void testPessimisticTxMessageClientFirstFlag() throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + IgniteEx ignite2 = startGrid(2); + + client = true; + + Ignite ignite3 = startGrid(3); + + assertTrue(ignite3.configuration().isClientMode()); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi(); + + spi.record(GridNearLockRequest.class); + + IgniteCache<Integer, Integer> cache = ignite3.cache(null); + + try (Transaction tx = ignite3.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(1, 1); + cache.put(2, 2); + cache.put(3, 3); + + tx.commit(); + } + + checkClientLockMessages(spi.recordedMessages(), 3); + + Map<Integer, Integer> map = new HashMap<>(); + + map.put(4, 4); + map.put(5, 5); + map.put(6, 6); + map.put(7, 7); + + try (Transaction tx = ignite3.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.putAll(map); + + tx.commit(); + } + + checkClientLockMessages(spi.recordedMessages(), 4); + + spi.record(null); + + TestCommunicationSpi spi0 = (TestCommunicationSpi)ignite0.configuration().getCommunicationSpi(); + + spi0.record(GridNearLockRequest.class); + + List<Integer> keys = primaryKeys(ignite1.cache(null), 3, 0); + + IgniteCache<Integer, Integer> cache0 = ignite0.cache(null); + + try (Transaction tx = ignite0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache0.put(keys.get(0), 0); + cache0.put(keys.get(1), 1); + cache0.put(keys.get(2), 2); + + tx.commit(); + } + + List<Object> msgs = spi0.recordedMessages(); + + assertEquals(3, msgs.size()); + + for (Object msg : msgs) + assertFalse(((GridNearLockRequest)msg).firstClientRequest()); + } + + /** + * @param msgs Messages. + * @param expCnt Expected number of messages. + */ + private void checkClientLockMessages(List<Object> msgs, int expCnt) { + assertEquals(expCnt, msgs.size()); + + assertTrue(((GridNearLockRequest)msgs.get(0)).firstClientRequest()); + + for (int i = 1; i < msgs.size(); i++) + assertFalse(((GridNearLockRequest)msgs.get(i)).firstClientRequest()); + } + + /** + * @throws Exception If failed. + */ + public void testOptimisticTxMessageClientFirstFlag() throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + IgniteEx ignite2 = startGrid(2); + + client = true; + + Ignite ignite3 = startGrid(3); + + assertTrue(ignite3.configuration().isClientMode()); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi(); + + IgniteCache<Integer, Integer> cache = ignite3.cache(null); + + List<Integer> keys0 = primaryKeys(ignite0.cache(null), 2, 0); + List<Integer> keys1 = primaryKeys(ignite1.cache(null), 2, 0); + List<Integer> keys2 = primaryKeys(ignite2.cache(null), 2, 0); + + LinkedHashMap<Integer, Integer> map = new LinkedHashMap<>(); + + map.put(keys0.get(0), 1); + map.put(keys1.get(0), 2); + map.put(keys2.get(0), 3); + map.put(keys0.get(1), 4); + map.put(keys1.get(1), 5); + map.put(keys2.get(1), 6); + + spi.record(GridNearTxPrepareRequest.class); + + try (Transaction tx = ignite3.transactions().txStart(OPTIMISTIC, REPEATABLE_READ)) { + for (Map.Entry<Integer, Integer> e : map.entrySet()) + cache.put(e.getKey(), e.getValue()); + + tx.commit(); + } + + checkClientPrepareMessages(spi.recordedMessages(), 6); + + checkData(map, null, cache, 4); + + cache.putAll(map); + + checkClientPrepareMessages(spi.recordedMessages(), 6); + + spi.record(null); + + checkData(map, null, cache, 4); + + IgniteCache<Integer, Integer> cache0 = ignite0.cache(null); + + TestCommunicationSpi spi0 = (TestCommunicationSpi)ignite0.configuration().getCommunicationSpi(); + + spi0.record(GridNearTxPrepareRequest.class); + + cache0.putAll(map); + + spi0.record(null); + + List<Object> msgs = spi0.recordedMessages(); + + assertEquals(4, msgs.size()); + + for (Object msg : msgs) + assertFalse(((GridNearTxPrepareRequest)msg).firstClientRequest()); + + checkData(map, null, cache, 4); + } + + /** + * @param msgs Messages. + * @param expCnt Expected number of messages. + */ + private void checkClientPrepareMessages(List<Object> msgs, int expCnt) { + assertEquals(expCnt, msgs.size()); + + assertTrue(((GridNearTxPrepareRequest)msgs.get(0)).firstClientRequest()); + + for (int i = 1; i < msgs.size(); i++) + assertFalse(((GridNearTxPrepareRequest) msgs.get(i)).firstClientRequest()); + } + + /** + * @throws Exception If failed. + */ + public void testLockRemoveAfterClientFailed() throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + + client = true; + + Ignite ignite2 = startGrid(2); + + assertTrue(ignite2.configuration().isClientMode()); + + IgniteCache<Integer, Integer> cache2 = ignite2.cache(null); + + final Integer key = 0; + + Lock lock2 = cache2.lock(key); + + lock2.lock(); + + ignite2.close(); + + IgniteCache<Integer, Integer> cache0 = ignite0.cache(null); + + assertFalse(cache0.isLocalLocked(key, false)); + + IgniteCache<Integer, Integer> cache1 = ignite1.cache(null); + + assertFalse(cache1.isLocalLocked(key, false)); + + Lock lock1 = cache1.lock(0); + + assertTrue(lock1.tryLock(5000, TimeUnit.MILLISECONDS)); + + lock1.unlock(); + + ignite2 = startGrid(2); + + assertTrue(ignite2.configuration().isClientMode()); + + cache2 = ignite2.cache(null); + + lock2 = cache2.lock(0); + + assertTrue(lock2.tryLock(5000, TimeUnit.MILLISECONDS)); + + lock2.unlock(); + } + + /** + * @throws Exception If failed. + */ + public void testLockFromClientBlocksExchange() throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + + startGrid(0); + startGrid(1); + + client = true; + + Ignite ignite2 = startGrid(2); + + IgniteCache<Integer, Integer> cache = ignite2.cache(null); + + Lock lock = cache.lock(0); + + lock.lock(); + + IgniteInternalFuture<?> startFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + client = false; + + startGrid(3); + + return null; + } + }); + + U.sleep(2000); + + assertFalse(startFut.isDone()); + + AffinityTopologyVersion ver = new AffinityTopologyVersion(4); + + List<IgniteInternalFuture<?>> futs = new ArrayList<>(); + + U.sleep(2000); + + for (int i = 0; i < 3; i++) { + Ignite ignite = ignite(i); + + IgniteInternalFuture<?> fut = + ((IgniteKernal)ignite).context().cache().context().exchange().affinityReadyFuture(ver); + + assertNotNull(fut); + + assertFalse(fut.isDone()); + + futs.add(fut); + } + + lock.unlock(); + + for (IgniteInternalFuture<?> fut : futs) + fut.get(10_000); + + startFut.get(10_000); + } + + /** + * @param map Expected data. + * @param keys Expected keys (if expected data is not specified). + * @param clientCache Client cache. + * @param expNodes Expected nodes number. + * @throws Exception If failed. + */ + private void checkData(final Map<Integer, Integer> map, + final Set<Integer> keys, + IgniteCache<?, ?> clientCache, + final int expNodes) + throws Exception + { + final List<Ignite> nodes = G.allGrids(); + + final Affinity<Integer> aff = nodes.get(0).affinity(null); + + assertEquals(expNodes, nodes.size()); + + boolean hasNearCache = clientCache.getConfiguration(CacheConfiguration.class).getNearConfiguration() != null; + + final Ignite nearCacheNode = hasNearCache ? clientCache.unwrap(Ignite.class) : null; + + boolean wait = GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + try { + Set<Integer> keys0 = map != null ? map.keySet() : keys; + + assertNotNull(keys0); + + for (Integer key : keys0) { + GridCacheVersion ver = null; + Object val = null; + + for (Ignite node : nodes) { + IgniteCache<Integer, Integer> cache = node.cache(null); + + boolean affNode = aff.isPrimaryOrBackup(node.cluster().localNode(), key); + + Object val0 = cache.localPeek(key); + + if (affNode || node == nearCacheNode) { + if (map != null) + assertEquals("Unexpected value for " + node.name(), map.get(key), val0); + else + assertNotNull("Unexpected value for " + node.name(), val0); + + GridCacheAdapter cache0 = ((IgniteKernal)node).internalCache(null); + + if (affNode && cache0.isNear()) + cache0 = ((GridNearCacheAdapter)cache0).dht(); + + GridCacheEntryEx entry = cache0.peekEx(key); + + assertNotNull("No entry [node=" + node.name() + ", key=" + key + ']', entry); + + GridCacheVersion ver0 = entry instanceof GridNearCacheEntry ? + ((GridNearCacheEntry)entry).dhtVersion() : entry.version(); + + assertNotNull("Null version [node=" + node.name() + ", key=" + key + ']', ver0); + + if (ver == null) { + ver = ver0; + val = val0; + } + else { + assertEquals("Version check failed [node=" + node.name() + + ", key=" + key + + ", affNode=" + affNode + + ", primary=" + aff.isPrimary(node.cluster().localNode(), key) + ']', + ver0, + ver); + + assertEquals("Value check failed [node=" + node.name() + + ", key=" + key + + ", affNode=" + affNode + + ", primary=" + aff.isPrimary(node.cluster().localNode(), key) + ']', + val0, + val); + } + } + else + assertNull("Unexpected non-null value for " + node.name(), val0); + } + } + } + catch (AssertionError e) { + log.info("Check failed, will retry: " + e); + + return false; + } + catch (Exception e) { + fail("Unexpected exception: " + e); + } + + return true; + } + }, 10_000); + + assertTrue("Data check failed.", wait); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicPrimaryPutAllMultinode() throws Exception { + multinode(PRIMARY, TestType.PUT_ALL); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicClockPutAllMultinode() throws Exception { + multinode(CLOCK, TestType.PUT_ALL); + } + + /** + * @throws Exception If failed. + */ + public void testOptimisticTxPutAllMultinode() throws Exception { + multinode(null, TestType.OPTIMISTIC_TX); + } + + /** + * @throws Exception If failed. + */ + public void testPessimisticTxPutAllMultinode() throws Exception { + multinode(null, TestType.PESSIMISTIC_TX); + } + + /** + * @throws Exception If failed. + */ + public void testLockAllMultinode() throws Exception { + multinode(null, TestType.LOCK); + } + + /** + * @param atomicWriteOrder Write order if test atomic cache. + * @param testType Test type. + * @throws Exception If failed. + */ + private void multinode(final CacheAtomicWriteOrderMode atomicWriteOrder, final TestType testType) + throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(atomicWriteOrder != null ? ATOMIC : TRANSACTIONAL); + ccfg.setAtomicWriteOrderMode(atomicWriteOrder); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + + final int SRV_CNT = 4; + + for (int i = 0; i < SRV_CNT; i++) + startGrid(i); + + final int CLIENT_CNT = 4; + + final List<Ignite> clients = new ArrayList<>(); + + client = true; + + for (int i = 0; i < CLIENT_CNT; i++) { + Ignite ignite = startGrid(SRV_CNT + i); + + assertTrue(ignite.configuration().isClientMode()); + + clients.add(ignite); + } + + final AtomicBoolean stop = new AtomicBoolean(); + + final AtomicInteger threadIdx = new AtomicInteger(0); + + final int THREADS = CLIENT_CNT * 3; + + final ConcurrentHashSet<Integer> putKeys = new ConcurrentHashSet<>(); + + IgniteInternalFuture<?> fut; + + try { + fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + int clientIdx = threadIdx.getAndIncrement() % CLIENT_CNT; + + Ignite ignite = clients.get(clientIdx); + + assertTrue(ignite.configuration().isClientMode()); + + Thread.currentThread().setName("update-thread-" + ignite.name()); + + IgniteCache<Integer, Integer> cache = ignite.cache(null); + + boolean useTx = testType == TestType.OPTIMISTIC_TX || testType == TestType.PESSIMISTIC_TX; + + if (useTx || testType == TestType.LOCK) { + assertEquals(TRANSACTIONAL, + cache.getConfiguration(CacheConfiguration.class).getAtomicityMode()); + } + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + int cntr = 0; + + while (!stop.get()) { + TreeMap<Integer, Integer> map = new TreeMap<>(); + + for (int i = 0; i < 100; i++) { + Integer key = rnd.nextInt(0, 1000); + + map.put(key, rnd.nextInt()); + } + + try { + if (testType == TestType.LOCK) { + Lock lock = cache.lockAll(map.keySet()); + + lock.lock(); + + lock.unlock(); + } + else { + if (useTx) { + IgniteTransactions txs = ignite.transactions(); + + TransactionConcurrency concurrency = + testType == TestType.PESSIMISTIC_TX ? PESSIMISTIC : OPTIMISTIC; + + try (Transaction tx = txs.txStart(concurrency, REPEATABLE_READ)) { + cache.putAll(map); + + tx.commit(); + } + } + else + cache.putAll(map); + + putKeys.addAll(map.keySet()); + } + } + catch (CacheException | IgniteException e) { + log.info("Operation failed, ignore: " + e); + } + + if (++cntr % 100 == 0) + log.info("Iteration: " + cntr); + + if (updateBarrier != null) + updateBarrier.await(); + } + + return null; + } + }, THREADS, "update-thread"); + + long stopTime = System.currentTimeMillis() + 60_000; + + while (System.currentTimeMillis() < stopTime) { + boolean restartClient = ThreadLocalRandom.current().nextBoolean(); + + Integer idx = null; + + if (restartClient) { + log.info("Start client node."); + + client = true; + + IgniteEx ignite = startGrid(SRV_CNT + CLIENT_CNT); + + IgniteCache<Integer, Integer> cache = ignite.cache(null); + + assertNotNull(cache); + } + else { + idx = ThreadLocalRandom.current().nextInt(0, SRV_CNT); + + log.info("Stop server node: " + idx); + + stopGrid(idx); + } + + updateBarrier = new CyclicBarrier(THREADS + 1, new Runnable() { + @Override public void run() { + updateBarrier = null; + } + }); + + try { + updateBarrier.await(30_000, TimeUnit.MILLISECONDS); + } + catch (TimeoutException e) { + log.error("Failed to wait for update."); + + for (Ignite ignite : G.allGrids()) + dumpCacheDebugInfo(ignite); + + U.dumpThreads(log); + + CyclicBarrier barrier0 = updateBarrier; + + if (barrier0 != null) + barrier0.reset(); + + fail("Failed to wait for update."); + } + + U.sleep(500); + + if (restartClient) { + log.info("Stop client node."); + + stopGrid(SRV_CNT + CLIENT_CNT); + } + else { + log.info("Start server node: " + idx); + + client = false; + + startGrid(idx); + } + + updateBarrier = new CyclicBarrier(THREADS + 1, new Runnable() { + @Override public void run() { + updateBarrier = null; + } + }); + + try { + updateBarrier.await(30_000, TimeUnit.MILLISECONDS); + } + catch (TimeoutException e) { + log.error("Failed to wait for update."); + + for (Ignite ignite : G.allGrids()) + dumpCacheDebugInfo(ignite); + + U.dumpThreads(log); + + CyclicBarrier barrier0 = updateBarrier; + + if (barrier0 != null) + barrier0.reset(); + + fail("Failed to wait for update."); + } + + U.sleep(500); + } + } + finally { + stop.set(true); + } + + fut.get(30_000); + + if (testType != TestType.LOCK) + checkData(null, putKeys, grid(SRV_CNT).cache(null), SRV_CNT + CLIENT_CNT); + } + + /** + * @throws Exception If failed. + */ + public void testServersLeaveOnStart() throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + + Ignite ignite0 = startGrid(0); + + client = true; + + final AtomicInteger nodeIdx = new AtomicInteger(2); + + final int CLIENTS = 10; + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + int idx = nodeIdx.getAndIncrement(); + + startGrid(idx); + + return null; + } + }, CLIENTS, "start-client"); + + ignite0.close(); + + fut.get(); + + for (int i = 0; i < CLIENTS; i++) { + Ignite ignite = grid(i + 2); + + assertEquals(CLIENTS, ignite.cluster().nodes().size()); + } + + client = false; + + startGrid(0); + startGrid(1); + + awaitPartitionMapExchange(); + + for (int i = 0; i < CLIENTS; i++) { + Ignite ignite = grid(i + 2); + + IgniteCache<Integer, Integer> cache = ignite.cache(null); + + cache.put(i, i); + + assertEquals((Object)i, cache.get(i)); + } + } + + /** + * + */ + private static class TestCommunicationSpi extends TcpCommunicationSpi { + /** */ + @LoggerResource + private IgniteLogger log; + + /** */ + private List<T2<ClusterNode, GridIoMessage>> blockedMsgs = new ArrayList<>(); + + /** */ + private Map<Class<?>, Set<UUID>> blockCls = new HashMap<>(); + + /** */ + private Class<?> recordCls; + + /** */ + private List<Object> recordedMsgs = new ArrayList<>(); + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + if (msg instanceof GridIoMessage) { + Object msg0 = ((GridIoMessage)msg).message(); + + synchronized (this) { + if (recordCls != null && msg0.getClass().equals(recordCls)) + recordedMsgs.add(msg0); + + Set<UUID> blockNodes = blockCls.get(msg0.getClass()); + + if (F.contains(blockNodes, node.id())) { + log.info("Block message [node=" + node.attribute(IgniteNodeAttributes.ATTR_GRID_NAME) + + ", msg=" + msg0 + ']'); + + blockedMsgs.add(new T2<>(node, (GridIoMessage)msg)); + + return; + } + } + } + + super.sendMessage(node, msg); + } + + /** + * @param recordCls Message class to record. + */ + void record(@Nullable Class<?> recordCls) { + synchronized (this) { + this.recordCls = recordCls; + } + } + + /** + * @return Recorded messages. + */ + List<Object> recordedMessages() { + synchronized (this) { + List<Object> msgs = recordedMsgs; + + recordedMsgs = new ArrayList<>(); + + return msgs; + } + } + + /** + * @param cls Message class. + * @param nodeId Node ID. + */ + void blockMessages(Class<?> cls, UUID nodeId) { + synchronized (this) { + Set<UUID> set = blockCls.get(cls); + + if (set == null) { + set = new HashSet<>(); + + blockCls.put(cls, set); + } + + set.add(nodeId); + } + } + + /** + * + */ + void stopBlock() { + synchronized (this) { + blockCls.clear(); + + for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) { + ClusterNode node = msg.get1(); + + log.info("Send blocked message: [node=" + node.attribute(IgniteNodeAttributes.ATTR_GRID_NAME) + + ", msg=" + msg.get2().message() + ']'); + + super.sendMessage(msg.get1(), msg.get2()); + } + } + } + } + + /** + * + */ + enum TestType { + /** */ + PUT_ALL, + + /** */ + OPTIMISTIC_TX, + + /** */ + PESSIMISTIC_TX, + + /** */ + LOCK + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeConcurrentStart.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeConcurrentStart.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeConcurrentStart.java new file mode 100644 index 0000000..bd74ece --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeConcurrentStart.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.CacheRebalanceMode.*; + +/** + * + */ +public class IgniteCacheClientNodeConcurrentStart extends GridCommonAbstractTest { + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODES_CNT = 5; + + /** */ + private Set<Integer> clientNodes; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + assertNotNull(clientNodes); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + boolean client = false; + + for (Integer clientIdx : clientNodes) { + if (getTestGridName(clientIdx).equals(gridName)) { + client = true; + + break; + } + } + + cfg.setClientMode(client); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setBackups(0); + ccfg.setRebalanceMode(SYNC); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentStart() throws Exception { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < 3; i++) { + try { + clientNodes = new HashSet<>(); + + while (clientNodes.size() < 2) + clientNodes.add(rnd.nextInt(0, NODES_CNT)); + + clientNodes.add(NODES_CNT - 1); + + log.info("Test iteration [iter=" + i + ", clients=" + clientNodes + ']'); + + startGridsMultiThreaded(NODES_CNT, true); + + for (int node : clientNodes) { + Ignite ignite = grid(node); + + assertTrue(ignite.configuration().isClientMode()); + } + } + finally { + stopAllGrids(); + } + } + } +}