master - query restart tests fix2
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/246b94a8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/246b94a8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/246b94a8 Branch: refs/heads/ignite-1093 Commit: 246b94a8bdc9901935db1865a0607a9fe48f5b23 Parents: 90adeae Author: S.Vladykin <svlady...@gridgain.com> Authored: Tue Aug 4 21:05:13 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Tue Aug 4 21:05:13 2015 +0300 ---------------------------------------------------------------------- .../query/h2/twostep/GridMergeIndex.java | 7 +++ .../h2/twostep/GridMergeIndexUnsorted.java | 23 +++++++-- .../query/h2/twostep/GridMergeTable.java | 51 ++++++++------------ .../h2/twostep/GridReduceQueryExecutor.java | 28 +---------- 4 files changed, 45 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/246b94a8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java index 2b2996d..71b207d 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java @@ -68,6 +68,13 @@ public abstract class GridMergeIndex extends BaseIndex { } /** + * @return Return source nodes for this merge index. + */ + public Set<UUID> sources() { + return remainingRows.keySet(); + } + + /** * @param nodeId Node ID. * @return {@code true} If this index needs data from the given source node. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/246b94a8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java index e0a07ec..276d25b 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java @@ -64,11 +64,24 @@ public class GridMergeIndexUnsorted extends GridMergeIndex { while (!iter.hasNext()) { GridResultPage page; - try { - page = queue.take(); - } - catch (InterruptedException e) { - throw new CacheException("Query execution was interrupted.", e); + for (;;) { + try { + page = queue.poll(500, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) { + throw new CacheException("Query execution was interrupted.", e); + } + + if (page != null) + break; + + UUID nodeId = ((GridMergeTable)table).checkSourceNodesAlive(); + + if (nodeId != null) { + fail(nodeId); + + assert !queue.isEmpty(); + } } if (page.isLast()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/246b94a8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java index c9cdff2..fd9eec3 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java @@ -17,7 +17,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep; -import org.h2.api.*; +import org.apache.ignite.internal.*; import org.h2.command.ddl.*; import org.h2.engine.*; import org.h2.index.*; @@ -32,6 +32,9 @@ import java.util.*; */ public class GridMergeTable extends TableBase { /** */ + private final GridKernalContext ctx; + + /** */ private final ArrayList<Index> idxs = new ArrayList<>(1); /** */ @@ -39,15 +42,29 @@ public class GridMergeTable extends TableBase { /** * @param data Data. + * @param ctx Kernal context. */ - public GridMergeTable(CreateTableData data) { + public GridMergeTable(CreateTableData data, GridKernalContext ctx) { super(data); + this.ctx = ctx; idx = new GridMergeIndexUnsorted(this, "merge_scan"); idxs.add(idx); } + /** + * @return Failed node or {@code null} if all alive. + */ + public UUID checkSourceNodesAlive() { + for (UUID nodeId : idx.sources()) { + if (!ctx.discovery().alive(nodeId)) + return nodeId; + } + + return null; + } + /** {@inheritDoc} */ @Override public void lock(Session session, boolean exclusive, boolean force) { // No-op. @@ -153,34 +170,4 @@ public class GridMergeTable extends TableBase { @Override public void checkRename() { throw DbException.getUnsupportedException("rename"); } - - /** - * Engine. - */ - public static class Engine implements TableEngine { - /** */ - private static ThreadLocal<GridMergeTable> createdTbl = new ThreadLocal<>(); - - /** - * @return Created table. - */ - public static GridMergeTable getCreated() { - GridMergeTable tbl = createdTbl.get(); - - assert tbl != null; - - createdTbl.remove(); - - return tbl; - } - - /** {@inheritDoc} */ - @Override public Table createTable(CreateTableData data) { - GridMergeTable tbl = new GridMergeTable(data); - - createdTbl.set(tbl); - - return tbl; - } - } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/246b94a8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index ac269db..ad8ab34 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -1096,7 +1096,7 @@ public class GridReduceQueryExecutor { else data.columns = planColumns(); - return new GridMergeTable(data); + return new GridMergeTable(data, ctx); } catch (Exception e) { U.closeQuiet(conn); @@ -1117,32 +1117,6 @@ public class GridReduceQueryExecutor { } /** - * @param conn Connection. - * @param qry Query. - * @return Table. - * @throws IgniteCheckedException If failed. - */ - private GridMergeTable createTable(Connection conn, GridCacheSqlQuery qry) throws IgniteCheckedException { - try { - try (PreparedStatement s = conn.prepareStatement( - "CREATE LOCAL TEMPORARY TABLE " + qry.alias() + - " ENGINE \"" + GridMergeTable.Engine.class.getName() + "\" " + - " AS SELECT * FROM (" + qry.query() + ") WHERE FALSE")) { - h2.bindParameters(s, F.asList(qry.parameters())); - - s.execute(); - } - - return GridMergeTable.Engine.getCreated(); - } - catch (SQLException e) { - U.closeQuiet(conn); - - throw new IgniteCheckedException(e); - } - } - - /** * @param reconnectFut Reconnect future. */ public void onDisconnected(IgniteFuture<?> reconnectFut) {