Repository: incubator-ignite Updated Branches: refs/heads/ignite-484 35008b671 -> f2b96e0b5
ignite-484 - retry with topology version Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9c7274b0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9c7274b0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9c7274b0 Branch: refs/heads/ignite-484 Commit: 9c7274b0ac33936893bd532b70b57df5c668eb3f Parents: 35008b6 Author: S.Vladykin <svlady...@gridgain.com> Authored: Wed May 13 12:41:51 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Wed May 13 12:41:51 2015 +0300 ---------------------------------------------------------------------- .../messages/GridQueryNextPageResponse.java | 43 +++++++++----------- .../processors/query/h2/IgniteH2Indexing.java | 13 ++++++ .../query/h2/twostep/GridMapQueryExecutor.java | 17 ++------ .../h2/twostep/GridReduceQueryExecutor.java | 22 ++++++---- 4 files changed, 48 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c7274b0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java index c2cca75..b881f93 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep.messages; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.plugin.extensions.communication.*; @@ -33,12 +34,6 @@ public class GridQueryNextPageResponse implements Message { private static final long serialVersionUID = 0L; /** */ - public static final byte CODE_OK = 0; - - /** */ - public static final byte CODE_RETRY = -1; - - /** */ private long qryReqId; /** */ @@ -61,8 +56,8 @@ public class GridQueryNextPageResponse implements Message { @GridDirectTransient private transient Collection<?> plainRows; - /** Response code. */ - private byte code = CODE_OK; + /** */ + private AffinityTopologyVersion retry; /** * For {@link Externalizable}. @@ -95,20 +90,6 @@ public class GridQueryNextPageResponse implements Message { } /** - * @return Response code. - */ - public byte code() { - return code; - } - - /** - * @param code Response code. - */ - public void code(byte code) { - this.code = code; - } - - /** * @return Query request ID. */ public long queryRequestId() { @@ -211,7 +192,7 @@ public class GridQueryNextPageResponse implements Message { writer.incrementState(); case 6: - if (!writer.writeByte("code", code)) + if (!writer.writeMessage("retry", retry)) return false; writer.incrementState(); @@ -277,7 +258,7 @@ public class GridQueryNextPageResponse implements Message { reader.incrementState(); case 6: - code = reader.readByte("code"); + retry = reader.readMessage("retry"); if (!reader.isLastRead()) return false; @@ -298,4 +279,18 @@ public class GridQueryNextPageResponse implements Message { @Override public byte fieldsCount() { return 7; } + + /** + * @return Retry topology version. + */ + public AffinityTopologyVersion retry() { + return retry; + } + + /** + * @param retry Retry topology version. + */ + public void retry(AffinityTopologyVersion retry) { + this.retry = retry; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c7274b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 44db280..31a33ef 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -1379,6 +1379,19 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** + * @param topVer Topology version. + * @throws IgniteCheckedException If failed. + */ + public void awaitForCacheAffinity(AffinityTopologyVersion topVer) throws IgniteCheckedException { + assert topVer != null; + + IgniteInternalFuture<?> fut = ctx.cache().context().exchange().affinityReadyFuture(topVer); + + if (fut != null) + fut.get(); + } + + /** * Wrapper to store connection and flag is schema set or not. */ private static class ConnectionWrapper { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c7274b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index d01a8a4..39595b1 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -216,21 +216,10 @@ public class GridMapQueryExecutor { } /** - * @param topVer Topology version. - * @throws IgniteCheckedException If failed. - */ - private void awaitForCacheAffinity(AffinityTopologyVersion topVer) throws IgniteCheckedException { - IgniteInternalFuture<?> fut = ctx.cache().context().exchange().affinityReadyFuture(topVer); - - if (fut != null) - fut.get(); - } - - /** * @param cacheNames Cache names. * @param topVer Topology version. * @param reserved Reserved list. - * @return {@code true} If all the needed partitions succesfuly reserved. + * @return {@code true} If all the needed partitions successfully reserved. * @throws IgniteCheckedException If failed. */ private boolean reservePartitions(Collection<String> cacheNames, AffinityTopologyVersion topVer, @@ -297,7 +286,7 @@ public class GridMapQueryExecutor { if (topVer != null) { // Await all caches to be deployed on this node and all the needed topology changes to arrive. - awaitForCacheAffinity(topVer); + h2.awaitForCacheAffinity(topVer); // Reserve primary partitions. if (!reservePartitions(F.concat(true, req.space(), req.extraSpaces()), topVer, reserved)) { @@ -476,7 +465,7 @@ public class GridMapQueryExecutor { loc ? null : Collections.<Message>emptyList(), loc ? Collections.<Value[]>emptyList() : null); - msg.code(GridQueryNextPageResponse.CODE_RETRY); + msg.retry(ctx.discovery().topologyVersionEx()); ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.PUBLIC_POOL); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c7274b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 68c7048..76de71b 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -261,10 +261,13 @@ public class GridReduceQueryExecutor { idx.addPage(page); - if (msg.code() == GridQueryNextPageResponse.CODE_RETRY) - r.retry = true; + if (msg.retry() != null) { + r.retry = msg.retry(); - if (msg.allRows() != -1) // Only the first page contains row count. + while (r.latch.getCount() != 0) + r.latch.countDown(); + } + else if (msg.allRows() != -1) // Only the first page contains row count. r.latch.countDown(); } @@ -274,7 +277,7 @@ public class GridReduceQueryExecutor { * @return Cursor. */ public QueryCursor<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry) { - for (int attempt = 0;; attempt++) { + for (;;) { long qryReqId = reqIdGen.incrementAndGet(); QueryRun r = new QueryRun(); @@ -349,7 +352,9 @@ public class GridReduceQueryExecutor { ResultSet res = null; - if (!r.retry) { + AffinityTopologyVersion retry = r.retry; + + if (retry == null) { if (qry.explain()) return explainPlan(r.conn, space, qry); @@ -365,9 +370,8 @@ public class GridReduceQueryExecutor { // dropTable(r.conn, tbl.getName()); TODO } - if (r.retry) { - if (attempt > 0) - U.sleep(attempt * 10); + if (retry != null) { + h2.awaitForCacheAffinity(retry); continue; } @@ -697,7 +701,7 @@ public class GridReduceQueryExecutor { private volatile CacheException rmtErr; /** */ - private volatile boolean retry; + private volatile AffinityTopologyVersion retry; } /**