Repository: incubator-ignite Updated Branches: refs/heads/ignite-sql-tests 334b66808 -> cef213040
ignite-sql-tests - last page flag removed Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d7cde2b8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d7cde2b8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d7cde2b8 Branch: refs/heads/ignite-sql-tests Commit: d7cde2b8eaf65b6a5962c0c4bc69031d52a58a49 Parents: 334b668 Author: S.Vladykin <svlady...@gridgain.com> Authored: Thu Mar 12 01:36:27 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Thu Mar 12 01:36:27 2015 +0300 ---------------------------------------------------------------------- .../query/h2/twostep/GridMapQueryExecutor.java | 2 +- .../query/h2/twostep/GridMergeIndex.java | 85 +++++++++++++------- .../h2/twostep/GridMergeIndexUnsorted.java | 8 +- .../h2/twostep/GridReduceQueryExecutor.java | 19 ++--- .../query/h2/twostep/GridResultPage.java | 12 +-- .../messages/GridQueryNextPageResponse.java | 16 +--- .../cache/GridCacheCrossCacheQuerySelfTest.java | 3 +- 7 files changed, 80 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7cde2b8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index ed0da58..44e8b3d 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -284,7 +284,7 @@ public class GridMapQueryExecutor { try { ctx.io().sendUserMessage(F.asList(node), - new GridQueryNextPageResponse(qr.qryReqId, qry, page, page == 0 ? res.rowCount : -1, last, rows), + new GridQueryNextPageResponse(qr.qryReqId, qry, page, page == 0 ? res.rowCount : -1, rows), GridTopic.TOPIC_QUERY, false, 0); } catch (IgniteCheckedException e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7cde2b8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java index 4632e0b..b301622 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java @@ -23,9 +23,11 @@ import org.h2.index.*; import org.h2.message.*; import org.h2.result.*; import org.h2.table.*; +import org.jdk8.backport.*; import org.jetbrains.annotations.*; import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.*; /** @@ -33,16 +35,16 @@ import java.util.concurrent.atomic.*; */ public abstract class GridMergeIndex extends BaseIndex { /** */ - protected final GridResultPage<?> END = new GridResultPage<Object>(null, null); + protected static final GridResultPage END = new GridResultPage(null, null); /** */ - private static final int MAX_FETCH_SIZE = 100000; + private static final int MAX_FETCH_SIZE = 100000; // TODO configure - /** */ - private final AtomicInteger cnt = new AtomicInteger(0); + /** All rows number. */ + private final AtomicInteger rowsCnt = new AtomicInteger(0); - /** Result sources. */ - private final AtomicInteger srcs = new AtomicInteger(0); + /** Remaining rows per source node ID. */ + private final ConcurrentMap<UUID, Counter> remainingRows = new ConcurrentHashMap8<>(); /** * Will be r/w from query execution thread only, does not need to be threadsafe. @@ -61,7 +63,7 @@ public abstract class GridMergeIndex extends BaseIndex { /** {@inheritDoc} */ @Override public long getRowCount(Session session) { - return cnt.get(); + return rowsCnt.get(); } /** {@inheritDoc} */ @@ -70,42 +72,61 @@ public abstract class GridMergeIndex extends BaseIndex { } /** - * @param srcs Number of sources. - */ - public void setNumberOfSources(int srcs) { - this.srcs.set(srcs); - } - - /** - * @param cnt Count. + * @param nodeId Node ID. */ - public void addCount(int cnt) { - this.cnt.addAndGet(cnt); + public void addSource(UUID nodeId) { + if (remainingRows.put(nodeId, new Counter()) != null) + throw new IllegalStateException(); } /** * @param page Page. */ - public final void addPage(GridResultPage<?> page) { - if (!page.response().rows().isEmpty()) + public final void addPage(GridResultPage page) { + int pageRowsCnt = page.response().rows().size(); + + if (pageRowsCnt != 0) addPage0(page); - else - assert page.response().isLast(); - if (page.response().isLast()) { - int srcs0 = srcs.decrementAndGet(); + Counter cnt = remainingRows.get(page.source()); - assert srcs0 >= 0 : srcs0; + int allRows = page.response().allRows(); + + if (allRows != -1) { // Only the first page contains allRows count and is allowed to init counter. + assert !cnt.initialized : "Counter is already initialized."; + + cnt.addAndGet(allRows); + rowsCnt.addAndGet(allRows); + + // We need this separate flag to handle case when the first source contains only one page + // and it will signal that all remaining counters are zero and fetch is finished. + cnt.initialized = true; + } + + if (cnt.addAndGet(-pageRowsCnt) == 0) { // Result can be negative in case of race between messages, it is ok. + for (Counter c : remainingRows.values()) { + if (c.get() != 0 || !c.initialized) + return; + } - if (srcs0 == 0) - addPage0(END); // We've fetched all. + addPage0(END); // We've fetched all. } } /** * @param page Page. */ - protected abstract void addPage0(GridResultPage<?> page); + protected abstract void addPage0(GridResultPage page); + + /** + * @param page Page. + */ + protected void fetchNextPage(GridResultPage page) { + assert page != END; + + if (remainingRows.get(page.source()).get() != 0) + page.fetchNextPage(); + } /** {@inheritDoc} */ @Override public Cursor find(Session session, SearchRow first, SearchRow last) { @@ -122,7 +143,7 @@ public abstract class GridMergeIndex extends BaseIndex { * @return {@code true} If we have fetched all the remote rows. */ public boolean fetchedAll() { - return fetched.size() == cnt.get(); + return fetched.size() == rowsCnt.get(); } /** @@ -300,4 +321,12 @@ public abstract class GridMergeIndex extends BaseIndex { throw new UnsupportedOperationException(); } } + + /** + * Counter with initialization flag. + */ + private static class Counter extends AtomicInteger { + /** */ + volatile boolean initialized; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7cde2b8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java index 81ba6c7..d36ea58 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java @@ -32,7 +32,7 @@ import java.util.concurrent.*; */ public class GridMergeIndexUnsorted extends GridMergeIndex { /** */ - private final BlockingQueue<GridResultPage<?>> queue = new LinkedBlockingQueue<>(); + private final BlockingQueue<GridResultPage> queue = new LinkedBlockingQueue<>(); /** * @param tbl Table. @@ -43,7 +43,7 @@ public class GridMergeIndexUnsorted extends GridMergeIndex { } /** {@inheritDoc} */ - @Override public void addPage0(GridResultPage<?> page) { + @Override protected void addPage0(GridResultPage page) { queue.add(page); } @@ -62,7 +62,7 @@ public class GridMergeIndexUnsorted extends GridMergeIndex { if (iter.hasNext()) return true; - GridResultPage<?> page; + GridResultPage page; try { page = queue.take(); @@ -77,7 +77,7 @@ public class GridMergeIndexUnsorted extends GridMergeIndex { return false; // We are done. } - page.fetchNextPage(); + fetchNextPage(page); iter = page.response().rows().iterator(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7cde2b8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 8101463..056cce2 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -170,17 +170,8 @@ public class GridReduceQueryExecutor { GridMergeIndex idx = r.tbls.get(msg.query()).getScanIndex(null); - if (msg.allRows() != -1) { // Only the first page contains row count. - idx.addCount(msg.allRows()); - - r.latch.countDown(); - } - - idx.addPage(new GridResultPage<UUID>(node.id(), msg) { + idx.addPage(new GridResultPage(node.id(), msg) { @Override public void fetchNextPage() { - if (res.isLast()) - return; // No-op if this message known to be the last. - try { ctx.io().sendUserMessage(F.asList(node), new GridQueryNextPageRequest(qryReqId, qry, pageSize), GridTopic.TOPIC_QUERY, false, 0); @@ -190,6 +181,9 @@ public class GridReduceQueryExecutor { } } }); + + if (msg.allRows() != -1) // Only the first page contains row count. + r.latch.countDown(); } /** @@ -219,7 +213,10 @@ public class GridReduceQueryExecutor { throw new IgniteException(e); } - tbl.getScanIndex(null).setNumberOfSources(nodes.size()); + GridMergeIndex idx = tbl.getScanIndex(null); + + for (ClusterNode node : nodes) + idx.addSource(node.id()); r.tbls.add(tbl); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7cde2b8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java index a7b3d6f..7f7d572 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java @@ -20,12 +20,14 @@ package org.apache.ignite.internal.processors.query.h2.twostep; import org.apache.ignite.internal.processors.query.h2.twostep.messages.*; import org.apache.ignite.internal.util.typedef.internal.*; +import java.util.*; + /** * Page result. */ -public class GridResultPage<Z> { +public class GridResultPage { /** */ - private final Z src; + private final UUID src; /** */ protected final GridQueryNextPageResponse res; @@ -34,15 +36,15 @@ public class GridResultPage<Z> { * @param src Source. * @param res Response. */ - protected GridResultPage(Z src, GridQueryNextPageResponse res) { + protected GridResultPage(UUID src, GridQueryNextPageResponse res) { this.src = src; this.res = res; } /** - * @return Result source. + * @return Result source node ID. */ - public Z source() { + public UUID source() { return src; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7cde2b8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java index 6ea8d08..468691d 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java @@ -46,9 +46,6 @@ public class GridQueryNextPageResponse implements Externalizable { /** */ private Collection<Value[]> rows; - /** */ - private boolean last; - /** * For {@link Externalizable}. */ @@ -61,10 +58,9 @@ public class GridQueryNextPageResponse implements Externalizable { * @param qry Query. * @param page Page. * @param allRows All rows count. - * @param last Last row. * @param rows Rows. */ - public GridQueryNextPageResponse(long qryReqId, int qry, int page, int allRows, boolean last, + public GridQueryNextPageResponse(long qryReqId, int qry, int page, int allRows, Collection<Value[]> rows) { assert rows != null; @@ -72,7 +68,6 @@ public class GridQueryNextPageResponse implements Externalizable { this.qry = qry; this.page = page; this.allRows = allRows; - this.last = last; this.rows = rows; } @@ -105,13 +100,6 @@ public class GridQueryNextPageResponse implements Externalizable { } /** - * @return {@code true} If this is the last page. - */ - public boolean isLast() { - return last; - } - - /** * @return Rows. */ public Collection<Value[]> rows() { @@ -123,7 +111,6 @@ public class GridQueryNextPageResponse implements Externalizable { out.writeLong(qryReqId); out.writeInt(qry); out.writeInt(page); - out.writeBoolean(last); out.writeInt(allRows); out.writeInt(rows.size()); @@ -158,7 +145,6 @@ public class GridQueryNextPageResponse implements Externalizable { qryReqId = in.readLong(); qry = in.readInt(); page = in.readInt(); - last = in.readBoolean(); allRows = in.readInt(); int rowCnt = in.readInt(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7cde2b8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java index b3db3bc..46b3b14 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java @@ -157,6 +157,8 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { set1.add((Integer)o.get(0)); } + assertFalse(set1.isEmpty()); + Set<Integer> set0 = new HashSet<>(); X.println("___ GROUP BY"); @@ -168,7 +170,6 @@ public class GridCacheCrossCacheQuerySelfTest extends GridCommonAbstractTest { assertTrue(set0.add((Integer) o.get(0))); } - assertFalse(set1.isEmpty()); assertEquals(set0, set1); X.println("___ GROUP BY AVG MIN MAX SUM COUNT(*) COUNT(x)");