ignite-484-1 - partition number fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b38c2eba Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b38c2eba Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b38c2eba Branch: refs/heads/ignite-980 Commit: b38c2eba30324d15f49b2074880101cc9f079654 Parents: 0647999 Author: S.Vladykin <svlady...@gridgain.com> Authored: Wed Jun 10 01:11:36 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Wed Jun 10 01:11:36 2015 +0300 ---------------------------------------------------------------------- .../h2/twostep/GridReduceQueryExecutor.java | 11 +++---- .../IgniteCacheQueryNodeRestartSelfTest2.java | 30 +++++++++++--------- .../IgniteCacheQuerySelfTestSuite.java | 1 + 3 files changed, 24 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b38c2eba/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index d059d93..03da6d3 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -631,9 +631,9 @@ public class GridReduceQueryExecutor { private Map<ClusterNode, IntArray> partitionLocations(final GridCacheContext<?,?> cctx, List<String> extraSpaces) { assert !cctx.isReplicated() && !cctx.isLocal() : cctx.name() + " must be partitioned"; - int maxParts = cctx.affinity().partitions(); + final int partsCnt = cctx.affinity().partitions(); - if (extraSpaces != null) { // Find max number of partitions for partitioned caches. + if (extraSpaces != null) { // Check correct number of partitions for partitioned caches. for (String extraSpace : extraSpaces) { GridCacheContext<?,?> extraCctx = cacheContext(extraSpace); @@ -642,12 +642,13 @@ public class GridReduceQueryExecutor { int parts = extraCctx.affinity().partitions(); - if (parts > maxParts) - maxParts = parts; + if (parts != partsCnt) + throw new CacheException("Number of partitions must be the same for correct collocation in " + + "caches " + cctx.name() + " and " + extraSpace + "."); } } - Set<ClusterNode>[] partLocs = new Set[maxParts]; + Set<ClusterNode>[] partLocs = new Set[partsCnt]; // Fill partition locations for main cache. for (int p = 0, parts = cctx.affinity().partitions(); p < parts; p++) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b38c2eba/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java index 1f0a6e6..746cc45 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest2.java @@ -97,7 +97,7 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends GridCommonAbstractTest cc.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); cc.setAtomicityMode(TRANSACTIONAL); cc.setRebalanceMode(CacheRebalanceMode.SYNC); - cc.setAffinity(new RendezvousAffinityFunction(false, name.equals("pe") ? 50 : 60)); + cc.setAffinity(new RendezvousAffinityFunction(false, 60)); if (name.equals("pe")) { cc.setIndexedTypes( @@ -178,7 +178,7 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends GridCommonAbstractTest public void testRestarts() throws Exception { int duration = 150 * 1000; int qryThreadNum = 4; - int restartThreadsNum = 1; // 4 + 2 = 6 nodes + int restartThreadsNum = 2; // 4 + 2 = 6 nodes final int nodeLifeTime = 2 * 1000; final int logFreq = 50; @@ -198,13 +198,13 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends GridCommonAbstractTest final AtomicInteger qryCnt = new AtomicInteger(); - final AtomicBoolean done = new AtomicBoolean(); + final AtomicBoolean qrysDone = new AtomicBoolean(); IgniteInternalFuture<?> fut1 = multithreadedAsync(new CAX() { @Override public void applyx() throws IgniteCheckedException { GridRandom rnd = new GridRandom(); - while (!done.get()) { + while (!qrysDone.get()) { int g; do { @@ -212,16 +212,16 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends GridCommonAbstractTest } while (!locks.compareAndSet(g, 0, 1)); -// if (rnd.nextBoolean()) { // Partitioned query. + if (rnd.nextBoolean()) { // Partitioned query. IgniteCache<?,?> cache = grid(g).cache("pu"); assertEquals(pRes, cache.query(new SqlFieldsQuery(PARTITIONED_QRY)).getAll()); -// } -// else { // Replicated query. -// IgniteCache<?,?> cache = grid(g).cache("co"); -// -// assertEquals(rRes, cache.query(new SqlFieldsQuery(REPLICATED_QRY)).getAll()); -// } + } + else { // Replicated query. + IgniteCache<?,?> cache = grid(g).cache("co"); + + assertEquals(rRes, cache.query(new SqlFieldsQuery(REPLICATED_QRY)).getAll()); + } locks.set(g, 0); @@ -235,12 +235,14 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends GridCommonAbstractTest final AtomicInteger restartCnt = new AtomicInteger(); + final AtomicBoolean restartsDone = new AtomicBoolean(); + IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable<Object>() { @SuppressWarnings({"BusyWait"}) @Override public Object call() throws Exception { GridRandom rnd = new GridRandom(); - while (!done.get()) { + while (!restartsDone.get()) { int g; do { @@ -272,12 +274,14 @@ public class IgniteCacheQueryNodeRestartSelfTest2 extends GridCommonAbstractTest info("Stopping.."); - done.set(true); + restartsDone.set(true); fut2.get(); info("Restarts stopped."); + qrysDone.set(true); + fut1.get(); info("Queries stopped."); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b38c2eba/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index cfc8f2c..a7523b6 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -66,6 +66,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheSqlQueryMultiThreadedSelfTest.class); suite.addTestSuite(IgniteCacheOffheapTieredMultithreadedSelfTest.class); suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class); + suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest2.class); suite.addTestSuite(GridCacheReduceQueryMultithreadedSelfTest.class); suite.addTestSuite(GridCacheCrossCacheQuerySelfTest.class); suite.addTestSuite(GridCacheQuerySerializationSelfTest.class);