ignite-484-1 - more fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ae3279a3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ae3279a3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ae3279a3 Branch: refs/heads/ignite-484-1 Commit: ae3279a37011a72d16af76d5e8f78cec0671cd3c Parents: 02e8afa Author: S.Vladykin <svlady...@gridgain.com> Authored: Tue Jun 9 02:18:04 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Tue Jun 9 02:18:04 2015 +0300 ---------------------------------------------------------------------- .../dht/preloader/GridDhtPartitionMap.java | 26 ++++++++-- .../processors/query/h2/IgniteH2Indexing.java | 11 +++++ .../query/h2/twostep/GridMapQueryExecutor.java | 22 +++------ .../h2/twostep/GridReduceQueryExecutor.java | 51 +++++++++----------- 4 files changed, 64 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae3279a3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java index facf7e3..7b720a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java @@ -23,6 +23,8 @@ import org.apache.ignite.internal.util.typedef.internal.*; import java.io.*; import java.util.*; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; + /** * Partition map. */ @@ -39,6 +41,9 @@ public class GridDhtPartitionMap implements Comparable<GridDhtPartitionMap>, Ext /** */ private Map<Integer, GridDhtPartitionState> map; + /** */ + private volatile int moving; + /** * @param nodeId Node ID. * @param updateSeq Update sequence number. @@ -72,7 +77,7 @@ public class GridDhtPartitionMap implements Comparable<GridDhtPartitionMap>, Ext GridDhtPartitionState state = e.getValue(); if (!onlyActive || state.active()) - map.put(e.getKey(), state); + put(e.getKey(), state); } } @@ -88,7 +93,22 @@ public class GridDhtPartitionMap implements Comparable<GridDhtPartitionMap>, Ext * @param state Partition state. */ public void put(Integer part, GridDhtPartitionState state) { - map.put(part, state); + GridDhtPartitionState old = map.put(part, state); + + if (old == MOVING) + moving--; + + if (state == MOVING) + moving++; + } + + /** + * @return {@code true} If partition map contains moving partitions. + */ + public boolean hasMovingPartitions() { + assert moving >= 0 : moving; + + return moving != 0; } /** @@ -214,7 +234,7 @@ public class GridDhtPartitionMap implements Comparable<GridDhtPartitionMap>, Ext int part = entry & 0x3FFF; int ordinal = entry >> 14; - map.put(part, GridDhtPartitionState.fromOrdinal(ordinal)); + put(part, GridDhtPartitionState.fromOrdinal(ordinal)); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae3279a3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 2e6f3db..a476d9e 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -1417,6 +1417,17 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** + * @param topVer Topology version. + * @throws IgniteCheckedException If failed. + */ + public void awaitForReadyTopologyVersion(AffinityTopologyVersion topVer) throws IgniteCheckedException { + IgniteInternalFuture<?> fut = ctx.cache().context().exchange().affinityReadyFuture(topVer); + + if (fut != null) + fut.get(); + } + + /** * Wrapper to store connection and flag is schema set or not. */ private static class ConnectionWrapper { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae3279a3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index c2e9eba..153cb13 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -242,14 +242,10 @@ public class GridMapQueryExecutor { for (int p = 0; p < partsCnt; p++) { GridDhtLocalPartition part = cctx.topology().localPartition(p, topVer, false); - if (part == null) + if (part == null || part.state() != OWNING) return false; - // Await for owning state. - part.owningFuture().get(); - // We don't need to reserve partitions because they will not be evicted in replicated caches. - assert part.state() == OWNING : part.state(); } } else { // Reserve primary partitions for partitioned cache. @@ -262,20 +258,13 @@ public class GridMapQueryExecutor { GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false); - if (part == null || part.state() == RENTING || !part.reserve()) + if (part == null || part.state() != OWNING || !part.reserve()) return false; reserved.add(part); - // Await for owning state. - part.owningFuture().get(); - - if (part.state() != OWNING) { - // We can't be MOVING since owningFuture is done and and can't be EVICTED since reserved. - assert part.state() == RENTING : part.state(); - + if (part.state() != OWNING) return false; - } } } } @@ -533,7 +522,10 @@ public class GridMapQueryExecutor { msg.retry(h2.readyTopologyVersion()); - ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.PUBLIC_POOL); + if (loc) + h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg); + else + ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.PUBLIC_POOL); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae3279a3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 605aa2f..d059d93 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -26,7 +26,6 @@ import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.query.*; @@ -284,23 +283,12 @@ public class GridReduceQueryExecutor { } /** - * @param readyTop Latest ready topology. * @param cctx Cache context for main space. * @param extraSpaces Extra spaces. * @return {@code true} If preloading is active. */ - private boolean isPreloadingActive( - AffinityTopologyVersion readyTop, - final GridCacheContext<?,?> cctx, - List<String> extraSpaces - ) { - AffinityTopologyVersion freshTop = ctx.discovery().topologyVersionEx(); - - int res = readyTop.compareTo(freshTop); - - assert res <= 0 : readyTop + " " + freshTop; - - if (res < 0 || hasMovingPartitions(cctx)) + private boolean isPreloadingActive(final GridCacheContext<?,?> cctx, List<String> extraSpaces) { + if (hasMovingPartitions(cctx)) return true; if (extraSpaces != null) { @@ -320,10 +308,8 @@ public class GridReduceQueryExecutor { GridDhtPartitionFullMap fullMap = cctx.topology().partitionMap(false); for (GridDhtPartitionMap map : fullMap.values()) { - for (GridDhtPartitionState state : map.map().values()) { - if (state == GridDhtPartitionState.MOVING) - return true; - } + if (map.hasMovingPartitions()) + return true; } return false; @@ -375,7 +361,7 @@ public class GridReduceQueryExecutor { nodes.retainAll(extraNodes); if (nodes.isEmpty()) { - if (isPreloadingActive(topVer, cctx, extraSpaces)) + if (isPreloadingActive(cctx, extraSpaces)) return null; // Retry. else throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace + @@ -384,7 +370,7 @@ public class GridReduceQueryExecutor { } else if (!cctx.isReplicated() && extraCctx.isReplicated()) { if (!extraNodes.containsAll(nodes)) - if (isPreloadingActive(topVer, cctx, extraSpaces)) + if (isPreloadingActive(cctx, extraSpaces)) return null; // Retry. else throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace + @@ -392,7 +378,7 @@ public class GridReduceQueryExecutor { } else if (!cctx.isReplicated() && !extraCctx.isReplicated()) { if (extraNodes.size() != nodes.size() || !nodes.containsAll(extraNodes)) - if (isPreloadingActive(topVer, cctx, extraSpaces)) + if (isPreloadingActive(cctx, extraSpaces)) return null; // Retry. else throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace + @@ -434,7 +420,7 @@ public class GridReduceQueryExecutor { // Explicit partition mapping for unstable topology. Map<ClusterNode, IntArray> partsMap = null; - if (isPreloadingActive(topVer, cctx, extraSpaces)) { + if (isPreloadingActive(cctx, extraSpaces)) { if (cctx.isReplicated()) nodes = replicatedDataNodes(cctx, extraSpaces); else { @@ -499,7 +485,7 @@ public class GridReduceQueryExecutor { mapQry.marshallParams(m); } - AffinityTopologyVersion retry = null; + boolean retry = false; if (send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null), partsMap)) { @@ -511,16 +497,21 @@ public class GridReduceQueryExecutor { if (state instanceof CacheException) throw new CacheException("Failed to run map query remotely.", (CacheException)state); - if (state instanceof AffinityTopologyVersion) - retry = (AffinityTopologyVersion)state; // Remote nodes can ask us to retry. + if (state instanceof AffinityTopologyVersion) { + retry = true; + + // If remote node asks us to retry then we have outdated full partition map. + // TODO is this correct way to wait for a new map?? + h2.awaitForReadyTopologyVersion((AffinityTopologyVersion)state); + } } } else // Send failed. - retry = topVer; + retry = true; ResultSet res = null; - if (retry == null) { + if (!retry) { if (qry.explain()) return explainPlan(r.conn, space, qry); @@ -536,8 +527,12 @@ public class GridReduceQueryExecutor { // dropTable(r.conn, tbl.getName()); TODO } - if (retry != null) + if (retry) { + if (Thread.currentThread().isInterrupted()) + throw new IgniteInterruptedCheckedException("Query was interrupted."); + continue; + } return new QueryCursorImpl<>(new GridQueryCacheObjectsIterator(new Iter(res), cctx, cctx.keepPortable())); }