ignite-gg9499 - group by

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f941e415
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f941e415
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f941e415

Branch: refs/heads/sprint-1
Commit: f941e415f2dc597435d9974a9f704bddf69fbf29
Parents: af14b52
Author: S.Vladykin <svlady...@gridgain.com>
Authored: Wed Jan 14 12:54:08 2015 +0300
Committer: S.Vladykin <svlady...@gridgain.com>
Committed: Wed Jan 14 12:54:08 2015 +0300

----------------------------------------------------------------------
 .../cache/query/GridCacheQueriesEx.java         | 11 ++-
 .../cache/query/GridCacheQueriesImpl.java       | 14 +++-
 .../cache/query/GridCacheQueriesProxy.java      | 16 +++-
 .../cache/query/GridCacheSqlQuery.java          |  8 ++
 .../cache/query/GridCacheTwoStepQuery.java      |  8 ++
 .../processors/query/GridQueryIndexing.java     | 11 ++-
 .../processors/query/GridQueryProcessor.java    | 23 +++++-
 .../processors/query/h2/GridH2Indexing.java     | 20 ++++-
 .../processors/query/h2/sql/GridSqlColumn.java  |  2 +
 .../query/h2/sql/GridSqlQueryParser.java        |  5 ++
 .../query/h2/sql/GridSqlQuerySplitter.java      | 85 +++++++++++++++-----
 .../processors/query/h2/sql/GridSqlSelect.java  | 50 +++++++++++-
 .../h2/twostep/GridReduceQueryExecutor.java     | 54 ++++++++++---
 .../cache/GridCacheCrossCacheQuerySelfTest.java | 39 ++++++++-
 14 files changed, 303 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f941e415/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesEx.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesEx.java
index e854367..a936a8b 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesEx.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesEx.java
@@ -44,8 +44,17 @@ public interface GridCacheQueriesEx<K, V> extends 
GridCacheQueries<K, V> {
     public <R> GridCacheQuery<R> createSpiQuery();
 
     /**
+     * @param space Space name.
      * @param qry Query.
      * @return Future.
      */
-    public IgniteFuture<GridCacheSqlResult> execute(GridCacheTwoStepQuery qry);
+    public IgniteFuture<GridCacheSqlResult> execute(String space, 
GridCacheTwoStepQuery qry);
+
+    /**
+     * @param space Space.
+     * @param sqlQry Query.
+     * @param params Parameters.
+     * @return Result.
+     */
+    public IgniteFuture<GridCacheSqlResult> executeTwoStepQuery(String space, 
String sqlQry, Object... params);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f941e415/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesImpl.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesImpl.java
index f643cb2..93a091a 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesImpl.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesImpl.java
@@ -158,8 +158,18 @@ public class GridCacheQueriesImpl<K, V> implements 
GridCacheQueriesEx<K, V>, Ext
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteFuture<GridCacheSqlResult> 
execute(GridCacheTwoStepQuery qry) {
-        return ctx.kernalContext().query().queryTwoStep(qry);
+    @Override public IgniteFuture<GridCacheSqlResult> execute(String space, 
GridCacheTwoStepQuery qry) {
+        return ctx.kernalContext().query().queryTwoStep(space, qry);
+    }
+
+    /**
+     * @param space Space.
+     * @param sqlQry Query.
+     * @param params Parameters.
+     * @return Result.
+     */
+    public IgniteFuture<GridCacheSqlResult> executeTwoStepQuery(String space, 
String sqlQry, Object[] params) {
+        return ctx.kernalContext().query().queryTwoStep(space, sqlQry, params);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f941e415/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesProxy.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesProxy.java
index 61f7ac7..1df4763 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesProxy.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheQueriesProxy.java
@@ -166,11 +166,23 @@ public class GridCacheQueriesProxy<K, V> implements 
GridCacheQueriesEx<K, V>, Ex
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteFuture<GridCacheSqlResult> 
execute(GridCacheTwoStepQuery qry) {
+    @Override public IgniteFuture<GridCacheSqlResult> execute(String space, 
GridCacheTwoStepQuery qry) {
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
-            return delegate.execute(qry);
+            return delegate.execute(space, qry);
+        }
+        finally {
+            gate.leave(prev);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<GridCacheSqlResult> 
executeTwoStepQuery(String space, String sqlQry, Object[] params) {
+        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+        try {
+            return delegate.executeTwoStepQuery(space, sqlQry, params);
         }
         finally {
             gate.leave(prev);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f941e415/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlQuery.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlQuery.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlQuery.java
index 025ea29..926f575 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlQuery.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheSqlQuery.java
@@ -9,6 +9,7 @@
 
 package org.gridgain.grid.kernal.processors.cache.query;
 
+import org.gridgain.grid.util.tostring.*;
 import org.gridgain.grid.util.typedef.*;
 import org.gridgain.grid.util.typedef.internal.*;
 
@@ -25,9 +26,11 @@ public class GridCacheSqlQuery implements Externalizable {
     String alias;
 
     /** */
+    @GridToStringInclude
     String qry;
 
     /** */
+    @GridToStringInclude
     Object[] params;
 
     /**
@@ -88,4 +91,9 @@ public class GridCacheSqlQuery implements Externalizable {
         if (F.isEmpty(params))
             params = EMPTY_PARAMS;
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridCacheSqlQuery.class, this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f941e415/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheTwoStepQuery.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheTwoStepQuery.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheTwoStepQuery.java
index a7c9a02..271b3b7 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheTwoStepQuery.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -11,6 +11,7 @@ package org.gridgain.grid.kernal.processors.cache.query;
 
 import org.apache.ignite.*;
 import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.tostring.*;
 import org.gridgain.grid.util.typedef.*;
 import org.gridgain.grid.util.typedef.internal.*;
 
@@ -22,9 +23,11 @@ import java.util.*;
  */
 public class GridCacheTwoStepQuery implements Serializable {
     /** */
+    @GridToStringInclude
     private Map<String, GridCacheSqlQuery> mapQrys;
 
     /** */
+    @GridToStringInclude
     private GridCacheSqlQuery reduce;
 
     /**
@@ -63,4 +66,9 @@ public class GridCacheTwoStepQuery implements Serializable {
     public Collection<GridCacheSqlQuery> mapQueries() {
         return mapQrys.values();
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridCacheTwoStepQuery.class, this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f941e415/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java
index 1b9ec6a..72604fc 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryIndexing.java
@@ -42,10 +42,19 @@ public interface GridQueryIndexing {
     /**
      * Runs two step query.
      *
+     * @param space Space name.
      * @param qry Query.
      * @return Future.
      */
-    public IgniteFuture<GridCacheSqlResult> queryTwoStep(GridCacheTwoStepQuery 
qry);
+    public IgniteFuture<GridCacheSqlResult> queryTwoStep(String space,  
GridCacheTwoStepQuery qry);
+
+    /**
+     * @param space Space.
+     * @param sqlQry Query.
+     * @param params Parameters.
+     * @return Result.
+     */
+    public IgniteFuture<GridCacheSqlResult> queryTwoStep(String space, String 
sqlQry, Object[] params);
 
     /**
      * Queries individual fields (generally used by JDBC drivers).

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f941e415/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryProcessor.java
 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryProcessor.java
index e05c425..dd48633 100644
--- 
a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryProcessor.java
+++ 
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/query/GridQueryProcessor.java
@@ -429,15 +429,34 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * @param space Space name.
      * @param qry Query.
      * @return Future.
      */
-    public IgniteFuture<GridCacheSqlResult> queryTwoStep(GridCacheTwoStepQuery 
qry) {
+    public IgniteFuture<GridCacheSqlResult> queryTwoStep(String space, 
GridCacheTwoStepQuery qry) {
+        if (!busyLock.enterBusy())
+            throw new IllegalStateException("Failed to execute query (grid is 
stopping).");
+
+        try {
+            return idx.queryTwoStep(space, qry);
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * @param space Space.
+     * @param sqlQry Query.
+     * @param params Parameters.
+     * @return Result.
+     */
+    public IgniteFuture<GridCacheSqlResult> queryTwoStep(String space, String 
sqlQry, Object[] params) {
         if (!busyLock.enterBusy())
             throw new IllegalStateException("Failed to execute query (grid is 
stopping).");
 
         try {
-            return idx.queryTwoStep(qry);
+            return idx.queryTwoStep(space, sqlQry, params);
         }
         finally {
             busyLock.leaveBusy();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f941e415/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java
 
b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java
index 7ee84f8..76cbe4f 100644
--- 
a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/GridH2Indexing.java
@@ -23,8 +23,10 @@ import org.gridgain.grid.kernal.processors.cache.*;
 import org.gridgain.grid.kernal.processors.cache.query.*;
 import org.gridgain.grid.kernal.processors.query.*;
 import org.gridgain.grid.kernal.processors.query.h2.opt.*;
+import org.gridgain.grid.kernal.processors.query.h2.sql.*;
 import org.gridgain.grid.kernal.processors.query.h2.twostep.*;
 import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.future.*;
 import org.gridgain.grid.util.lang.*;
 import org.gridgain.grid.util.offheap.unsafe.*;
 import org.gridgain.grid.util.typedef.*;
@@ -743,8 +745,22 @@ public class GridH2Indexing implements GridQueryIndexing {
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteFuture<GridCacheSqlResult> 
queryTwoStep(GridCacheTwoStepQuery qry) {
-        return rdcQryExec.query(qry);
+    @Override public IgniteFuture<GridCacheSqlResult> queryTwoStep(String 
space, GridCacheTwoStepQuery qry) {
+        return rdcQryExec.query(space, qry);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<GridCacheSqlResult> queryTwoStep(String 
space, String sqlQry, Object[] params) {
+        Connection c;
+
+        try {
+            c = connectionForSpace(space);
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFutureEx<>(e);
+        }
+
+        return queryTwoStep(space, GridSqlQuerySplitter.split(c, sqlQry, 
params));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f941e415/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlColumn.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlColumn.java
 
b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlColumn.java
index ef9b70c..460ce1c 100644
--- 
a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlColumn.java
+++ 
b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlColumn.java
@@ -28,6 +28,8 @@ public class GridSqlColumn extends GridSqlElement implements 
GridSqlValue {
      * @param sqlText Text.
      */
     public GridSqlColumn(GridSqlElement from, String name, String sqlText) {
+        assert sqlText != null;
+
         expressionInFrom = from;
         colName = name;
         this.sqlText = sqlText;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f941e415/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlQueryParser.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlQueryParser.java
 
b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlQueryParser.java
index 549983e..cbfca7a 100644
--- 
a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlQueryParser.java
+++ 
b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlQueryParser.java
@@ -221,9 +221,14 @@ public class GridSqlQueryParser {
 
         ArrayList<Expression> expressions = select.getExpressions();
 
+        for (Expression exp : expressions)
+            res.addExpression(parseExpression(exp));
+
         int[] grpIdx = GROUP_INDEXES.get(select);
 
         if (grpIdx != null) {
+            res.groupColumns(grpIdx);
+
             for (int idx : grpIdx)
                 res.addGroupExpression(parseExpression(expressions.get(idx)));
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f941e415/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlQuerySplitter.java
 
b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlQuerySplitter.java
index bef0ce9..c9815cc 100644
--- 
a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ 
b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -18,48 +18,93 @@ import java.util.*;
  * Splits a single SQL query into two step map-reduce query.
  */
 public class GridSqlQuerySplitter {
+    /** */
+    private static final String TABLE_PREFIX = "__T";
+
+    /** */
+    private static final String COLUMN_PREFIX = "__C";
+
+    /**
+     * @param idx Index of table.
+     * @return Table name.
+     */
+    private static String table(int idx) {
+        return TABLE_PREFIX + idx;
+    }
+
+    /**
+     * @param idx Index of column.
+     * @return Column alias.
+     */
+    private static String column(int idx) {
+        return COLUMN_PREFIX + idx;
+    }
+
     /**
      * @param conn Connection.
      * @param query Query.
      * @param params Parameters.
      * @return Two step query.
      */
-    public GridCacheTwoStepQuery split(Connection conn, String query, 
Collection<?> params) {
-        GridSqlSelect qry = GridSqlQueryParser.parse(conn, query);
+    public static GridCacheTwoStepQuery split(Connection conn, String query, 
Object[] params) {
+        GridSqlSelect srcQry = GridSqlQueryParser.parse(conn, query);
 
-//        GridSqlSelect rdcQry = qry.clone();
+        if (srcQry.groups().isEmpty()) { // Simple case.
+            String tbl0 = table(0);
 
-        for (GridSqlElement el : qry.select()) {
+            GridCacheTwoStepQuery res = new GridCacheTwoStepQuery("select * 
from " + tbl0);
 
+            res.addMapQuery(tbl0, srcQry.getSQL(), params);
+
+            return res;
         }
 
-        if (qry.distinct()) {
+        // Map query.
+        GridSqlSelect mapQry = srcQry.clone();
 
-        }
+        mapQry.clearSelect();
 
-        qry.from();
+        List<GridSqlAlias> aliases = new 
ArrayList<>(srcQry.allExpressions().size());
 
-        qry.where();
+        int idx = 0;
 
-        qry.groups();
+        for (GridSqlElement exp : srcQry.allExpressions()) { // Add all 
expressions to select clause.
+            if (exp instanceof GridSqlColumn)
+                exp = new GridSqlAlias(((GridSqlColumn)exp).columnName(), exp);
+            else if (!(exp instanceof GridSqlAlias))
+                exp = new GridSqlAlias(column(idx), exp);
 
-        qry.having();
+            aliases.add((GridSqlAlias)exp);
 
-        qry.sort();
-    }
+            mapQry.addSelectExpression(exp);
+
+            idx++;
+
+            assert aliases.size() == idx;
+        }
+
+        mapQry.clearGroups();
+
+        for (int col : srcQry.groupColumns())
+            mapQry.addGroupExpression(new GridSqlColumn(null, null, 
aliases.get(col).alias()));
+
+        mapQry.clearSort(); // TODO sort support
+
+        // Reduce query.
+        GridSqlSelect rdcQry = new GridSqlSelect();
 
-    private boolean checkGroup(GridSqlSelect qry) {
-        if (qry.distinct())
-            return true;
+        for (int i = 0; i < srcQry.select().size(); i++)
+            rdcQry.addSelectExpression(new GridSqlColumn(null, null, 
aliases.get(i).alias()));
 
-        qry.from();
+        rdcQry.from(new GridSqlTable(null, table(0)));
 
-        qry.where();
+        for (int col : srcQry.groupColumns())
+            rdcQry.addGroupExpression(new GridSqlColumn(null, null, 
aliases.get(col).alias()));
 
-        qry.groups();
+        GridCacheTwoStepQuery res = new GridCacheTwoStepQuery(rdcQry.getSQL());
 
-        qry.having();
+        res.addMapQuery(table(0), mapQry.getSQL(), params);
 
-        qry.sort();
+        return res;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f941e415/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlSelect.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlSelect.java
 
b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlSelect.java
index 535c3d1..500c90c 100644
--- 
a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlSelect.java
+++ 
b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/sql/GridSqlSelect.java
@@ -22,12 +22,18 @@ public class GridSqlSelect implements Cloneable {
     private boolean distinct;
 
     /** */
+    private List<GridSqlElement> allExprs;
+
+    /** */
     private List<GridSqlElement> select = new ArrayList<>();
 
     /** */
     private List<GridSqlElement> groups = new ArrayList<>();
 
     /** */
+    private int[] grpCols;
+
+    /** */
     private GridSqlElement from;
 
     /** */
@@ -124,6 +130,23 @@ public class GridSqlSelect implements Cloneable {
     }
 
     /**
+     * @param expression Expression.
+     */
+    public void addExpression(GridSqlElement expression) {
+        if (allExprs == null)
+            allExprs = new ArrayList<>();
+
+        allExprs.add(expression);
+    }
+
+    /**
+     * @return All expressions in select, group by, order by.
+     */
+    public List<GridSqlElement> allExpressions() {
+        return allExprs;
+    }
+
+    /**
      * @return Expressions.
      */
     public List<GridSqlElement> select() {
@@ -131,6 +154,13 @@ public class GridSqlSelect implements Cloneable {
     }
 
     /**
+     * Clears select list.
+     */
+    public void clearSelect() {
+        select = new ArrayList<>();
+    }
+
+    /**
      * @param expression Expression.
      */
     public void addSelectExpression(GridSqlElement expression) {
@@ -148,7 +178,8 @@ public class GridSqlSelect implements Cloneable {
      *
      */
     public void clearGroups() {
-        groups.clear();
+        groups = new ArrayList<>();
+        grpCols = null;
     }
 
     /**
@@ -159,6 +190,20 @@ public class GridSqlSelect implements Cloneable {
     }
 
     /**
+     * @return Group columns.
+     */
+    public int[] groupColumns() {
+        return grpCols;
+    }
+
+    /**
+     * @param grpCols Group columns.
+     */
+    public void groupColumns(int[] grpCols) {
+        this.grpCols = grpCols;
+    }
+
+    /**
      * @return Tables.
      */
     public GridSqlElement from() {
@@ -211,7 +256,7 @@ public class GridSqlSelect implements Cloneable {
      *
      */
     public void clearSort() {
-        sort.clear();
+        sort = new LinkedHashMap<>();
     }
 
     /**
@@ -231,6 +276,7 @@ public class GridSqlSelect implements Cloneable {
             res.select = new ArrayList<>(select);
             res.groups = new ArrayList<>(groups);
             res.sort = new LinkedHashMap<>(sort);
+            res.allExprs = null;
 
             return res;
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f941e415/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index a12b4f9..41da200 100644
--- 
a/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/gridgain/grid/kernal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -106,24 +106,36 @@ public class GridReduceQueryExecutor {
         });
     }
 
-    public IgniteFuture<GridCacheSqlResult> query(GridCacheTwoStepQuery qry) {
+    /**
+     * @param space Space name.
+     * @param qry Query.
+     * @return Future.
+     */
+    public IgniteFuture<GridCacheSqlResult> query(String space, 
GridCacheTwoStepQuery qry) {
         long qryReqId = reqIdGen.incrementAndGet();
 
         QueryRun r = new QueryRun();
 
-        r.tbls = new ArrayList<>();
+        r.tbls = new ArrayList<>(qry.mapQueries().size());
 
         try {
-            r.conn = h2.connectionForSpace(null);
+            r.conn = h2.connectionForSpace(space);
         }
         catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
+            return new GridFinishedFutureEx<>(e);
         }
 
         Collection<ClusterNode> nodes = ctx.grid().cluster().nodes(); // TODO 
filter nodes somehow?
 
         for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
-            GridMergeTable tbl = createTable(r.conn, mapQry);
+            GridMergeTable tbl;
+
+            try {
+                tbl = createTable(r.conn, mapQry);
+            }
+            catch (IgniteCheckedException e) {
+                return new GridFinishedFutureEx<>(e);
+            }
 
             tbl.getScanIndex(null).setNumberOfSources(nodes.size());
 
@@ -144,14 +156,36 @@ public class GridReduceQueryExecutor {
 
             final ResultSet res = h2.executeSqlQueryWithTimer(r.conn, 
rdc.query(), F.asList(rdc.parameters()));
 
+            for (GridMergeTable tbl : r.tbls)
+                dropTable(r.conn, tbl.getName());
+
             return new GridFinishedFuture(ctx, new Iter(res));
         }
-        catch (IgniteCheckedException | InterruptedException e) {
+        catch (IgniteCheckedException | InterruptedException | SQLException e) 
{
+            U.closeQuiet(r.conn);
+
             return new GridFinishedFuture<>(ctx, e);
         }
     }
 
-    private GridMergeTable createTable(Connection conn, GridCacheSqlQuery qry) 
{
+    /**
+     * @param conn Connection.
+     * @param tblName Table name.
+     * @throws SQLException If failed.
+     */
+    private void dropTable(Connection conn, String tblName) throws 
SQLException {
+        try (Statement s = conn.createStatement()) {
+            s.execute("DROP TABLE " + tblName);
+        }
+    }
+
+    /**
+     * @param conn Connection.
+     * @param qry Query.
+     * @return Table.
+     * @throws IgniteCheckedException If failed.
+     */
+    private GridMergeTable createTable(Connection conn, GridCacheSqlQuery qry) 
throws IgniteCheckedException {
         try {
             try (PreparedStatement s = conn.prepareStatement(
                 "CREATE LOCAL TEMPORARY TABLE " + qry.alias() +
@@ -164,8 +198,10 @@ public class GridReduceQueryExecutor {
 
             return GridMergeTable.Engine.getCreated();
         }
-        catch (SQLException|IgniteCheckedException e) {
-            throw new IgniteException(e);
+        catch (SQLException e) {
+            U.closeQuiet(conn);
+
+            throw new IgniteCheckedException(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f941e415/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
 
b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
index b4d6595..af49b43 100644
--- 
a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
+++ 
b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
@@ -96,8 +96,10 @@ public class GridCacheCrossCacheQuerySelfTest extends 
GridCommonAbstractTest {
     public void testTwoStep() throws Exception {
         fillCaches();
 
+        String cache = "partitioned";
+
         GridCacheQueriesEx<Integer, FactPurchase> qx =
-            (GridCacheQueriesEx<Integer, FactPurchase>)ignite.<Integer, 
FactPurchase>cache("partitioned").queries();
+            (GridCacheQueriesEx<Integer, FactPurchase>)ignite.<Integer, 
FactPurchase>cache(cache).queries();
 
 //        for (Map.Entry<Integer, FactPurchase> e : 
qx.createSqlQuery(FactPurchase.class, "1 = 1").execute().get())
 //            X.println("___ "  + e);
@@ -106,11 +108,44 @@ public class GridCacheCrossCacheQuerySelfTest extends 
GridCommonAbstractTest {
 
         q.addMapQuery("_cnts_", "select count(*) x from 
\"partitioned\".FactPurchase where ? = ?", 2 ,2);
 
-        Object cnt = qx.execute(q).get().iterator().next().get(0);
+        Object cnt = qx.execute(cache, q).get().iterator().next().get(0);
 
         assertEquals(10L, cnt);
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTwoStepGroup() throws Exception {
+        fillCaches();
+
+        GridCacheQueriesEx<Integer, FactPurchase> qx =
+            (GridCacheQueriesEx<Integer, FactPurchase>)ignite.<Integer, 
FactPurchase>cache("partitioned").queries();
+
+        Set<Integer> set0 = new HashSet<>();
+
+        for (List<?> o : qx.executeTwoStepQuery("partitioned", "select 
productId from FactPurchase group by productId")
+            .get()) {
+            X.println("___ -> " + o);
+
+            assertTrue(set0.add((Integer) o.get(0)));
+        }
+
+        X.println("___ ");
+
+        Set<Integer> set1 = new HashSet<>();
+
+        for (List<?> o : qx.executeTwoStepQuery("partitioned", "select 
productId from FactPurchase")
+            .get()) {
+            X.println("___ -> " + o);
+
+            set1.add((Integer)o.get(0));
+        }
+
+        assertFalse(set1.isEmpty());
+        assertEquals(set0, set1);
+    }
+
     /** @throws Exception If failed. */
     public void testOnProjection() throws Exception {
         fillCaches();

Reply via email to