ignite-484-1 - more fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3da82e18 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3da82e18 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3da82e18 Branch: refs/heads/ignite-sprint-6 Commit: 3da82e18322fc6f1d3cfb7946dd0e87893cd9b4d Parents: d389ada Author: S.Vladykin <svlady...@gridgain.com> Authored: Mon Jun 8 07:14:24 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Mon Jun 8 07:14:24 2015 +0300 ---------------------------------------------------------------------- .../query/h2/twostep/GridMapQueryExecutor.java | 44 +++-- .../h2/twostep/GridReduceQueryExecutor.java | 187 ++++++++++++++----- .../IgniteCacheQueryNodeRestartSelfTest.java | 2 +- 3 files changed, 169 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3da82e18/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index ede0e2e..b4d895f 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -229,29 +229,45 @@ public class GridMapQueryExecutor { for (String cacheName : cacheNames) { GridCacheContext<?,?> cctx = cacheContext(cacheName, topVer); - int partsCnt = cctx.affinity().partitions(); + if (cctx.isLocal()) + continue; - if (parts == null) - partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer); + int partsCnt = cctx.affinity().partitions(); - for (int partId : partIds) { - GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false); + if (cctx.isReplicated()) { // Check all the partitions are in owning state for replicated cache. + for (int p = 0; p < partsCnt; p++) { + GridDhtLocalPartition part = cctx.topology().localPartition(p, topVer, false); - if (partId >= partsCnt) - break; // We can have more partitions because `parts` array is shared for all caches. + if (part == null) + return false; - if (part != null) { // Await for owning state. part.owningFuture().get(); + } + } + else { // Reserve primary partitions for partitioned cache. + if (parts == null) + partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer); + + for (int partId : partIds) { + if (partId >= partsCnt) + break; // We can have more partitions because `parts` array is shared for all caches. + + GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false); - if (part.reserve()) { - reserved.add(part); + if (part != null) { + // Await for owning state. + part.owningFuture().get(); - continue; + if (part.reserve()) { + reserved.add(part); + + continue; + } } - } - return false; + return false; + } } } @@ -382,7 +398,7 @@ public class GridMapQueryExecutor { if (qr.canceled) { qr.result(i).close(); - throw new IgniteException("Query was canceled."); + return; } // Send the first page. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3da82e18/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 87ac2f4..80f0a18 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -306,9 +306,9 @@ public class GridReduceQueryExecutor { * @param topVer Topology version. * @param cctx Cache context for main space. * @param extraSpaces Extra spaces. - * @return Data nodes. + * @return Data nodes or {@code null} if repartitioning started and we need to retry.. */ - private Collection<ClusterNode> dataNodes( + private Collection<ClusterNode> stableDataNodes( AffinityTopologyVersion topVer, final GridCacheContext<?,?> cctx, List<String> extraSpaces @@ -325,7 +325,7 @@ public class GridReduceQueryExecutor { GridCacheContext<?,?> extraCctx = cacheContext(extraSpace); if (extraCctx.isLocal()) - continue; + continue; // No consistency guaranties for local caches. if (cctx.isReplicated() && !extraCctx.isReplicated()) throw new CacheException("Queries running on replicated cache should not contain JOINs " + @@ -339,20 +339,29 @@ public class GridReduceQueryExecutor { if (cctx.isReplicated() && extraCctx.isReplicated()) { nodes.retainAll(extraNodes); - if (nodes.isEmpty() && !isPreloadingActive(topVer)) - throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace + - "' have distinct set of data nodes."); + if (nodes.isEmpty()) { + if (isPreloadingActive(topVer)) + return null; // Retry. + else + throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace + + "' have distinct set of data nodes."); + } } else if (!cctx.isReplicated() && extraCctx.isReplicated()) { - if (!extraNodes.containsAll(nodes) && !isPreloadingActive(topVer)) - throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace + - "' have distinct set of data nodes."); + if (!extraNodes.containsAll(nodes)) + if (isPreloadingActive(topVer)) + return null; // Retry. + else + throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace + + "' have distinct set of data nodes."); } else if (!cctx.isReplicated() && !extraCctx.isReplicated()) { - if ((extraNodes.size() != nodes.size() || !nodes.containsAll(extraNodes)) && - !isPreloadingActive(topVer)) - throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace + - "' have distinct set of data nodes."); + if (extraNodes.size() != nodes.size() || !nodes.containsAll(extraNodes)) + if (isPreloadingActive(topVer)) + return null; // Retry. + else + throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace + + "' have distinct set of data nodes."); } else throw new IllegalStateException(); @@ -385,10 +394,27 @@ public class GridReduceQueryExecutor { List<String> extraSpaces = extraSpaces(space, qry.spaces()); - Collection<ClusterNode> nodes = dataNodes(topVer, cctx, extraSpaces); + Collection<ClusterNode> nodes; // Explicit partition mapping for unstable topology. - Map<ClusterNode, IntArray> gridPartsMap = null; + Map<ClusterNode, IntArray> partsMap = null; + + if (isPreloadingActive(topVer)) { + if (cctx.isReplicated()) + nodes = replicatedDataNodes(cctx, extraSpaces); + else { + partsMap = partitionLocations(cctx, extraSpaces); + + nodes = partsMap == null ? null : partsMap.keySet(); + } + } + else + nodes = stableDataNodes(topVer, cctx, extraSpaces); + + if (nodes == null) + continue; // Retry. + + assert !nodes.isEmpty(); if (cctx.isReplicated() || qry.explain()) { assert qry.explain() || !nodes.contains(ctx.cluster().get().localNode()) : "We must be on a client node."; @@ -396,14 +422,6 @@ public class GridReduceQueryExecutor { // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node. nodes = Collections.singleton(F.rand(nodes)); } - else if (isPreloadingActive(topVer)) { - gridPartsMap = partitionLocations(cctx, extraSpaces); - - if (gridPartsMap == null) - continue; // Retry. - - nodes = gridPartsMap.keySet(); - } for (GridCacheSqlQuery mapQry : qry.mapQueries()) { GridMergeTable tbl; @@ -446,23 +464,24 @@ public class GridReduceQueryExecutor { mapQry.marshallParams(m); } - send(nodes, - new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null), - gridPartsMap); - - U.await(r.latch); - AffinityTopologyVersion retry = null; - Object state = r.state.get(); + if (send(nodes, + new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null), partsMap)) { + U.await(r.latch); - if (state != null) { - if (state instanceof CacheException) - throw new CacheException("Failed to run map query remotely.", (CacheException)state); + Object state = r.state.get(); - if (state instanceof AffinityTopologyVersion) - retry = (AffinityTopologyVersion)state; // Remote nodes can ask us to retry. + if (state != null) { + if (state instanceof CacheException) + throw new CacheException("Failed to run map query remotely.", (CacheException)state); + + if (state instanceof AffinityTopologyVersion) + retry = (AffinityTopologyVersion)state; // Remote nodes can ask us to retry. + } } + else // Send failed. + retry = topVer; ResultSet res = null; @@ -507,6 +526,80 @@ public class GridReduceQueryExecutor { } } + /** + * Calculates data nodes for replicated caches on unstable topology. + * + * @param cctx Cache context for main space. + * @param extraSpaces Extra spaces. + * @return Collection of all data nodes owning all the caches or {@code null} for retry. + */ + private Collection<ClusterNode> replicatedDataNodes(final GridCacheContext<?,?> cctx, List<String> extraSpaces) { + assert cctx.isReplicated() : cctx.name() + " must be replicated"; + + Set<ClusterNode> nodes = owningReplicatedDataNodes(cctx); + + if (!F.isEmpty(extraSpaces)) { + for (String extraSpace : extraSpaces) { + GridCacheContext<?,?> extraCctx = cacheContext(extraSpace); + + if (extraCctx.isLocal()) + continue; + + if (!extraCctx.isReplicated()) + throw new CacheException("Queries running on replicated cache should not contain JOINs " + + "with partitioned tables."); + + nodes.retainAll(owningReplicatedDataNodes(extraCctx)); + + if (nodes.isEmpty()) + return null; // Retry. + } + } + + return nodes; + } + + /** + * Collects all the nodes owning all the partitions for the given replicated cache. + * + * @param cctx Cache context. + * @return Owning nodes. + */ + private Set<ClusterNode> owningReplicatedDataNodes(GridCacheContext<?,?> cctx) { + assert cctx.isReplicated() : cctx.name() + " must be replicated"; + + String space = cctx.name(); + + Set<ClusterNode> dataNodes = new HashSet<>(ctx.discovery().cacheAffinityNodes(space, NONE)); + + if (dataNodes.isEmpty()) + throw new CacheException("No data nodes found for cache '" + space + "'"); + + // Find all the nodes owning all the partitions for replicated cache. + for (int p = 0, extraParts = cctx.affinity().partitions(); p < extraParts; p++) { + List<ClusterNode> owners = cctx.topology().owners(p); + + if (owners.isEmpty()) + throw new CacheException("No data nodes found for cache '" + space + + "' for partition " + p); + + dataNodes.retainAll(owners); + + if (dataNodes.isEmpty()) + throw new CacheException("No data nodes found for cache '" + space + + "' owning all the partitions."); + } + + return dataNodes; + } + + /** + * Calculates partition mapping for partitioned cache on unstable topology. + * + * @param cctx Cache context for main space. + * @param extraSpaces Extra spaces. + * @return Partition mapping or {@code null} if we can't calculate it due to repartitioning and we need to retry. + */ @SuppressWarnings("unchecked") private Map<ClusterNode, IntArray> partitionLocations(final GridCacheContext<?,?> cctx, List<String> extraSpaces) { assert !cctx.isReplicated() && !cctx.isLocal() : cctx.name() + " must be partitioned"; @@ -573,24 +666,13 @@ public class GridReduceQueryExecutor { if (!extraCctx.isReplicated()) continue; - Set<ClusterNode> dataNodes = new HashSet<>(ctx.discovery().cacheAffinityNodes(extraSpace, NONE)); - - if (dataNodes.isEmpty()) - throw new CacheException("No data nodes found for cache '" + extraSpace + "'"); - - for (int p = 0, extraParts = extraCctx.affinity().partitions(); p < extraParts; p++) { - dataNodes.retainAll(extraCctx.topology().owners(p)); - - if (dataNodes.isEmpty()) - throw new CacheException("No data nodes found for cache '" + extraSpace + - "' for partition " + p); - } + Set<ClusterNode> dataNodes = owningReplicatedDataNodes(extraCctx); for (Set<ClusterNode> partLoc : partLocs) { partLoc.retainAll(dataNodes); if (partLoc.isEmpty()) - return null; // Intersection is empty -> retry. + return null; // Retry. } } } @@ -689,14 +771,17 @@ public class GridReduceQueryExecutor { * @param nodes Nodes. * @param msg Message. * @param gridPartsMap Partitions. + * @return {@code true} If all messages sent successfully. */ - private void send( + private boolean send( Collection<ClusterNode> nodes, Message msg, Map<ClusterNode,IntArray> gridPartsMap ) { boolean locNodeFound = false; + boolean ok = true; + for (ClusterNode node : nodes) { if (node.isLocal()) { locNodeFound = true; @@ -708,12 +793,16 @@ public class GridReduceQueryExecutor { ctx.io().send(node, GridTopic.TOPIC_QUERY, copy(msg, node, gridPartsMap), GridIoPolicy.PUBLIC_POOL); } catch (IgniteCheckedException e) { + ok = false; + U.warn(log, e.getMessage()); } } if (locNodeFound) // Local node goes the last to allow parallel execution. h2.mapQueryExecutor().onMessage(ctx.localNodeId(), copy(msg, ctx.cluster().get().localNode(), gridPartsMap)); + + return ok; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3da82e18/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java index edba352..035554e 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java @@ -100,7 +100,7 @@ public class IgniteCacheQueryNodeRestartSelfTest extends GridCacheAbstractSelfTe int duration = 60 * 1000; int qryThreadNum = 10; final long nodeLifeTime = 2 * 1000; - final int logFreq = 20; + final int logFreq = 50; final IgniteCache<Integer, Integer> cache = grid(0).cache(null);