master - query restart tests fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/90adeae9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/90adeae9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/90adeae9 Branch: refs/heads/ignite-1108 Commit: 90adeae9dd57f0aaaabe5f244d5167853a0b48dc Parents: 38810b6 Author: S.Vladykin <svlady...@gridgain.com> Authored: Tue Aug 4 20:30:00 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Tue Aug 4 20:30:00 2015 +0300 ---------------------------------------------------------------------- .../h2/twostep/GridReduceQueryExecutor.java | 34 ++++++++++++++++++-- 1 file changed, 31 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90adeae9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index cde3288..ac269db 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -150,8 +150,7 @@ public class GridReduceQueryExecutor { for (QueryRun r : runs.values()) { for (GridMergeTable tbl : r.tbls) { if (tbl.getScanIndex(null).hasSource(nodeId)) { - // Will attempt to retry. If reduce query was started it will fail on next page fetching. - retry(r, h2.readyTopologyVersion(), nodeId); + handleNodeLeft(r, nodeId); break; } @@ -162,6 +161,15 @@ public class GridReduceQueryExecutor { } /** + * @param r Query run. + * @param nodeId Left node ID. + */ + private void handleNodeLeft(QueryRun r, UUID nodeId) { + // Will attempt to retry. If reduce query was started it will fail on next page fetching. + retry(r, h2.readyTopologyVersion(), nodeId); + } + + /** * @param nodeId Node ID. * @param msg Message. */ @@ -515,7 +523,7 @@ public class GridReduceQueryExecutor { if (send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null), partsMap)) { - U.await(r.latch); + awaitAllReplies(r, nodes); Object state = r.state.get(); @@ -595,6 +603,26 @@ public class GridReduceQueryExecutor { } /** + * @param r Query run. + * @param nodes Nodes to check periodically if they alive. + * @throws IgniteInterruptedCheckedException If interrupted. + */ + private void awaitAllReplies(QueryRun r, Collection<ClusterNode> nodes) + throws IgniteInterruptedCheckedException { + while (!U.await(r.latch, 500, TimeUnit.MILLISECONDS)) { + for (ClusterNode node : nodes) { + if (!ctx.discovery().alive(node)) { + handleNodeLeft(r, node.id()); + + assert r.latch.getCount() == 0; + + return; + } + } + } + } + + /** * Calculates data nodes for replicated caches on unstable topology. * * @param cctx Cache context for main space.