ignite-gg9499 - group by
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f941e415 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f941e415 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f941e415 Branch: refs/heads/sprint-1 Commit: f941e415f2dc597435d9974a9f704bddf69fbf29 Parents: af14b52 Author: S.Vladykin <svlady...@gridgain.com> Authored: Wed Jan 14 12:54:08 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Wed Jan 14 12:54:08 2015 +0300 ---------------------------------------------------------------------- .../cache/query/GridCacheQueriesEx.java | 11 ++- .../cache/query/GridCacheQueriesImpl.java | 14 +++- .../cache/query/GridCacheQueriesProxy.java | 16 +++- .../cache/query/GridCacheSqlQuery.java | 8 ++ .../cache/query/GridCacheTwoStepQuery.java | 8 ++ .../processors/query/GridQueryIndexing.java | 11 ++- .../processors/query/GridQueryProcessor.java | 23 +++++- .../processors/query/h2/GridH2Indexing.java | 20 ++++- .../processors/query/h2/sql/GridSqlColumn.java | 2 + .../query/h2/sql/GridSqlQueryParser.java | 5 ++ .../query/h2/sql/GridSqlQuerySplitter.java | 85 +++++++++++++++----- .../processors/query/h2/sql/GridSqlSelect.java | 50 +++++++++++- .../h2/twostep/GridReduceQueryExecutor.java | 54 ++++++++++--- .../cache/GridCacheCrossCacheQuerySelfTest.java | 39 ++++++++- 14 files changed, 303 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f941e415/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesEx.java index e854367..a936a8b 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesEx.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesEx.java @@ -44,8 +44,17 @@ public interface GridCacheQueriesEx<K, V> extends GridCacheQueries<K, V> { public <R> GridCacheQuery<R> createSpiQuery(); /** + * @param space Space name. * @param qry Query. * @return Future. */ - public IgniteFuture<GridCacheSqlResult> execute(GridCacheTwoStepQuery qry); + public IgniteFuture<GridCacheSqlResult> execute(String space, GridCacheTwoStepQuery qry); + + /** + * @param space Space. + * @param sqlQry Query. + * @param params Parameters. + * @return Result. + */ + public IgniteFuture<GridCacheSqlResult> executeTwoStepQuery(String space, String sqlQry, Object... params); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f941e415/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesImpl.java index f643cb2..93a091a 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesImpl.java @@ -158,8 +158,18 @@ public class GridCacheQueriesImpl<K, V> implements GridCacheQueriesEx<K, V>, Ext } /** {@inheritDoc} */ - @Override public IgniteFuture<GridCacheSqlResult> execute(GridCacheTwoStepQuery qry) { - return ctx.kernalContext().query().queryTwoStep(qry); + @Override public IgniteFuture<GridCacheSqlResult> execute(String space, GridCacheTwoStepQuery qry) { + return ctx.kernalContext().query().queryTwoStep(space, qry); + } + + /** + * @param space Space. + * @param sqlQry Query. + * @param params Parameters. + * @return Result. + */ + public IgniteFuture<GridCacheSqlResult> executeTwoStepQuery(String space, String sqlQry, Object[] params) { + return ctx.kernalContext().query().queryTwoStep(space, sqlQry, params); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f941e415/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesProxy.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesProxy.java index 61f7ac7..1df4763 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesProxy.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesProxy.java @@ -166,11 +166,23 @@ public class GridCacheQueriesProxy<K, V> implements GridCacheQueriesEx<K, V>, Ex } /** {@inheritDoc} */ - @Override public IgniteFuture<GridCacheSqlResult> execute(GridCacheTwoStepQuery qry) { + @Override public IgniteFuture<GridCacheSqlResult> execute(String space, GridCacheTwoStepQuery qry) { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); try { - return delegate.execute(qry); + return delegate.execute(space, qry); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<GridCacheSqlResult> executeTwoStepQuery(String space, String sqlQry, Object[] params) { + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + return delegate.executeTwoStepQuery(space, sqlQry, params); } finally { gate.leave(prev); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f941e415/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlQuery.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlQuery.java index 025ea29..926f575 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlQuery.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlQuery.java @@ -9,6 +9,7 @@ package org.gridgain.grid.kernal.processors.cache.query; +import org.gridgain.grid.util.tostring.*; import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; @@ -25,9 +26,11 @@ public class GridCacheSqlQuery implements Externalizable { String alias; /** */ + @GridToStringInclude String qry; /** */ + @GridToStringInclude Object[] params; /** @@ -88,4 +91,9 @@ public class GridCacheSqlQuery implements Externalizable { if (F.isEmpty(params)) params = EMPTY_PARAMS; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCacheSqlQuery.class, this); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f941e415/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheTwoStepQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheTwoStepQuery.java index a7c9a02..271b3b7 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheTwoStepQuery.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheTwoStepQuery.java @@ -11,6 +11,7 @@ package org.gridgain.grid.kernal.processors.cache.query; import org.apache.ignite.*; import org.gridgain.grid.util.*; +import org.gridgain.grid.util.tostring.*; import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; @@ -22,9 +23,11 @@ import java.util.*; */ public class GridCacheTwoStepQuery implements Serializable { /** */ + @GridToStringInclude private Map<String, GridCacheSqlQuery> mapQrys; /** */ + @GridToStringInclude private GridCacheSqlQuery reduce; /** @@ -63,4 +66,9 @@ public class GridCacheTwoStepQuery implements Serializable { public Collection<GridCacheSqlQuery> mapQueries() { return mapQrys.values(); } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridCacheTwoStepQuery.class, this); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f941e415/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java index 1b9ec6a..72604fc 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java @@ -42,10 +42,19 @@ public interface GridQueryIndexing { /** * Runs two step query. * + * @param space Space name. * @param qry Query. * @return Future. */ - public IgniteFuture<GridCacheSqlResult> queryTwoStep(GridCacheTwoStepQuery qry); + public IgniteFuture<GridCacheSqlResult> queryTwoStep(String space, GridCacheTwoStepQuery qry); + + /** + * @param space Space. + * @param sqlQry Query. + * @param params Parameters. + * @return Result. + */ + public IgniteFuture<GridCacheSqlResult> queryTwoStep(String space, String sqlQry, Object[] params); /** * Queries individual fields (generally used by JDBC drivers). http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f941e415/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryProcessor.java index e05c425..dd48633 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryProcessor.java @@ -429,15 +429,34 @@ public class GridQueryProcessor extends GridProcessorAdapter { } /** + * @param space Space name. * @param qry Query. * @return Future. */ - public IgniteFuture<GridCacheSqlResult> queryTwoStep(GridCacheTwoStepQuery qry) { + public IgniteFuture<GridCacheSqlResult> queryTwoStep(String space, GridCacheTwoStepQuery qry) { + if (!busyLock.enterBusy()) + throw new IllegalStateException("Failed to execute query (grid is stopping)."); + + try { + return idx.queryTwoStep(space, qry); + } + finally { + busyLock.leaveBusy(); + } + } + + /** + * @param space Space. + * @param sqlQry Query. + * @param params Parameters. + * @return Result. + */ + public IgniteFuture<GridCacheSqlResult> queryTwoStep(String space, String sqlQry, Object[] params) { if (!busyLock.enterBusy()) throw new IllegalStateException("Failed to execute query (grid is stopping)."); try { - return idx.queryTwoStep(qry); + return idx.queryTwoStep(space, sqlQry, params); } finally { busyLock.leaveBusy(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f941e415/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java index 7ee84f8..76cbe4f 100644 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java +++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java @@ -23,8 +23,10 @@ import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.kernal.processors.cache.query.*; import org.gridgain.grid.kernal.processors.query.*; import org.gridgain.grid.kernal.processors.query.h2.opt.*; +import org.gridgain.grid.kernal.processors.query.h2.sql.*; import org.gridgain.grid.kernal.processors.query.h2.twostep.*; import org.gridgain.grid.util.*; +import org.gridgain.grid.util.future.*; import org.gridgain.grid.util.lang.*; import org.gridgain.grid.util.offheap.unsafe.*; import org.gridgain.grid.util.typedef.*; @@ -743,8 +745,22 @@ public class GridH2Indexing implements GridQueryIndexing { } /** {@inheritDoc} */ - @Override public IgniteFuture<GridCacheSqlResult> queryTwoStep(GridCacheTwoStepQuery qry) { - return rdcQryExec.query(qry); + @Override public IgniteFuture<GridCacheSqlResult> queryTwoStep(String space, GridCacheTwoStepQuery qry) { + return rdcQryExec.query(space, qry); + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<GridCacheSqlResult> queryTwoStep(String space, String sqlQry, Object[] params) { + Connection c; + + try { + c = connectionForSpace(space); + } + catch (IgniteCheckedException e) { + return new GridFinishedFutureEx<>(e); + } + + return queryTwoStep(space, GridSqlQuerySplitter.split(c, sqlQry, params)); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f941e415/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlColumn.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlColumn.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlColumn.java index ef9b70c..460ce1c 100644 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlColumn.java +++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlColumn.java @@ -28,6 +28,8 @@ public class GridSqlColumn extends GridSqlElement implements GridSqlValue { * @param sqlText Text. */ public GridSqlColumn(GridSqlElement from, String name, String sqlText) { + assert sqlText != null; + expressionInFrom = from; colName = name; this.sqlText = sqlText; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f941e415/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlQueryParser.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlQueryParser.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlQueryParser.java index 549983e..cbfca7a 100644 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlQueryParser.java +++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlQueryParser.java @@ -221,9 +221,14 @@ public class GridSqlQueryParser { ArrayList<Expression> expressions = select.getExpressions(); + for (Expression exp : expressions) + res.addExpression(parseExpression(exp)); + int[] grpIdx = GROUP_INDEXES.get(select); if (grpIdx != null) { + res.groupColumns(grpIdx); + for (int idx : grpIdx) res.addGroupExpression(parseExpression(expressions.get(idx))); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f941e415/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlQuerySplitter.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlQuerySplitter.java index bef0ce9..c9815cc 100644 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlQuerySplitter.java +++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlQuerySplitter.java @@ -18,48 +18,93 @@ import java.util.*; * Splits a single SQL query into two step map-reduce query. */ public class GridSqlQuerySplitter { + /** */ + private static final String TABLE_PREFIX = "__T"; + + /** */ + private static final String COLUMN_PREFIX = "__C"; + + /** + * @param idx Index of table. + * @return Table name. + */ + private static String table(int idx) { + return TABLE_PREFIX + idx; + } + + /** + * @param idx Index of column. + * @return Column alias. + */ + private static String column(int idx) { + return COLUMN_PREFIX + idx; + } + /** * @param conn Connection. * @param query Query. * @param params Parameters. * @return Two step query. */ - public GridCacheTwoStepQuery split(Connection conn, String query, Collection<?> params) { - GridSqlSelect qry = GridSqlQueryParser.parse(conn, query); + public static GridCacheTwoStepQuery split(Connection conn, String query, Object[] params) { + GridSqlSelect srcQry = GridSqlQueryParser.parse(conn, query); -// GridSqlSelect rdcQry = qry.clone(); + if (srcQry.groups().isEmpty()) { // Simple case. + String tbl0 = table(0); - for (GridSqlElement el : qry.select()) { + GridCacheTwoStepQuery res = new GridCacheTwoStepQuery("select * from " + tbl0); + res.addMapQuery(tbl0, srcQry.getSQL(), params); + + return res; } - if (qry.distinct()) { + // Map query. + GridSqlSelect mapQry = srcQry.clone(); - } + mapQry.clearSelect(); - qry.from(); + List<GridSqlAlias> aliases = new ArrayList<>(srcQry.allExpressions().size()); - qry.where(); + int idx = 0; - qry.groups(); + for (GridSqlElement exp : srcQry.allExpressions()) { // Add all expressions to select clause. + if (exp instanceof GridSqlColumn) + exp = new GridSqlAlias(((GridSqlColumn)exp).columnName(), exp); + else if (!(exp instanceof GridSqlAlias)) + exp = new GridSqlAlias(column(idx), exp); - qry.having(); + aliases.add((GridSqlAlias)exp); - qry.sort(); - } + mapQry.addSelectExpression(exp); + + idx++; + + assert aliases.size() == idx; + } + + mapQry.clearGroups(); + + for (int col : srcQry.groupColumns()) + mapQry.addGroupExpression(new GridSqlColumn(null, null, aliases.get(col).alias())); + + mapQry.clearSort(); // TODO sort support + + // Reduce query. + GridSqlSelect rdcQry = new GridSqlSelect(); - private boolean checkGroup(GridSqlSelect qry) { - if (qry.distinct()) - return true; + for (int i = 0; i < srcQry.select().size(); i++) + rdcQry.addSelectExpression(new GridSqlColumn(null, null, aliases.get(i).alias())); - qry.from(); + rdcQry.from(new GridSqlTable(null, table(0))); - qry.where(); + for (int col : srcQry.groupColumns()) + rdcQry.addGroupExpression(new GridSqlColumn(null, null, aliases.get(col).alias())); - qry.groups(); + GridCacheTwoStepQuery res = new GridCacheTwoStepQuery(rdcQry.getSQL()); - qry.having(); + res.addMapQuery(table(0), mapQry.getSQL(), params); - qry.sort(); + return res; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f941e415/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlSelect.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlSelect.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlSelect.java index 535c3d1..500c90c 100644 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlSelect.java +++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlSelect.java @@ -22,12 +22,18 @@ public class GridSqlSelect implements Cloneable { private boolean distinct; /** */ + private List<GridSqlElement> allExprs; + + /** */ private List<GridSqlElement> select = new ArrayList<>(); /** */ private List<GridSqlElement> groups = new ArrayList<>(); /** */ + private int[] grpCols; + + /** */ private GridSqlElement from; /** */ @@ -124,6 +130,23 @@ public class GridSqlSelect implements Cloneable { } /** + * @param expression Expression. + */ + public void addExpression(GridSqlElement expression) { + if (allExprs == null) + allExprs = new ArrayList<>(); + + allExprs.add(expression); + } + + /** + * @return All expressions in select, group by, order by. + */ + public List<GridSqlElement> allExpressions() { + return allExprs; + } + + /** * @return Expressions. */ public List<GridSqlElement> select() { @@ -131,6 +154,13 @@ public class GridSqlSelect implements Cloneable { } /** + * Clears select list. + */ + public void clearSelect() { + select = new ArrayList<>(); + } + + /** * @param expression Expression. */ public void addSelectExpression(GridSqlElement expression) { @@ -148,7 +178,8 @@ public class GridSqlSelect implements Cloneable { * */ public void clearGroups() { - groups.clear(); + groups = new ArrayList<>(); + grpCols = null; } /** @@ -159,6 +190,20 @@ public class GridSqlSelect implements Cloneable { } /** + * @return Group columns. + */ + public int[] groupColumns() { + return grpCols; + } + + /** + * @param grpCols Group columns. + */ + public void groupColumns(int[] grpCols) { + this.grpCols = grpCols; + } + + /** * @return Tables. */ public GridSqlElement from() { @@ -211,7 +256,7 @@ public class GridSqlSelect implements Cloneable { * */ public void clearSort() { - sort.clear(); + sort = new LinkedHashMap<>(); } /** @@ -231,6 +276,7 @@ public class GridSqlSelect implements Cloneable { res.select = new ArrayList<>(select); res.groups = new ArrayList<>(groups); res.sort = new LinkedHashMap<>(sort); + res.allExprs = null; return res; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f941e415/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java index a12b4f9..41da200 100644 --- a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -106,24 +106,36 @@ public class GridReduceQueryExecutor { }); } - public IgniteFuture<GridCacheSqlResult> query(GridCacheTwoStepQuery qry) { + /** + * @param space Space name. + * @param qry Query. + * @return Future. + */ + public IgniteFuture<GridCacheSqlResult> query(String space, GridCacheTwoStepQuery qry) { long qryReqId = reqIdGen.incrementAndGet(); QueryRun r = new QueryRun(); - r.tbls = new ArrayList<>(); + r.tbls = new ArrayList<>(qry.mapQueries().size()); try { - r.conn = h2.connectionForSpace(null); + r.conn = h2.connectionForSpace(space); } catch (IgniteCheckedException e) { - throw new IgniteException(e); + return new GridFinishedFutureEx<>(e); } Collection<ClusterNode> nodes = ctx.grid().cluster().nodes(); // TODO filter nodes somehow? for (GridCacheSqlQuery mapQry : qry.mapQueries()) { - GridMergeTable tbl = createTable(r.conn, mapQry); + GridMergeTable tbl; + + try { + tbl = createTable(r.conn, mapQry); + } + catch (IgniteCheckedException e) { + return new GridFinishedFutureEx<>(e); + } tbl.getScanIndex(null).setNumberOfSources(nodes.size()); @@ -144,14 +156,36 @@ public class GridReduceQueryExecutor { final ResultSet res = h2.executeSqlQueryWithTimer(r.conn, rdc.query(), F.asList(rdc.parameters())); + for (GridMergeTable tbl : r.tbls) + dropTable(r.conn, tbl.getName()); + return new GridFinishedFuture(ctx, new Iter(res)); } - catch (IgniteCheckedException | InterruptedException e) { + catch (IgniteCheckedException | InterruptedException | SQLException e) { + U.closeQuiet(r.conn); + return new GridFinishedFuture<>(ctx, e); } } - private GridMergeTable createTable(Connection conn, GridCacheSqlQuery qry) { + /** + * @param conn Connection. + * @param tblName Table name. + * @throws SQLException If failed. + */ + private void dropTable(Connection conn, String tblName) throws SQLException { + try (Statement s = conn.createStatement()) { + s.execute("DROP TABLE " + tblName); + } + } + + /** + * @param conn Connection. + * @param qry Query. + * @return Table. + * @throws IgniteCheckedException If failed. + */ + private GridMergeTable createTable(Connection conn, GridCacheSqlQuery qry) throws IgniteCheckedException { try { try (PreparedStatement s = conn.prepareStatement( "CREATE LOCAL TEMPORARY TABLE " + qry.alias() + @@ -164,8 +198,10 @@ public class GridReduceQueryExecutor { return GridMergeTable.Engine.getCreated(); } - catch (SQLException|IgniteCheckedException e) { - throw new IgniteException(e); + catch (SQLException e) { + U.closeQuiet(conn); + + throw new IgniteCheckedException(e); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f941e415/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java index b4d6595..af49b43 100644 --- a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java @@ -96,8 +96,10 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { public void testTwoStep() throws Exception { fillCaches(); + String cache = "partitioned"; + GridCacheQueriesEx<Integer, FactPurchase> qx = - (GridCacheQueriesEx<Integer, FactPurchase>)ignite.<Integer, FactPurchase>cache("partitioned").queries(); + (GridCacheQueriesEx<Integer, FactPurchase>)ignite.<Integer, FactPurchase>cache(cache).queries(); // for (Map.Entry<Integer, FactPurchase> e : qx.createSqlQuery(FactPurchase.class, "1 = 1").execute().get()) // X.println("___ " + e); @@ -106,11 +108,44 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { q.addMapQuery("_cnts_", "select count(*) x from \"partitioned\".FactPurchase where ? = ?", 2 ,2); - Object cnt = qx.execute(q).get().iterator().next().get(0); + Object cnt = qx.execute(cache, q).get().iterator().next().get(0); assertEquals(10L, cnt); } + /** + * @throws Exception If failed. + */ + public void testTwoStepGroup() throws Exception { + fillCaches(); + + GridCacheQueriesEx<Integer, FactPurchase> qx = + (GridCacheQueriesEx<Integer, FactPurchase>)ignite.<Integer, FactPurchase>cache("partitioned").queries(); + + Set<Integer> set0 = new HashSet<>(); + + for (List<?> o : qx.executeTwoStepQuery("partitioned", "select productId from FactPurchase group by productId") + .get()) { + X.println("___ -> " + o); + + assertTrue(set0.add((Integer) o.get(0))); + } + + X.println("___ "); + + Set<Integer> set1 = new HashSet<>(); + + for (List<?> o : qx.executeTwoStepQuery("partitioned", "select productId from FactPurchase") + .get()) { + X.println("___ -> " + o); + + set1.add((Integer)o.get(0)); + } + + assertFalse(set1.isEmpty()); + assertEquals(set0, set1); + } + /** @throws Exception If failed. */ public void testOnProjection() throws Exception { fillCaches();