Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-sprint-6 4cc376bed -> de5318960


ignite-484 - fixes + tests enabled


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5e877cc7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5e877cc7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5e877cc7

Branch: refs/heads/ignite-sprint-6
Commit: 5e877cc75d75c63240296e1e17ba517805a30827
Parents: f2b96e0
Author: S.Vladykin <svlady...@gridgain.com>
Authored: Tue May 19 22:43:43 2015 +0300
Committer: S.Vladykin <svlady...@gridgain.com>
Committed: Tue May 19 22:43:43 2015 +0300

----------------------------------------------------------------------
 .../h2/twostep/GridMergeIndexUnsorted.java      |   2 +-
 .../h2/twostep/GridReduceQueryExecutor.java     | 122 ++++++++++++++-----
 .../query/h2/twostep/GridResultPage.java        |   2 +-
 .../IgniteCacheQueryNodeRestartSelfTest.java    |  21 +++-
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +-
 5 files changed, 111 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e877cc7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
index 76a52e9..fdee17a 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
@@ -60,7 +60,7 @@ public class GridMergeIndexUnsorted extends GridMergeIndex {
             Iterator<Value[]> iter = Collections.emptyIterator();
 
             @Override public boolean hasNext() {
-                while (!iter.hasNext()){
+                while (!iter.hasNext()) {
                     GridResultPage page;
 
                     try {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e877cc7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 76de71b..bb6801c 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -145,7 +145,8 @@ public class GridReduceQueryExecutor {
                 for (QueryRun r : runs.values()) {
                     for (GridMergeTable tbl : r.tbls) {
                         if (tbl.getScanIndex(null).hasSource(nodeId)) {
-                            fail(r, nodeId, "Node left the topology.");
+                            // Will attempt to retry. If reduce query was 
started it will fail on next page fetching.
+                            retry(r, topologyVersion(), nodeId);
 
                             break;
                         }
@@ -201,15 +202,8 @@ public class GridReduceQueryExecutor {
      * @param msg Error message.
      */
     private void fail(QueryRun r, UUID nodeId, String msg) {
-        if (r != null) {
-            r.rmtErr = new CacheException("Failed to execute map query on the 
node: " + nodeId + ", " + msg);
-
-            while(r.latch.getCount() != 0)
-                r.latch.countDown();
-
-            for (GridMergeTable tbl : r.tbls)
-                tbl.getScanIndex(null).fail(nodeId);
-        }
+        if (r != null)
+            r.state(new CacheException("Failed to execute map query on the 
node: " + nodeId + ", " + msg), nodeId);
     }
 
     /**
@@ -234,8 +228,16 @@ public class GridReduceQueryExecutor {
         try {
             page = new GridResultPage(ctx, node.id(), msg, false) {
                 @Override public void fetchNextPage() {
-                    if (r.rmtErr != null)
-                        throw new CacheException("Next page fetch failed.", 
r.rmtErr);
+                    Object errState = r.state.get();
+
+                    if (errState != null) {
+                        CacheException e = new CacheException("Failed to fetch 
data from node: " + node.id());
+
+                        if (errState instanceof CacheException)
+                            e.addSuppressed((Throwable)errState);
+
+                        throw e;
+                    }
 
                     try {
                         GridQueryNextPageRequest msg0 = new 
GridQueryNextPageRequest(qryReqId, qry, pageSize);
@@ -261,17 +263,29 @@ public class GridReduceQueryExecutor {
 
         idx.addPage(page);
 
-        if (msg.retry() != null) {
-            r.retry = msg.retry();
-
-            while (r.latch.getCount() != 0)
-                r.latch.countDown();
-        }
+        if (msg.retry() != null)
+            retry(r, msg.retry(), node.id());
         else if (msg.allRows() != -1) // Only the first page contains row 
count.
             r.latch.countDown();
     }
 
     /**
+     * @param r Query run.
+     * @param retryVer Retry version.
+     * @param nodeId Node ID.
+     */
+    private void retry(QueryRun r, AffinityTopologyVersion retryVer, UUID 
nodeId) {
+        r.state(retryVer, nodeId);
+    }
+
+    /**
+     * @return Current topology version.
+     */
+    private AffinityTopologyVersion topologyVersion() {
+        return ctx.discovery().topologyVersionEx();
+    }
+
+    /**
      * @param cctx Cache context.
      * @param qry Query.
      * @return Cursor.
@@ -290,10 +304,13 @@ public class GridReduceQueryExecutor {
 
             r.conn = (JdbcConnection)h2.connectionForSpace(space);
 
-            AffinityTopologyVersion topVer = 
ctx.discovery().topologyVersionEx();
+            AffinityTopologyVersion topVer = topologyVersion();
 
             Collection<ClusterNode> nodes = 
ctx.discovery().cacheAffinityNodes(space, topVer);
 
+            if (F.isEmpty(nodes))
+                throw new CacheException("No data nodes found for cache: " + 
space);
+
             if (cctx.isReplicated() || qry.explain()) {
                 assert qry.explain() || 
!nodes.contains(ctx.cluster().get().localNode()) : "We must be on a client 
node.";
 
@@ -342,17 +359,37 @@ public class GridReduceQueryExecutor {
                         mapQry.marshallParams(m);
                 }
 
-                send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, 
mapQrys, topVer,
-                    extraSpaces(space, qry.spaces())));
+                boolean ok = false;
+
+                try {
+                    send(nodes, new GridQueryRequest(qryReqId, r.pageSize, 
space, mapQrys, topVer,
+                        extraSpaces(space, qry.spaces())));
+
+                    ok = true;
+                }
+                catch (IgniteCheckedException e) {
+                    U.warn(log, "Failed to send query request to nodes: " + 
nodes);
+                }
+
+                AffinityTopologyVersion retry = null;
 
-                U.await(r.latch);
+                if (ok) { // Sent successfully.
+                    U.await(r.latch);
 
-                if (r.rmtErr != null)
-                    throw new CacheException("Failed to run map query 
remotely.", r.rmtErr);
+                    Object state = r.state.get();
 
-                ResultSet res = null;
+                    if (state != null) {
+                        if (state instanceof CacheException)
+                            throw new CacheException("Failed to run map query 
remotely.", (CacheException)state);
 
-                AffinityTopologyVersion retry = r.retry;
+                        if (state instanceof AffinityTopologyVersion)
+                            retry = (AffinityTopologyVersion)state; // Remote 
nodes can ask us to retry.
+                    }
+                }
+                else  // Send failed -> retry.
+                    retry = topologyVersion();
+
+                ResultSet res = null;
 
                 if (retry == null) {
                     if (qry.explain())
@@ -364,8 +401,14 @@ public class GridReduceQueryExecutor {
                 }
 
                 for (GridMergeTable tbl : r.tbls) {
-                    if (!tbl.getScanIndex(null).fetchedAll()) // We have to 
explicitly cancel queries on remote nodes.
-                        send(nodes, new GridQueryCancelRequest(qryReqId));
+                    if (!tbl.getScanIndex(null).fetchedAll()) { // We have to 
explicitly cancel queries on remote nodes.
+                        try {
+                            send(nodes, new GridQueryCancelRequest(qryReqId));
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.warn(log, "Failed to send cancel request to 
nodes: " + nodes);
+                        }
+                    }
 
 //                dropTable(r.conn, tbl.getName()); TODO
                 }
@@ -697,11 +740,26 @@ public class GridReduceQueryExecutor {
         /** */
         private int pageSize;
 
-        /** */
-        private volatile CacheException rmtErr;
+        /** Can be either CacheException in case of error or 
AffinityTopologyVersion to retry if needed. */
+        private final AtomicReference<Object> state = new AtomicReference<>();
 
-        /** */
-        private volatile AffinityTopologyVersion retry;
+        /**
+         * @param o Fail state object.
+         * @param nodeId Node ID.
+         */
+        void state(Object o, UUID nodeId) {
+            assert o != null;
+            assert o instanceof CacheException || o instanceof 
AffinityTopologyVersion : o.getClass();
+
+            if (!state.compareAndSet(null, o))
+                return;
+
+            while (latch.getCount() != 0) // We don't need to wait for all 
nodes to reply.
+                latch.countDown();
+
+            for (GridMergeTable tbl : tbls) // Fail all merge indexes.
+                tbl.getScanIndex(null).fail(nodeId);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e877cc7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
index 9392fd1..35bfab9 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
@@ -161,7 +161,7 @@ public class GridResultPage {
      * Request next page.
      */
     public void fetchNextPage() {
-        throw new UnsupportedOperationException();
+        throw new CacheException("Failed to fetch data from node: " + src);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e877cc7/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
index 5dce126..4edef55 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
@@ -78,6 +78,7 @@ public class IgniteCacheQueryNodeRestartSelfTest extends 
GridCacheAbstractSelfTe
         cc.setBackups(1);
         
cc.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
         cc.setAtomicityMode(TRANSACTIONAL);
+        cc.setRebalanceMode(CacheRebalanceMode.SYNC);
         cc.setIndexedTypes(
             Integer.class, Integer.class
         );
@@ -106,7 +107,7 @@ public class IgniteCacheQueryNodeRestartSelfTest extends 
GridCacheAbstractSelfTe
         for (int i = 0; i < KEY_CNT; i++)
             cache.put(i, i);
 
-        assertEquals(KEY_CNT, cache.localSize());
+        assertEquals(KEY_CNT, cache.size());
 
         final AtomicInteger qryCnt = new AtomicInteger();
 
@@ -116,9 +117,23 @@ public class IgniteCacheQueryNodeRestartSelfTest extends 
GridCacheAbstractSelfTe
             @Override public void applyx() throws IgniteCheckedException {
                 while (!done.get()) {
                     Collection<Cache.Entry<Integer, Integer>> res =
-                        cache.query(new SqlQuery(Integer.class, "_val >= 
0")).getAll();
+                        cache.query(new SqlQuery<Integer, 
Integer>(Integer.class, "true")).getAll();
 
-                    assertFalse(res.isEmpty());
+                    Set<Integer> keys = new HashSet<>();
+
+                    for (Cache.Entry<Integer,Integer> entry : res)
+                        keys.add(entry.getKey());
+
+                    if (KEY_CNT > keys.size()) {
+                        for (int i = 0; i < KEY_CNT; i++) {
+                            if (!keys.contains(i))
+                                assertEquals(Integer.valueOf(i), cache.get(i));
+                        }
+
+                        fail("res size: " + res.size());
+                    }
+
+                    assertEquals(KEY_CNT, keys.size());
 
                     int c = qryCnt.incrementAndGet();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e877cc7/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index ce05980..915b118 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -64,7 +64,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         
suite.addTestSuite(IgniteCacheQueryOffheapEvictsMultiThreadedSelfTest.class);
         suite.addTestSuite(IgniteCacheSqlQueryMultiThreadedSelfTest.class);
         
suite.addTestSuite(IgniteCacheOffheapTieredMultithreadedSelfTest.class);
-//        suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class); TODO 
IGNITE-484
+        suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class);
         suite.addTestSuite(GridCacheReduceQueryMultithreadedSelfTest.class);
         suite.addTestSuite(GridCacheCrossCacheQuerySelfTest.class);
         suite.addTestSuite(GridCacheQuerySerializationSelfTest.class);

Reply via email to