Repository: incubator-ignite Updated Branches: refs/heads/ignite-sprint-6 4cc376bed -> de5318960
ignite-484 - fixes + tests enabled Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5e877cc7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5e877cc7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5e877cc7 Branch: refs/heads/ignite-sprint-6 Commit: 5e877cc75d75c63240296e1e17ba517805a30827 Parents: f2b96e0 Author: S.Vladykin <svlady...@gridgain.com> Authored: Tue May 19 22:43:43 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Tue May 19 22:43:43 2015 +0300 ---------------------------------------------------------------------- .../h2/twostep/GridMergeIndexUnsorted.java | 2 +- .../h2/twostep/GridReduceQueryExecutor.java | 122 ++++++++++++++----- .../query/h2/twostep/GridResultPage.java | 2 +- .../IgniteCacheQueryNodeRestartSelfTest.java | 21 +++- .../IgniteCacheQuerySelfTestSuite.java | 2 +- 5 files changed, 111 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e877cc7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java index 76a52e9..fdee17a 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java @@ -60,7 +60,7 @@ public class GridMergeIndexUnsorted extends GridMergeIndex { Iterator<Value[]> iter = Collections.emptyIterator(); @Override public boolean hasNext() { - while (!iter.hasNext()){ + while (!iter.hasNext()) { GridResultPage page; try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e877cc7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 76de71b..bb6801c 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -145,7 +145,8 @@ public class GridReduceQueryExecutor { for (QueryRun r : runs.values()) { for (GridMergeTable tbl : r.tbls) { if (tbl.getScanIndex(null).hasSource(nodeId)) { - fail(r, nodeId, "Node left the topology."); + // Will attempt to retry. If reduce query was started it will fail on next page fetching. + retry(r, topologyVersion(), nodeId); break; } @@ -201,15 +202,8 @@ public class GridReduceQueryExecutor { * @param msg Error message. */ private void fail(QueryRun r, UUID nodeId, String msg) { - if (r != null) { - r.rmtErr = new CacheException("Failed to execute map query on the node: " + nodeId + ", " + msg); - - while(r.latch.getCount() != 0) - r.latch.countDown(); - - for (GridMergeTable tbl : r.tbls) - tbl.getScanIndex(null).fail(nodeId); - } + if (r != null) + r.state(new CacheException("Failed to execute map query on the node: " + nodeId + ", " + msg), nodeId); } /** @@ -234,8 +228,16 @@ public class GridReduceQueryExecutor { try { page = new GridResultPage(ctx, node.id(), msg, false) { @Override public void fetchNextPage() { - if (r.rmtErr != null) - throw new CacheException("Next page fetch failed.", r.rmtErr); + Object errState = r.state.get(); + + if (errState != null) { + CacheException e = new CacheException("Failed to fetch data from node: " + node.id()); + + if (errState instanceof CacheException) + e.addSuppressed((Throwable)errState); + + throw e; + } try { GridQueryNextPageRequest msg0 = new GridQueryNextPageRequest(qryReqId, qry, pageSize); @@ -261,17 +263,29 @@ public class GridReduceQueryExecutor { idx.addPage(page); - if (msg.retry() != null) { - r.retry = msg.retry(); - - while (r.latch.getCount() != 0) - r.latch.countDown(); - } + if (msg.retry() != null) + retry(r, msg.retry(), node.id()); else if (msg.allRows() != -1) // Only the first page contains row count. r.latch.countDown(); } /** + * @param r Query run. + * @param retryVer Retry version. + * @param nodeId Node ID. + */ + private void retry(QueryRun r, AffinityTopologyVersion retryVer, UUID nodeId) { + r.state(retryVer, nodeId); + } + + /** + * @return Current topology version. + */ + private AffinityTopologyVersion topologyVersion() { + return ctx.discovery().topologyVersionEx(); + } + + /** * @param cctx Cache context. * @param qry Query. * @return Cursor. @@ -290,10 +304,13 @@ public class GridReduceQueryExecutor { r.conn = (JdbcConnection)h2.connectionForSpace(space); - AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx(); + AffinityTopologyVersion topVer = topologyVersion(); Collection<ClusterNode> nodes = ctx.discovery().cacheAffinityNodes(space, topVer); + if (F.isEmpty(nodes)) + throw new CacheException("No data nodes found for cache: " + space); + if (cctx.isReplicated() || qry.explain()) { assert qry.explain() || !nodes.contains(ctx.cluster().get().localNode()) : "We must be on a client node."; @@ -342,17 +359,37 @@ public class GridReduceQueryExecutor { mapQry.marshallParams(m); } - send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, - extraSpaces(space, qry.spaces()))); + boolean ok = false; + + try { + send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, + extraSpaces(space, qry.spaces()))); + + ok = true; + } + catch (IgniteCheckedException e) { + U.warn(log, "Failed to send query request to nodes: " + nodes); + } + + AffinityTopologyVersion retry = null; - U.await(r.latch); + if (ok) { // Sent successfully. + U.await(r.latch); - if (r.rmtErr != null) - throw new CacheException("Failed to run map query remotely.", r.rmtErr); + Object state = r.state.get(); - ResultSet res = null; + if (state != null) { + if (state instanceof CacheException) + throw new CacheException("Failed to run map query remotely.", (CacheException)state); - AffinityTopologyVersion retry = r.retry; + if (state instanceof AffinityTopologyVersion) + retry = (AffinityTopologyVersion)state; // Remote nodes can ask us to retry. + } + } + else // Send failed -> retry. + retry = topologyVersion(); + + ResultSet res = null; if (retry == null) { if (qry.explain()) @@ -364,8 +401,14 @@ public class GridReduceQueryExecutor { } for (GridMergeTable tbl : r.tbls) { - if (!tbl.getScanIndex(null).fetchedAll()) // We have to explicitly cancel queries on remote nodes. - send(nodes, new GridQueryCancelRequest(qryReqId)); + if (!tbl.getScanIndex(null).fetchedAll()) { // We have to explicitly cancel queries on remote nodes. + try { + send(nodes, new GridQueryCancelRequest(qryReqId)); + } + catch (IgniteCheckedException e) { + U.warn(log, "Failed to send cancel request to nodes: " + nodes); + } + } // dropTable(r.conn, tbl.getName()); TODO } @@ -697,11 +740,26 @@ public class GridReduceQueryExecutor { /** */ private int pageSize; - /** */ - private volatile CacheException rmtErr; + /** Can be either CacheException in case of error or AffinityTopologyVersion to retry if needed. */ + private final AtomicReference<Object> state = new AtomicReference<>(); - /** */ - private volatile AffinityTopologyVersion retry; + /** + * @param o Fail state object. + * @param nodeId Node ID. + */ + void state(Object o, UUID nodeId) { + assert o != null; + assert o instanceof CacheException || o instanceof AffinityTopologyVersion : o.getClass(); + + if (!state.compareAndSet(null, o)) + return; + + while (latch.getCount() != 0) // We don't need to wait for all nodes to reply. + latch.countDown(); + + for (GridMergeTable tbl : tbls) // Fail all merge indexes. + tbl.getScanIndex(null).fail(nodeId); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e877cc7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java index 9392fd1..35bfab9 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java @@ -161,7 +161,7 @@ public class GridResultPage { * Request next page. */ public void fetchNextPage() { - throw new UnsupportedOperationException(); + throw new CacheException("Failed to fetch data from node: " + src); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e877cc7/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java index 5dce126..4edef55 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java @@ -78,6 +78,7 @@ public class IgniteCacheQueryNodeRestartSelfTest extends GridCacheAbstractSelfTe cc.setBackups(1); cc.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); cc.setAtomicityMode(TRANSACTIONAL); + cc.setRebalanceMode(CacheRebalanceMode.SYNC); cc.setIndexedTypes( Integer.class, Integer.class ); @@ -106,7 +107,7 @@ public class IgniteCacheQueryNodeRestartSelfTest extends GridCacheAbstractSelfTe for (int i = 0; i < KEY_CNT; i++) cache.put(i, i); - assertEquals(KEY_CNT, cache.localSize()); + assertEquals(KEY_CNT, cache.size()); final AtomicInteger qryCnt = new AtomicInteger(); @@ -116,9 +117,23 @@ public class IgniteCacheQueryNodeRestartSelfTest extends GridCacheAbstractSelfTe @Override public void applyx() throws IgniteCheckedException { while (!done.get()) { Collection<Cache.Entry<Integer, Integer>> res = - cache.query(new SqlQuery(Integer.class, "_val >= 0")).getAll(); + cache.query(new SqlQuery<Integer, Integer>(Integer.class, "true")).getAll(); - assertFalse(res.isEmpty()); + Set<Integer> keys = new HashSet<>(); + + for (Cache.Entry<Integer,Integer> entry : res) + keys.add(entry.getKey()); + + if (KEY_CNT > keys.size()) { + for (int i = 0; i < KEY_CNT; i++) { + if (!keys.contains(i)) + assertEquals(Integer.valueOf(i), cache.get(i)); + } + + fail("res size: " + res.size()); + } + + assertEquals(KEY_CNT, keys.size()); int c = qryCnt.incrementAndGet(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5e877cc7/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index ce05980..915b118 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -64,7 +64,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheQueryOffheapEvictsMultiThreadedSelfTest.class); suite.addTestSuite(IgniteCacheSqlQueryMultiThreadedSelfTest.class); suite.addTestSuite(IgniteCacheOffheapTieredMultithreadedSelfTest.class); -// suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class); TODO IGNITE-484 + suite.addTestSuite(IgniteCacheQueryNodeRestartSelfTest.class); suite.addTestSuite(GridCacheReduceQueryMultithreadedSelfTest.class); suite.addTestSuite(GridCacheCrossCacheQuerySelfTest.class); suite.addTestSuite(GridCacheQuerySerializationSelfTest.class);