Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-sql-tests 334b66808 -> cef213040


ignite-sql-tests - last page flag removed


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d7cde2b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d7cde2b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d7cde2b8

Branch: refs/heads/ignite-sql-tests
Commit: d7cde2b8eaf65b6a5962c0c4bc69031d52a58a49
Parents: 334b668
Author: S.Vladykin <svlady...@gridgain.com>
Authored: Thu Mar 12 01:36:27 2015 +0300
Committer: S.Vladykin <svlady...@gridgain.com>
Committed: Thu Mar 12 01:36:27 2015 +0300

----------------------------------------------------------------------
 .../query/h2/twostep/GridMapQueryExecutor.java  |  2 +-
 .../query/h2/twostep/GridMergeIndex.java        | 85 +++++++++++++-------
 .../h2/twostep/GridMergeIndexUnsorted.java      |  8 +-
 .../h2/twostep/GridReduceQueryExecutor.java     | 19 ++---
 .../query/h2/twostep/GridResultPage.java        | 12 +--
 .../messages/GridQueryNextPageResponse.java     | 16 +---
 .../cache/GridCacheCrossCacheQuerySelfTest.java |  3 +-
 7 files changed, 80 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7cde2b8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index ed0da58..44e8b3d 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -284,7 +284,7 @@ public class GridMapQueryExecutor {
 
         try {
             ctx.io().sendUserMessage(F.asList(node),
-                new GridQueryNextPageResponse(qr.qryReqId, qry, page, page == 
0 ? res.rowCount : -1, last, rows),
+                new GridQueryNextPageResponse(qr.qryReqId, qry, page, page == 
0 ? res.rowCount : -1, rows),
                 GridTopic.TOPIC_QUERY, false, 0);
         }
         catch (IgniteCheckedException e) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7cde2b8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
index 4632e0b..b301622 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
@@ -23,9 +23,11 @@ import org.h2.index.*;
 import org.h2.message.*;
 import org.h2.result.*;
 import org.h2.table.*;
+import org.jdk8.backport.*;
 import org.jetbrains.annotations.*;
 
 import java.util.*;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
 /**
@@ -33,16 +35,16 @@ import java.util.concurrent.atomic.*;
  */
 public abstract class GridMergeIndex extends BaseIndex {
     /** */
-    protected final GridResultPage<?> END = new GridResultPage<Object>(null, 
null);
+    protected static final GridResultPage END = new GridResultPage(null, null);
 
     /** */
-    private static final int MAX_FETCH_SIZE = 100000;
+    private static final int MAX_FETCH_SIZE = 100000; // TODO configure
 
-    /** */
-    private final AtomicInteger cnt = new AtomicInteger(0);
+    /** All rows number. */
+    private final AtomicInteger rowsCnt = new AtomicInteger(0);
 
-    /** Result sources. */
-    private final AtomicInteger srcs = new AtomicInteger(0);
+    /** Remaining rows per source node ID. */
+    private final ConcurrentMap<UUID, Counter> remainingRows = new 
ConcurrentHashMap8<>();
 
     /**
      * Will be r/w from query execution thread only, does not need to be 
threadsafe.
@@ -61,7 +63,7 @@ public abstract class GridMergeIndex extends BaseIndex {
 
     /** {@inheritDoc} */
     @Override public long getRowCount(Session session) {
-        return cnt.get();
+        return rowsCnt.get();
     }
 
     /** {@inheritDoc} */
@@ -70,42 +72,61 @@ public abstract class GridMergeIndex extends BaseIndex {
     }
 
     /**
-     * @param srcs Number of sources.
-     */
-    public void setNumberOfSources(int srcs) {
-        this.srcs.set(srcs);
-    }
-
-    /**
-     * @param cnt Count.
+     * @param nodeId Node ID.
      */
-    public void addCount(int cnt) {
-        this.cnt.addAndGet(cnt);
+    public void addSource(UUID nodeId) {
+        if (remainingRows.put(nodeId, new Counter()) != null)
+            throw new IllegalStateException();
     }
 
     /**
      * @param page Page.
      */
-    public final void addPage(GridResultPage<?> page) {
-        if (!page.response().rows().isEmpty())
+    public final void addPage(GridResultPage page) {
+        int pageRowsCnt = page.response().rows().size();
+
+        if (pageRowsCnt != 0)
             addPage0(page);
-        else
-            assert page.response().isLast();
 
-        if (page.response().isLast()) {
-            int srcs0 = srcs.decrementAndGet();
+        Counter cnt = remainingRows.get(page.source());
 
-            assert srcs0 >= 0 : srcs0;
+        int allRows = page.response().allRows();
+
+        if (allRows != -1) { // Only the first page contains allRows count and 
is allowed to init counter.
+            assert !cnt.initialized : "Counter is already initialized.";
+
+            cnt.addAndGet(allRows);
+            rowsCnt.addAndGet(allRows);
+
+            // We need this separate flag to handle case when the first source 
contains only one page
+            // and it will signal that all remaining counters are zero and 
fetch is finished.
+            cnt.initialized = true;
+        }
+
+        if (cnt.addAndGet(-pageRowsCnt) == 0) { // Result can be negative in 
case of race between messages, it is ok.
+            for (Counter c : remainingRows.values()) {
+                if (c.get() != 0 || !c.initialized)
+                    return;
+            }
 
-            if (srcs0 == 0)
-                addPage0(END); // We've fetched all.
+            addPage0(END); // We've fetched all.
         }
     }
 
     /**
      * @param page Page.
      */
-    protected abstract void addPage0(GridResultPage<?> page);
+    protected abstract void addPage0(GridResultPage page);
+
+    /**
+     * @param page Page.
+     */
+    protected void fetchNextPage(GridResultPage page) {
+        assert page != END;
+
+        if (remainingRows.get(page.source()).get() != 0)
+            page.fetchNextPage();
+    }
 
     /** {@inheritDoc} */
     @Override public Cursor find(Session session, SearchRow first, SearchRow 
last) {
@@ -122,7 +143,7 @@ public abstract class GridMergeIndex extends BaseIndex {
      * @return {@code true} If we have fetched all the remote rows.
      */
     public boolean fetchedAll() {
-        return fetched.size() == cnt.get();
+        return fetched.size() == rowsCnt.get();
     }
 
     /**
@@ -300,4 +321,12 @@ public abstract class GridMergeIndex extends BaseIndex {
             throw new UnsupportedOperationException();
         }
     }
+
+    /**
+     * Counter with initialization flag.
+     */
+    private static class Counter extends AtomicInteger {
+        /** */
+        volatile boolean initialized;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7cde2b8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
index 81ba6c7..d36ea58 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
@@ -32,7 +32,7 @@ import java.util.concurrent.*;
  */
 public class GridMergeIndexUnsorted extends GridMergeIndex {
     /** */
-    private final BlockingQueue<GridResultPage<?>> queue = new 
LinkedBlockingQueue<>();
+    private final BlockingQueue<GridResultPage> queue = new 
LinkedBlockingQueue<>();
 
     /**
      * @param tbl  Table.
@@ -43,7 +43,7 @@ public class GridMergeIndexUnsorted extends GridMergeIndex {
     }
 
     /** {@inheritDoc} */
-    @Override public void addPage0(GridResultPage<?> page) {
+    @Override protected void addPage0(GridResultPage page) {
         queue.add(page);
     }
 
@@ -62,7 +62,7 @@ public class GridMergeIndexUnsorted extends GridMergeIndex {
                 if (iter.hasNext())
                     return true;
 
-                GridResultPage<?> page;
+                GridResultPage page;
 
                 try {
                     page = queue.take();
@@ -77,7 +77,7 @@ public class GridMergeIndexUnsorted extends GridMergeIndex {
                     return false; // We are done.
                 }
 
-                page.fetchNextPage();
+                fetchNextPage(page);
 
                 iter = page.response().rows().iterator();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7cde2b8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 8101463..056cce2 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -170,17 +170,8 @@ public class GridReduceQueryExecutor {
 
         GridMergeIndex idx = r.tbls.get(msg.query()).getScanIndex(null);
 
-        if (msg.allRows() != -1) { // Only the first page contains row count.
-            idx.addCount(msg.allRows());
-
-            r.latch.countDown();
-        }
-
-        idx.addPage(new GridResultPage<UUID>(node.id(), msg) {
+        idx.addPage(new GridResultPage(node.id(), msg) {
             @Override public void fetchNextPage() {
-                if (res.isLast())
-                    return; // No-op if this message known to be the last.
-
                 try {
                     ctx.io().sendUserMessage(F.asList(node), new 
GridQueryNextPageRequest(qryReqId, qry, pageSize),
                         GridTopic.TOPIC_QUERY, false, 0);
@@ -190,6 +181,9 @@ public class GridReduceQueryExecutor {
                 }
             }
         });
+
+        if (msg.allRows() != -1) // Only the first page contains row count.
+            r.latch.countDown();
     }
 
     /**
@@ -219,7 +213,10 @@ public class GridReduceQueryExecutor {
                 throw new IgniteException(e);
             }
 
-            tbl.getScanIndex(null).setNumberOfSources(nodes.size());
+            GridMergeIndex idx = tbl.getScanIndex(null);
+
+            for (ClusterNode node : nodes)
+                idx.addSource(node.id());
 
             r.tbls.add(tbl);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7cde2b8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
index a7b3d6f..7f7d572 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
@@ -20,12 +20,14 @@ package 
org.apache.ignite.internal.processors.query.h2.twostep;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 
+import java.util.*;
+
 /**
  * Page result.
  */
-public class GridResultPage<Z> {
+public class GridResultPage {
     /** */
-    private final Z src;
+    private final UUID src;
 
     /** */
     protected final GridQueryNextPageResponse res;
@@ -34,15 +36,15 @@ public class GridResultPage<Z> {
      * @param src Source.
      * @param res Response.
      */
-    protected GridResultPage(Z src, GridQueryNextPageResponse res) {
+    protected GridResultPage(UUID src, GridQueryNextPageResponse res) {
         this.src = src;
         this.res = res;
     }
 
     /**
-     * @return Result source.
+     * @return Result source node ID.
      */
-    public Z source() {
+    public UUID source() {
         return src;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7cde2b8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
index 6ea8d08..468691d 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
@@ -46,9 +46,6 @@ public class GridQueryNextPageResponse implements 
Externalizable {
     /** */
     private Collection<Value[]> rows;
 
-    /** */
-    private boolean last;
-
     /**
      * For {@link Externalizable}.
      */
@@ -61,10 +58,9 @@ public class GridQueryNextPageResponse implements 
Externalizable {
      * @param qry Query.
      * @param page Page.
      * @param allRows All rows count.
-     * @param last Last row.
      * @param rows Rows.
      */
-    public GridQueryNextPageResponse(long qryReqId, int qry, int page, int 
allRows, boolean last,
+    public GridQueryNextPageResponse(long qryReqId, int qry, int page, int 
allRows,
         Collection<Value[]> rows) {
         assert rows != null;
 
@@ -72,7 +68,6 @@ public class GridQueryNextPageResponse implements 
Externalizable {
         this.qry = qry;
         this.page = page;
         this.allRows = allRows;
-        this.last = last;
         this.rows = rows;
     }
 
@@ -105,13 +100,6 @@ public class GridQueryNextPageResponse implements 
Externalizable {
     }
 
     /**
-     * @return {@code true} If this is the last page.
-     */
-    public boolean isLast() {
-        return last;
-    }
-
-    /**
      * @return Rows.
      */
     public Collection<Value[]> rows() {
@@ -123,7 +111,6 @@ public class GridQueryNextPageResponse implements 
Externalizable {
         out.writeLong(qryReqId);
         out.writeInt(qry);
         out.writeInt(page);
-        out.writeBoolean(last);
         out.writeInt(allRows);
 
         out.writeInt(rows.size());
@@ -158,7 +145,6 @@ public class GridQueryNextPageResponse implements 
Externalizable {
         qryReqId = in.readLong();
         qry = in.readInt();
         page = in.readInt();
-        last = in.readBoolean();
         allRows = in.readInt();
 
         int rowCnt = in.readInt();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d7cde2b8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
index b3db3bc..46b3b14 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheCrossCacheQuerySelfTest.java
@@ -157,6 +157,8 @@ public class GridCacheCrossCacheQuerySelfTest extends 
GridCommonAbstractTest {
             set1.add((Integer)o.get(0));
         }
 
+        assertFalse(set1.isEmpty());
+
         Set<Integer> set0 = new HashSet<>();
 
         X.println("___ GROUP BY");
@@ -168,7 +170,6 @@ public class GridCacheCrossCacheQuerySelfTest extends 
GridCommonAbstractTest {
             assertTrue(set0.add((Integer) o.get(0)));
         }
 
-        assertFalse(set1.isEmpty());
         assertEquals(set0, set1);
 
         X.println("___ GROUP BY AVG MIN MAX SUM COUNT(*) COUNT(x)");

Reply via email to