ignite-sql-tests - moved messages to core

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c459f306
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c459f306
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c459f306

Branch: refs/heads/ignite-sql-tests
Commit: c459f306e9f9d4646065d38bf6fd421bd59d65b1
Parents: d7cde2b
Author: S.Vladykin <svlady...@gridgain.com>
Authored: Thu Mar 12 03:13:46 2015 +0300
Committer: S.Vladykin <svlady...@gridgain.com>
Committed: Thu Mar 12 03:13:46 2015 +0300

----------------------------------------------------------------------
 .../messages/GridQueryCancelRequest.java        |  49 +++++
 .../twostep/messages/GridQueryFailResponse.java |  64 +++++++
 .../messages/GridQueryNextPageRequest.java      |  77 ++++++++
 .../messages/GridQueryNextPageResponse.java     | 128 +++++++++++++
 .../h2/twostep/messages/GridQueryRequest.java   |  95 ++++++++++
 .../query/h2/twostep/GridMapQueryExecutor.java  |  60 +++++-
 .../query/h2/twostep/GridMergeIndex.java        |  26 ++-
 .../h2/twostep/GridMergeIndexUnsorted.java      |   7 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |  16 +-
 .../query/h2/twostep/GridResultPage.java        |  33 +++-
 .../messages/GridQueryCancelRequest.java        |  49 -----
 .../twostep/messages/GridQueryFailResponse.java |  64 -------
 .../messages/GridQueryNextPageRequest.java      |  77 --------
 .../messages/GridQueryNextPageResponse.java     | 181 -------------------
 .../h2/twostep/messages/GridQueryRequest.java   |  95 ----------
 15 files changed, 535 insertions(+), 486 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c459f306/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java
new file mode 100644
index 0000000..9bcb410
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep.messages;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Cancel request.
+ */
+public class GridQueryCancelRequest implements Serializable {
+    /** */
+    private long qryReqId;
+
+    /**
+     * @param qryReqId Query request ID.
+     */
+    public GridQueryCancelRequest(long qryReqId) {
+        this.qryReqId = qryReqId;
+    }
+
+    /**
+     * @return Query request ID.
+     */
+    public long queryRequestId() {
+        return qryReqId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridQueryCancelRequest.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c459f306/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
new file mode 100644
index 0000000..f551ab8
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep.messages;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Error message.
+ */
+public class GridQueryFailResponse implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long qryReqId;
+
+    /** */
+    private String errMsg;
+
+    /**
+     * @param qryReqId Query request ID.
+     * @param err Error.
+     */
+    public GridQueryFailResponse(long qryReqId, Throwable err) {
+        this.qryReqId = qryReqId;
+        this.errMsg = err.getClass() + ":" + err.getMessage();
+    }
+
+    /**
+     * @return Query request ID.
+     */
+    public long queryRequestId() {
+        return qryReqId;
+    }
+
+    /**
+     * @return Error.
+     */
+    public String error() {
+        return errMsg;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridQueryFailResponse.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c459f306/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
new file mode 100644
index 0000000..ae7e1e3
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep.messages;
+
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Request to fetch next page.
+ */
+public class GridQueryNextPageRequest implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long qryReqId;
+
+    /** */
+    private int qry;
+
+    /** */
+    private int pageSize;
+
+    /**
+     * @param qryReqId Query request ID.
+     * @param qry Query.
+     * @param pageSize Page size.
+     */
+    public GridQueryNextPageRequest(long qryReqId, int qry, int pageSize) {
+        this.qryReqId = qryReqId;
+        this.qry = qry;
+        this.pageSize = pageSize;
+    }
+
+    /**
+     * @return Query request ID.
+     */
+    public long queryRequestId() {
+        return qryReqId;
+    }
+
+    /**
+     * @return Query.
+     */
+    public int query() {
+        return qry;
+    }
+
+    /**
+     * @return Page size.
+     */
+    public int pageSize() {
+        return pageSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridQueryNextPageRequest.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c459f306/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
new file mode 100644
index 0000000..3c4cc94
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep.messages;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Next page response.
+ */
+public class GridQueryNextPageResponse implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long qryReqId;
+
+    /** */
+    private int qry;
+
+    /** */
+    private int page;
+
+    /** */
+    private int allRows;
+
+    /** */
+    private byte[] rows;
+
+    /**
+     * For {@link Externalizable}.
+     */
+    public GridQueryNextPageResponse() {
+        // No-op.
+    }
+
+    /**
+     * @param qryReqId Query request ID.
+     * @param qry Query.
+     * @param page Page.
+     * @param allRows All rows count.
+     * @param rows Rows.
+     */
+    public GridQueryNextPageResponse(long qryReqId, int qry, int page, int 
allRows,
+        byte[] rows) {
+        assert rows != null;
+
+        this.qryReqId = qryReqId;
+        this.qry = qry;
+        this.page = page;
+        this.allRows = allRows;
+        this.rows = rows;
+    }
+
+    /**
+     * @return Query request ID.
+     */
+    public long queryRequestId() {
+        return qryReqId;
+    }
+
+    /**
+     * @return Query.
+     */
+    public int query() {
+        return qry;
+    }
+
+    /**
+     * @return Page.
+     */
+    public int page() {
+        return page;
+    }
+
+    /**
+     * @return All rows.
+     */
+    public int allRows() {
+        return allRows;
+    }
+
+    /**
+     * @return Rows.
+     */
+    public byte[] rows() {
+        return rows;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeLong(qryReqId);
+        out.writeInt(qry);
+        out.writeInt(page);
+        out.writeInt(allRows);
+        U.writeByteArray(out, rows);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        qryReqId = in.readLong();
+        qry = in.readInt();
+        page = in.readInt();
+        allRows = in.readInt();
+        rows = U.readByteArray(in);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridQueryNextPageResponse.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c459f306/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
new file mode 100644
index 0000000..2834b84
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep.messages;
+
+import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Query request.
+ */
+public class GridQueryRequest implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long reqId;
+
+    /** */
+    private int pageSize;
+
+    /** */
+    private String space;
+
+    /** */
+    @GridToStringInclude
+    private Collection<GridCacheSqlQuery> qrys;
+
+    /**
+     * @param reqId Request ID.
+     * @param pageSize Page size.
+     * @param space Space.
+     * @param qrys Queries.
+     */
+    public GridQueryRequest(long reqId, int pageSize, String space, 
Collection<GridCacheSqlQuery> qrys) {
+        this.reqId = reqId;
+        this.pageSize = pageSize;
+        this.space = space;
+
+        assert qrys instanceof Serializable;
+
+        this.qrys = qrys;
+    }
+
+    /**
+     * @return Request ID.
+     */
+    public long requestId() {
+        return reqId;
+    }
+
+    /**
+     * @return Page size.
+     */
+    public int pageSize() {
+        return pageSize;
+    }
+
+    /**
+     * @return Space.
+     */
+    public String space() {
+        return space;
+    }
+
+    /**
+     * @return Queries.
+     */
+    public Collection<GridCacheSqlQuery> queries() {
+        return qrys;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridQueryRequest.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c459f306/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 44e8b3d..5b2a0ab 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.h2.jdbc.*;
 import org.h2.result.*;
+import org.h2.store.*;
 import org.h2.value.*;
 import org.jdk8.backport.*;
 
@@ -284,7 +285,8 @@ public class GridMapQueryExecutor {
 
         try {
             ctx.io().sendUserMessage(F.asList(node),
-                new GridQueryNextPageResponse(qr.qryReqId, qry, page, page == 
0 ? res.rowCount : -1, rows),
+                new GridQueryNextPageResponse(qr.qryReqId, qry, page, page == 
0 ? res.rowCount : -1,
+                    marshallRows(rows)),
                 GridTopic.TOPIC_QUERY, false, 0);
         }
         catch (IgniteCheckedException e) {
@@ -295,6 +297,62 @@ public class GridMapQueryExecutor {
     }
 
     /**
+     * @param bytes Bytes.
+     * @return Rows.
+     */
+    public static List<Value[]> unmarshallRows(byte[] bytes) {
+        Data data = Data.create(null, bytes);
+
+        int rowCnt = data.readVarInt();
+
+        if (rowCnt == 0)
+            return Collections.emptyList();
+
+        ArrayList<Value[]> rows = new ArrayList<>(rowCnt);
+
+        int cols = data.readVarInt();
+
+        for (int r = 0; r < rowCnt; r++) {
+            Value[] row = new Value[cols];
+
+            for (int c = 0; c < cols; c++)
+                row[c] = data.readValue();
+
+            rows.add(row);
+        }
+
+        return rows;
+    }
+
+    /**
+     * @param rows Rows.
+     * @return Bytes.
+     */
+    public static byte[] marshallRows(Collection<Value[]> rows) {
+        Data data = Data.create(null, 256);
+
+        data.writeVarInt(rows.size());
+
+        boolean first = true;
+
+        for (Value[] row : rows) {
+            if (first) {
+                data.writeVarInt(row.length);
+
+                first = false;
+            }
+
+            for (Value val : row) {
+                data.checkCapacity(data.getValueLen(val));
+
+                data.writeValue(val);
+            }
+        }
+
+        return Arrays.copyOf(data.getBytes(), data.length());
+    }
+
+    /**
      *
      */
     private class QueryResults {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c459f306/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
index b301622..f6989ae 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
@@ -35,9 +35,6 @@ import java.util.concurrent.atomic.*;
  */
 public abstract class GridMergeIndex extends BaseIndex {
     /** */
-    protected static final GridResultPage END = new GridResultPage(null, null);
-
-    /** */
     private static final int MAX_FETCH_SIZE = 100000; // TODO configure
 
     /** All rows number. */
@@ -46,6 +43,9 @@ public abstract class GridMergeIndex extends BaseIndex {
     /** Remaining rows per source node ID. */
     private final ConcurrentMap<UUID, Counter> remainingRows = new 
ConcurrentHashMap8<>();
 
+    /** */
+    private final AtomicBoolean lastSubmitted = new AtomicBoolean();
+
     /**
      * Will be r/w from query execution thread only, does not need to be 
threadsafe.
      */
@@ -83,7 +83,7 @@ public abstract class GridMergeIndex extends BaseIndex {
      * @param page Page.
      */
     public final void addPage(GridResultPage page) {
-        int pageRowsCnt = page.response().rows().size();
+        int pageRowsCnt = page.rows().size();
 
         if (pageRowsCnt != 0)
             addPage0(page);
@@ -104,12 +104,20 @@ public abstract class GridMergeIndex extends BaseIndex {
         }
 
         if (cnt.addAndGet(-pageRowsCnt) == 0) { // Result can be negative in 
case of race between messages, it is ok.
-            for (Counter c : remainingRows.values()) {
-                if (c.get() != 0 || !c.initialized)
-                    return;
+            boolean last = true;
+
+            for (Counter c : remainingRows.values()) { // Check all the 
sources.
+                if (c.get() != 0 || !c.initialized) {
+                    last = false;
+
+                    break;
+                }
             }
 
-            addPage0(END); // We've fetched all.
+            if (last)
+                last = lastSubmitted.compareAndSet(false, true);
+
+            addPage0(new GridResultPage(page.source(), null, last));
         }
     }
 
@@ -122,8 +130,6 @@ public abstract class GridMergeIndex extends BaseIndex {
      * @param page Page.
      */
     protected void fetchNextPage(GridResultPage page) {
-        assert page != END;
-
         if (remainingRows.get(page.source()).get() != 0)
             page.fetchNextPage();
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c459f306/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
index d36ea58..44faea1 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
@@ -44,7 +44,8 @@ public class GridMergeIndexUnsorted extends GridMergeIndex {
 
     /** {@inheritDoc} */
     @Override protected void addPage0(GridResultPage page) {
-        queue.add(page);
+        if (page.rows() != null || page.isLast()) // We are not interested in 
terminating pages which are not last.
+            queue.add(page);
     }
 
     /** {@inheritDoc} */
@@ -71,7 +72,7 @@ public class GridMergeIndexUnsorted extends GridMergeIndex {
                     throw new IgniteException("Query execution was 
interrupted.", e);
                 }
 
-                if (page == END) {
+                if (page.isLast()) {
                     assert queue.isEmpty() : "It must be the last page: " + 
queue;
 
                     return false; // We are done.
@@ -79,7 +80,7 @@ public class GridMergeIndexUnsorted extends GridMergeIndex {
 
                 fetchNextPage(page);
 
-                iter = page.response().rows().iterator();
+                iter = page.rows().iterator();
 
                 assert iter.hasNext();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c459f306/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 056cce2..4dc9e59 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -147,7 +147,7 @@ public class GridReduceQueryExecutor {
         QueryRun r = runs.get(msg.queryRequestId());
 
         if (r != null && r.latch.getCount() != 0) {
-            r.rmtErr = new CacheException("Failed to execute map query on the 
node: " + node.id(), msg.error());
+            r.rmtErr = new CacheException("Failed to execute map query on the 
node: " + node.id() + "\n " + msg.error());
 
             while(r.latch.getCount() > 0)
                 r.latch.countDown();
@@ -161,16 +161,17 @@ public class GridReduceQueryExecutor {
     private void onNextPage(final ClusterNode node, GridQueryNextPageResponse 
msg) {
         final long qryReqId = msg.queryRequestId();
         final int qry = msg.query();
-        final int pageSize = msg.rows().size();
 
-        QueryRun r = runs.get(qryReqId);
+        final QueryRun r = runs.get(qryReqId);
 
         if (r == null) // Already finished with error or canceled.
             return;
 
+        final int pageSize = r.pageSize;
+
         GridMergeIndex idx = r.tbls.get(msg.query()).getScanIndex(null);
 
-        idx.addPage(new GridResultPage(node.id(), msg) {
+        idx.addPage(new GridResultPage(node.id(), msg, false) {
             @Override public void fetchNextPage() {
                 try {
                     ctx.io().sendUserMessage(F.asList(node), new 
GridQueryNextPageRequest(qryReqId, qry, pageSize),
@@ -196,6 +197,8 @@ public class GridReduceQueryExecutor {
 
         QueryRun r = new QueryRun();
 
+        r.pageSize = 1000; // TODO configure correctly page size
+
         r.tbls = new ArrayList<>(qry.mapQueries().size());
 
         r.conn = h2.connectionForSpace(space);
@@ -228,7 +231,7 @@ public class GridReduceQueryExecutor {
         runs.put(qryReqId, r);
 
         try {
-            ctx.io().sendUserMessage(nodes, new GridQueryRequest(qryReqId, 
1000, space, qry.mapQueries()), // TODO conf page size
+            ctx.io().sendUserMessage(nodes, new GridQueryRequest(qryReqId, 
r.pageSize, space, qry.mapQueries()),
                 GridTopic.TOPIC_QUERY, false, 0);
 
             r.latch.await();
@@ -393,6 +396,9 @@ public class GridReduceQueryExecutor {
         private Connection conn;
 
         /** */
+        private int pageSize;
+
+        /** */
         private volatile Throwable rmtErr;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c459f306/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
index 7f7d572..a903f01 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
@@ -19,6 +19,7 @@ package 
org.apache.ignite.internal.processors.query.h2.twostep;
 
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.h2.value.*;
 
 import java.util.*;
 
@@ -32,13 +33,43 @@ public class GridResultPage {
     /** */
     protected final GridQueryNextPageResponse res;
 
+    /** */
+    private final Collection<Value[]> rows;
+
+    /** */
+    private final boolean last;
+
     /**
      * @param src Source.
      * @param res Response.
+     * @param last If this is the globally last page.
      */
-    protected GridResultPage(UUID src, GridQueryNextPageResponse res) {
+    protected GridResultPage(UUID src, GridQueryNextPageResponse res, boolean 
last) {
+        assert src != null;
+
         this.src = src;
         this.res = res;
+        this.last = last;
+
+        if (last)
+            assert res == null : "The last page must be dummy.";
+
+        // res == null means that it is a terminating dummy page for the given 
source node ID.
+        rows = res == null ? null : 
GridMapQueryExecutor.unmarshallRows(res.rows());
+    }
+
+    /**
+     * @return {@code true} If this is a dummy last page for all the sources.
+     */
+    public boolean isLast() {
+        return last;
+    }
+
+    /**
+     * @return Rows.
+     */
+    public Collection<Value[]> rows() {
+        return rows;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c459f306/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java
deleted file mode 100644
index 9bcb410..0000000
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2.twostep.messages;
-
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-
-/**
- * Cancel request.
- */
-public class GridQueryCancelRequest implements Serializable {
-    /** */
-    private long qryReqId;
-
-    /**
-     * @param qryReqId Query request ID.
-     */
-    public GridQueryCancelRequest(long qryReqId) {
-        this.qryReqId = qryReqId;
-    }
-
-    /**
-     * @return Query request ID.
-     */
-    public long queryRequestId() {
-        return qryReqId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridQueryCancelRequest.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c459f306/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
deleted file mode 100644
index 3251677..0000000
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2.twostep.messages;
-
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-
-/**
- * Error message.
- */
-public class GridQueryFailResponse implements Serializable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private long qryReqId;
-
-    /** */
-    private Throwable err;
-
-    /**
-     * @param qryReqId Query request ID.
-     * @param err Error.
-     */
-    public GridQueryFailResponse(long qryReqId, Throwable err) {
-        this.qryReqId = qryReqId;
-        this.err = err;
-    }
-
-    /**
-     * @return Query request ID.
-     */
-    public long queryRequestId() {
-        return qryReqId;
-    }
-
-    /**
-     * @return Error.
-     */
-    public Throwable error() {
-        return err;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridQueryFailResponse.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c459f306/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
deleted file mode 100644
index ae7e1e3..0000000
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2.twostep.messages;
-
-
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-
-/**
- * Request to fetch next page.
- */
-public class GridQueryNextPageRequest implements Serializable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private long qryReqId;
-
-    /** */
-    private int qry;
-
-    /** */
-    private int pageSize;
-
-    /**
-     * @param qryReqId Query request ID.
-     * @param qry Query.
-     * @param pageSize Page size.
-     */
-    public GridQueryNextPageRequest(long qryReqId, int qry, int pageSize) {
-        this.qryReqId = qryReqId;
-        this.qry = qry;
-        this.pageSize = pageSize;
-    }
-
-    /**
-     * @return Query request ID.
-     */
-    public long queryRequestId() {
-        return qryReqId;
-    }
-
-    /**
-     * @return Query.
-     */
-    public int query() {
-        return qry;
-    }
-
-    /**
-     * @return Page size.
-     */
-    public int pageSize() {
-        return pageSize;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridQueryNextPageRequest.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c459f306/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
deleted file mode 100644
index 468691d..0000000
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2.twostep.messages;
-
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.h2.store.*;
-import org.h2.value.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Next page response.
- */
-public class GridQueryNextPageResponse implements Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private long qryReqId;
-
-    /** */
-    private int qry;
-
-    /** */
-    private int page;
-
-    /** */
-    private int allRows;
-
-    /** */
-    private Collection<Value[]> rows;
-
-    /**
-     * For {@link Externalizable}.
-     */
-    public GridQueryNextPageResponse() {
-        // No-op.
-    }
-
-    /**
-     * @param qryReqId Query request ID.
-     * @param qry Query.
-     * @param page Page.
-     * @param allRows All rows count.
-     * @param rows Rows.
-     */
-    public GridQueryNextPageResponse(long qryReqId, int qry, int page, int 
allRows,
-        Collection<Value[]> rows) {
-        assert rows != null;
-
-        this.qryReqId = qryReqId;
-        this.qry = qry;
-        this.page = page;
-        this.allRows = allRows;
-        this.rows = rows;
-    }
-
-    /**
-     * @return Query request ID.
-     */
-    public long queryRequestId() {
-        return qryReqId;
-    }
-
-    /**
-     * @return Query.
-     */
-    public int query() {
-        return qry;
-    }
-
-    /**
-     * @return Page.
-     */
-    public int page() {
-        return page;
-    }
-
-    /**
-     * @return All rows.
-     */
-    public int allRows() {
-        return allRows;
-    }
-
-    /**
-     * @return Rows.
-     */
-    public Collection<Value[]> rows() {
-        return rows;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeLong(qryReqId);
-        out.writeInt(qry);
-        out.writeInt(page);
-        out.writeInt(allRows);
-
-        out.writeInt(rows.size());
-
-        if (rows.isEmpty())
-            return;
-
-        Data data = Data.create(null, 512);
-
-        boolean first = true;
-
-        for (Value[] row : rows) {
-            if (first) {
-                out.writeInt(row.length);
-
-                first = false;
-            }
-
-            for (Value val : row) {
-                data.checkCapacity(data.getValueLen(val));
-
-                data.writeValue(val);
-            }
-        }
-
-        out.writeInt(data.length());
-        out.write(data.getBytes(), 0, data.length());
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-        qryReqId = in.readLong();
-        qry = in.readInt();
-        page = in.readInt();
-        allRows = in.readInt();
-
-        int rowCnt = in.readInt();
-
-        if (rowCnt == 0)
-            rows = Collections.emptyList();
-        else {
-            rows = new ArrayList<>(rowCnt);
-
-            int cols = in.readInt();
-            int dataSize = in.readInt();
-
-            byte[] dataBytes = new byte[dataSize];
-
-            in.readFully(dataBytes);
-
-            Data data = Data.create(null, dataBytes);
-
-            for (int r = 0; r < rowCnt; r++) {
-                Value[] row = new Value[cols];
-
-                for (int c = 0; c < cols; c++)
-                    row[c] = data.readValue();
-
-                rows.add(row);
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridQueryNextPageResponse.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c459f306/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
deleted file mode 100644
index 2834b84..0000000
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2.twostep.messages;
-
-import org.apache.ignite.internal.processors.cache.query.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Query request.
- */
-public class GridQueryRequest implements Serializable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private long reqId;
-
-    /** */
-    private int pageSize;
-
-    /** */
-    private String space;
-
-    /** */
-    @GridToStringInclude
-    private Collection<GridCacheSqlQuery> qrys;
-
-    /**
-     * @param reqId Request ID.
-     * @param pageSize Page size.
-     * @param space Space.
-     * @param qrys Queries.
-     */
-    public GridQueryRequest(long reqId, int pageSize, String space, 
Collection<GridCacheSqlQuery> qrys) {
-        this.reqId = reqId;
-        this.pageSize = pageSize;
-        this.space = space;
-
-        assert qrys instanceof Serializable;
-
-        this.qrys = qrys;
-    }
-
-    /**
-     * @return Request ID.
-     */
-    public long requestId() {
-        return reqId;
-    }
-
-    /**
-     * @return Page size.
-     */
-    public int pageSize() {
-        return pageSize;
-    }
-
-    /**
-     * @return Space.
-     */
-    public String space() {
-        return space;
-    }
-
-    /**
-     * @return Queries.
-     */
-    public Collection<GridCacheSqlQuery> queries() {
-        return qrys;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridQueryRequest.class, this);
-    }
-}

Reply via email to