ignite-sql-tests - node failures
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/eacbac44 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/eacbac44 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/eacbac44 Branch: refs/heads/ignite-45 Commit: eacbac44d26c259fbd3c8cb5da57affbd7a7c769 Parents: 7c63170 Author: S.Vladykin <svlady...@gridgain.com> Authored: Fri Mar 13 22:04:55 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Fri Mar 13 22:04:55 2015 +0300 ---------------------------------------------------------------------- .../query/h2/twostep/GridMapQueryExecutor.java | 17 +++++++- .../query/h2/twostep/GridMergeIndex.java | 15 +++++++ .../h2/twostep/GridMergeIndexUnsorted.java | 34 ++++++--------- .../h2/twostep/GridReduceQueryExecutor.java | 45 +++++++++++++++++--- .../query/h2/twostep/GridResultPage.java | 4 +- 5 files changed, 84 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eacbac44/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index adade06..99cbaa8 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.query.h2.*; import org.apache.ignite.internal.processors.query.h2.twostep.messages.*; @@ -86,7 +87,19 @@ public class GridMapQueryExecutor implements GridMessageListener { log = ctx.log(GridMapQueryExecutor.class); - // TODO handle node failures. + ctx.event().addLocalEventListener(new GridLocalEventListener() { + @Override public void onEvent(final Event evt) { + UUID nodeId = ((DiscoveryEvent)evt).eventNode().id(); + + ConcurrentMap<Long,QueryResults> nodeRess = qryRess.remove(nodeId); + + if (nodeRess == null) + return; + + for (QueryResults ress : nodeRess.values()) + ress.cancel(); + } + }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT); ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, this); } @@ -171,7 +184,7 @@ public class GridMapQueryExecutor implements GridMessageListener { QueryResults qr = new QueryResults(req.requestId(), qrys.size()); - if (nodeRess.putIfAbsent(req.requestId(), qr) != null) + if (nodeRess.put(req.requestId(), qr) != null) throw new IllegalStateException(); h2.setFilters(h2.backupFilter()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eacbac44/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java index f6989ae..a6a3fea 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java @@ -61,6 +61,14 @@ public abstract class GridMergeIndex extends BaseIndex { initBaseIndex(tbl, 0, name, cols, type); } + /** + * @param nodeId Node ID. + * @return {@code true} If this index needs data from the given source node. + */ + public boolean hasSource(UUID nodeId) { + return remainingRows.containsKey(nodeId); + } + /** {@inheritDoc} */ @Override public long getRowCount(Session session) { return rowsCnt.get(); @@ -80,6 +88,13 @@ public abstract class GridMergeIndex extends BaseIndex { } /** + * @param nodeId Node ID. + */ + public void fail(UUID nodeId) { + addPage0(new GridResultPage(nodeId, null, false)); + } + + /** * @param page Page. */ public final void addPage(GridResultPage page) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eacbac44/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java index 44faea1..93c9482 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java @@ -17,13 +17,13 @@ package org.apache.ignite.internal.processors.query.h2.twostep; -import org.apache.ignite.*; import org.h2.index.*; import org.h2.result.*; import org.h2.table.*; import org.h2.value.*; import org.jetbrains.annotations.*; +import javax.cache.*; import java.util.*; import java.util.concurrent.*; @@ -44,7 +44,7 @@ public class GridMergeIndexUnsorted extends GridMergeIndex { /** {@inheritDoc} */ @Override protected void addPage0(GridResultPage page) { - if (page.rows() != null || page.isLast()) // We are not interested in terminating pages which are not last. + if (!page.rows().isEmpty() || page.isLast() || queue.isEmpty()) queue.add(page); } @@ -60,30 +60,24 @@ public class GridMergeIndexUnsorted extends GridMergeIndex { Iterator<Value[]> iter = Collections.emptyIterator(); @Override public boolean hasNext() { - if (iter.hasNext()) - return true; + while (!iter.hasNext()){ + GridResultPage page; - GridResultPage page; + try { + page = queue.take(); + } + catch (InterruptedException e) { + throw new CacheException("Query execution was interrupted.", e); + } - try { - page = queue.take(); - } - catch (InterruptedException e) { - throw new IgniteException("Query execution was interrupted.", e); - } + if (page.isLast()) + return false; // We are done. - if (page.isLast()) { - assert queue.isEmpty() : "It must be the last page: " + queue; + fetchNextPage(page); - return false; // We are done. + iter = page.rows().iterator(); } - fetchNextPage(page); - - iter = page.rows().iterator(); - - assert iter.hasNext(); - return true; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eacbac44/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index d1d8faa..1d6bb99 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -20,8 +20,10 @@ package org.apache.ignite.internal.processors.query.h2.twostep; import org.apache.ignite.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.query.h2.*; @@ -107,10 +109,24 @@ public class GridReduceQueryExecutor implements GridMessageListener { log = ctx.log(GridReduceQueryExecutor.class); - // TODO handle node failure. - ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, this); + ctx.event().addLocalEventListener(new GridLocalEventListener() { + @Override public void onEvent(final Event evt) { + UUID nodeId = ((DiscoveryEvent)evt).eventNode().id(); + + for (QueryRun r : runs.values()) { + for (GridMergeTable tbl : r.tbls) { + if (tbl.getScanIndex(null).hasSource(nodeId)) { + fail(r, nodeId, "Node left the topology."); + + break; + } + } + } + } + }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT); + h2.executeStatement("PUBLIC", "CREATE ALIAS " + GridSqlQuerySplitter.TABLE_FUNC_NAME + " FOR \"" + GridReduceQueryExecutor.class.getName() + ".mergeTableFunction\""); } @@ -146,11 +162,23 @@ public class GridReduceQueryExecutor implements GridMessageListener { private void onFail(ClusterNode node, GridQueryFailResponse msg) { QueryRun r = runs.get(msg.queryRequestId()); - if (r != null && r.latch.getCount() != 0) { - r.rmtErr = new CacheException("Failed to execute map query on the node: " + node.id() + "\n " + msg.error()); + fail(r, node.id(), msg.error()); + } + + /** + * @param r Query run. + * @param nodeId Failed node ID. + * @param msg Error message. + */ + private void fail(QueryRun r, UUID nodeId, String msg) { + if (r != null) { + r.rmtErr = new CacheException("Failed to execute map query on the node: " + nodeId + ", " + msg); - while(r.latch.getCount() > 0) + while(r.latch.getCount() != 0) r.latch.countDown(); + + for (GridMergeTable tbl : r.tbls) + tbl.getScanIndex(null).fail(nodeId); } } @@ -173,6 +201,9 @@ public class GridReduceQueryExecutor implements GridMessageListener { idx.addPage(new GridResultPage(node.id(), msg, false) { @Override public void fetchNextPage() { + if (r.rmtErr != null) + throw new CacheException("Next page fetch failed.", r.rmtErr); + try { GridQueryNextPageRequest msg0 = new GridQueryNextPageRequest(qryReqId, qry, pageSize); @@ -182,7 +213,7 @@ public class GridReduceQueryExecutor implements GridMessageListener { ctx.io().send(node, GridTopic.TOPIC_QUERY, msg0, GridIoPolicy.PUBLIC_POOL); } catch (IgniteCheckedException e) { - throw new IgniteException(e); + throw new CacheException(e); } } }); @@ -435,7 +466,7 @@ public class GridReduceQueryExecutor implements GridMessageListener { private int pageSize; /** */ - private volatile Throwable rmtErr; + private volatile CacheException rmtErr; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eacbac44/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java index a903f01..e31829d 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java @@ -44,7 +44,7 @@ public class GridResultPage { * @param res Response. * @param last If this is the globally last page. */ - protected GridResultPage(UUID src, GridQueryNextPageResponse res, boolean last) { + public GridResultPage(UUID src, GridQueryNextPageResponse res, boolean last) { assert src != null; this.src = src; @@ -55,7 +55,7 @@ public class GridResultPage { assert res == null : "The last page must be dummy."; // res == null means that it is a terminating dummy page for the given source node ID. - rows = res == null ? null : GridMapQueryExecutor.unmarshallRows(res.rows()); + rows = res == null ? Collections.<Value[]>emptySet() : GridMapQueryExecutor.unmarshallRows(res.rows()); } /**