ignite-sql-tests - remote error propagation
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8558fc88 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8558fc88 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8558fc88 Branch: refs/heads/ignite-sql-tests Commit: 8558fc880805e381fef48e398139d7c67e39f648 Parents: 1c11784 Author: S.Vladykin <svlady...@gridgain.com> Authored: Tue Feb 10 01:51:56 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Tue Feb 10 01:51:56 2015 +0300 ---------------------------------------------------------------------- .../h2/twostep/GridReduceQueryExecutor.java | 37 +++++++++++++++++--- 1 file changed, 33 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8558fc88/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index acafddb..23c1775 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.jdk8.backport.*; +import javax.cache.*; import java.sql.*; import java.util.*; import java.util.concurrent.*; @@ -95,8 +96,19 @@ public class GridReduceQueryExecutor { }); } + /** + * @param node Node. + * @param msg Message. + */ private void onFail(ClusterNode node, GridQueryFailResponse msg) { - U.error(log, "Failed to execute query.", msg.error()); + QueryRun r = runs.get(msg.queryRequestId()); + + if (r != null && r.latch.getCount() != 0) { + r.rmtErr = msg.error(); + + while(r.latch.getCount() > 0) + r.latch.countDown(); + } } /** @@ -110,6 +122,9 @@ public class GridReduceQueryExecutor { QueryRun r = runs.get(qryReqId); + if (r == null) // Already finished with error or canceled. + return; + GridMergeIndex idx = r.tbls.get(msg.query()).getScanIndex(null); if (msg.allRows() != -1) { // Only the first page contains row count. @@ -145,7 +160,8 @@ public class GridReduceQueryExecutor { r.conn = h2.connectionForSpace(space); - Collection<ClusterNode> nodes = ctx.grid().cluster().nodes(); // TODO filter nodes somehow? + // TODO Add topology version. + Collection<ClusterNode> nodes = ctx.grid().cluster().forCacheNodes(space).nodes(); for (GridCacheSqlQuery mapQry : qry.mapQueries()) { GridMergeTable tbl; @@ -172,6 +188,9 @@ public class GridReduceQueryExecutor { r.latch.await(); + if (r.rmtErr != null) + throw new CacheException("Failed to run map query remotely.", r.rmtErr); + GridCacheSqlQuery rdc = qry.reduceQuery(); final ResultSet res = h2.executeSqlQueryWithTimer(r.conn, rdc.query(), F.asList(rdc.parameters())); @@ -181,10 +200,17 @@ public class GridReduceQueryExecutor { return new QueryCursorImpl<>(new Iter(res)); } - catch (IgniteCheckedException | InterruptedException | SQLException e) { + catch (IgniteCheckedException | InterruptedException | SQLException | RuntimeException e) { U.closeQuiet(r.conn); - throw new IgniteException(e); + if (e instanceof CacheException) + throw (CacheException)e; + + throw new CacheException("Failed to run reduce query locally.", e); + } + finally { + if (!runs.remove(qryReqId, r)) + U.warn(log, "Query run was removed: " + qryReqId); } } @@ -237,6 +263,9 @@ public class GridReduceQueryExecutor { /** */ private Connection conn; + + /** */ + private volatile Throwable rmtErr; } /**