Repository: incubator-ignite Updated Branches: refs/heads/ignite-1239 be97bdee1 -> a235c1ebb
ignite-1239: fallback retries using a topology version when unreserved exception happened Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a235c1eb Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a235c1eb Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a235c1eb Branch: refs/heads/ignite-1239 Commit: a235c1ebb4e222d17dc48e3b728e256d5e77c32c Parents: be97bde Author: Denis Magda <dma...@gridgain.com> Authored: Fri Aug 14 12:32:37 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Fri Aug 14 12:32:37 2015 +0300 ---------------------------------------------------------------------- .../GridDhtUnreservedPartitionException.java | 15 +++++- .../cache/query/GridCacheQueryAdapter.java | 50 +++++++++++++------- .../cache/query/GridCacheQueryManager.java | 3 +- 3 files changed, 49 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a235c1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnreservedPartitionException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnreservedPartitionException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnreservedPartitionException.java index e64019b..d824a47 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnreservedPartitionException.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnreservedPartitionException.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import org.apache.ignite.*; +import org.apache.ignite.internal.processors.affinity.*; /** * Exception that is thrown when a partition reservation failed. @@ -29,14 +30,19 @@ public class GridDhtUnreservedPartitionException extends IgniteCheckedException /** Partition. */ private final int part; + /** Topology version. */ + private final AffinityTopologyVersion topVer; + /** * @param part Partition. + * @param topVer Affinity topology version. * @param msg Message. */ - public GridDhtUnreservedPartitionException(int part, String msg) { + public GridDhtUnreservedPartitionException(int part, AffinityTopologyVersion topVer, String msg) { super(msg); this.part = part; + this.topVer = topVer; } /** @@ -46,6 +52,13 @@ public class GridDhtUnreservedPartitionException extends IgniteCheckedException return part; } + /** + * @return Affinity topology version. + */ + public AffinityTopologyVersion topologyVersion() { + return topVer; + } + /** {@inheritDoc} */ @Override public String toString() { return getClass() + " [part=" + part + ", msg=" + getMessage() + ']'; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a235c1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index d4d9d00..5253835 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -458,7 +458,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { return (CacheQueryFuture<R>)(loc ? qryMgr.queryFieldsLocal(bean) : qryMgr.queryFieldsDistributed(bean, nodes)); else if (type == SCAN && part != null && nodes.size() > 1) - return new CacheQueryFallbackFuture<>(nodes, bean, qryMgr); + return new CacheQueryFallbackFuture<>(nodes, part, bean, qryMgr, cctx); else return (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes)); } @@ -557,8 +557,11 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { /** Backups. */ private volatile Queue<ClusterNode> nodes; - /** Backups that failed with {@link GridDhtUnreservedPartitionException}. */ - private volatile Collection<ClusterNode> unreservedNodes; + /** Topology version of the last detected {@link GridDhtUnreservedPartitionException}. */ + private volatile AffinityTopologyVersion unreservedTopVer; + + /** Number of times to retry the query on the nodes failed with {@link GridDhtUnreservedPartitionException}. */ + private volatile int unreservedNodesRetryCnt = 5; /** Bean. */ private final GridCacheQueryBean bean; @@ -566,19 +569,26 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { /** Query manager. */ private final GridCacheQueryManager qryMgr; - /** Number of times to retry the query on the nodes failed with {@link GridDhtUnreservedPartitionException}. */ - private volatile int unreservedNodesRetryCnt = 5; + /** Cache context. */ + private final GridCacheContext cctx; + + /** Partition. */ + private final int part; /** * @param nodes Backups. + * @param part Partition. * @param bean Bean. * @param qryMgr Query manager. + * @param cctx Cache context. */ - public CacheQueryFallbackFuture(Collection<ClusterNode> nodes, GridCacheQueryBean bean, - GridCacheQueryManager qryMgr) { + public CacheQueryFallbackFuture(Collection<ClusterNode> nodes, int part, GridCacheQueryBean bean, + GridCacheQueryManager qryMgr, GridCacheContext cctx) { this.nodes = fallbacks(nodes); this.bean = bean; this.qryMgr = qryMgr; + this.cctx = cctx; + this.part = part; init(); } @@ -622,23 +632,29 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { catch (IgniteCheckedException e) { if (e.getCause() != null && e.getCause().getClass() == GridDhtUnreservedPartitionException.class) { - // The race is impossible here because fallback queries are executed one by one. - // Volatile guarantees visibility. - if (unreservedNodes == null) - unreservedNodes = new ArrayList<>(nodes.size() + 1); + unreservedTopVer = ((GridDhtUnreservedPartitionException)e.getCause()).topologyVersion(); - unreservedNodes.add(node); + assert unreservedTopVer != null; } if (F.isEmpty(nodes)) { - if (unreservedNodes != null && --unreservedNodesRetryCnt > 0) { - assert unreservedNodes.size() > 0; + final AffinityTopologyVersion topVer = unreservedTopVer; + + if (topVer != null && --unreservedNodesRetryCnt > 0) { + + cctx.affinity().affinityReadyFuture(topVer).listen( + new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply( + IgniteInternalFuture<AffinityTopologyVersion> future) { - nodes = fallbacks(unreservedNodes); + nodes = fallbacks(cctx.topology().owners(part, topVer)); - unreservedNodes = null; + // Race is impossible here because query retries are executed one by one. + unreservedTopVer = null; - init(); + init(); + } + }); } else onDone(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a235c1eb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 5c1aa4e..bfe5ecc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -795,7 +795,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte // double check for owning state if (locPart == null || locPart.state() != OWNING || !locPart.reserve() || locPart.state() != OWNING) - throw new GridDhtUnreservedPartitionException(part, "Partition can not be reserved"); + throw new GridDhtUnreservedPartitionException(part, + cctx.affinity().affinityTopologyVersion(), "Partition can not be reserved"); iter = new Iterator<K>() { private Iterator<KeyCacheObject> iter0 = locPart.keySet().iterator();