ignite-389 Partition scan query fallback test
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/29dc7221 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/29dc7221 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/29dc7221 Branch: refs/heads/ignite-929 Commit: 29dc7221c12db1e39a17de4471a8c5ebed4b8709 Parents: 5d6bb53 Author: agura <ag...@gridgain.com> Authored: Fri May 29 16:28:34 2015 +0300 Committer: agura <ag...@gridgain.com> Committed: Fri May 29 16:28:34 2015 +0300 ---------------------------------------------------------------------- ...CacheScanPartitionQueryFallbackSelfTest.java | 335 ++++++++++++++----- 1 file changed, 259 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29dc7221/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java index 31336e6..dfa7296 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java @@ -21,8 +21,10 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.lang.*; @@ -32,15 +34,17 @@ import org.apache.ignite.spi.communication.tcp.*; import org.apache.ignite.testframework.junits.common.*; import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; /** * Tests partition scan query fallback. */ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractTest { /** Grid count. */ - private static final int GRID_CNT = 5; + private static final int GRID_CNT = 3; - /** Kys count. */ + /** Keys count. */ private static final int KEYS_CNT = 5000; /** Backups. */ @@ -49,20 +53,29 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT /** Cache mode. */ private CacheMode cacheMode; - /** Fallback. */ - private boolean fallback; + /** Client mode. */ + private volatile boolean clientMode; - /** Primary node id. */ - private static volatile UUID expNodeId; + /** Expected first node ID. */ + private static UUID expNodeId; - /** Fail node id. */ - private static volatile UUID failNodeId; + /** Expected fallback node ID. */ + private static UUID expFallbackNodeId; + + /** Communication SPI factory. */ + private CommunicationSpiFactory commSpiFactory; + + /** Latch. */ + private static CountDownLatch latch; + + /** Test entries. */ + private Map<Integer, Map<Integer, Integer>> entries = new HashMap<>(); /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - cfg.setCommunicationSpi(new TestCommunicationSpi()); + cfg.setCommunicationSpi(commSpiFactory.create()); CacheConfiguration ccfg = defaultCacheConfiguration(); ccfg.setCacheMode(cacheMode); @@ -72,142 +85,312 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT cfg.setCacheConfiguration(ccfg); + cfg.setClientMode(clientMode); + return cfg; } /** + * Scan should perform on the local node. + * * @throws Exception If failed. */ - public void testPrimary() throws Exception { + public void testScanLocal() throws Exception { cacheMode = CacheMode.PARTITIONED; backups = 0; - failNodeId = null; - fallback = false; + commSpiFactory = new TestLocalCommunicationSpiFactory(); - doTestScanPartition(); + try { + Ignite ignite = startGrids(GRID_CNT); + + IgniteCacheProxy<Integer, Integer> cache = fillCache(ignite); + + int part = anyLocalPartition(cache.context()); + + CacheQuery<Map.Entry<Integer, Integer>> qry = cache.context().queries().createScanQuery(null, part, false); + + doTestScanQuery(qry); + } + finally { + stopAllGrids(); + } } /** + * Scan should perform on the remote node. + * * @throws Exception If failed. */ - public void testFallbackToBackup() throws Exception { + public void testScanRemote() throws Exception { cacheMode = CacheMode.PARTITIONED; - backups = 1; - failNodeId = null; - fallback = true; + backups = 0; + commSpiFactory = new TestRemoteCommunicationSpiFactory(); - doTestScanPartition(); + try { + Ignite ignite = startGrids(GRID_CNT); + + IgniteCacheProxy<Integer, Integer> cache = fillCache(ignite); + + IgniteBiTuple<Integer, UUID> tup = remotePartition(cache.context()); + + int part = tup.get1(); + + expNodeId = tup.get2(); + + CacheQuery<Map.Entry<Integer, Integer>> qry = cache.context().queries().createScanQuery(null, part, false); + + doTestScanQuery(qry); + } + finally { + stopAllGrids(); + } } /** + * Scan should try first remote node and fallbacks to second remote node. + * * @throws Exception If failed. */ - protected void doTestScanPartition() throws Exception { - try { - Ignite ignite = startGrids(GRID_CNT); + public void testScanFallback() throws Exception { + cacheMode = CacheMode.PARTITIONED; + backups = 1; + commSpiFactory = new TestFallbackCommunicationSpiFactory(); - IgniteCacheProxy<Integer, Integer> cache = - (IgniteCacheProxy<Integer, Integer>)ignite.<Integer, Integer>cache(null); + final Set<Integer> candidates = new TreeSet<>(); - Map<Integer, Map<Integer, Integer>> entries = new HashMap<>(); + final AtomicBoolean test = new AtomicBoolean(false); - for (int i = 0; i < KEYS_CNT; i++) { - cache.put(i, i); + for(int j = 0; j < 2; j++) { + clientMode = true; - int part = cache.context().affinity().partition(i); + latch = new CountDownLatch(1); - Map<Integer, Integer> partEntries = entries.get(part); + try { + final Ignite ignite0 = startGrid(0); - if (partEntries == null) - entries.put(part, partEntries = new HashMap<>()); + clientMode = false; - partEntries.put(i, i); - } + final IgniteEx ignite1 = startGrid(1); + final IgniteEx ignite2 = startGrid(2); + startGrid(3); - IgniteBiTuple<Integer, UUID> tup = remotePartition(cache.context(), true); + if (test.get()) { + expNodeId = ignite1.localNode().id(); + expFallbackNodeId = ignite2.localNode().id(); + } - int part = tup.get1(); + final IgniteCacheProxy<Integer, Integer> cache = fillCache(ignite0); - if (fallback) - failNodeId = tup.get2(); - else - expNodeId = tup.get2(); + if (!test.get()) { + candidates.addAll(localPartitions(ignite1)); + candidates.retainAll(localPartitions(ignite2)); + } - if (fallback) - expNodeId = remoteBackup(part, cache.context()); + Runnable run = new Runnable() { + @Override public void run() { + try { + startGrid(4); + startGrid(5); - CacheQuery<Map.Entry<Integer, Integer>> qry = cache.context().queries().createScanQuery(null, part, false); + awaitPartitionMapExchange(); + + if (!test.get()) { + Set<Integer> parts = localPartitions(ignite1); + candidates.removeAll(parts); + } - CacheQueryFuture<Map.Entry<Integer, Integer>> fut = qry.execute(); + latch.countDown(); + } + catch (Exception e) { + e.printStackTrace(); + } - Collection<Map.Entry<Integer, Integer>> expEntries = fut.get(); + } + }; - for (Map.Entry<Integer, Integer> e : expEntries) { - Map<Integer, Integer> map = entries.get(part); + int part; + CacheQuery<Map.Entry<Integer, Integer>> qry = null; - if(map == null) - assertTrue(expEntries.isEmpty()); + if (test.get()) { + part = F.first(candidates); + + qry = cache.context().queries().createScanQuery(null, part, false); + } + + new Thread(run).start(); + + if (test.get()) + doTestScanQuery(qry); else - assertEquals(map.get(e.getKey()), e.getValue()); + latch.await(); + } + finally { + test.set(true); + + stopAllGrids(); } } - finally { - stopAllGrids(); + } + + /** + * @param ignite Ignite. + */ + protected IgniteCacheProxy<Integer, Integer> fillCache(Ignite ignite) { + IgniteCacheProxy<Integer, Integer> cache = + (IgniteCacheProxy<Integer, Integer>)ignite.<Integer, Integer>cache(null); + + for (int i = 0; i < KEYS_CNT; i++) { + cache.put(i, i); + + int part = cache.context().affinity().partition(i); + + Map<Integer, Integer> partEntries = entries.get(part); + + if (partEntries == null) + entries.put(part, partEntries = new HashMap<>()); + + partEntries.put(i, i); } + + return cache; + } + + /** + * @param qry Query. + */ + protected void doTestScanQuery( + CacheQuery<Map.Entry<Integer, Integer>> qry) throws IgniteCheckedException { + CacheQueryFuture<Map.Entry<Integer, Integer>> fut = qry.execute(); + + Collection<Map.Entry<Integer, Integer>> expEntries = fut.get(); + + for (Map.Entry<Integer, Integer> e : expEntries) { + Map<Integer, Integer> map = entries.get(((GridCacheQueryAdapter)qry).partition()); + + if (map == null) + assertTrue(expEntries.isEmpty()); + else + assertEquals(map.get(e.getKey()), e.getValue()); + } + } + + /** + * @param cctx Cctx. + */ + private static int anyLocalPartition(GridCacheContext<?, ?> cctx) { + return F.first(cctx.topology().localPartitions()).id(); } /** * @param cctx Cctx. - * @param primary Primary. */ - private IgniteBiTuple<Integer, UUID> remotePartition(final GridCacheContext cctx, boolean primary) { + private IgniteBiTuple<Integer, UUID> remotePartition(final GridCacheContext cctx) { ClusterNode node = F.first(cctx.kernalContext().grid().cluster().forRemotes().nodes()); GridCacheAffinityManager affMgr = cctx.affinity(); AffinityTopologyVersion topVer = affMgr.affinityTopologyVersion(); - Set<Integer> parts = primary ? - affMgr.primaryPartitions(node.id(), topVer) : affMgr.backupPartitions(node.id(), topVer); + Set<Integer> parts = affMgr.primaryPartitions(node.id(), topVer); return new IgniteBiTuple<>(F.first(parts), node.id()); } /** - * @param part Partition. - * @param cctx Cctx. + * @param ignite Ignite. */ - private UUID remoteBackup(int part, final GridCacheContext cctx) { - final UUID locUuid = cctx.localNodeId(); + private Set<Integer> localPartitions(Ignite ignite) { + GridCacheContext cctx = ((IgniteCacheProxy)ignite.cache(null)).context(); + + Collection<GridDhtLocalPartition> owningParts = F.view(cctx.topology().localPartitions(), + new IgnitePredicate<GridDhtLocalPartition>() { + @Override public boolean apply(GridDhtLocalPartition part) { + return part.state() == GridDhtPartitionState.OWNING; + } + }); + + return new HashSet<>(F.transform(owningParts, new IgniteClosure<GridDhtLocalPartition, Integer>() { + @Override public Integer apply(GridDhtLocalPartition part) { + return part.id(); + } + })); + } - GridCacheAffinityManager affMgr = cctx.affinity(); + /** + * Factory for tests specific communication SPI. + */ + private interface CommunicationSpiFactory { + /** + * Creates communication SPI instance. + */ + TcpCommunicationSpi create(); + } - AffinityTopologyVersion topVer = affMgr.affinityTopologyVersion(); + /** + * + */ + private static class TestLocalCommunicationSpiFactory implements CommunicationSpiFactory { + /** {@inheritDoc} */ + @Override public TcpCommunicationSpi create() { + return new TcpCommunicationSpi() { + @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + Object origMsg = ((GridIoMessage)msg).message(); - return F.first(F.view(affMgr.backups(part, topVer), new IgnitePredicate<ClusterNode>() { - @Override public boolean apply(ClusterNode node) { - return !node.id().equals(locUuid); - } - })).id(); + if (origMsg instanceof GridCacheQueryRequest) + fail(); //should use local node + + super.sendMessage(node, msg); + } + }; + } } /** * */ - private static class TestCommunicationSpi extends TcpCommunicationSpi { + private static class TestRemoteCommunicationSpiFactory implements CommunicationSpiFactory { /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg) - throws IgniteSpiException { - Object origMsg = ((GridIoMessage)msg).message(); + @Override public TcpCommunicationSpi create() { + return new TcpCommunicationSpi() { + @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + Object origMsg = ((GridIoMessage)msg).message(); - if (origMsg instanceof GridCacheQueryRequest) { - if (node.id().equals(failNodeId)) - throw new IgniteSpiException(""); - else - assertEquals(expNodeId, node.id()); - } + if (origMsg instanceof GridCacheQueryRequest) + assertEquals(expNodeId, node.id()); + + super.sendMessage(node, msg); + } + }; + } + } - super.sendMessage(node, msg); + /** + * + */ + private static class TestFallbackCommunicationSpiFactory implements CommunicationSpiFactory { + /** {@inheritDoc} */ + @Override public TcpCommunicationSpi create() { + return new TcpCommunicationSpi() { + @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + Object origMsg = ((GridIoMessage)msg).message(); + + if (origMsg instanceof GridCacheQueryRequest) { + if (latch.getCount() > 0) + assertEquals(expNodeId, node.id()); + else + assertEquals(expFallbackNodeId, node.id()); + + try { + latch.await(); + } + catch (InterruptedException e) { + throw new IgniteSpiException(e); + } + } + + super.sendMessage(node, msg); + } + }; } } }