# ignite-23 remap for atomic updates from client
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/942d7120 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/942d7120 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/942d7120 Branch: refs/heads/ignite-709_2 Commit: 942d71207043ba63fc1cc84b1e2e69f4a306a2dd Parents: 804e0d9 Author: sboikov <sboi...@gridgain.com> Authored: Wed May 20 11:58:06 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu May 21 09:29:38 2015 +0300 ---------------------------------------------------------------------- .../dht/atomic/GridDhtAtomicCache.java | 32 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 14 +- ...niteCacheClientNodeChangingTopologyTest.java | 599 +++++++++++++++++++ .../testsuites/IgniteCacheTestSuite2.java | 8 +- 4 files changed, 629 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/942d7120/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 19d88e0..a1c2a1f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1023,6 +1023,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { IgniteCacheExpiryPolicy expiry = null; + boolean clientReq = false; + try { // If batch store update is enabled, we need to lock all entries. // First, need to acquire locks on cache entries, then check filter. @@ -1042,17 +1044,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { return; } - // Do not check topology version for CLOCK versioning since - // partition exchange will wait for near update future. - if (topology().topologyVersion().equals(req.topologyVersion()) || - ctx.config().getAtomicWriteOrderMode() == CLOCK) { - ClusterNode node = ctx.discovery().node(nodeId); + ClusterNode node = ctx.discovery().node(nodeId); - if (node == null) { - U.warn(log, "Node originated update request left grid: " + nodeId); + if (node == null) { + U.warn(log, "Node originated update request left grid: " + nodeId); - return; - } + return; + } + + clientReq = CU.clientNode(node); + + // Do not check topology version for CLOCK versioning since + // partition exchange will wait for near update future. + if ((req.fastMap() && !clientReq) || topology().topologyVersion().equals(req.topologyVersion())) { boolean hasNear = ctx.discovery().cacheNearNode(node, name()); @@ -1103,7 +1107,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { retVal = updRes.invokeResults(); } else { - UpdateSingleResult<K, V> updRes = updateSingle(node, + UpdateSingleResult updRes = updateSingle(node, hasNear, req, res, @@ -1155,7 +1159,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } } catch (GridDhtInvalidPartitionException ignore) { - assert ctx.config().getAtomicWriteOrderMode() == PRIMARY; + assert !req.fastMap() || clientReq; if (log.isDebugEnabled()) log.debug("Caught invalid partition exception for cache entry (will remap update request): " + req); @@ -1603,7 +1607,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @return Return value. * @throws GridCacheEntryRemovedException Should be never thrown. */ - private UpdateSingleResult<K, V> updateSingle( + private UpdateSingleResult updateSingle( ClusterNode node, boolean hasNear, GridNearAtomicUpdateRequest req, @@ -1797,7 +1801,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } } - return new UpdateSingleResult<>(retVal, deleted, dhtFut); + return new UpdateSingleResult(retVal, deleted, dhtFut); } /** @@ -2570,7 +2574,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** * Result of {@link GridDhtAtomicCache#updateSingle} execution. */ - private static class UpdateSingleResult<K, V> { + private static class UpdateSingleResult { /** */ private final GridCacheReturn retVal; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/942d7120/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index d138936..476487e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -289,7 +289,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> * @param waitTopFut Whether to wait for topology future. */ public void map(boolean waitTopFut) { - mapOnTopology(keys, false, null, waitTopFut); + mapOnTopology(null, false, null, waitTopFut); } /** {@inheritDoc} */ @@ -323,7 +323,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> */ public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) { if (res.remapKeys() != null) { - assert cctx.config().getAtomicWriteOrderMode() == PRIMARY; + assert cctx.config().getAtomicWriteOrderMode() == PRIMARY || cctx.kernalContext().clientNode(); mapOnTopology(res.remapKeys(), true, nodeId, true); @@ -474,13 +474,14 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> } /** - * @param keys Keys to map. + * @param topVer Topology version. + * @param remapKeys Keys to remap or {@code null} to map all keys. * @param remap Flag indicating if this is partial remap for this future. * @param oldNodeId Old node ID if was remap. */ private void map0( AffinityTopologyVersion topVer, - Collection<?> keys, + @Nullable Collection<?> remapKeys, boolean remap, @Nullable UUID oldNodeId) { assert oldNodeId == null || remap; @@ -503,6 +504,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> log.debug("Assigned fast-map version for update on near node: " + updVer); if (keys.size() == 1 && !fastMap && (single == null || single)) { + assert remapKeys == null || remapKeys.size() == 1 : remapKeys; + Object key = F.first(keys); Object val; @@ -682,6 +685,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); + if (remapKeys != null && !remapKeys.contains(cacheKey)) + continue; + if (op != TRANSFORM) val = cctx.toCacheObject(val); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/942d7120/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java index bb2e458..f964d39 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java @@ -17,11 +17,41 @@ package org.apache.ignite.internal.processors.cache.distributed; +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.plugin.extensions.communication.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.communication.tcp.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.junits.common.*; +import org.apache.ignite.transactions.*; + +import javax.cache.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*; +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.cache.CacheRebalanceMode.*; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; +import static org.apache.ignite.transactions.TransactionConcurrency.*; +import static org.apache.ignite.transactions.TransactionIsolation.*; /** * @@ -30,12 +60,581 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac /** */ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + /** */ + private CacheConfiguration ccfg; + + /** */ + private boolean client; + + /** */ + private volatile CyclicBarrier updateBarrier; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + cfg.setClientMode(client); + + cfg.setCommunicationSpi(new TestCommunicationSpi()); + + cfg.setCacheConfiguration(ccfg); + return cfg; } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicPutAllClockMode() throws Exception { + atomicPutAll(CLOCK); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicPutAllPrimaryMode() throws Exception { + atomicPutAll(PRIMARY); + } + + /** + * @param writeOrder Write order. + * @throws Exception If failed. + */ + private void atomicPutAll(CacheAtomicWriteOrderMode writeOrder) throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(writeOrder); + ccfg.setRebalanceMode(SYNC); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + + client = true; + + Ignite ignite2 = startGrid(2); + + assertTrue(ignite2.configuration().isClientMode()); + + final Map<Integer, Integer> map = new HashMap<>(); + + for (int i = 0; i < 100; i++) + map.put(i, i); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi(); + + // Block messages requests for both nodes. + spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite1.localNode().id()); + + final IgniteCache<Integer, Integer> cache = ignite2.cache(null); + + assertEquals(writeOrder, cache.getConfiguration(CacheConfiguration.class).getAtomicWriteOrderMode()); + + IgniteInternalFuture<?> putFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + cache.putAll(map); + + return null; + } + }); + + assertFalse(putFut.isDone()); + + client = false; + + IgniteEx ignite3 = startGrid(3); + + log.info("Stop block1."); + + spi.stopBlock(); + + putFut.get(); + + checkData(map, 4); + + ignite3.close(); + + map.clear(); + + for (int i = 0; i < 100; i++) + map.put(i, i + 1); + + // Block messages requests for single node. + spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id()); + + putFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + cache.putAll(map); + + return null; + } + }); + + assertFalse(putFut.isDone()); + + client = false; + + startGrid(3); + + log.info("Stop block2."); + + spi.stopBlock(); + + putFut.get(); + + checkData(map, 4); + + for (int i = 0; i < 100; i++) + map.put(i, i + 2); + + cache.putAll(map); + + checkData(map, 4); + } + + /** + * @throws Exception If failed. + */ + public void testTxPutAll() throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + + client = true; + + Ignite ignite2 = startGrid(2); + + assertTrue(ignite2.configuration().isClientMode()); + + final Map<Integer, Integer> map = new HashMap<>(); + + for (int i = 0; i < 1; i++) + map.put(i, i); + + TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi(); + + spi.blockMessages(GridNearTxPrepareRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearTxPrepareRequest.class, ignite1.localNode().id()); + + final IgniteCache<Integer, Integer> cache = ignite2.cache(null); + + IgniteInternalFuture<?> putFut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + cache.putAll(map); + + return null; + } + }); + + assertFalse(putFut.isDone()); + + client = false; + + IgniteEx ignite3 = startGrid(3); + + log.info("Stop block1."); + + spi.stopBlock(); + + putFut.get(); + + checkData(map, 4); + } + + /** + * @throws Exception If failed. + */ + public void testLockRemoveAfterClientFailed() throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + + IgniteEx ignite0 = startGrid(0); + IgniteEx ignite1 = startGrid(1); + + client = true; + + Ignite ignite2 = startGrid(2); + + assertTrue(ignite2.configuration().isClientMode()); + + IgniteCache<Integer, Integer> cache2 = ignite2.cache(null); + + Lock lock2 = cache2.lock(0); + + lock2.lock(); + + ignite2.close(); + + IgniteCache<Integer, Integer> cache1 = ignite1.cache(null); + + Lock lock1 = cache1.lock(0); + + assertTrue(lock1.tryLock(5000, TimeUnit.MILLISECONDS)); + + lock1.unlock(); + + ignite2 = startGrid(2); + + assertTrue(ignite2.configuration().isClientMode()); + + cache2 = ignite2.cache(null); + + lock2 = cache2.lock(0); + + assertTrue(lock2.tryLock(5000, TimeUnit.MILLISECONDS)); + + lock2.unlock(); + } + + /** + * @param map Expected data. + * @param expNodes Expected nodes number. + * @throws Exception If failed. + */ + private void checkData(final Map<Integer, Integer> map, final int expNodes) throws Exception { + final List<Ignite> nodes = G.allGrids(); + + final Affinity<Integer> aff = nodes.get(0).affinity(null); + + assertEquals(expNodes, nodes.size()); + + boolean wait = GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + try { + for (Map.Entry<Integer, Integer> e : map.entrySet()) { + Integer key = e.getKey(); + + for (Ignite node : nodes) { + IgniteCache<Integer, Integer> cache = node.cache(null); + + if (aff.isPrimaryOrBackup(node.cluster().localNode(), key)) + assertEquals("Unexpected value for " + node.name(), e.getValue(), cache.localPeek(key)); + else + assertNull("Unexpected non-null value for " + node.name(), cache.localPeek(key)); + } + } + } + catch (AssertionError e) { + log.info("Check failed, will retry: " + e); + + return false; + } + + return true; + } + }, 10_000); + + assertTrue("Data check failed.", wait); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicPrimaryPutAllMultinode() throws Exception { + putAllMultinode(PRIMARY, false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicClockPutAllMultinode() throws Exception { + putAllMultinode(CLOCK ,false); + } + + /** + * @throws Exception If failed. + */ + public void testOptimisticTxPutAllMultinode() throws Exception { + putAllMultinode(null, false); + } + + /** + * @throws Exception If failed. + */ + public void testPessimisticTxPutAllMultinode() throws Exception { + putAllMultinode(null, true); + } + + /** + * @param atomicWriteOrder Write order if test atomic cache. + * @param pessimisticTx {@code True} if use pessimistic tx. + * @throws Exception If failed. + */ + private void putAllMultinode(final CacheAtomicWriteOrderMode atomicWriteOrder, final boolean pessimisticTx) + throws Exception { + ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(PARTITIONED); + ccfg.setBackups(1); + ccfg.setAtomicityMode(atomicWriteOrder != null ? ATOMIC : TRANSACTIONAL); + ccfg.setAtomicWriteOrderMode(atomicWriteOrder); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setRebalanceMode(SYNC); + + final int SRV_CNT = 4; + + for (int i = 0; i < SRV_CNT; i++) + startGrid(i); + + final int CLIENT_CNT = 4; + + final List<Ignite> clients = new ArrayList<>(); + + client = true; + + for (int i = 0; i < CLIENT_CNT; i++) { + Ignite ignite = startGrid(SRV_CNT + i); + + assertTrue(ignite.configuration().isClientMode()); + + clients.add(ignite); + } + + client = false; + + List<IgniteInternalFuture<?>> futs = new ArrayList<>(); + + final AtomicBoolean stop = new AtomicBoolean(); + + final AtomicInteger threadIdx = new AtomicInteger(0); + + final int THREADS = CLIENT_CNT * 3; + + try { + GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + int clientIdx = threadIdx.getAndIncrement() % CLIENT_CNT; + + Ignite ignite = clients.get(clientIdx); + + assertTrue(ignite.configuration().isClientMode()); + + Thread.currentThread().setName("update-thread-" + ignite.name()); + + IgniteCache<Integer, Integer> cache = ignite.cache(null); + + boolean useTx = atomicWriteOrder == null; + + if (useTx) { + assertEquals(TRANSACTIONAL, + cache.getConfiguration(CacheConfiguration.class).getAtomicityMode()); + } + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + int cntr = 0; + + while (!stop.get()) { + TreeMap<Integer, Integer> map = new TreeMap<>(); + + for (int i = 0; i < 100; i++) + map.put(rnd.nextInt(0, 1000), i); + + try { + if (useTx) { + IgniteTransactions txs = ignite.transactions(); + + TransactionConcurrency concurrency = pessimisticTx ? PESSIMISTIC : OPTIMISTIC; + + try (Transaction tx = txs.txStart(concurrency, REPEATABLE_READ)) { + cache.putAll(map); + + tx.commit(); + } + } + else + cache.putAll(map); + } + catch (CacheException | IgniteException e) { + log.info("Update failed, ignore: " + e); + } + + if (++cntr % 100 == 0) + log.info("Iteration: " + cntr); + + if (updateBarrier != null) + updateBarrier.await(); + } + + return null; + } + }, THREADS, "update-thread"); + + for (final Ignite ignite : clients) { + + futs.add(GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Thread.currentThread().setName("update-" + ignite.name()); + + log.info("Start updates from node: " + ignite.name()); + + + return null; + } + })); + } + + long stopTime = System.currentTimeMillis() + 2 * 60_000; + + while (System.currentTimeMillis() < stopTime) { + int idx = ThreadLocalRandom.current().nextInt(0, SRV_CNT); + + log.info("Stop node: " + idx); + + stopGrid(idx); + + updateBarrier = new CyclicBarrier(THREADS + 1, new Runnable() { + @Override public void run() { + updateBarrier = null; + } + }); + + updateBarrier.await(15_000, TimeUnit.MILLISECONDS); + + CyclicBarrier barrier0 = updateBarrier; + + if (barrier0 != null) { + log.info("Failed to wait for update."); + + U.dumpThreads(log); + + barrier0.reset(); + + fail("Failed to wait for update."); + } + + U.sleep(500); + + log.info("Start node: " + idx); + + startGrid(idx); + + updateBarrier = new CyclicBarrier(THREADS + 1, new Runnable() { + @Override public void run() { + updateBarrier = null; + } + }); + + updateBarrier.await(15_000, TimeUnit.MILLISECONDS); + + barrier0 = updateBarrier; + + if (barrier0 != null) { + log.info("Failed to wait for update."); + + U.dumpThreads(log); + + barrier0.reset(); + + fail("Failed to wait for update."); + } + + U.sleep(500); + } + } + finally { + stop.set(true); + } + + for (IgniteInternalFuture<?> fut : futs) + fut.get(); + } + + /** + * + */ + private static class TestCommunicationSpi extends TcpCommunicationSpi { + /** */ + @LoggerResource + private IgniteLogger log; + + /** */ + private List<T2<ClusterNode, GridIoMessage>> blockedMsgs = new ArrayList<>(); + + /** */ + private Map<Class<?>, Set<UUID>> blockCls = new HashMap<>(); + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + if (msg instanceof GridIoMessage) { + Object msg0 = ((GridIoMessage)msg).message(); + + synchronized (this) { + Set<UUID> blockNodes = blockCls.get(msg0.getClass()); + + if (F.contains(blockNodes, node.id())) { + log.info("Block message [node=" + node.attribute(IgniteNodeAttributes.ATTR_GRID_NAME) + + ", msg=" + msg0 + ']'); + + blockedMsgs.add(new T2<>(node, (GridIoMessage)msg)); + + return; + } + } + } + + super.sendMessage(node, msg); + } + + /** + * @param cls Message class. + * @param nodeId Node ID. + */ + void blockMessages(Class<?> cls, UUID nodeId) { + synchronized (this) { + Set<UUID> set = blockCls.get(cls); + + if (set == null) { + set = new HashSet<>(); + + blockCls.put(cls, set); + } + + set.add(nodeId); + } + } + + /** + * + */ + void stopBlock() { + synchronized (this) { + blockCls.clear(); + + for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) { + ClusterNode node = msg.get1(); + + log.info("Send blocked message: [node=" + node.attribute(IgniteNodeAttributes.ATTR_GRID_NAME) + + ", msg=" + msg.get2().message() + ']'); + + super.sendMessage(msg.get1(), msg.get2()); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/942d7120/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java index 80c1f4e..4664c66 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java @@ -130,16 +130,12 @@ public class IgniteCacheTestSuite2 extends TestSuite { suite.addTest(new TestSuite(GridCacheOffheapUpdateSelfTest.class)); - // TODO: GG-7242, GG-7243: Enabled when fixed. -// suite.addTest(new TestSuite(GridCacheDhtRemoveFailureTest.class)); -// suite.addTest(new TestSuite(GridCacheNearRemoveFailureTest.class)); - // TODO: GG-7201: Enable when fixed. - //suite.addTest(new TestSuite(GridCacheDhtAtomicRemoveFailureTest.class)); - suite.addTest(new TestSuite(GridCacheNearPrimarySyncSelfTest.class)); suite.addTest(new TestSuite(GridCacheColocatedPrimarySyncSelfTest.class)); + suite.addTest(new TestSuite(IgniteCachePartitionMapUpdateTest.class)); suite.addTest(new TestSuite(IgniteCacheClientNodePartitionsExchangeTest.class)); + suite.addTest(new TestSuite(IgniteCacheClientNodeChangingTopologyTest.class)); return suite; }