ignite-sql-tests - moved messages to core
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c459f306 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c459f306 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c459f306 Branch: refs/heads/ignite-sql-tests Commit: c459f306e9f9d4646065d38bf6fd421bd59d65b1 Parents: d7cde2b Author: S.Vladykin <svlady...@gridgain.com> Authored: Thu Mar 12 03:13:46 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Thu Mar 12 03:13:46 2015 +0300 ---------------------------------------------------------------------- .../messages/GridQueryCancelRequest.java | 49 +++++ .../twostep/messages/GridQueryFailResponse.java | 64 +++++++ .../messages/GridQueryNextPageRequest.java | 77 ++++++++ .../messages/GridQueryNextPageResponse.java | 128 +++++++++++++ .../h2/twostep/messages/GridQueryRequest.java | 95 ++++++++++ .../query/h2/twostep/GridMapQueryExecutor.java | 60 +++++- .../query/h2/twostep/GridMergeIndex.java | 26 ++- .../h2/twostep/GridMergeIndexUnsorted.java | 7 +- .../h2/twostep/GridReduceQueryExecutor.java | 16 +- .../query/h2/twostep/GridResultPage.java | 33 +++- .../messages/GridQueryCancelRequest.java | 49 ----- .../twostep/messages/GridQueryFailResponse.java | 64 ------- .../messages/GridQueryNextPageRequest.java | 77 -------- .../messages/GridQueryNextPageResponse.java | 181 ------------------- .../h2/twostep/messages/GridQueryRequest.java | 95 ---------- 15 files changed, 535 insertions(+), 486 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c459f306/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java new file mode 100644 index 0000000..9bcb410 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep.messages; + +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * Cancel request. + */ +public class GridQueryCancelRequest implements Serializable { + /** */ + private long qryReqId; + + /** + * @param qryReqId Query request ID. + */ + public GridQueryCancelRequest(long qryReqId) { + this.qryReqId = qryReqId; + } + + /** + * @return Query request ID. + */ + public long queryRequestId() { + return qryReqId; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridQueryCancelRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c459f306/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java new file mode 100644 index 0000000..f551ab8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep.messages; + +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * Error message. + */ +public class GridQueryFailResponse implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long qryReqId; + + /** */ + private String errMsg; + + /** + * @param qryReqId Query request ID. + * @param err Error. + */ + public GridQueryFailResponse(long qryReqId, Throwable err) { + this.qryReqId = qryReqId; + this.errMsg = err.getClass() + ":" + err.getMessage(); + } + + /** + * @return Query request ID. + */ + public long queryRequestId() { + return qryReqId; + } + + /** + * @return Error. + */ + public String error() { + return errMsg; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridQueryFailResponse.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c459f306/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java new file mode 100644 index 0000000..ae7e1e3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep.messages; + + +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * Request to fetch next page. + */ +public class GridQueryNextPageRequest implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long qryReqId; + + /** */ + private int qry; + + /** */ + private int pageSize; + + /** + * @param qryReqId Query request ID. + * @param qry Query. + * @param pageSize Page size. + */ + public GridQueryNextPageRequest(long qryReqId, int qry, int pageSize) { + this.qryReqId = qryReqId; + this.qry = qry; + this.pageSize = pageSize; + } + + /** + * @return Query request ID. + */ + public long queryRequestId() { + return qryReqId; + } + + /** + * @return Query. + */ + public int query() { + return qry; + } + + /** + * @return Page size. + */ + public int pageSize() { + return pageSize; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridQueryNextPageRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c459f306/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java new file mode 100644 index 0000000..3c4cc94 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep.messages; + +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * Next page response. + */ +public class GridQueryNextPageResponse implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long qryReqId; + + /** */ + private int qry; + + /** */ + private int page; + + /** */ + private int allRows; + + /** */ + private byte[] rows; + + /** + * For {@link Externalizable}. + */ + public GridQueryNextPageResponse() { + // No-op. + } + + /** + * @param qryReqId Query request ID. + * @param qry Query. + * @param page Page. + * @param allRows All rows count. + * @param rows Rows. + */ + public GridQueryNextPageResponse(long qryReqId, int qry, int page, int allRows, + byte[] rows) { + assert rows != null; + + this.qryReqId = qryReqId; + this.qry = qry; + this.page = page; + this.allRows = allRows; + this.rows = rows; + } + + /** + * @return Query request ID. + */ + public long queryRequestId() { + return qryReqId; + } + + /** + * @return Query. + */ + public int query() { + return qry; + } + + /** + * @return Page. + */ + public int page() { + return page; + } + + /** + * @return All rows. + */ + public int allRows() { + return allRows; + } + + /** + * @return Rows. + */ + public byte[] rows() { + return rows; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeLong(qryReqId); + out.writeInt(qry); + out.writeInt(page); + out.writeInt(allRows); + U.writeByteArray(out, rows); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + qryReqId = in.readLong(); + qry = in.readInt(); + page = in.readInt(); + allRows = in.readInt(); + rows = U.readByteArray(in); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridQueryNextPageResponse.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c459f306/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java new file mode 100644 index 0000000..2834b84 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep.messages; + +import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +/** + * Query request. + */ +public class GridQueryRequest implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long reqId; + + /** */ + private int pageSize; + + /** */ + private String space; + + /** */ + @GridToStringInclude + private Collection<GridCacheSqlQuery> qrys; + + /** + * @param reqId Request ID. + * @param pageSize Page size. + * @param space Space. + * @param qrys Queries. + */ + public GridQueryRequest(long reqId, int pageSize, String space, Collection<GridCacheSqlQuery> qrys) { + this.reqId = reqId; + this.pageSize = pageSize; + this.space = space; + + assert qrys instanceof Serializable; + + this.qrys = qrys; + } + + /** + * @return Request ID. + */ + public long requestId() { + return reqId; + } + + /** + * @return Page size. + */ + public int pageSize() { + return pageSize; + } + + /** + * @return Space. + */ + public String space() { + return space; + } + + /** + * @return Queries. + */ + public Collection<GridCacheSqlQuery> queries() { + return qrys; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridQueryRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c459f306/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 44e8b3d..5b2a0ab 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.h2.jdbc.*; import org.h2.result.*; +import org.h2.store.*; import org.h2.value.*; import org.jdk8.backport.*; @@ -284,7 +285,8 @@ public class GridMapQueryExecutor { try { ctx.io().sendUserMessage(F.asList(node), - new GridQueryNextPageResponse(qr.qryReqId, qry, page, page == 0 ? res.rowCount : -1, rows), + new GridQueryNextPageResponse(qr.qryReqId, qry, page, page == 0 ? res.rowCount : -1, + marshallRows(rows)), GridTopic.TOPIC_QUERY, false, 0); } catch (IgniteCheckedException e) { @@ -295,6 +297,62 @@ public class GridMapQueryExecutor { } /** + * @param bytes Bytes. + * @return Rows. + */ + public static List<Value[]> unmarshallRows(byte[] bytes) { + Data data = Data.create(null, bytes); + + int rowCnt = data.readVarInt(); + + if (rowCnt == 0) + return Collections.emptyList(); + + ArrayList<Value[]> rows = new ArrayList<>(rowCnt); + + int cols = data.readVarInt(); + + for (int r = 0; r < rowCnt; r++) { + Value[] row = new Value[cols]; + + for (int c = 0; c < cols; c++) + row[c] = data.readValue(); + + rows.add(row); + } + + return rows; + } + + /** + * @param rows Rows. + * @return Bytes. + */ + public static byte[] marshallRows(Collection<Value[]> rows) { + Data data = Data.create(null, 256); + + data.writeVarInt(rows.size()); + + boolean first = true; + + for (Value[] row : rows) { + if (first) { + data.writeVarInt(row.length); + + first = false; + } + + for (Value val : row) { + data.checkCapacity(data.getValueLen(val)); + + data.writeValue(val); + } + } + + return Arrays.copyOf(data.getBytes(), data.length()); + } + + /** * */ private class QueryResults { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c459f306/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java index b301622..f6989ae 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java @@ -35,9 +35,6 @@ import java.util.concurrent.atomic.*; */ public abstract class GridMergeIndex extends BaseIndex { /** */ - protected static final GridResultPage END = new GridResultPage(null, null); - - /** */ private static final int MAX_FETCH_SIZE = 100000; // TODO configure /** All rows number. */ @@ -46,6 +43,9 @@ public abstract class GridMergeIndex extends BaseIndex { /** Remaining rows per source node ID. */ private final ConcurrentMap<UUID, Counter> remainingRows = new ConcurrentHashMap8<>(); + /** */ + private final AtomicBoolean lastSubmitted = new AtomicBoolean(); + /** * Will be r/w from query execution thread only, does not need to be threadsafe. */ @@ -83,7 +83,7 @@ public abstract class GridMergeIndex extends BaseIndex { * @param page Page. */ public final void addPage(GridResultPage page) { - int pageRowsCnt = page.response().rows().size(); + int pageRowsCnt = page.rows().size(); if (pageRowsCnt != 0) addPage0(page); @@ -104,12 +104,20 @@ public abstract class GridMergeIndex extends BaseIndex { } if (cnt.addAndGet(-pageRowsCnt) == 0) { // Result can be negative in case of race between messages, it is ok. - for (Counter c : remainingRows.values()) { - if (c.get() != 0 || !c.initialized) - return; + boolean last = true; + + for (Counter c : remainingRows.values()) { // Check all the sources. + if (c.get() != 0 || !c.initialized) { + last = false; + + break; + } } - addPage0(END); // We've fetched all. + if (last) + last = lastSubmitted.compareAndSet(false, true); + + addPage0(new GridResultPage(page.source(), null, last)); } } @@ -122,8 +130,6 @@ public abstract class GridMergeIndex extends BaseIndex { * @param page Page. */ protected void fetchNextPage(GridResultPage page) { - assert page != END; - if (remainingRows.get(page.source()).get() != 0) page.fetchNextPage(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c459f306/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java index d36ea58..44faea1 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java @@ -44,7 +44,8 @@ public class GridMergeIndexUnsorted extends GridMergeIndex { /** {@inheritDoc} */ @Override protected void addPage0(GridResultPage page) { - queue.add(page); + if (page.rows() != null || page.isLast()) // We are not interested in terminating pages which are not last. + queue.add(page); } /** {@inheritDoc} */ @@ -71,7 +72,7 @@ public class GridMergeIndexUnsorted extends GridMergeIndex { throw new IgniteException("Query execution was interrupted.", e); } - if (page == END) { + if (page.isLast()) { assert queue.isEmpty() : "It must be the last page: " + queue; return false; // We are done. @@ -79,7 +80,7 @@ public class GridMergeIndexUnsorted extends GridMergeIndex { fetchNextPage(page); - iter = page.response().rows().iterator(); + iter = page.rows().iterator(); assert iter.hasNext(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c459f306/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 056cce2..4dc9e59 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -147,7 +147,7 @@ public class GridReduceQueryExecutor { QueryRun r = runs.get(msg.queryRequestId()); if (r != null && r.latch.getCount() != 0) { - r.rmtErr = new CacheException("Failed to execute map query on the node: " + node.id(), msg.error()); + r.rmtErr = new CacheException("Failed to execute map query on the node: " + node.id() + "\n " + msg.error()); while(r.latch.getCount() > 0) r.latch.countDown(); @@ -161,16 +161,17 @@ public class GridReduceQueryExecutor { private void onNextPage(final ClusterNode node, GridQueryNextPageResponse msg) { final long qryReqId = msg.queryRequestId(); final int qry = msg.query(); - final int pageSize = msg.rows().size(); - QueryRun r = runs.get(qryReqId); + final QueryRun r = runs.get(qryReqId); if (r == null) // Already finished with error or canceled. return; + final int pageSize = r.pageSize; + GridMergeIndex idx = r.tbls.get(msg.query()).getScanIndex(null); - idx.addPage(new GridResultPage(node.id(), msg) { + idx.addPage(new GridResultPage(node.id(), msg, false) { @Override public void fetchNextPage() { try { ctx.io().sendUserMessage(F.asList(node), new GridQueryNextPageRequest(qryReqId, qry, pageSize), @@ -196,6 +197,8 @@ public class GridReduceQueryExecutor { QueryRun r = new QueryRun(); + r.pageSize = 1000; // TODO configure correctly page size + r.tbls = new ArrayList<>(qry.mapQueries().size()); r.conn = h2.connectionForSpace(space); @@ -228,7 +231,7 @@ public class GridReduceQueryExecutor { runs.put(qryReqId, r); try { - ctx.io().sendUserMessage(nodes, new GridQueryRequest(qryReqId, 1000, space, qry.mapQueries()), // TODO conf page size + ctx.io().sendUserMessage(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, qry.mapQueries()), GridTopic.TOPIC_QUERY, false, 0); r.latch.await(); @@ -393,6 +396,9 @@ public class GridReduceQueryExecutor { private Connection conn; /** */ + private int pageSize; + + /** */ private volatile Throwable rmtErr; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c459f306/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java index 7f7d572..a903f01 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep; import org.apache.ignite.internal.processors.query.h2.twostep.messages.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.h2.value.*; import java.util.*; @@ -32,13 +33,43 @@ public class GridResultPage { /** */ protected final GridQueryNextPageResponse res; + /** */ + private final Collection<Value[]> rows; + + /** */ + private final boolean last; + /** * @param src Source. * @param res Response. + * @param last If this is the globally last page. */ - protected GridResultPage(UUID src, GridQueryNextPageResponse res) { + protected GridResultPage(UUID src, GridQueryNextPageResponse res, boolean last) { + assert src != null; + this.src = src; this.res = res; + this.last = last; + + if (last) + assert res == null : "The last page must be dummy."; + + // res == null means that it is a terminating dummy page for the given source node ID. + rows = res == null ? null : GridMapQueryExecutor.unmarshallRows(res.rows()); + } + + /** + * @return {@code true} If this is a dummy last page for all the sources. + */ + public boolean isLast() { + return last; + } + + /** + * @return Rows. + */ + public Collection<Value[]> rows() { + return rows; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c459f306/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java deleted file mode 100644 index 9bcb410..0000000 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryCancelRequest.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.h2.twostep.messages; - -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; - -/** - * Cancel request. - */ -public class GridQueryCancelRequest implements Serializable { - /** */ - private long qryReqId; - - /** - * @param qryReqId Query request ID. - */ - public GridQueryCancelRequest(long qryReqId) { - this.qryReqId = qryReqId; - } - - /** - * @return Query request ID. - */ - public long queryRequestId() { - return qryReqId; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridQueryCancelRequest.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c459f306/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java deleted file mode 100644 index 3251677..0000000 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryFailResponse.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.h2.twostep.messages; - -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; - -/** - * Error message. - */ -public class GridQueryFailResponse implements Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private long qryReqId; - - /** */ - private Throwable err; - - /** - * @param qryReqId Query request ID. - * @param err Error. - */ - public GridQueryFailResponse(long qryReqId, Throwable err) { - this.qryReqId = qryReqId; - this.err = err; - } - - /** - * @return Query request ID. - */ - public long queryRequestId() { - return qryReqId; - } - - /** - * @return Error. - */ - public Throwable error() { - return err; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridQueryFailResponse.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c459f306/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java deleted file mode 100644 index ae7e1e3..0000000 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.h2.twostep.messages; - - -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; - -/** - * Request to fetch next page. - */ -public class GridQueryNextPageRequest implements Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private long qryReqId; - - /** */ - private int qry; - - /** */ - private int pageSize; - - /** - * @param qryReqId Query request ID. - * @param qry Query. - * @param pageSize Page size. - */ - public GridQueryNextPageRequest(long qryReqId, int qry, int pageSize) { - this.qryReqId = qryReqId; - this.qry = qry; - this.pageSize = pageSize; - } - - /** - * @return Query request ID. - */ - public long queryRequestId() { - return qryReqId; - } - - /** - * @return Query. - */ - public int query() { - return qry; - } - - /** - * @return Page size. - */ - public int pageSize() { - return pageSize; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridQueryNextPageRequest.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c459f306/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java deleted file mode 100644 index 468691d..0000000 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.h2.twostep.messages; - -import org.apache.ignite.internal.util.typedef.internal.*; -import org.h2.store.*; -import org.h2.value.*; - -import java.io.*; -import java.util.*; - -/** - * Next page response. - */ -public class GridQueryNextPageResponse implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private long qryReqId; - - /** */ - private int qry; - - /** */ - private int page; - - /** */ - private int allRows; - - /** */ - private Collection<Value[]> rows; - - /** - * For {@link Externalizable}. - */ - public GridQueryNextPageResponse() { - // No-op. - } - - /** - * @param qryReqId Query request ID. - * @param qry Query. - * @param page Page. - * @param allRows All rows count. - * @param rows Rows. - */ - public GridQueryNextPageResponse(long qryReqId, int qry, int page, int allRows, - Collection<Value[]> rows) { - assert rows != null; - - this.qryReqId = qryReqId; - this.qry = qry; - this.page = page; - this.allRows = allRows; - this.rows = rows; - } - - /** - * @return Query request ID. - */ - public long queryRequestId() { - return qryReqId; - } - - /** - * @return Query. - */ - public int query() { - return qry; - } - - /** - * @return Page. - */ - public int page() { - return page; - } - - /** - * @return All rows. - */ - public int allRows() { - return allRows; - } - - /** - * @return Rows. - */ - public Collection<Value[]> rows() { - return rows; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeLong(qryReqId); - out.writeInt(qry); - out.writeInt(page); - out.writeInt(allRows); - - out.writeInt(rows.size()); - - if (rows.isEmpty()) - return; - - Data data = Data.create(null, 512); - - boolean first = true; - - for (Value[] row : rows) { - if (first) { - out.writeInt(row.length); - - first = false; - } - - for (Value val : row) { - data.checkCapacity(data.getValueLen(val)); - - data.writeValue(val); - } - } - - out.writeInt(data.length()); - out.write(data.getBytes(), 0, data.length()); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - qryReqId = in.readLong(); - qry = in.readInt(); - page = in.readInt(); - allRows = in.readInt(); - - int rowCnt = in.readInt(); - - if (rowCnt == 0) - rows = Collections.emptyList(); - else { - rows = new ArrayList<>(rowCnt); - - int cols = in.readInt(); - int dataSize = in.readInt(); - - byte[] dataBytes = new byte[dataSize]; - - in.readFully(dataBytes); - - Data data = Data.create(null, dataBytes); - - for (int r = 0; r < rowCnt; r++) { - Value[] row = new Value[cols]; - - for (int c = 0; c < cols; c++) - row[c] = data.readValue(); - - rows.add(row); - } - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridQueryNextPageResponse.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c459f306/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java deleted file mode 100644 index 2834b84..0000000 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.query.h2.twostep.messages; - -import org.apache.ignite.internal.processors.cache.query.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.util.*; - -/** - * Query request. - */ -public class GridQueryRequest implements Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private long reqId; - - /** */ - private int pageSize; - - /** */ - private String space; - - /** */ - @GridToStringInclude - private Collection<GridCacheSqlQuery> qrys; - - /** - * @param reqId Request ID. - * @param pageSize Page size. - * @param space Space. - * @param qrys Queries. - */ - public GridQueryRequest(long reqId, int pageSize, String space, Collection<GridCacheSqlQuery> qrys) { - this.reqId = reqId; - this.pageSize = pageSize; - this.space = space; - - assert qrys instanceof Serializable; - - this.qrys = qrys; - } - - /** - * @return Request ID. - */ - public long requestId() { - return reqId; - } - - /** - * @return Page size. - */ - public int pageSize() { - return pageSize; - } - - /** - * @return Space. - */ - public String space() { - return space; - } - - /** - * @return Queries. - */ - public Collection<GridCacheSqlQuery> queries() { - return qrys; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridQueryRequest.class, this); - } -}