ignite-gg-9933 - keep portable flag fix

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f16eadee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f16eadee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f16eadee

Branch: refs/heads/ignite-gg-9933
Commit: f16eadee1c230170768448114a05358542b1ab05
Parents: 89ac9dc
Author: S.Vladykin <svlady...@gridgain.com>
Authored: Fri Mar 20 02:03:15 2015 +0300
Committer: S.Vladykin <svlady...@gridgain.com>
Committed: Fri Mar 20 02:03:15 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/IgniteCacheProxy.java      | 10 ++-
 .../cache/query/GridCacheQueriesImpl.java       |  2 +-
 .../cache/query/GridCacheTwoStepQuery.java      | 20 ++++++
 .../processors/query/GridQueryIndexing.java     | 20 +++---
 .../query/GridQueryPortableFieldsIterator.java  | 70 ++++++++++++++++++++
 .../processors/query/GridQueryProcessor.java    | 48 ++++++++------
 .../processors/query/h2/IgniteH2Indexing.java   | 32 ++++++---
 .../h2/twostep/GridReduceQueryExecutor.java     | 13 ++--
 8 files changed, 160 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f16eadee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 3216ccc..53e9a00 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -422,7 +422,7 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
                 if (isReplicatedDataNode() || ctx.isLocal())
                     return doLocalQuery(p);
 
-                return ctx.kernalContext().query().queryTwoStep(ctx.name(), 
p.getType(), p.getSql(), p.getArgs());
+                return ctx.kernalContext().query().queryTwoStep(ctx, p);
             }
 
             return query(qry, projection(false));
@@ -462,7 +462,7 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
             if (isReplicatedDataNode() || ctx.isLocal())
                 return doLocalFieldsQuery(qry);
 
-            return ctx.kernalContext().query().queryTwoStep(ctx.name(), 
qry.getSql(), qry.getArgs());
+            return ctx.kernalContext().query().queryTwoStep(ctx, qry);
         }
         catch (Exception e) {
             if (e instanceof CacheException)
@@ -480,8 +480,7 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
      * @return Cursor.
      */
     private QueryCursor<Entry<K,V>> doLocalQuery(SqlQuery p) {
-        return new 
QueryCursorImpl<>(ctx.kernalContext().query().<K,V>queryLocal(
-            ctx.name(), p.getType(), p.getSql(), p.getArgs()));
+        return new 
QueryCursorImpl<>(ctx.kernalContext().query().<K,V>queryLocal(ctx, p));
     }
 
     /**
@@ -489,8 +488,7 @@ public class IgniteCacheProxy<K, V> extends 
AsyncSupportAdapter<IgniteCache<K, V
      * @return Cursor.
      */
     private QueryCursor<List<?>> doLocalFieldsQuery(SqlFieldsQuery q) {
-        return ctx.kernalContext().query().queryLocalFields(
-            ctx.name(), q.getSql(), q.getArgs());
+        return ctx.kernalContext().query().queryLocalFields(ctx, q);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f16eadee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java
index a018e98..ae25c1c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java
@@ -165,7 +165,7 @@ public class GridCacheQueriesImpl<K, V> implements 
GridCacheQueriesEx<K, V>, Ext
 
     /** {@inheritDoc} */
     @Override public QueryCursor<List<?>> executeTwoStepQuery(String space, 
String sqlQry, Object[] params) {
-        return ctx.kernalContext().query().queryTwoStep(space, sqlQry, params);
+        return ctx.kernalContext().query().queryTwoStep(ctx, new 
SqlFieldsQuery(sqlQry).setArgs(params));
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f16eadee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
index 585d78e..11b2057 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -34,6 +34,9 @@ public class GridCacheTwoStepQuery implements Serializable {
     private static final long serialVersionUID = 0L;
 
     /** */
+    public static final int DFLT_PAGE_SIZE = 1000;
+
+    /** */
     @GridToStringInclude
     private Map<String, GridCacheSqlQuery> mapQrys;
 
@@ -41,6 +44,9 @@ public class GridCacheTwoStepQuery implements Serializable {
     @GridToStringInclude
     private GridCacheSqlQuery reduce;
 
+    /** */
+    private int pageSize = DFLT_PAGE_SIZE;
+
     /**
      * @param qry Reduce query.
      * @param params Reduce query parameters.
@@ -50,6 +56,20 @@ public class GridCacheTwoStepQuery implements Serializable {
     }
 
     /**
+     * @param pageSize Page size.
+     */
+    public void pageSize(int pageSize) {
+        this.pageSize = pageSize;
+    }
+
+    /**
+     * @return Page size.
+     */
+    public int pageSize() {
+        return pageSize;
+    }
+
+    /**
      * @param alias Alias.
      * @param qry SQL Query.
      * @param params Query parameters.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f16eadee/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index fb8f4b8..2caae45 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cache.query.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.lang.*;
@@ -53,32 +54,29 @@ public interface GridQueryIndexing {
     /**
      * Runs two step query.
      *
-     * @param space Space name.
+     * @param cctx Cache context.
      * @param qry Query.
      * @return Cursor.
      */
-    public QueryCursor<List<?>> queryTwoStep(String space, 
GridCacheTwoStepQuery qry);
+    public QueryCursor<List<?>> queryTwoStep(GridCacheContext<?,?> cctx, 
GridCacheTwoStepQuery qry);
 
     /**
      * Parses SQL query into two step query and executes it.
      *
-     * @param space Space.
-     * @param sqlQry Query.
-     * @param params Parameters.
+     * @param cctx Cache context.
+     * @param qry Query.
      * @return Cursor.
      */
-    public QueryCursor<List<?>> queryTwoStep(String space, String sqlQry, 
Object[] params);
+    public QueryCursor<List<?>> queryTwoStep(GridCacheContext<?,?> cctx, 
SqlFieldsQuery qry);
 
     /**
      * Parses SQL query into two step query and executes it.
      *
-     * @param space Space.
-     * @param type Type name.
-     * @param sqlQry Query.
-     * @param params Parameters.
+     * @param cctx Cache context.
+     * @param qry Query.
      * @return Cursor.
      */
-    public <K,V> QueryCursor<Cache.Entry<K,V>> queryTwoStep(String space, 
String type, String sqlQry, Object[] params);
+    public <K,V> QueryCursor<Cache.Entry<K,V>> 
queryTwoStep(GridCacheContext<?,?> cctx, SqlQuery qry);
 
     /**
      * Queries individual fields (generally used by JDBC drivers).

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f16eadee/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryPortableFieldsIterator.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryPortableFieldsIterator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryPortableFieldsIterator.java
new file mode 100644
index 0000000..8ccebe2
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryPortableFieldsIterator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.util.*;
+
+/**
+ * Deserializes portable objects if needed.
+ */
+public class GridQueryPortableFieldsIterator implements Iterator<List<?>>, 
AutoCloseable {
+    /** */
+    private final Iterator<List<?>> iter;
+
+    /** */
+    private final GridCacheContext<?,?> cctx;
+
+    /** */
+    private final boolean keepPortable;
+
+    /**
+     * @param iter Iterator.
+     * @param cctx Cache context.
+     * @param keepPortable Keep portable.
+     */
+    public GridQueryPortableFieldsIterator(Iterator<List<?>> iter, 
GridCacheContext<?,?> cctx, boolean keepPortable) {
+        this.iter = iter;
+        this.cctx = cctx;
+        this.keepPortable = keepPortable;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws Exception {
+        if (iter instanceof AutoCloseable)
+            U.closeQuiet((AutoCloseable)iter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasNext() {
+        return iter.hasNext();
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public List<?> next() {
+        return 
(List<?>)cctx.unwrapPortablesIfNeeded((Collection<Object>)iter.next(), 
keepPortable);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void remove() {
+        iter.remove();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f16eadee/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 068ebe5..547b707 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -433,7 +433,7 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
             throw new IllegalStateException("Failed to execute query (grid is 
stopping).");
 
         try {
-            return idx.queryTwoStep(space, qry);
+            return 
idx.queryTwoStep(ctx.cache().internalCache(space).context(), qry);
         }
         finally {
             busyLock.leaveBusy();
@@ -441,17 +441,16 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     }
 
     /**
-     * @param space Space.
-     * @param sqlQry Query.
-     * @param params Parameters.
+     * @param cctx Cache context.
+     * @param qry Query.
      * @return Cursor.
      */
-    public QueryCursor<List<?>> queryTwoStep(String space, String sqlQry, 
Object[] params) {
+    public QueryCursor<List<?>> queryTwoStep(GridCacheContext<?,?> cctx, 
SqlFieldsQuery qry) {
         if (!busyLock.enterBusy())
             throw new IllegalStateException("Failed to execute query (grid is 
stopping).");
 
         try {
-            return idx.queryTwoStep(space, sqlQry, params);
+            return idx.queryTwoStep(cctx, qry);
         }
         finally {
             busyLock.leaveBusy();
@@ -459,18 +458,16 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     }
 
     /**
-     * @param space Space.
-     * @param type Type.
-     * @param sqlQry Query.
-     * @param params Parameters.
+     * @param cctx Cache context.
+     * @param qry Query.
      * @return Cursor.
      */
-    public <K,V> QueryCursor<Cache.Entry<K,V>> queryTwoStep(String space, 
String type, String sqlQry, Object[] params) {
+    public <K,V> QueryCursor<Cache.Entry<K,V>> 
queryTwoStep(GridCacheContext<?,?> cctx, SqlQuery qry) {
         if (!busyLock.enterBusy())
             throw new IllegalStateException("Failed to execute query (grid is 
stopping).");
 
         try {
-            return idx.queryTwoStep(space, type, sqlQry, params);
+            return idx.queryTwoStep(cctx, qry);
         }
         finally {
             busyLock.leaveBusy();
@@ -478,17 +475,20 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     }
 
     /**
-     * @param space Space.
-     * @param type Type.
-     * @param sqlQry Query.
-     * @param params Parameters.
+     * @param cctx Cache context.
+     * @param qry Query.
      * @return Cursor.
      */
-    public <K,V> Iterator<Cache.Entry<K,V>> queryLocal(String space, String 
type, String sqlQry, Object[] params) {
+    public <K,V> Iterator<Cache.Entry<K,V>> queryLocal(GridCacheContext<?,?> 
cctx, SqlQuery qry) {
         if (!busyLock.enterBusy())
             throw new IllegalStateException("Failed to execute query (grid is 
stopping).");
 
         try {
+            String space = cctx.name();
+            String type = qry.getType();
+            String sqlQry = qry.getSql();
+            Object[] params = qry.getArgs();
+
             TypeDescriptor typeDesc = typesByName.get(new TypeName(space, 
type));
 
             if (typeDesc == null || !typeDesc.registered())
@@ -549,16 +549,19 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     }
 
     /**
-     * @param space Space.
-     * @param sql SQL Query.
-     * @param args Arguments.
+     * @param cctx Cache context.
+     * @param qry Query.
      * @return Iterator.
      */
-    public QueryCursor<List<?>> queryLocalFields(String space, String sql, 
Object[] args) {
+    public QueryCursor<List<?>> queryLocalFields(GridCacheContext<?,?> cctx, 
SqlFieldsQuery qry) {
         if (!busyLock.enterBusy())
             throw new IllegalStateException("Failed to execute query (grid is 
stopping).");
 
         try {
+            String space = cctx.name();
+            String sql = qry.getSql();
+            Object[] args = qry.getArgs();
+
             GridQueryFieldsResult res = idx.queryFields(space, sql, 
F.asList(args), idx.backupFilter());
 
             if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
@@ -577,7 +580,8 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
                         null));
             }
 
-            QueryCursorImpl<List<?>> cursor = new 
QueryCursorImpl<>(res.iterator());
+            QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(
+                new GridQueryPortableFieldsIterator(res.iterator(), cctx, 
cctx.keepPortable()));
 
             cursor.fieldsMeta(res.metaData());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f16eadee/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 1e431db..ee6aedf 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -694,29 +694,36 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     }
 
     /** {@inheritDoc} */
-    @Override public QueryCursor<List<?>> queryTwoStep(String space, 
GridCacheTwoStepQuery qry) {
-        return rdcQryExec.query(space, qry);
+    @Override public QueryCursor<List<?>> queryTwoStep(GridCacheContext<?,?> 
cctx, GridCacheTwoStepQuery qry) {
+        return rdcQryExec.query(cctx, qry);
     }
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Override public <K, V> QueryCursor<Cache.Entry<K,V>> queryTwoStep(String 
space, String type, String sqlQry,
-        Object[] params) {
+    @Override public <K, V> QueryCursor<Cache.Entry<K,V>> 
queryTwoStep(GridCacheContext<?,?> cctx, SqlQuery qry) {
+        String type = qry.getType();
+        String space = cctx.name();
+
         TableDescriptor tblDesc = tableDescriptor(type, space);
 
         if (tblDesc == null)
             throw new CacheException("Failed to find SQL table for type: " + 
type);
 
-        String qry;
+        String sql;
 
         try {
-            qry = generateQuery(sqlQry, tblDesc);
+            sql = generateQuery(qry.getSql(), tblDesc);
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException(e);
         }
 
-        final QueryCursor<List<?>> res = queryTwoStep(space, qry, params);
+        SqlFieldsQuery fqry = new SqlFieldsQuery(sql);
+
+        fqry.setArgs(qry.getArgs());
+        fqry.setPageSize(qry.getPageSize());
+
+        final QueryCursor<List<?>> res = queryTwoStep(cctx, fqry);
 
         final Iterator<List<?>> iter0 = res.iterator();
 
@@ -744,7 +751,10 @@ public class IgniteH2Indexing implements GridQueryIndexing 
{
     }
 
     /** {@inheritDoc} */
-    @Override public QueryCursor<List<?>> queryTwoStep(String space, String 
sqlQry, Object[] params) {
+    @Override public QueryCursor<List<?>> queryTwoStep(GridCacheContext<?,?> 
cctx, SqlFieldsQuery qry) {
+        String space = cctx.name();
+        String sqlQry = qry.getSql();
+
         Connection c = connectionForSpace(space);
 
         PreparedStatement stmt;
@@ -760,7 +770,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         Collection<GridQueryFieldMetadata> meta;
 
         try {
-            twoStepQry = 
GridSqlQuerySplitter.split((JdbcPreparedStatement)stmt, params);
+            twoStepQry = 
GridSqlQuerySplitter.split((JdbcPreparedStatement)stmt, qry.getArgs());
 
             meta = meta(stmt.getMetaData());
         }
@@ -774,7 +784,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         if (log.isDebugEnabled())
             log.debug("Parsed query: `" + sqlQry + "` into two step query: " + 
twoStepQry);
 
-        QueryCursorImpl<List<?>> cursor = 
(QueryCursorImpl<List<?>>)queryTwoStep(space, twoStepQry);
+        twoStepQry.pageSize(qry.getPageSize());
+
+        QueryCursorImpl<List<?>> cursor = 
(QueryCursorImpl<List<?>>)queryTwoStep(cctx, twoStepQry);
 
         cursor.fieldsMeta(meta);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f16eadee/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 4c1dde7..9195153 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.eventstorage.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.query.*;
 import org.apache.ignite.internal.processors.query.h2.*;
 import org.apache.ignite.internal.processors.query.h2.sql.*;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
@@ -237,25 +238,27 @@ public class GridReduceQueryExecutor implements 
GridMessageListener {
     }
 
     /**
-     * @param space Space name.
+     * @param cctx Cache context.
      * @param qry Query.
      * @return Cursor.
      */
-    public QueryCursor<List<?>> query(String space, GridCacheTwoStepQuery qry) 
{
+    public QueryCursor<List<?>> query(GridCacheContext<?,?> cctx, 
GridCacheTwoStepQuery qry) {
         long qryReqId = reqIdGen.incrementAndGet();
 
         QueryRun r = new QueryRun();
 
-        r.pageSize = 1000; // TODO configure correctly page size
+        r.pageSize = qry.pageSize() <= 0 ? 
GridCacheTwoStepQuery.DFLT_PAGE_SIZE : qry.pageSize();
 
         r.tbls = new ArrayList<>(qry.mapQueries().size());
 
+        String space = cctx.name();
+
         r.conn = h2.connectionForSpace(space);
 
         // TODO Add topology version.
         ClusterGroup dataNodes = ctx.grid().cluster().forDataNodes(space);
 
-        if (ctx.cache().internalCache(space).context().isReplicated()) {
+        if (cctx.isReplicated()) {
             assert dataNodes.node(ctx.localNodeId()) == null : "We must be on 
a client node.";
 
             dataNodes = dataNodes.forRandom(); // Select random data node to 
run query on a replicated data.
@@ -307,7 +310,7 @@ public class GridReduceQueryExecutor implements 
GridMessageListener {
 //                dropTable(r.conn, tbl.getName()); TODO
             }
 
-            return new QueryCursorImpl<>(new Iter(res));
+            return new QueryCursorImpl<>(new 
GridQueryPortableFieldsIterator(new Iter(res), cctx, cctx.keepPortable()));
         }
         catch (IgniteCheckedException | InterruptedException | 
RuntimeException e) {
             U.closeQuiet(r.conn);

Reply via email to