ignite-sql-tests - direct marshallable

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/cef21304
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/cef21304
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/cef21304

Branch: refs/heads/ignite-sql-tests
Commit: cef213040fb2201e25fefbeedb5bf2ea8e18df78
Parents: c459f30
Author: S.Vladykin <svlady...@gridgain.com>
Authored: Thu Mar 12 04:44:05 2015 +0300
Committer: S.Vladykin <svlady...@gridgain.com>
Committed: Thu Mar 12 04:44:05 2015 +0300

----------------------------------------------------------------------
 .../ignite/codegen/MessageCodeGenerator.java    |   8 ++
 .../communication/GridIoMessageFactory.java     |  26 ++++
 .../messages/GridQueryCancelRequest.java        |  66 +++++++++-
 .../twostep/messages/GridQueryFailResponse.java |  80 +++++++++++-
 .../messages/GridQueryNextPageRequest.java      |  94 +++++++++++++-
 .../messages/GridQueryNextPageResponse.java     | 114 ++++++++++++++++-
 .../h2/twostep/messages/GridQueryRequest.java   | 126 ++++++++++++++++++-
 .../processors/query/h2/IgniteH2Indexing.java   |  14 +++
 .../query/h2/twostep/GridMapQueryExecutor.java  |  81 +++++++-----
 .../h2/twostep/GridReduceQueryExecutor.java     |  92 +++++++++-----
 10 files changed, 626 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cef21304/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git 
a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
 
b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 7962a4b..76fed7e 100644
--- 
a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ 
b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.codegen;
 
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
@@ -164,6 +165,13 @@ public class MessageCodeGenerator {
 //
 //        
gen.generateAndWrite(GridCacheOptimisticCheckPreparedTxRequest.class);
 //        
gen.generateAndWrite(GridCacheOptimisticCheckPreparedTxResponse.class);
+
+//        gen.generateAndWrite(GridQueryCancelRequest.class);
+//        gen.generateAndWrite(GridQueryFailResponse.class);
+//        gen.generateAndWrite(GridQueryNextPageRequest.class);
+//        gen.generateAndWrite(GridQueryNextPageResponse.class);
+//        gen.generateAndWrite(GridQueryRequest.class);
+//        gen.generateAndWrite(GridCacheSqlQuery.class);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cef21304/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 6109d74..4eb0c1d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.clock.*;
 import org.apache.ignite.internal.processors.continuous.*;
 import org.apache.ignite.internal.processors.dataload.*;
 import org.apache.ignite.internal.processors.igfs.*;
+import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
 import org.apache.ignite.internal.processors.rest.handlers.task.*;
 import org.apache.ignite.internal.processors.streamer.*;
 import org.apache.ignite.internal.util.*;
@@ -492,6 +493,31 @@ public class GridIoMessageFactory implements 
MessageFactory {
 
                 break;
 
+            case 90:
+                msg = new GridQueryCancelRequest();
+
+                break;
+
+            case 91:
+                msg = new GridQueryFailResponse();
+
+                break;
+
+            case 92:
+                msg = new GridQueryNextPageRequest();
+
+                break;
+
+            case 93:
+                msg = new GridQueryNextPageResponse();
+
+                break;
+
+            case 94:
+                msg = new GridQueryRequest();
+
+                break;
+
             default:
                 if (ext != null) {
                     for (MessageFactory factory : ext) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cef21304/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java
index 9bcb410..85371b8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java
@@ -18,17 +18,25 @@
 package org.apache.ignite.internal.processors.query.h2.twostep.messages;
 
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
 
-import java.io.*;
+import java.nio.*;
 
 /**
  * Cancel request.
  */
-public class GridQueryCancelRequest implements Serializable {
+public class GridQueryCancelRequest implements Message {
     /** */
     private long qryReqId;
 
     /**
+     * Default constructor.
+     */
+    public GridQueryCancelRequest() {
+        // No-op.
+    }
+
+    /**
      * @param qryReqId Query request ID.
      */
     public GridQueryCancelRequest(long qryReqId) {
@@ -46,4 +54,58 @@ public class GridQueryCancelRequest implements Serializable {
     @Override public String toString() {
         return S.toString(GridQueryCancelRequest.class, this);
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeLong("qryReqId", qryReqId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                qryReqId = reader.readLong("qryReqId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 90;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 1;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cef21304/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
index f551ab8..7a2f22e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java
@@ -18,13 +18,14 @@
 package org.apache.ignite.internal.processors.query.h2.twostep.messages;
 
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
 
-import java.io.*;
+import java.nio.*;
 
 /**
  * Error message.
  */
-public class GridQueryFailResponse implements Serializable {
+public class GridQueryFailResponse implements Message {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -35,6 +36,13 @@ public class GridQueryFailResponse implements Serializable {
     private String errMsg;
 
     /**
+     * Default constructor.
+     */
+    public GridQueryFailResponse() {
+        // No-op.
+    }
+
+    /**
      * @param qryReqId Query request ID.
      * @param err Error.
      */
@@ -61,4 +69,72 @@ public class GridQueryFailResponse implements Serializable {
     @Override public String toString() {
         return S.toString(GridQueryFailResponse.class, this);
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeString("errMsg", errMsg))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeLong("qryReqId", qryReqId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                errMsg = reader.readString("errMsg");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                qryReqId = reader.readLong("qryReqId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 91;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cef21304/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
index ae7e1e3..2e49683 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
@@ -19,13 +19,14 @@ package 
org.apache.ignite.internal.processors.query.h2.twostep.messages;
 
 
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
 
-import java.io.*;
+import java.nio.*;
 
 /**
  * Request to fetch next page.
  */
-public class GridQueryNextPageRequest implements Serializable {
+public class GridQueryNextPageRequest implements Message {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -39,6 +40,13 @@ public class GridQueryNextPageRequest implements 
Serializable {
     private int pageSize;
 
     /**
+     * Default constructor.
+     */
+    public GridQueryNextPageRequest() {
+        // No-op.
+    }
+
+    /**
      * @param qryReqId Query request ID.
      * @param qry Query.
      * @param pageSize Page size.
@@ -74,4 +82,86 @@ public class GridQueryNextPageRequest implements 
Serializable {
     @Override public String toString() {
         return S.toString(GridQueryNextPageRequest.class, this);
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeInt("pageSize", pageSize))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeInt("qry", qry))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeLong("qryReqId", qryReqId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                pageSize = reader.readInt("pageSize");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                qry = reader.readInt("qry");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                qryReqId = reader.readLong("qryReqId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 92;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 3;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cef21304/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
index 3c4cc94..b2cdea6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
@@ -18,13 +18,15 @@
 package org.apache.ignite.internal.processors.query.h2.twostep.messages;
 
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
 
 import java.io.*;
+import java.nio.*;
 
 /**
  * Next page response.
  */
-public class GridQueryNextPageResponse implements Externalizable {
+public class GridQueryNextPageResponse implements Externalizable, Message {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -125,4 +127,114 @@ public class GridQueryNextPageResponse implements 
Externalizable {
     @Override public String toString() {
         return S.toString(GridQueryNextPageResponse.class, this);
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeInt("allRows", allRows))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeInt("page", page))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeInt("qry", qry))
+                    return false;
+
+                writer.incrementState();
+
+            case 3:
+                if (!writer.writeLong("qryReqId", qryReqId))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeByteArray("rows", rows))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                allRows = reader.readInt("allRows");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                page = reader.readInt("page");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                qry = reader.readInt("qry");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 3:
+                qryReqId = reader.readLong("qryReqId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                rows = reader.readByteArray("rows");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 93;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 5;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cef21304/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
index 2834b84..bc929dd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
@@ -17,17 +17,21 @@
 
 package org.apache.ignite.internal.processors.query.h2.twostep.messages;
 
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.plugin.extensions.communication.*;
 
-import java.io.*;
+import java.nio.*;
 import java.util.*;
 
 /**
  * Query request.
  */
-public class GridQueryRequest implements Serializable {
+public class GridQueryRequest implements Message {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -42,22 +46,35 @@ public class GridQueryRequest implements Serializable {
 
     /** */
     @GridToStringInclude
+    @GridDirectTransient
     private Collection<GridCacheSqlQuery> qrys;
 
+    /** */
+    private byte[] qrysBytes;
+
+    /**
+     * Default constructor.
+     */
+    public GridQueryRequest() {
+        // No-op.
+    }
+
     /**
      * @param reqId Request ID.
      * @param pageSize Page size.
      * @param space Space.
      * @param qrys Queries.
+     * @param qrysBytes Marshalled queries.
      */
-    public GridQueryRequest(long reqId, int pageSize, String space, 
Collection<GridCacheSqlQuery> qrys) {
+    public GridQueryRequest(long reqId, int pageSize, String space, 
Collection<GridCacheSqlQuery> qrys, byte[] qrysBytes) {
         this.reqId = reqId;
         this.pageSize = pageSize;
         this.space = space;
 
-        assert qrys instanceof Serializable;
+        assert qrysBytes != null;
 
         this.qrys = qrys;
+        this.qrysBytes = qrysBytes;
     }
 
     /**
@@ -84,7 +101,10 @@ public class GridQueryRequest implements Serializable {
     /**
      * @return Queries.
      */
-    public Collection<GridCacheSqlQuery> queries() {
+    public Collection<GridCacheSqlQuery> queries(Marshaller m) throws 
IgniteCheckedException {
+        if (qrys == null && qrysBytes != null)
+            qrys = m.unmarshal(qrysBytes, null);
+
         return qrys;
     }
 
@@ -92,4 +112,100 @@ public class GridQueryRequest implements Serializable {
     @Override public String toString() {
         return S.toString(GridQueryRequest.class, this);
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeInt("pageSize", pageSize))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeByteArray("qrysBytes", qrysBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeLong("reqId", reqId))
+                    return false;
+
+                writer.incrementState();
+
+            case 3:
+                if (!writer.writeString("space", space))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                pageSize = reader.readInt("pageSize");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                qrysBytes = reader.readByteArray("qrysBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                reqId = reader.readLong("reqId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 3:
+                space = reader.readString("space");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 94;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 4;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cef21304/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 5fd837e..b210f4d 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1017,6 +1017,20 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         return ((Number)iter.next().get(0)).longValue();
     }
 
+    /**
+     * @return Map query executor.
+     */
+    public GridMapQueryExecutor mapQueryExecutor() {
+        return mapQryExec;
+    }
+
+    /**
+     * @return Reduce query executor.
+     */
+    public GridReduceQueryExecutor reduceQueryExecutor() {
+        return rdcQryExec;
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings("NonThreadSafeLazyInitialization")
     @Override public void start(GridKernalContext ctx) throws 
IgniteCheckedException {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cef21304/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 5b2a0ab..adade06 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -21,12 +21,12 @@ import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.processors.query.h2.*;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
 import org.h2.jdbc.*;
 import org.h2.result.*;
 import org.h2.store.*;
@@ -45,7 +45,7 @@ import static org.apache.ignite.events.EventType.*;
 /**
  * Map query executor.
  */
-public class GridMapQueryExecutor {
+public class GridMapQueryExecutor implements GridMessageListener {
     /** */
     private static final Field RESULT_FIELD;
 
@@ -88,34 +88,33 @@ public class GridMapQueryExecutor {
 
         // TODO handle node failures.
 
-        ctx.io().addUserMessageListener(GridTopic.TOPIC_QUERY, new 
IgniteBiPredicate<UUID, Object>() {
-            @Override public boolean apply(UUID nodeId, Object msg) {
-                try {
-                    assert msg != null;
+        ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, this);
+    }
 
-                    ClusterNode node = ctx.discovery().node(nodeId);
+    /** {@inheritDoc} */
+    @Override public void onMessage(UUID nodeId, Object msg) {
+        try {
+            assert msg != null;
 
-                    boolean processed = true;
+            ClusterNode node = ctx.discovery().node(nodeId);
 
-                    if (msg instanceof GridQueryRequest)
-                        onQueryRequest(node, (GridQueryRequest)msg);
-                    else if (msg instanceof GridQueryNextPageRequest)
-                        onNextPageRequest(node, (GridQueryNextPageRequest)msg);
-                    else if (msg instanceof GridQueryCancelRequest)
-                        onCancel(node, (GridQueryCancelRequest)msg);
-                    else
-                        processed = false;
+            boolean processed = true;
 
-                    if (processed && log.isDebugEnabled())
-                        log.debug("Processed request: " + nodeId + "->" + 
ctx.localNodeId() + " " + msg);
-                }
-                catch(Throwable th) {
-                    U.error(log, "Failed to process message: " + msg, th);
-                }
+            if (msg instanceof GridQueryRequest)
+                onQueryRequest(node, (GridQueryRequest)msg);
+            else if (msg instanceof GridQueryNextPageRequest)
+                onNextPageRequest(node, (GridQueryNextPageRequest)msg);
+            else if (msg instanceof GridQueryCancelRequest)
+                onCancel(node, (GridQueryCancelRequest)msg);
+            else
+                processed = false;
 
-                return true;
-            }
-        });
+            if (processed && log.isDebugEnabled())
+                log.debug("Processed request: " + nodeId + "->" + 
ctx.localNodeId() + " " + msg);
+        }
+        catch(Throwable th) {
+            U.error(log, "Failed to process message: " + msg, th);
+        }
     }
 
     /**
@@ -161,7 +160,16 @@ public class GridMapQueryExecutor {
     private void onQueryRequest(ClusterNode node, GridQueryRequest req) {
         ConcurrentMap<Long,QueryResults> nodeRess = resultsForNode(node.id());
 
-        QueryResults qr = new QueryResults(req.requestId(), 
req.queries().size());
+        Collection<GridCacheSqlQuery> qrys;
+
+        try {
+            qrys = req.queries(ctx.config().getMarshaller());
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+
+        QueryResults qr = new QueryResults(req.requestId(), qrys.size());
 
         if (nodeRess.putIfAbsent(req.requestId(), qr) != null)
             throw new IllegalStateException();
@@ -176,7 +184,7 @@ public class GridMapQueryExecutor {
 
             String space = req.space();
 
-            for (GridCacheSqlQuery qry : req.queries()) {
+            for (GridCacheSqlQuery qry : qrys) {
                 ResultSet rs = h2.executeSqlQueryWithTimer(space, 
h2.connectionForSpace(space), qry.query(),
                     F.asList(qry.parameters()));
 
@@ -233,8 +241,12 @@ public class GridMapQueryExecutor {
      */
     private void sendError(ClusterNode node, long qryReqId, Throwable err) {
         try {
-            ctx.io().sendUserMessage(F.asList(node), new 
GridQueryFailResponse(qryReqId, err),
-                GridTopic.TOPIC_QUERY, false, 0);
+            GridQueryFailResponse msg = new GridQueryFailResponse(qryReqId, 
err);
+
+            if (node.isLocal())
+                h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
+            else
+                ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, 
GridIoPolicy.PUBLIC_POOL);
         }
         catch (Exception e) {
             e.addSuppressed(err);
@@ -284,10 +296,13 @@ public class GridMapQueryExecutor {
         }
 
         try {
-            ctx.io().sendUserMessage(F.asList(node),
-                new GridQueryNextPageResponse(qr.qryReqId, qry, page, page == 
0 ? res.rowCount : -1,
-                    marshallRows(rows)),
-                GridTopic.TOPIC_QUERY, false, 0);
+            GridQueryNextPageResponse msg = new 
GridQueryNextPageResponse(qr.qryReqId, qry, page,
+                page == 0 ? res.rowCount : -1, marshallRows(rows));
+
+            if (node.isLocal())
+                h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
+            else
+                ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, 
GridIoPolicy.PUBLIC_POOL);
         }
         catch (IgniteCheckedException e) {
             log.error("Failed to send message.", e);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cef21304/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 4dc9e59..642a5ba 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.processors.query.h2.*;
@@ -28,7 +29,7 @@ import org.apache.ignite.internal.processors.query.h2.sql.*;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
 import org.h2.command.ddl.*;
 import org.h2.command.dml.Query;
 import org.h2.engine.*;
@@ -51,7 +52,7 @@ import java.util.concurrent.atomic.*;
 /**
  * Reduce query executor.
  */
-public class GridReduceQueryExecutor {
+public class GridReduceQueryExecutor implements GridMessageListener {
     /** */
     private GridKernalContext ctx;
 
@@ -108,35 +109,34 @@ public class GridReduceQueryExecutor {
 
         // TODO handle node failure.
 
-        ctx.io().addUserMessageListener(GridTopic.TOPIC_QUERY, new 
IgniteBiPredicate<UUID, Object>() {
-            @Override public boolean apply(UUID nodeId, Object msg) {
-                try {
-                    assert msg != null;
+        ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, this);
 
-                    ClusterNode node = ctx.discovery().node(nodeId);
+        h2.executeStatement("PUBLIC", "CREATE ALIAS " + 
GridSqlQuerySplitter.TABLE_FUNC_NAME +
+            " FOR \"" + GridReduceQueryExecutor.class.getName() + 
".mergeTableFunction\"");
+    }
 
-                    boolean processed = true;
+    /** {@inheritDoc} */
+    @Override public void onMessage(UUID nodeId, Object msg) {
+        try {
+            assert msg != null;
 
-                    if (msg instanceof GridQueryNextPageResponse)
-                        onNextPage(node, (GridQueryNextPageResponse)msg);
-                    else if (msg instanceof GridQueryFailResponse)
-                        onFail(node, (GridQueryFailResponse)msg);
-                    else
-                        processed = false;
+            ClusterNode node = ctx.discovery().node(nodeId);
 
-                    if (processed && log.isDebugEnabled())
-                        log.debug("Processed response: " + nodeId + "->" + 
ctx.localNodeId() + " " + msg);
-                }
-                catch(Throwable th) {
-                    U.error(log, "Failed to process message: " + msg, th);
-                }
+            boolean processed = true;
 
-                return true;
-            }
-        });
+            if (msg instanceof GridQueryNextPageResponse)
+                onNextPage(node, (GridQueryNextPageResponse)msg);
+            else if (msg instanceof GridQueryFailResponse)
+                onFail(node, (GridQueryFailResponse)msg);
+            else
+                processed = false;
 
-        h2.executeStatement("PUBLIC", "CREATE ALIAS " + 
GridSqlQuerySplitter.TABLE_FUNC_NAME +
-            " FOR \"" + GridReduceQueryExecutor.class.getName() + 
".mergeTableFunction\"");
+            if (processed && log.isDebugEnabled())
+                log.debug("Processed response: " + nodeId + "->" + 
ctx.localNodeId() + " " + msg);
+        }
+        catch(Throwable th) {
+            U.error(log, "Failed to process message: " + msg, th);
+        }
     }
 
     /**
@@ -174,8 +174,12 @@ public class GridReduceQueryExecutor {
         idx.addPage(new GridResultPage(node.id(), msg, false) {
             @Override public void fetchNextPage() {
                 try {
-                    ctx.io().sendUserMessage(F.asList(node), new 
GridQueryNextPageRequest(qryReqId, qry, pageSize),
-                        GridTopic.TOPIC_QUERY, false, 0);
+                    GridQueryNextPageRequest msg0 = new 
GridQueryNextPageRequest(qryReqId, qry, pageSize);
+
+                    if (node.isLocal())
+                        h2.mapQueryExecutor().onMessage(ctx.localNodeId(), 
msg0);
+                    else
+                        ctx.io().send(node, GridTopic.TOPIC_QUERY, msg0, 
GridIoPolicy.PUBLIC_POOL);
                 }
                 catch (IgniteCheckedException e) {
                     throw new IgniteException(e);
@@ -231,8 +235,8 @@ public class GridReduceQueryExecutor {
         runs.put(qryReqId, r);
 
         try {
-            ctx.io().sendUserMessage(nodes, new GridQueryRequest(qryReqId, 
r.pageSize, space, qry.mapQueries()),
-                GridTopic.TOPIC_QUERY, false, 0);
+            send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, 
qry.mapQueries(),
+                ctx.config().getMarshaller().marshal(qry.mapQueries())));
 
             r.latch.await();
 
@@ -245,7 +249,7 @@ public class GridReduceQueryExecutor {
 
             for (GridMergeTable tbl : r.tbls) {
                 if (!tbl.getScanIndex(null).fetchedAll()) // We have to 
explicitly cancel queries on remote nodes.
-                    ctx.io().sendUserMessage(nodes, new 
GridQueryCancelRequest(qryReqId), GridTopic.TOPIC_QUERY, false, 0);
+                    send(nodes, new GridQueryCancelRequest(qryReqId));
 
 //                dropTable(r.conn, tbl.getName()); TODO
             }
@@ -269,6 +273,34 @@ public class GridReduceQueryExecutor {
     }
 
     /**
+     * @param nodes Nodes.
+     * @param msg Message.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void send(Collection<ClusterNode> nodes, Message msg) throws 
IgniteCheckedException {
+        for (ClusterNode node : nodes) {
+            if (node.isLocal()) {
+                ArrayList<ClusterNode> remotes = new ArrayList<>(nodes.size() 
- 1);
+
+                for (ClusterNode node0 : nodes) {
+                    if (node0 != node)
+                        remotes.add(node0);
+                }
+
+                ctx.io().send(remotes, GridTopic.TOPIC_QUERY, msg, 
GridIoPolicy.PUBLIC_POOL);
+
+                // Local node goes the last to allow parallel execution.
+                h2.mapQueryExecutor().onMessage(ctx.localNodeId(), msg);
+
+                return;
+            }
+        }
+
+        // All the given nodes are remotes.
+        ctx.io().send(nodes, GridTopic.TOPIC_QUERY, msg, 
GridIoPolicy.PUBLIC_POOL);
+    }
+
+    /**
      * @param conn Connection.
      * @param tblName Table name.
      * @throws SQLException If failed.

Reply via email to