ignite-484-1 - per partition mapping on unstable topology
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/357b4c06 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/357b4c06 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/357b4c06 Branch: refs/heads/ignite-484-1 Commit: 357b4c06a9ff7e3557eb765b5266b46fd9742bba Parents: ebde280 Author: S.Vladykin <svlady...@gridgain.com> Authored: Tue May 26 20:26:40 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Tue May 26 20:26:40 2015 +0300 ---------------------------------------------------------------------- .../h2/twostep/messages/GridQueryRequest.java | 20 ++ .../processors/query/h2/IgniteH2Indexing.java | 9 +- .../query/h2/twostep/GridMapQueryExecutor.java | 5 +- .../h2/twostep/GridReduceQueryExecutor.java | 183 +++++++++++++------ 4 files changed, 159 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/357b4c06/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java index 74b4392..99ef094 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java @@ -95,6 +95,19 @@ public class GridQueryRequest implements Message { } /** + * @param cp Copy from. + */ + public GridQueryRequest(GridQueryRequest cp) { + this.reqId = cp.reqId; + this.pageSize = cp.pageSize; + this.space = cp.space; + this.qrys = cp.qrys; + this.topVer = cp.topVer; + this.extraSpaces = cp.extraSpaces; + this.parts = cp.parts; + } + + /** * @return All the needed partitions for {@link #space()} and {@link #extraSpaces()}. */ public List<int[]> partitions() { @@ -102,6 +115,13 @@ public class GridQueryRequest implements Message { } /** + * @param parts All the needed partitions for {@link #space()} and {@link #extraSpaces()}. + */ + public void partitions(List<int[]> parts) { + this.parts = parts; + } + + /** * @return All extra space names participating in query other than {@link #space()}. */ public List<String> extraSpaces() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/357b4c06/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 67b4874..ffedfb3 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -1375,7 +1375,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { final int[] parts0 = parts.get(idx); - if (parts0.length < 64) { + if (parts0.length < 64) { // Fast scan for small arrays. return new IgniteBiPredicate<K,V>() { @Override public boolean apply(K k, V v) { int p = aff.partition(k); @@ -1383,6 +1383,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { for (int p0 : parts0) { if (p0 == p) return true; + + if (p0 > p) // Array is sorted. + return false; } return false; @@ -1424,9 +1427,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** - * @return Current topology version. + * @return Ready topology version. */ - public AffinityTopologyVersion topologyVersion() { + public AffinityTopologyVersion readyTopologyVersion() { return ctx.cache().context().exchange().readyAffinityVersion(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/357b4c06/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 06bad76..9d9060a 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -260,6 +260,9 @@ public class GridMapQueryExecutor { * @return Collection wrapper. */ private static Collection<Integer> wrap(final int[] ints) { + if (F.isEmpty(ints)) + return Collections.emptySet(); + return new AbstractCollection<Integer>() { @Override public Iterator<Integer> iterator() { return new Iterator<Integer>() { @@ -503,7 +506,7 @@ public class GridMapQueryExecutor { loc ? null : Collections.<Message>emptyList(), loc ? Collections.<Value[]>emptyList() : null); - msg.retry(h2.topologyVersion()); + msg.retry(h2.readyTopologyVersion()); ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.PUBLIC_POOL); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/357b4c06/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 0836a75..c445844 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -146,7 +146,7 @@ public class GridReduceQueryExecutor { for (GridMergeTable tbl : r.tbls) { if (tbl.getScanIndex(null).hasSource(nodeId)) { // Will attempt to retry. If reduce query was started it will fail on next page fetching. - retry(r, h2.topologyVersion(), nodeId); + retry(r, h2.readyTopologyVersion(), nodeId); break; } @@ -283,6 +283,9 @@ public class GridReduceQueryExecutor { * @return Array. */ private static int[] unbox(Set<Integer> set) { + if (set == null) + return null; + int[] arr = new int[set.size()]; int i = 0; @@ -294,6 +297,20 @@ public class GridReduceQueryExecutor { } /** + * @param readyTop Latest ready topology. + * @return {@code true} If preloading is active. + */ + private boolean isPreloadingActive(AffinityTopologyVersion readyTop) { + AffinityTopologyVersion freshTop = ctx.discovery().topologyVersionEx(); + + int res = readyTop.compareTo(freshTop); + + assert res <= 0 : readyTop + " " + freshTop; + + return res < 0; + } + + /** * @param cctx Cache context. * @param qry Query. * @return Cursor. @@ -312,7 +329,7 @@ public class GridReduceQueryExecutor { r.conn = (JdbcConnection)h2.connectionForSpace(space); - AffinityTopologyVersion topVer = h2.topologyVersion(); + AffinityTopologyVersion topVer = h2.readyTopologyVersion(); Collection<ClusterNode> nodes = ctx.discovery().cacheAffinityNodes(space, topVer); @@ -321,7 +338,8 @@ public class GridReduceQueryExecutor { List<String> extraSpaces = extraSpaces(space, qry.spaces()); - List<int[]> parts = null; + // Explicit partition mapping for unstable topology: {nodeId -> {cacheName -> {parts}}} + Map<ClusterNode, Map<String, Set<Integer>>> gridPartsMap = null; if (cctx.isReplicated() || qry.explain()) { assert qry.explain() || !nodes.contains(ctx.cluster().get().localNode()) : "We must be on a client node."; @@ -329,16 +347,17 @@ public class GridReduceQueryExecutor { // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node. nodes = Collections.singleton(F.rand(nodes)); } - else if (ctx.cache().context().exchange().hasPendingExchange()) { // TODO isActive ?? - parts = new ArrayList<>(extraSpaces == null ? 1 : extraSpaces.size() + 1); + else if (isPreloadingActive(topVer)) { + gridPartsMap = new HashMap<>(nodes.size(), 1f); - parts.add(unbox(cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer))); + collectPartitionOwners(gridPartsMap, cctx); if (extraSpaces != null) { for (String extraSpace : extraSpaces) - parts.add(unbox(ctx.cache().internalCache(extraSpace).context() - .affinity().primaryPartitions(ctx.localNodeId(), topVer))); + collectPartitionOwners(gridPartsMap, ctx.cache().internalCache(extraSpace).context()); } + + nodes = gridPartsMap.keySet(); } for (GridCacheSqlQuery mapQry : qry.mapQueries()) { @@ -382,34 +401,23 @@ public class GridReduceQueryExecutor { mapQry.marshallParams(m); } - boolean ok = false; - - try { - send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, parts)); + send(nodes, + new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null), + gridPartsMap); - ok = true; - } - catch (IgniteCheckedException e) { - U.warn(log, "Failed to send query request to nodes: " + nodes); - } + U.await(r.latch); AffinityTopologyVersion retry = null; - if (ok) { // Sent successfully. - U.await(r.latch); - - Object state = r.state.get(); + Object state = r.state.get(); - if (state != null) { - if (state instanceof CacheException) - throw new CacheException("Failed to run map query remotely.", (CacheException)state); + if (state != null) { + if (state instanceof CacheException) + throw new CacheException("Failed to run map query remotely.", (CacheException)state); - if (state instanceof AffinityTopologyVersion) - retry = (AffinityTopologyVersion)state; // Remote nodes can ask us to retry. - } + if (state instanceof AffinityTopologyVersion) + retry = (AffinityTopologyVersion)state; // Remote nodes can ask us to retry. } - else // Send failed -> retry. - retry = h2.topologyVersion(); ResultSet res = null; @@ -423,14 +431,8 @@ public class GridReduceQueryExecutor { } for (GridMergeTable tbl : r.tbls) { - if (!tbl.getScanIndex(null).fetchedAll()) { // We have to explicitly cancel queries on remote nodes. - try { - send(nodes, new GridQueryCancelRequest(qryReqId)); - } - catch (IgniteCheckedException e) { - U.warn(log, "Failed to send cancel request to nodes: " + nodes); - } - } + if (!tbl.getScanIndex(null).fetchedAll()) // We have to explicitly cancel queries on remote nodes. + send(nodes, new GridQueryCancelRequest(qryReqId), null); // dropTable(r.conn, tbl.getName()); TODO } @@ -461,6 +463,48 @@ public class GridReduceQueryExecutor { } /** + * Collects actual partition owners for the cache context int the given map. + * + * @param gridPartsMap Target map. + * @param cctx Cache context. + */ + private void collectPartitionOwners( + Map<ClusterNode,Map<String,Set<Integer>>> gridPartsMap, + GridCacheContext<?,?> cctx + ) { + int partsCnt = cctx.affinity().partitions(); + + for (int p = 0; p < partsCnt; p++) { + // We don't care about exact topology version here, we just need to get all the needed partition + // owners in actual state. + List<ClusterNode> owners = cctx.topology().owners(p); + + if (F.isEmpty(owners)) + continue; // All primary and backup nodes are dead now for this partition. We sorrow. + + ClusterNode owner = F.rand(owners); + + Map<String, Set<Integer>> nodePartsMap = gridPartsMap.get(owner); + + if (nodePartsMap == null) { + nodePartsMap = new HashMap<>(); + + gridPartsMap.put(owner, nodePartsMap); + } + + Set<Integer> parts = nodePartsMap.get(cctx.name()); + + if (parts == null) { + parts = new TreeSet<>(); // We need them sorted. + + nodePartsMap.put(cctx.name(), parts); + } + + parts.add(p); + } + } + + /** * @param mainSpace Main space. * @param allSpaces All spaces. * @return List of all extra spaces or {@code null} if none. @@ -531,33 +575,64 @@ public class GridReduceQueryExecutor { /** * @param nodes Nodes. * @param msg Message. - * @throws IgniteCheckedException If failed. + * @param gridPartsMap Partitions. */ - private void send(Collection<ClusterNode> nodes, Message msg) throws IgniteCheckedException { + private void send( + Collection<ClusterNode> nodes, + Message msg, + Map<ClusterNode, Map<String, Set<Integer>>> gridPartsMap + ) { + boolean locNodeFound = false; + for (ClusterNode node : nodes) { if (node.isLocal()) { - if (nodes.size() > 1) { - ArrayList<ClusterNode> remotes = new ArrayList<>(nodes.size() - 1); + locNodeFound = true; - for (ClusterNode node0 : nodes) { - if (!node0.isLocal()) - remotes.add(node0); - } + continue; + } - assert remotes.size() == nodes.size() - 1; + try { + ctx.io().send(node, GridTopic.TOPIC_QUERY, copy(msg, node, gridPartsMap), GridIoPolicy.PUBLIC_POOL); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send message to node: " + node, e); + } + } - ctx.io().send(remotes, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.PUBLIC_POOL); - } + if (locNodeFound) // Local node goes the last to allow parallel execution. + h2.mapQueryExecutor().onMessage(ctx.localNodeId(), copy(msg, ctx.cluster().get().localNode(), gridPartsMap)); + } + + /** + * @param msg Message to copy. + * @param node Node. + * @param gridPartsMap Partitions map. + * @return Copy of message with partitions set. + */ + private Message copy(Message msg, ClusterNode node, Map<ClusterNode, Map<String, Set<Integer>>> gridPartsMap) { + if (gridPartsMap == null) + return msg; - // Local node goes the last to allow parallel execution. - h2.mapQueryExecutor().onMessage(ctx.localNodeId(), msg); + Map<String,Set<Integer>> nodeParts = gridPartsMap.get(node); - return; - } + assert nodeParts != null; + + GridQueryRequest req = (GridQueryRequest)msg; + + List<int[]> parts = new ArrayList<>(nodeParts.size()); + + parts.add(unbox(nodeParts.get(req.space()))); + + if (req.extraSpaces() != null) { + for (String extraSpace : req.extraSpaces()) + parts.add(unbox(nodeParts.get(extraSpace))); } - // All the given nodes are remotes. - ctx.io().send(nodes, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.PUBLIC_POOL); + GridQueryRequest res = new GridQueryRequest(req); + + res.partitions(parts); + + return res; } /**