ignite-sql-tests - direct marshallable
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/cef21304 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/cef21304 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/cef21304 Branch: refs/heads/ignite-sql-tests Commit: cef213040fb2201e25fefbeedb5bf2ea8e18df78 Parents: c459f30 Author: S.Vladykin <svlady...@gridgain.com> Authored: Thu Mar 12 04:44:05 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Thu Mar 12 04:44:05 2015 +0300 ---------------------------------------------------------------------- .../ignite/codegen/MessageCodeGenerator.java | 8 ++ .../communication/GridIoMessageFactory.java | 26 ++++ .../messages/GridQueryCancelRequest.java | 66 +++++++++- .../twostep/messages/GridQueryFailResponse.java | 80 +++++++++++- .../messages/GridQueryNextPageRequest.java | 94 +++++++++++++- .../messages/GridQueryNextPageResponse.java | 114 ++++++++++++++++- .../h2/twostep/messages/GridQueryRequest.java | 126 ++++++++++++++++++- .../processors/query/h2/IgniteH2Indexing.java | 14 +++ .../query/h2/twostep/GridMapQueryExecutor.java | 81 +++++++----- .../h2/twostep/GridReduceQueryExecutor.java | 92 +++++++++----- 10 files changed, 626 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cef21304/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java ---------------------------------------------------------------------- diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java index 7962a4b..76fed7e 100644 --- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java +++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java @@ -18,6 +18,7 @@ package org.apache.ignite.codegen; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.query.h2.twostep.messages.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; @@ -164,6 +165,13 @@ public class MessageCodeGenerator { // // gen.generateAndWrite(GridCacheOptimisticCheckPreparedTxRequest.class); // gen.generateAndWrite(GridCacheOptimisticCheckPreparedTxResponse.class); + +// gen.generateAndWrite(GridQueryCancelRequest.class); +// gen.generateAndWrite(GridQueryFailResponse.class); +// gen.generateAndWrite(GridQueryNextPageRequest.class); +// gen.generateAndWrite(GridQueryNextPageResponse.class); +// gen.generateAndWrite(GridQueryRequest.class); +// gen.generateAndWrite(GridCacheSqlQuery.class); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cef21304/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 6109d74..4eb0c1d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.clock.*; import org.apache.ignite.internal.processors.continuous.*; import org.apache.ignite.internal.processors.dataload.*; import org.apache.ignite.internal.processors.igfs.*; +import org.apache.ignite.internal.processors.query.h2.twostep.messages.*; import org.apache.ignite.internal.processors.rest.handlers.task.*; import org.apache.ignite.internal.processors.streamer.*; import org.apache.ignite.internal.util.*; @@ -492,6 +493,31 @@ public class GridIoMessageFactory implements MessageFactory { break; + case 90: + msg = new GridQueryCancelRequest(); + + break; + + case 91: + msg = new GridQueryFailResponse(); + + break; + + case 92: + msg = new GridQueryNextPageRequest(); + + break; + + case 93: + msg = new GridQueryNextPageResponse(); + + break; + + case 94: + msg = new GridQueryRequest(); + + break; + default: if (ext != null) { for (MessageFactory factory : ext) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cef21304/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java index 9bcb410..85371b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java @@ -18,17 +18,25 @@ package org.apache.ignite.internal.processors.query.h2.twostep.messages; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.plugin.extensions.communication.*; -import java.io.*; +import java.nio.*; /** * Cancel request. */ -public class GridQueryCancelRequest implements Serializable { +public class GridQueryCancelRequest implements Message { /** */ private long qryReqId; /** + * Default constructor. + */ + public GridQueryCancelRequest() { + // No-op. + } + + /** * @param qryReqId Query request ID. */ public GridQueryCancelRequest(long qryReqId) { @@ -46,4 +54,58 @@ public class GridQueryCancelRequest implements Serializable { @Override public String toString() { return S.toString(GridQueryCancelRequest.class, this); } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeLong("qryReqId", qryReqId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + qryReqId = reader.readLong("qryReqId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 90; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 1; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cef21304/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java index f551ab8..7a2f22e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java @@ -18,13 +18,14 @@ package org.apache.ignite.internal.processors.query.h2.twostep.messages; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.plugin.extensions.communication.*; -import java.io.*; +import java.nio.*; /** * Error message. */ -public class GridQueryFailResponse implements Serializable { +public class GridQueryFailResponse implements Message { /** */ private static final long serialVersionUID = 0L; @@ -35,6 +36,13 @@ public class GridQueryFailResponse implements Serializable { private String errMsg; /** + * Default constructor. + */ + public GridQueryFailResponse() { + // No-op. + } + + /** * @param qryReqId Query request ID. * @param err Error. */ @@ -61,4 +69,72 @@ public class GridQueryFailResponse implements Serializable { @Override public String toString() { return S.toString(GridQueryFailResponse.class, this); } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeString("errMsg", errMsg)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeLong("qryReqId", qryReqId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + errMsg = reader.readString("errMsg"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + qryReqId = reader.readLong("qryReqId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 91; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 2; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cef21304/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java index ae7e1e3..2e49683 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java @@ -19,13 +19,14 @@ package org.apache.ignite.internal.processors.query.h2.twostep.messages; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.plugin.extensions.communication.*; -import java.io.*; +import java.nio.*; /** * Request to fetch next page. */ -public class GridQueryNextPageRequest implements Serializable { +public class GridQueryNextPageRequest implements Message { /** */ private static final long serialVersionUID = 0L; @@ -39,6 +40,13 @@ public class GridQueryNextPageRequest implements Serializable { private int pageSize; /** + * Default constructor. + */ + public GridQueryNextPageRequest() { + // No-op. + } + + /** * @param qryReqId Query request ID. * @param qry Query. * @param pageSize Page size. @@ -74,4 +82,86 @@ public class GridQueryNextPageRequest implements Serializable { @Override public String toString() { return S.toString(GridQueryNextPageRequest.class, this); } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeInt("pageSize", pageSize)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeInt("qry", qry)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeLong("qryReqId", qryReqId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + pageSize = reader.readInt("pageSize"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + qry = reader.readInt("qry"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + qryReqId = reader.readLong("qryReqId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 92; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 3; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cef21304/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java index 3c4cc94..b2cdea6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java @@ -18,13 +18,15 @@ package org.apache.ignite.internal.processors.query.h2.twostep.messages; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.plugin.extensions.communication.*; import java.io.*; +import java.nio.*; /** * Next page response. */ -public class GridQueryNextPageResponse implements Externalizable { +public class GridQueryNextPageResponse implements Externalizable, Message { /** */ private static final long serialVersionUID = 0L; @@ -125,4 +127,114 @@ public class GridQueryNextPageResponse implements Externalizable { @Override public String toString() { return S.toString(GridQueryNextPageResponse.class, this); } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeInt("allRows", allRows)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeInt("page", page)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeInt("qry", qry)) + return false; + + writer.incrementState(); + + case 3: + if (!writer.writeLong("qryReqId", qryReqId)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeByteArray("rows", rows)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + allRows = reader.readInt("allRows"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + page = reader.readInt("page"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + qry = reader.readInt("qry"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 3: + qryReqId = reader.readLong("qryReqId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + rows = reader.readByteArray("rows"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 93; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 5; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cef21304/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java index 2834b84..bc929dd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java @@ -17,17 +17,21 @@ package org.apache.ignite.internal.processors.query.h2.twostep.messages; +import org.apache.ignite.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.plugin.extensions.communication.*; -import java.io.*; +import java.nio.*; import java.util.*; /** * Query request. */ -public class GridQueryRequest implements Serializable { +public class GridQueryRequest implements Message { /** */ private static final long serialVersionUID = 0L; @@ -42,22 +46,35 @@ public class GridQueryRequest implements Serializable { /** */ @GridToStringInclude + @GridDirectTransient private Collection<GridCacheSqlQuery> qrys; + /** */ + private byte[] qrysBytes; + + /** + * Default constructor. + */ + public GridQueryRequest() { + // No-op. + } + /** * @param reqId Request ID. * @param pageSize Page size. * @param space Space. * @param qrys Queries. + * @param qrysBytes Marshalled queries. */ - public GridQueryRequest(long reqId, int pageSize, String space, Collection<GridCacheSqlQuery> qrys) { + public GridQueryRequest(long reqId, int pageSize, String space, Collection<GridCacheSqlQuery> qrys, byte[] qrysBytes) { this.reqId = reqId; this.pageSize = pageSize; this.space = space; - assert qrys instanceof Serializable; + assert qrysBytes != null; this.qrys = qrys; + this.qrysBytes = qrysBytes; } /** @@ -84,7 +101,10 @@ public class GridQueryRequest implements Serializable { /** * @return Queries. */ - public Collection<GridCacheSqlQuery> queries() { + public Collection<GridCacheSqlQuery> queries(Marshaller m) throws IgniteCheckedException { + if (qrys == null && qrysBytes != null) + qrys = m.unmarshal(qrysBytes, null); + return qrys; } @@ -92,4 +112,100 @@ public class GridQueryRequest implements Serializable { @Override public String toString() { return S.toString(GridQueryRequest.class, this); } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeInt("pageSize", pageSize)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeByteArray("qrysBytes", qrysBytes)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeLong("reqId", reqId)) + return false; + + writer.incrementState(); + + case 3: + if (!writer.writeString("space", space)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + pageSize = reader.readInt("pageSize"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + qrysBytes = reader.readByteArray("qrysBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + reqId = reader.readLong("reqId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 3: + space = reader.readString("space"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 94; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 4; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cef21304/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 5fd837e..b210f4d 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -1017,6 +1017,20 @@ public class IgniteH2Indexing implements GridQueryIndexing { return ((Number)iter.next().get(0)).longValue(); } + /** + * @return Map query executor. + */ + public GridMapQueryExecutor mapQueryExecutor() { + return mapQryExec; + } + + /** + * @return Reduce query executor. + */ + public GridReduceQueryExecutor reduceQueryExecutor() { + return rdcQryExec; + } + /** {@inheritDoc} */ @SuppressWarnings("NonThreadSafeLazyInitialization") @Override public void start(GridKernalContext ctx) throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cef21304/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 5b2a0ab..adade06 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -21,12 +21,12 @@ import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.query.h2.*; import org.apache.ignite.internal.processors.query.h2.twostep.messages.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; import org.h2.jdbc.*; import org.h2.result.*; import org.h2.store.*; @@ -45,7 +45,7 @@ import static org.apache.ignite.events.EventType.*; /** * Map query executor. */ -public class GridMapQueryExecutor { +public class GridMapQueryExecutor implements GridMessageListener { /** */ private static final Field RESULT_FIELD; @@ -88,34 +88,33 @@ public class GridMapQueryExecutor { // TODO handle node failures. - ctx.io().addUserMessageListener(GridTopic.TOPIC_QUERY, new IgniteBiPredicate<UUID, Object>() { - @Override public boolean apply(UUID nodeId, Object msg) { - try { - assert msg != null; + ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, this); + } - ClusterNode node = ctx.discovery().node(nodeId); + /** {@inheritDoc} */ + @Override public void onMessage(UUID nodeId, Object msg) { + try { + assert msg != null; - boolean processed = true; + ClusterNode node = ctx.discovery().node(nodeId); - if (msg instanceof GridQueryRequest) - onQueryRequest(node, (GridQueryRequest)msg); - else if (msg instanceof GridQueryNextPageRequest) - onNextPageRequest(node, (GridQueryNextPageRequest)msg); - else if (msg instanceof GridQueryCancelRequest) - onCancel(node, (GridQueryCancelRequest)msg); - else - processed = false; + boolean processed = true; - if (processed && log.isDebugEnabled()) - log.debug("Processed request: " + nodeId + "->" + ctx.localNodeId() + " " + msg); - } - catch(Throwable th) { - U.error(log, "Failed to process message: " + msg, th); - } + if (msg instanceof GridQueryRequest) + onQueryRequest(node, (GridQueryRequest)msg); + else if (msg instanceof GridQueryNextPageRequest) + onNextPageRequest(node, (GridQueryNextPageRequest)msg); + else if (msg instanceof GridQueryCancelRequest) + onCancel(node, (GridQueryCancelRequest)msg); + else + processed = false; - return true; - } - }); + if (processed && log.isDebugEnabled()) + log.debug("Processed request: " + nodeId + "->" + ctx.localNodeId() + " " + msg); + } + catch(Throwable th) { + U.error(log, "Failed to process message: " + msg, th); + } } /** @@ -161,7 +160,16 @@ public class GridMapQueryExecutor { private void onQueryRequest(ClusterNode node, GridQueryRequest req) { ConcurrentMap<Long,QueryResults> nodeRess = resultsForNode(node.id()); - QueryResults qr = new QueryResults(req.requestId(), req.queries().size()); + Collection<GridCacheSqlQuery> qrys; + + try { + qrys = req.queries(ctx.config().getMarshaller()); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + + QueryResults qr = new QueryResults(req.requestId(), qrys.size()); if (nodeRess.putIfAbsent(req.requestId(), qr) != null) throw new IllegalStateException(); @@ -176,7 +184,7 @@ public class GridMapQueryExecutor { String space = req.space(); - for (GridCacheSqlQuery qry : req.queries()) { + for (GridCacheSqlQuery qry : qrys) { ResultSet rs = h2.executeSqlQueryWithTimer(space, h2.connectionForSpace(space), qry.query(), F.asList(qry.parameters())); @@ -233,8 +241,12 @@ public class GridMapQueryExecutor { */ private void sendError(ClusterNode node, long qryReqId, Throwable err) { try { - ctx.io().sendUserMessage(F.asList(node), new GridQueryFailResponse(qryReqId, err), - GridTopic.TOPIC_QUERY, false, 0); + GridQueryFailResponse msg = new GridQueryFailResponse(qryReqId, err); + + if (node.isLocal()) + h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg); + else + ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.PUBLIC_POOL); } catch (Exception e) { e.addSuppressed(err); @@ -284,10 +296,13 @@ public class GridMapQueryExecutor { } try { - ctx.io().sendUserMessage(F.asList(node), - new GridQueryNextPageResponse(qr.qryReqId, qry, page, page == 0 ? res.rowCount : -1, - marshallRows(rows)), - GridTopic.TOPIC_QUERY, false, 0); + GridQueryNextPageResponse msg = new GridQueryNextPageResponse(qr.qryReqId, qry, page, + page == 0 ? res.rowCount : -1, marshallRows(rows)); + + if (node.isLocal()) + h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg); + else + ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.PUBLIC_POOL); } catch (IgniteCheckedException e) { log.error("Failed to send message.", e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cef21304/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 4dc9e59..642a5ba 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -21,6 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.query.h2.*; @@ -28,7 +29,7 @@ import org.apache.ignite.internal.processors.query.h2.sql.*; import org.apache.ignite.internal.processors.query.h2.twostep.messages.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; +import org.apache.ignite.plugin.extensions.communication.*; import org.h2.command.ddl.*; import org.h2.command.dml.Query; import org.h2.engine.*; @@ -51,7 +52,7 @@ import java.util.concurrent.atomic.*; /** * Reduce query executor. */ -public class GridReduceQueryExecutor { +public class GridReduceQueryExecutor implements GridMessageListener { /** */ private GridKernalContext ctx; @@ -108,35 +109,34 @@ public class GridReduceQueryExecutor { // TODO handle node failure. - ctx.io().addUserMessageListener(GridTopic.TOPIC_QUERY, new IgniteBiPredicate<UUID, Object>() { - @Override public boolean apply(UUID nodeId, Object msg) { - try { - assert msg != null; + ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, this); - ClusterNode node = ctx.discovery().node(nodeId); + h2.executeStatement("PUBLIC", "CREATE ALIAS " + GridSqlQuerySplitter.TABLE_FUNC_NAME + + " FOR \"" + GridReduceQueryExecutor.class.getName() + ".mergeTableFunction\""); + } - boolean processed = true; + /** {@inheritDoc} */ + @Override public void onMessage(UUID nodeId, Object msg) { + try { + assert msg != null; - if (msg instanceof GridQueryNextPageResponse) - onNextPage(node, (GridQueryNextPageResponse)msg); - else if (msg instanceof GridQueryFailResponse) - onFail(node, (GridQueryFailResponse)msg); - else - processed = false; + ClusterNode node = ctx.discovery().node(nodeId); - if (processed && log.isDebugEnabled()) - log.debug("Processed response: " + nodeId + "->" + ctx.localNodeId() + " " + msg); - } - catch(Throwable th) { - U.error(log, "Failed to process message: " + msg, th); - } + boolean processed = true; - return true; - } - }); + if (msg instanceof GridQueryNextPageResponse) + onNextPage(node, (GridQueryNextPageResponse)msg); + else if (msg instanceof GridQueryFailResponse) + onFail(node, (GridQueryFailResponse)msg); + else + processed = false; - h2.executeStatement("PUBLIC", "CREATE ALIAS " + GridSqlQuerySplitter.TABLE_FUNC_NAME + - " FOR \"" + GridReduceQueryExecutor.class.getName() + ".mergeTableFunction\""); + if (processed && log.isDebugEnabled()) + log.debug("Processed response: " + nodeId + "->" + ctx.localNodeId() + " " + msg); + } + catch(Throwable th) { + U.error(log, "Failed to process message: " + msg, th); + } } /** @@ -174,8 +174,12 @@ public class GridReduceQueryExecutor { idx.addPage(new GridResultPage(node.id(), msg, false) { @Override public void fetchNextPage() { try { - ctx.io().sendUserMessage(F.asList(node), new GridQueryNextPageRequest(qryReqId, qry, pageSize), - GridTopic.TOPIC_QUERY, false, 0); + GridQueryNextPageRequest msg0 = new GridQueryNextPageRequest(qryReqId, qry, pageSize); + + if (node.isLocal()) + h2.mapQueryExecutor().onMessage(ctx.localNodeId(), msg0); + else + ctx.io().send(node, GridTopic.TOPIC_QUERY, msg0, GridIoPolicy.PUBLIC_POOL); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -231,8 +235,8 @@ public class GridReduceQueryExecutor { runs.put(qryReqId, r); try { - ctx.io().sendUserMessage(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, qry.mapQueries()), - GridTopic.TOPIC_QUERY, false, 0); + send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, qry.mapQueries(), + ctx.config().getMarshaller().marshal(qry.mapQueries()))); r.latch.await(); @@ -245,7 +249,7 @@ public class GridReduceQueryExecutor { for (GridMergeTable tbl : r.tbls) { if (!tbl.getScanIndex(null).fetchedAll()) // We have to explicitly cancel queries on remote nodes. - ctx.io().sendUserMessage(nodes, new GridQueryCancelRequest(qryReqId), GridTopic.TOPIC_QUERY, false, 0); + send(nodes, new GridQueryCancelRequest(qryReqId)); // dropTable(r.conn, tbl.getName()); TODO } @@ -269,6 +273,34 @@ public class GridReduceQueryExecutor { } /** + * @param nodes Nodes. + * @param msg Message. + * @throws IgniteCheckedException If failed. + */ + private void send(Collection<ClusterNode> nodes, Message msg) throws IgniteCheckedException { + for (ClusterNode node : nodes) { + if (node.isLocal()) { + ArrayList<ClusterNode> remotes = new ArrayList<>(nodes.size() - 1); + + for (ClusterNode node0 : nodes) { + if (node0 != node) + remotes.add(node0); + } + + ctx.io().send(remotes, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.PUBLIC_POOL); + + // Local node goes the last to allow parallel execution. + h2.mapQueryExecutor().onMessage(ctx.localNodeId(), msg); + + return; + } + } + + // All the given nodes are remotes. + ctx.io().send(nodes, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.PUBLIC_POOL); + } + + /** * @param conn Connection. * @param tblName Table name. * @throws SQLException If failed.