ignite-sql-tests - remote error propagation

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8558fc88
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8558fc88
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8558fc88

Branch: refs/heads/ignite-sql-tests
Commit: 8558fc880805e381fef48e398139d7c67e39f648
Parents: 1c11784
Author: S.Vladykin <svlady...@gridgain.com>
Authored: Tue Feb 10 01:51:56 2015 +0300
Committer: S.Vladykin <svlady...@gridgain.com>
Committed: Tue Feb 10 01:51:56 2015 +0300

----------------------------------------------------------------------
 .../h2/twostep/GridReduceQueryExecutor.java     | 37 +++++++++++++++++---
 1 file changed, 33 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8558fc88/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index acafddb..23c1775 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.jdk8.backport.*;
 
+import javax.cache.*;
 import java.sql.*;
 import java.util.*;
 import java.util.concurrent.*;
@@ -95,8 +96,19 @@ public class GridReduceQueryExecutor {
         });
     }
 
+    /**
+     * @param node Node.
+     * @param msg Message.
+     */
     private void onFail(ClusterNode node, GridQueryFailResponse msg) {
-        U.error(log, "Failed to execute query.", msg.error());
+        QueryRun r = runs.get(msg.queryRequestId());
+
+        if (r != null && r.latch.getCount() != 0) {
+            r.rmtErr = msg.error();
+
+            while(r.latch.getCount() > 0)
+                r.latch.countDown();
+        }
     }
 
     /**
@@ -110,6 +122,9 @@ public class GridReduceQueryExecutor {
 
         QueryRun r = runs.get(qryReqId);
 
+        if (r == null) // Already finished with error or canceled.
+            return;
+
         GridMergeIndex idx = r.tbls.get(msg.query()).getScanIndex(null);
 
         if (msg.allRows() != -1) { // Only the first page contains row count.
@@ -145,7 +160,8 @@ public class GridReduceQueryExecutor {
 
         r.conn = h2.connectionForSpace(space);
 
-        Collection<ClusterNode> nodes = ctx.grid().cluster().nodes(); // TODO 
filter nodes somehow?
+        // TODO Add topology version.
+        Collection<ClusterNode> nodes = 
ctx.grid().cluster().forCacheNodes(space).nodes();
 
         for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
             GridMergeTable tbl;
@@ -172,6 +188,9 @@ public class GridReduceQueryExecutor {
 
             r.latch.await();
 
+            if (r.rmtErr != null)
+                throw new CacheException("Failed to run map query remotely.", 
r.rmtErr);
+
             GridCacheSqlQuery rdc = qry.reduceQuery();
 
             final ResultSet res = h2.executeSqlQueryWithTimer(r.conn, 
rdc.query(), F.asList(rdc.parameters()));
@@ -181,10 +200,17 @@ public class GridReduceQueryExecutor {
 
             return new QueryCursorImpl<>(new Iter(res));
         }
-        catch (IgniteCheckedException | InterruptedException | SQLException e) 
{
+        catch (IgniteCheckedException | InterruptedException | SQLException | 
RuntimeException e) {
             U.closeQuiet(r.conn);
 
-            throw new IgniteException(e);
+            if (e instanceof CacheException)
+                throw (CacheException)e;
+
+            throw new CacheException("Failed to run reduce query locally.", e);
+        }
+        finally {
+            if (!runs.remove(qryReqId, r))
+                U.warn(log, "Query run was removed: " + qryReqId);
         }
     }
 
@@ -237,6 +263,9 @@ public class GridReduceQueryExecutor {
 
         /** */
         private Connection conn;
+
+        /** */
+        private volatile Throwable rmtErr;
     }
 
     /**

Reply via email to