ignite-sql-tests - node failures

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/eacbac44
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/eacbac44
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/eacbac44

Branch: refs/heads/ignite-45
Commit: eacbac44d26c259fbd3c8cb5da57affbd7a7c769
Parents: 7c63170
Author: S.Vladykin <svlady...@gridgain.com>
Authored: Fri Mar 13 22:04:55 2015 +0300
Committer: S.Vladykin <svlady...@gridgain.com>
Committed: Fri Mar 13 22:04:55 2015 +0300

----------------------------------------------------------------------
 .../query/h2/twostep/GridMapQueryExecutor.java  | 17 +++++++-
 .../query/h2/twostep/GridMergeIndex.java        | 15 +++++++
 .../h2/twostep/GridMergeIndexUnsorted.java      | 34 ++++++---------
 .../h2/twostep/GridReduceQueryExecutor.java     | 45 +++++++++++++++++---
 .../query/h2/twostep/GridResultPage.java        |  4 +-
 5 files changed, 84 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eacbac44/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index adade06..99cbaa8 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.processors.query.h2.*;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
@@ -86,7 +87,19 @@ public class GridMapQueryExecutor implements 
GridMessageListener {
 
         log = ctx.log(GridMapQueryExecutor.class);
 
-        // TODO handle node failures.
+        ctx.event().addLocalEventListener(new GridLocalEventListener() {
+            @Override public void onEvent(final Event evt) {
+                UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
+
+                ConcurrentMap<Long,QueryResults> nodeRess = 
qryRess.remove(nodeId);
+
+                if (nodeRess == null)
+                    return;
+
+                for (QueryResults ress : nodeRess.values())
+                    ress.cancel();
+            }
+        }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
 
         ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, this);
     }
@@ -171,7 +184,7 @@ public class GridMapQueryExecutor implements 
GridMessageListener {
 
         QueryResults qr = new QueryResults(req.requestId(), qrys.size());
 
-        if (nodeRess.putIfAbsent(req.requestId(), qr) != null)
+        if (nodeRess.put(req.requestId(), qr) != null)
             throw new IllegalStateException();
 
         h2.setFilters(h2.backupFilter());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eacbac44/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
index f6989ae..a6a3fea 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
@@ -61,6 +61,14 @@ public abstract class GridMergeIndex extends BaseIndex {
         initBaseIndex(tbl, 0, name, cols, type);
     }
 
+    /**
+     * @param nodeId Node ID.
+     * @return {@code true} If this index needs data from the given source 
node.
+     */
+    public boolean hasSource(UUID nodeId) {
+        return remainingRows.containsKey(nodeId);
+    }
+
     /** {@inheritDoc} */
     @Override public long getRowCount(Session session) {
         return rowsCnt.get();
@@ -80,6 +88,13 @@ public abstract class GridMergeIndex extends BaseIndex {
     }
 
     /**
+     * @param nodeId Node ID.
+     */
+    public void fail(UUID nodeId) {
+        addPage0(new GridResultPage(nodeId, null, false));
+    }
+
+    /**
      * @param page Page.
      */
     public final void addPage(GridResultPage page) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eacbac44/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
index 44faea1..93c9482 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
@@ -17,13 +17,13 @@
 
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
-import org.apache.ignite.*;
 import org.h2.index.*;
 import org.h2.result.*;
 import org.h2.table.*;
 import org.h2.value.*;
 import org.jetbrains.annotations.*;
 
+import javax.cache.*;
 import java.util.*;
 import java.util.concurrent.*;
 
@@ -44,7 +44,7 @@ public class GridMergeIndexUnsorted extends GridMergeIndex {
 
     /** {@inheritDoc} */
     @Override protected void addPage0(GridResultPage page) {
-        if (page.rows() != null || page.isLast()) // We are not interested in 
terminating pages which are not last.
+        if (!page.rows().isEmpty() || page.isLast() || queue.isEmpty())
             queue.add(page);
     }
 
@@ -60,30 +60,24 @@ public class GridMergeIndexUnsorted extends GridMergeIndex {
             Iterator<Value[]> iter = Collections.emptyIterator();
 
             @Override public boolean hasNext() {
-                if (iter.hasNext())
-                    return true;
+                while (!iter.hasNext()){
+                    GridResultPage page;
 
-                GridResultPage page;
+                    try {
+                        page = queue.take();
+                    }
+                    catch (InterruptedException e) {
+                        throw new CacheException("Query execution was 
interrupted.", e);
+                    }
 
-                try {
-                    page = queue.take();
-                }
-                catch (InterruptedException e) {
-                    throw new IgniteException("Query execution was 
interrupted.", e);
-                }
+                    if (page.isLast())
+                        return false; // We are done.
 
-                if (page.isLast()) {
-                    assert queue.isEmpty() : "It must be the last page: " + 
queue;
+                    fetchNextPage(page);
 
-                    return false; // We are done.
+                    iter = page.rows().iterator();
                 }
 
-                fetchNextPage(page);
-
-                iter = page.rows().iterator();
-
-                assert iter.hasNext();
-
                 return true;
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eacbac44/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index d1d8faa..1d6bb99 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -20,8 +20,10 @@ package 
org.apache.ignite.internal.processors.query.h2.twostep;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.processors.query.h2.*;
@@ -107,10 +109,24 @@ public class GridReduceQueryExecutor implements 
GridMessageListener {
 
         log = ctx.log(GridReduceQueryExecutor.class);
 
-        // TODO handle node failure.
-
         ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, this);
 
+        ctx.event().addLocalEventListener(new GridLocalEventListener() {
+            @Override public void onEvent(final Event evt) {
+                UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
+
+                for (QueryRun r : runs.values()) {
+                    for (GridMergeTable tbl : r.tbls) {
+                        if (tbl.getScanIndex(null).hasSource(nodeId)) {
+                            fail(r, nodeId, "Node left the topology.");
+
+                            break;
+                        }
+                    }
+                }
+            }
+        }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
+
         h2.executeStatement("PUBLIC", "CREATE ALIAS " + 
GridSqlQuerySplitter.TABLE_FUNC_NAME +
             " FOR \"" + GridReduceQueryExecutor.class.getName() + 
".mergeTableFunction\"");
     }
@@ -146,11 +162,23 @@ public class GridReduceQueryExecutor implements 
GridMessageListener {
     private void onFail(ClusterNode node, GridQueryFailResponse msg) {
         QueryRun r = runs.get(msg.queryRequestId());
 
-        if (r != null && r.latch.getCount() != 0) {
-            r.rmtErr = new CacheException("Failed to execute map query on the 
node: " + node.id() + "\n " + msg.error());
+        fail(r, node.id(), msg.error());
+    }
+
+    /**
+     * @param r Query run.
+     * @param nodeId Failed node ID.
+     * @param msg Error message.
+     */
+    private void fail(QueryRun r, UUID nodeId, String msg) {
+        if (r != null) {
+            r.rmtErr = new CacheException("Failed to execute map query on the 
node: " + nodeId + ", " + msg);
 
-            while(r.latch.getCount() > 0)
+            while(r.latch.getCount() != 0)
                 r.latch.countDown();
+
+            for (GridMergeTable tbl : r.tbls)
+                tbl.getScanIndex(null).fail(nodeId);
         }
     }
 
@@ -173,6 +201,9 @@ public class GridReduceQueryExecutor implements 
GridMessageListener {
 
         idx.addPage(new GridResultPage(node.id(), msg, false) {
             @Override public void fetchNextPage() {
+                if (r.rmtErr != null)
+                    throw new CacheException("Next page fetch failed.", 
r.rmtErr);
+
                 try {
                     GridQueryNextPageRequest msg0 = new 
GridQueryNextPageRequest(qryReqId, qry, pageSize);
 
@@ -182,7 +213,7 @@ public class GridReduceQueryExecutor implements 
GridMessageListener {
                         ctx.io().send(node, GridTopic.TOPIC_QUERY, msg0, 
GridIoPolicy.PUBLIC_POOL);
                 }
                 catch (IgniteCheckedException e) {
-                    throw new IgniteException(e);
+                    throw new CacheException(e);
                 }
             }
         });
@@ -435,7 +466,7 @@ public class GridReduceQueryExecutor implements 
GridMessageListener {
         private int pageSize;
 
         /** */
-        private volatile Throwable rmtErr;
+        private volatile CacheException rmtErr;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eacbac44/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
index a903f01..e31829d 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
@@ -44,7 +44,7 @@ public class GridResultPage {
      * @param res Response.
      * @param last If this is the globally last page.
      */
-    protected GridResultPage(UUID src, GridQueryNextPageResponse res, boolean 
last) {
+    public GridResultPage(UUID src, GridQueryNextPageResponse res, boolean 
last) {
         assert src != null;
 
         this.src = src;
@@ -55,7 +55,7 @@ public class GridResultPage {
             assert res == null : "The last page must be dummy.";
 
         // res == null means that it is a terminating dummy page for the given 
source node ID.
-        rows = res == null ? null : 
GridMapQueryExecutor.unmarshallRows(res.rows());
+        rows = res == null ? Collections.<Value[]>emptySet() : 
GridMapQueryExecutor.unmarshallRows(res.rows());
     }
 
     /**

Reply via email to