ignite-484-1 - more fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/02e8afa0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/02e8afa0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/02e8afa0 Branch: refs/heads/ignite-sprint-6 Commit: 02e8afa08521f053e785f8dfcd11e586542d04f7 Parents: 3da82e1 Author: S.Vladykin <svlady...@gridgain.com> Authored: Tue Jun 9 00:40:49 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Tue Jun 9 00:40:49 2015 +0300 ---------------------------------------------------------------------- .../h2/twostep/messages/GridQueryRequest.java | 6 +- .../apache/ignite/internal/util/GridDebug.java | 19 ++++++ .../processors/query/h2/IgniteH2Indexing.java | 13 ---- .../query/h2/twostep/GridMapQueryExecutor.java | 32 +++++---- .../h2/twostep/GridReduceQueryExecutor.java | 68 ++++++++++++++------ 5 files changed, 92 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/02e8afa0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java index 6465bbc..47d1f44 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java @@ -53,10 +53,12 @@ public class GridQueryRequest implements Message { private AffinityTopologyVersion topVer; /** */ + @GridToStringInclude @GridDirectCollection(String.class) private List<String> extraSpaces; /** */ + @GridToStringInclude private int[] parts; /** @@ -216,7 +218,7 @@ public class GridQueryRequest implements Message { writer.incrementState(); case 6: - if (!writer.writeIntArray("partitions", parts)) + if (!writer.writeIntArray("parts", parts)) return false; writer.incrementState(); @@ -282,7 +284,7 @@ public class GridQueryRequest implements Message { reader.incrementState(); case 6: - parts = reader.readIntArray("partitions"); + parts = reader.readIntArray("parts"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/02e8afa0/modules/core/src/main/java/org/apache/ignite/internal/util/GridDebug.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridDebug.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridDebug.java index d686ca6..aadec74 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridDebug.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridDebug.java @@ -180,6 +180,25 @@ public class GridDebug { } /** + * Dumps given number of last events. + * + * @param n Number of last elements to dump. + */ + public static void dumpLastAndStop(int n) { + ConcurrentLinkedQueue<Item> q = que.getAndSet(null); + + if (q == null) + return; + + int size = q.size(); + + while (size-- > n) + q.poll(); + + dump(q); + } + + /** * Dump given queue to stdout. * * @param que Queue. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/02e8afa0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index da497a2..2e6f3db 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -1410,19 +1410,6 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** - * @param topVer Topology version. - * @throws IgniteCheckedException If failed. - */ - public void awaitForCacheAffinity(AffinityTopologyVersion topVer) throws IgniteCheckedException { - assert topVer != null; - - IgniteInternalFuture<?> fut = ctx.cache().context().exchange().affinityReadyFuture(topVer); - - if (fut != null) - fut.get(); - } - - /** * @return Ready topology version. */ public AffinityTopologyVersion readyTopologyVersion() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/02e8afa0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index b4d895f..c2e9eba 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -48,6 +48,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; import static org.apache.ignite.events.EventType.*; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.*; /** @@ -222,13 +223,16 @@ public class GridMapQueryExecutor { * @return {@code true} If all the needed partitions successfully reserved. * @throws IgniteCheckedException If failed. */ - private boolean reservePartitions(Collection<String> cacheNames, AffinityTopologyVersion topVer, int[] parts, + private boolean reservePartitions(Collection<String> cacheNames, AffinityTopologyVersion topVer, final int[] parts, List<GridDhtLocalPartition> reserved) throws IgniteCheckedException { Collection<Integer> partIds = parts == null ? null : wrap(parts); for (String cacheName : cacheNames) { GridCacheContext<?,?> cctx = cacheContext(cacheName, topVer); + if (cctx == null) // Cache was not found, probably was not deployed yet. + return false; + if (cctx.isLocal()) continue; @@ -243,6 +247,9 @@ public class GridMapQueryExecutor { // Await for owning state. part.owningFuture().get(); + + // We don't need to reserve partitions because they will not be evicted in replicated caches. + assert part.state() == OWNING : part.state(); } } else { // Reserve primary partitions for partitioned cache. @@ -255,18 +262,20 @@ public class GridMapQueryExecutor { GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false); - if (part != null) { - // Await for owning state. - part.owningFuture().get(); + if (part == null || part.state() == RENTING || !part.reserve()) + return false; - if (part.reserve()) { - reserved.add(part); + reserved.add(part); - continue; - } - } + // Await for owning state. + part.owningFuture().get(); - return false; + if (part.state() != OWNING) { + // We can't be MOVING since owningFuture is done and and can't be EVICTED since reserved. + assert part.state() == RENTING : part.state(); + + return false; + } } } } @@ -345,9 +354,6 @@ public class GridMapQueryExecutor { final AffinityTopologyVersion topVer = req.topologyVersion(); if (topVer != null) { - // Await all caches to be deployed on this node and all the needed topology changes to arrive. - h2.awaitForCacheAffinity(topVer); - // Reserve primary partitions. if (!reservePartitions(caches, topVer, req.partitions(), reserved)) { sendRetry(node, req.requestId()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/02e8afa0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 80f0a18..605aa2f 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -26,6 +26,8 @@ import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.query.*; import org.apache.ignite.internal.processors.query.h2.*; @@ -53,6 +55,7 @@ import javax.cache.*; import java.lang.reflect.*; import java.sql.*; import java.util.*; +import java.util.Set; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -282,16 +285,48 @@ public class GridReduceQueryExecutor { /** * @param readyTop Latest ready topology. + * @param cctx Cache context for main space. + * @param extraSpaces Extra spaces. * @return {@code true} If preloading is active. */ - private boolean isPreloadingActive(AffinityTopologyVersion readyTop) { + private boolean isPreloadingActive( + AffinityTopologyVersion readyTop, + final GridCacheContext<?,?> cctx, + List<String> extraSpaces + ) { AffinityTopologyVersion freshTop = ctx.discovery().topologyVersionEx(); int res = readyTop.compareTo(freshTop); assert res <= 0 : readyTop + " " + freshTop; - return res < 0; + if (res < 0 || hasMovingPartitions(cctx)) + return true; + + if (extraSpaces != null) { + for (String extraSpace : extraSpaces) { + if (hasMovingPartitions(cacheContext(extraSpace))) + return true; + } + } + + return false; + } + + /** + * @return {@code true} If cache context + */ + private boolean hasMovingPartitions(GridCacheContext<?,?> cctx) { + GridDhtPartitionFullMap fullMap = cctx.topology().partitionMap(false); + + for (GridDhtPartitionMap map : fullMap.values()) { + for (GridDhtPartitionState state : map.map().values()) { + if (state == GridDhtPartitionState.MOVING) + return true; + } + } + + return false; } /** @@ -340,7 +375,7 @@ public class GridReduceQueryExecutor { nodes.retainAll(extraNodes); if (nodes.isEmpty()) { - if (isPreloadingActive(topVer)) + if (isPreloadingActive(topVer, cctx, extraSpaces)) return null; // Retry. else throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace + @@ -349,7 +384,7 @@ public class GridReduceQueryExecutor { } else if (!cctx.isReplicated() && extraCctx.isReplicated()) { if (!extraNodes.containsAll(nodes)) - if (isPreloadingActive(topVer)) + if (isPreloadingActive(topVer, cctx, extraSpaces)) return null; // Retry. else throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace + @@ -357,7 +392,7 @@ public class GridReduceQueryExecutor { } else if (!cctx.isReplicated() && !extraCctx.isReplicated()) { if (extraNodes.size() != nodes.size() || !nodes.containsAll(extraNodes)) - if (isPreloadingActive(topVer)) + if (isPreloadingActive(topVer, cctx, extraSpaces)) return null; // Retry. else throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace + @@ -399,7 +434,7 @@ public class GridReduceQueryExecutor { // Explicit partition mapping for unstable topology. Map<ClusterNode, IntArray> partsMap = null; - if (isPreloadingActive(topVer)) { + if (isPreloadingActive(topVer, cctx, extraSpaces)) { if (cctx.isReplicated()) nodes = replicatedDataNodes(cctx, extraSpaces); else { @@ -501,11 +536,8 @@ public class GridReduceQueryExecutor { // dropTable(r.conn, tbl.getName()); TODO } - if (retry != null) { - h2.awaitForCacheAffinity(retry); - + if (retry != null) continue; - } return new QueryCursorImpl<>(new GridQueryCacheObjectsIterator(new Iter(res), cctx, cctx.keepPortable())); } @@ -770,13 +802,13 @@ public class GridReduceQueryExecutor { /** * @param nodes Nodes. * @param msg Message. - * @param gridPartsMap Partitions. + * @param partsMap Partitions. * @return {@code true} If all messages sent successfully. */ private boolean send( Collection<ClusterNode> nodes, Message msg, - Map<ClusterNode,IntArray> gridPartsMap + Map<ClusterNode,IntArray> partsMap ) { boolean locNodeFound = false; @@ -790,7 +822,7 @@ public class GridReduceQueryExecutor { } try { - ctx.io().send(node, GridTopic.TOPIC_QUERY, copy(msg, node, gridPartsMap), GridIoPolicy.PUBLIC_POOL); + ctx.io().send(node, GridTopic.TOPIC_QUERY, copy(msg, node, partsMap), GridIoPolicy.PUBLIC_POOL); } catch (IgniteCheckedException e) { ok = false; @@ -800,7 +832,7 @@ public class GridReduceQueryExecutor { } if (locNodeFound) // Local node goes the last to allow parallel execution. - h2.mapQueryExecutor().onMessage(ctx.localNodeId(), copy(msg, ctx.cluster().get().localNode(), gridPartsMap)); + h2.mapQueryExecutor().onMessage(ctx.localNodeId(), copy(msg, ctx.cluster().get().localNode(), partsMap)); return ok; } @@ -808,16 +840,16 @@ public class GridReduceQueryExecutor { /** * @param msg Message to copy. * @param node Node. - * @param gridPartsMap Partitions map. + * @param partsMap Partitions map. * @return Copy of message with partitions set. */ - private Message copy(Message msg, ClusterNode node, Map<ClusterNode,IntArray> gridPartsMap) { - if (gridPartsMap == null) + private Message copy(Message msg, ClusterNode node, Map<ClusterNode,IntArray> partsMap) { + if (partsMap == null) return msg; GridQueryRequest res = new GridQueryRequest((GridQueryRequest)msg); - IntArray parts = gridPartsMap.get(node); + IntArray parts = partsMap.get(node); assert parts != null : node;