ignite-sql-tests - jdbc

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a06a5575
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a06a5575
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a06a5575

Branch: refs/heads/ignite-sql-tests
Commit: a06a55750c42649acba4a27697809dde463542f1
Parents: cfcb9a4
Author: S.Vladykin <svlady...@gridgain.com>
Authored: Tue Mar 17 05:23:11 2015 +0300
Committer: S.Vladykin <svlady...@gridgain.com>
Committed: Tue Mar 17 05:23:11 2015 +0300

----------------------------------------------------------------------
 .../ignite/jdbc/JdbcEmptyCacheSelfTest.java     |   3 +
 .../processors/cache/IgniteCacheProxy.java      |   4 +-
 .../processors/cache/QueryCursorImpl.java       |  18 +++
 .../query/jdbc/GridCacheQueryJdbcTask.java      |   5 +-
 .../processors/query/GridQueryProcessor.java    |  14 +-
 .../processors/query/h2/IgniteH2Indexing.java   | 143 +++++++------------
 .../query/h2/sql/GridSqlQueryParser.java        |  33 +++--
 .../query/h2/sql/GridSqlQuerySplitter.java      |  19 ++-
 .../h2/twostep/GridReduceQueryExecutor.java     |  47 +++---
 9 files changed, 150 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a06a5575/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcEmptyCacheSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcEmptyCacheSelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcEmptyCacheSelfTest.java
index 9742999..3869ddd 100644
--- 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcEmptyCacheSelfTest.java
+++ 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcEmptyCacheSelfTest.java
@@ -54,6 +54,9 @@ public class JdbcEmptyCacheSelfTest extends 
GridCommonAbstractTest {
         cache.setCacheMode(PARTITIONED);
         cache.setBackups(1);
         cache.setWriteSynchronizationMode(FULL_SYNC);
+        cache.setIndexedTypes(
+            Byte.class, Byte.class
+        );
 
         cfg.setCacheConfiguration(cache);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a06a5575/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index aaa63fd..3216ccc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -489,8 +489,8 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
      * @return Cursor.
      */
     private QueryCursor<List<?>> doLocalFieldsQuery(SqlFieldsQuery q) {
-        return new 
QueryCursorImpl<>(ctx.kernalContext().query().queryLocalFields(
-            ctx.name(), q.getSql(), q.getArgs()));
+        return ctx.kernalContext().query().queryLocalFields(
+            ctx.name(), q.getSql(), q.getArgs());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a06a5575/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
index 62e7376..7cb9efc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.query.*;
 
 import java.util.*;
 
@@ -32,6 +33,9 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T> {
     /** */
     private boolean iterTaken;
 
+    /** */
+    private Collection<GridQueryFieldMetadata> fieldsMeta;
+
     /**
      * @param iter Iterator.
      */
@@ -95,4 +99,18 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T> {
             }
         }
     }
+
+    /**
+     * @param fieldsMeta SQL Fields query result metadata.
+     */
+    public void fieldsMeta(Collection<GridQueryFieldMetadata> fieldsMeta) {
+        this.fieldsMeta = fieldsMeta;
+    }
+
+    /**
+     * @return SQL Fields query result metadata.
+     */
+    public Collection<GridQueryFieldMetadata> fieldsMeta() {
+        return fieldsMeta;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a06a5575/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java
index b53a9e1..332c649 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/jdbc/GridCacheQueryJdbcTask.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.compute.*;
+import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.query.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -194,7 +195,9 @@ public class GridCacheQueryJdbcTask extends 
ComputeTaskAdapter<byte[], byte[]> {
 
                 QueryCursor<List<?>> cursor = cache.queryFields(qry);
 
-                Collection<GridQueryFieldMetadata> meta = null; // TODO
+                Collection<GridQueryFieldMetadata> meta = 
((QueryCursorImpl<List<?>>)cursor).fieldsMeta();
+
+                assert meta != null;
 
                 tbls = new ArrayList<>(meta.size());
                 cols = new ArrayList<>(meta.size());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a06a5575/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 11a9f2c..aa924c4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -35,7 +35,6 @@ import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.util.worker.*;
 import org.apache.ignite.lang.*;
-import org.apache.ignite.spi.*;
 import org.apache.ignite.spi.indexing.*;
 import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
@@ -555,13 +554,12 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
      * @param args Arguments.
      * @return Iterator.
      */
-    public Iterator<List<?>> queryLocalFields(String space, String sql, 
Object[] args) {
+    public QueryCursor<List<?>> queryLocalFields(String space, String sql, 
Object[] args) {
         if (!busyLock.enterBusy())
             throw new IllegalStateException("Failed to execute query (grid is 
stopping).");
 
         try {
-            IgniteSpiCloseableIterator<List<?>> iterator =
-                idx.queryFields(space, sql, F.asList(args), 
idx.backupFilter()).iterator();
+            GridQueryFieldsResult res = idx.queryFields(space, sql, 
F.asList(args), idx.backupFilter());
 
             if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
                 ctx.event().record(new CacheQueryExecutedEvent<>(
@@ -579,10 +577,14 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
                         null));
             }
 
-            return iterator;
+            QueryCursorImpl<List<?>> cursor = new 
QueryCursorImpl<>(res.iterator());
+
+            cursor.fieldsMeta(res.metaData());
+
+            return cursor;
         }
         catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
+            throw new CacheException(e);
         }
         finally {
             busyLock.leaveBusy();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a06a5575/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index c3a3da3..1e431db 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -65,7 +65,6 @@ import java.sql.*;
 import java.text.*;
 import java.util.*;
 import java.util.concurrent.*;
-import java.util.concurrent.locks.*;
 
 import static org.apache.ignite.IgniteSystemProperties.*;
 import static org.apache.ignite.internal.processors.query.GridQueryIndexType.*;
@@ -496,18 +495,7 @@ public class IgniteH2Indexing implements GridQueryIndexing 
{
 
             if (rs != null) {
                 try {
-                    ResultSetMetaData rsMeta = rs.getMetaData();
-
-                    meta = new ArrayList<>(rsMeta.getColumnCount());
-
-                    for (int i = 1; i <= rsMeta.getColumnCount(); i++) {
-                        String schemaName = rsMeta.getSchemaName(i);
-                        String typeName = rsMeta.getTableName(i);
-                        String name = rsMeta.getColumnLabel(i);
-                        String type = rsMeta.getColumnClassName(i);
-
-                        meta.add(new SqlFieldMetadata(schemaName, typeName, 
name, type));
-                    }
+                    meta = meta(rs.getMetaData());
                 }
                 catch (SQLException e) {
                     throw new IgniteSpiException("Failed to get meta data.", 
e);
@@ -522,6 +510,26 @@ public class IgniteH2Indexing implements GridQueryIndexing 
{
     }
 
     /**
+     * @param rsMeta Metadata.
+     * @return List of fields metadata.
+     * @throws SQLException If failed.
+     */
+    private static List<GridQueryFieldMetadata> meta(ResultSetMetaData rsMeta) 
throws SQLException {
+        ArrayList<GridQueryFieldMetadata> meta = new 
ArrayList<>(rsMeta.getColumnCount());
+
+        for (int i = 1; i <= rsMeta.getColumnCount(); i++) {
+            String schemaName = rsMeta.getSchemaName(i);
+            String typeName = rsMeta.getTableName(i);
+            String name = rsMeta.getColumnLabel(i);
+            String type = rsMeta.getColumnClassName(i);
+
+            meta.add(new SqlFieldMetadata(schemaName, typeName, name, type));
+        }
+
+        return meta;
+    }
+
+    /**
      * @param stmt Prepared statement.
      * @return Command type.
      */
@@ -739,12 +747,38 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     @Override public QueryCursor<List<?>> queryTwoStep(String space, String 
sqlQry, Object[] params) {
         Connection c = connectionForSpace(space);
 
-        GridCacheTwoStepQuery twoStepQry = GridSqlQuerySplitter.split(c, 
sqlQry, params);
+        PreparedStatement stmt;
+
+        try {
+            stmt = c.prepareStatement(sqlQry);
+        }
+        catch (SQLException e) {
+            throw new CacheException("Failed to parse query: " + sqlQry, e);
+        }
+
+        GridCacheTwoStepQuery twoStepQry;
+        Collection<GridQueryFieldMetadata> meta;
+
+        try {
+            twoStepQry = 
GridSqlQuerySplitter.split((JdbcPreparedStatement)stmt, params);
+
+            meta = meta(stmt.getMetaData());
+        }
+        catch (SQLException e) {
+            throw new CacheException(e);
+        }
+        finally {
+            U.close(stmt, log);
+        }
 
         if (log.isDebugEnabled())
             log.debug("Parsed query: `" + sqlQry + "` into two step query: " + 
twoStepQry);
 
-        return queryTwoStep(space, twoStepQry);
+        QueryCursorImpl<List<?>> cursor = 
(QueryCursorImpl<List<?>>)queryTwoStep(space, twoStepQry);
+
+        cursor.fieldsMeta(meta);
+
+        return cursor;
     }
 
     /**
@@ -1049,7 +1083,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         if (Utils.serializer != null)
             U.warn(log, "Custom H2 serialization is already configured, will 
override.");
 
-        Utils.serializer = h2Serializer(ctx != null && ctx.deploy().enabled());
+        Utils.serializer = h2Serializer();
 
         String dbName = (ctx != null ? ctx.localNodeId() : 
UUID.randomUUID()).toString();
 
@@ -1097,83 +1131,10 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     }
 
     /**
-     * @param p2pEnabled If peer-deployment is enabled.
      * @return Serializer.
      */
-    protected JavaObjectSerializer h2Serializer(boolean p2pEnabled) {
-        return p2pEnabled ?
-            new JavaObjectSerializer() {
-                /** */
-                private volatile Map<ClassLoader, Byte> ldr2id = 
Collections.emptyMap();
-
-                /** */
-                private volatile Map<Byte, ClassLoader> id2ldr = 
Collections.emptyMap();
-
-                /** */
-                private byte ldrIdGen = Byte.MIN_VALUE;
-
-                /** */
-                private final Lock lock = new ReentrantLock();
-
-                @Override public byte[] serialize(Object obj) throws Exception 
{
-                    ClassLoader ldr = obj.getClass().getClassLoader();
-
-                    Byte ldrId = ldr2id.get(ldr);
-
-                    if (ldrId == null) {
-                        lock.lock();
-
-                        try {
-                            ldrId = ldr2id.get(ldr);
-
-                            if (ldrId == null) {
-                                ldrId = ldrIdGen++;
-
-                                if (id2ldr.containsKey(ldrId)) // Overflow.
-                                    throw new IgniteException("Failed to add 
new peer-to-peer class loader.");
-
-                                Map<Byte, ClassLoader> id2ldr0 = new 
HashMap<>(id2ldr);
-                                Map<ClassLoader, Byte> ldr2id0 = new 
IdentityHashMap<>(ldr2id);
-
-                                id2ldr0.put(ldrId, ldr);
-                                ldr2id0.put(ldr, ldrId);
-
-                                ldr2id = ldr2id0;
-                                id2ldr = id2ldr0;
-                            }
-                        }
-                        finally {
-                            lock.unlock();
-                        }
-                    }
-
-                    byte[] bytes = marshaller.marshal(obj);
-
-                    int len = bytes.length;
-
-                    bytes = Arrays.copyOf(bytes, len + 1); // The last byte is 
for ldrId.
-
-                    bytes[len] = ldrId;
-
-                    return bytes;
-                }
-
-                @Override public Object deserialize(byte[] bytes) throws 
Exception {
-                    int last = bytes.length - 1;
-
-                    byte ldrId = bytes[last];
-
-                    ClassLoader ldr = id2ldr.get(ldrId);
-
-                    if (ldr == null)
-                        throw new IllegalStateException("Class loader was not 
found: " + ldrId);
-
-                    bytes = Arrays.copyOf(bytes, last); // Trim the last byte.
-
-                    return marshaller.unmarshal(bytes, ldr);
-                }
-            } :
-            new JavaObjectSerializer() {
+    protected JavaObjectSerializer h2Serializer() {
+        return new JavaObjectSerializer() {
                 @Override public byte[] serialize(Object obj) throws Exception 
{
                     return marshaller.marshal(obj);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a06a5575/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
index a8c83d6..2e2f9c3 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.h2.sql;
 
 import org.apache.ignite.*;
+import org.h2.command.*;
 import org.h2.command.dml.*;
 import org.h2.engine.*;
 import org.h2.expression.*;
@@ -28,7 +29,6 @@ import org.h2.value.*;
 import org.jetbrains.annotations.*;
 
 import java.lang.reflect.*;
-import java.sql.*;
 import java.util.*;
 import java.util.Set;
 
@@ -151,17 +151,34 @@ public class GridSqlQueryParser {
     private static final Getter<JavaFunction, FunctionAlias> FUNC_ALIAS = 
getter(JavaFunction.class, "functionAlias");
 
     /** */
+    private static final Getter<JdbcPreparedStatement,Command> COMMAND = 
getter(JdbcPreparedStatement.class, "command");
+
+    /** */
+    private static volatile Getter<Command,Prepared> prepared;
+
+    /** */
     private final IdentityHashMap<Object, Object> h2ObjToGridObj = new 
IdentityHashMap<>();
 
     /**
-     * @param conn Connection.
-     * @param select Select query.
-     * @return Parsed select query.
+     * @param stmt Prepared statement.
+     * @return Parsed select.
      */
-    public static GridSqlSelect parse(Connection conn, String select) {
-        Session ses = (Session)((JdbcConnection)conn).getSession();
+    public static GridSqlSelect parse(JdbcPreparedStatement stmt) {
+        Command cmd = COMMAND.get(stmt);
+
+        Getter<Command,Prepared> p = prepared;
+
+        if (p == null) {
+            Class<? extends Command> cls = cmd.getClass();
+
+            assert cls.getSimpleName().equals("CommandContainer");
+
+            prepared = p = getter(cls, "prepared");
+        }
+
+        Prepared select = p.get(cmd);
 
-        return new GridSqlQueryParser().parse((Select)ses.prepare(select));
+        return new GridSqlQueryParser().parse((Select)select);
     }
 
     /**
@@ -510,7 +527,7 @@ public class GridSqlQueryParser {
      * @param cls Class.
      * @param fldName Fld name.
      */
-    private static <T, R> Getter<T, R> getter(Class<T> cls, String fldName) {
+    private static <T, R> Getter<T, R> getter(Class<? extends T> cls, String 
fldName) {
         Field field;
 
         try {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a06a5575/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 019ed59..47e5e05 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -19,9 +19,9 @@ package org.apache.ignite.internal.processors.query.h2.sql;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.processors.cache.query.*;
+import org.h2.jdbc.*;
 import org.h2.value.*;
 
-import java.sql.*;
 import java.util.*;
 
 import static 
org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType.*;
@@ -56,16 +56,15 @@ public class GridSqlQuerySplitter {
     }
 
     /**
-     * @param conn Connection.
-     * @param query Query.
+     * @param stmt Prepared statement.
      * @param params Parameters.
      * @return Two step query.
      */
-    public static GridCacheTwoStepQuery split(Connection conn, String query, 
Object[] params) {
+    public static GridCacheTwoStepQuery split(JdbcPreparedStatement stmt, 
Object[] params) {
         if (params == null)
             params = GridCacheSqlQuery.EMPTY_PARAMS;
 
-        GridSqlSelect srcQry = GridSqlQueryParser.parse(conn, query);
+        GridSqlSelect srcQry = GridSqlQueryParser.parse(stmt);
 
         final String mergeTable = TABLE_FUNC_NAME + "()"; // table(0); TODO
 
@@ -299,16 +298,14 @@ public class GridSqlQuerySplitter {
             String mapColAlias = columnName(idx);
             String rdcColAlias;
 
-            if (alias == null) { // Wrap map column with generated alias if 
none.
+            if (alias == null)  // Original column name for reduce column.
                 rdcColAlias = el instanceof GridSqlColumn ? 
((GridSqlColumn)el).columnName() : mapColAlias;
-
-                alias = alias(mapColAlias, el); // `el` is known not to be 
alias.
-
-                mapSelect.set(idx, alias);
-            }
             else // Set initial alias for reduce column.
                 rdcColAlias = alias.alias();
 
+            // Always wrap map column into generated alias.
+            mapSelect.set(idx, alias(mapColAlias, el)); // `el` is known not 
to be an alias.
+
             if (idx < rdcSelect.length) { // SELECT __C0 AS orginal_alias
                 GridSqlElement rdcEl = column(mapColAlias);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a06a5575/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index f3d6bfc..4c1dde7 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -200,24 +200,37 @@ public class GridReduceQueryExecutor implements 
GridMessageListener {
 
         GridMergeIndex idx = r.tbls.get(msg.query()).getScanIndex(null);
 
-        idx.addPage(new GridResultPage(node.id(), msg, false) {
-            @Override public void fetchNextPage() {
-                if (r.rmtErr != null)
-                    throw new CacheException("Next page fetch failed.", 
r.rmtErr);
-
-                try {
-                    GridQueryNextPageRequest msg0 = new 
GridQueryNextPageRequest(qryReqId, qry, pageSize);
-
-                    if (node.isLocal())
-                        h2.mapQueryExecutor().onMessage(ctx.localNodeId(), 
msg0);
-                    else
-                        ctx.io().send(node, GridTopic.TOPIC_QUERY, msg0, 
GridIoPolicy.PUBLIC_POOL);
-                }
-                catch (IgniteCheckedException e) {
-                    throw new CacheException(e);
+        GridResultPage page;
+
+        try {
+            page = new GridResultPage(node.id(), msg, false) {
+                @Override public void fetchNextPage() {
+                    if (r.rmtErr != null)
+                        throw new CacheException("Next page fetch failed.", 
r.rmtErr);
+
+                    try {
+                        GridQueryNextPageRequest msg0 = new 
GridQueryNextPageRequest(qryReqId, qry, pageSize);
+
+                        if (node.isLocal())
+                            h2.mapQueryExecutor().onMessage(ctx.localNodeId(), 
msg0);
+                        else
+                            ctx.io().send(node, GridTopic.TOPIC_QUERY, msg0, 
GridIoPolicy.PUBLIC_POOL);
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw new CacheException(e);
+                    }
                 }
-            }
-        });
+            };
+        }
+        catch (Exception e) {
+            U.error(log, "Error in message.", e);
+
+            fail(r, node.id(), "Error in message.");
+
+            return;
+        }
+
+        idx.addPage(page);
 
         if (msg.allRows() != -1) // Only the first page contains row count.
             r.latch.countDown();

Reply via email to