ignite-484 - v2
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/081e75bc Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/081e75bc Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/081e75bc Branch: refs/heads/ignite-484 Commit: 081e75bce608394937ef8e4d62e4aa12a2cc3b09 Parents: 7000722 Author: S.Vladykin <svlady...@gridgain.com> Authored: Wed May 13 10:29:36 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Wed May 13 10:29:36 2015 +0300 ---------------------------------------------------------------------- .../distributed/dht/GridDhtLocalPartition.java | 19 +++++ .../h2/twostep/messages/GridQueryRequest.java | 16 ++-- .../query/h2/twostep/GridMapQueryExecutor.java | 84 ++++++++++++-------- .../h2/twostep/GridReduceQueryExecutor.java | 12 ++- 4 files changed, 85 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/081e75bc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 0749f66..dc4982e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -70,6 +70,10 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition> @GridToStringExclude private final GridFutureAdapter<?> rent; + /** Rent future. */ + @GridToStringExclude + private final GridFutureAdapter<?> own; + /** Entries map. */ private final ConcurrentMap<KeyCacheObject, GridDhtCacheEntry> map; @@ -111,6 +115,12 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition> } }; + own = new GridFutureAdapter<Object>() { + @Override public String toString() { + return "PartitionOwnFuture [part=" + GridDhtLocalPartition.this + ", map=" + map + ']'; + } + }; + map = new ConcurrentHashMap8<>(cctx.config().getStartSize() / cctx.affinity().partitions()); @@ -385,6 +395,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition> // No need to keep history any more. evictHist = null; + own.onDone(); + return true; } } @@ -419,6 +431,13 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition> } /** + * @return The future which will be completed when partition will have state {@link GridDhtPartitionState#OWNING}. + */ + public IgniteInternalFuture<?> owningFuture() { + return own; + } + + /** * @param updateSeq Update sequence. * @return Future for evict attempt. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/081e75bc/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java index 319a818..2d53944 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query.h2.twostep.messages; import org.apache.ignite.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -49,7 +50,7 @@ public class GridQueryRequest implements Message { private Collection<GridCacheSqlQuery> qrys; /** Topology version. */ - private long topVer; + private AffinityTopologyVersion topVer; /** */ @GridDirectCollection(String.class) @@ -70,7 +71,12 @@ public class GridQueryRequest implements Message { * @param topVer Topology version. * @param extraSpaces All space names participating in query other than {@code space}. */ - public GridQueryRequest(long reqId, int pageSize, String space, Collection<GridCacheSqlQuery> qrys, long topVer, + public GridQueryRequest( + long reqId, + int pageSize, + String space, + Collection<GridCacheSqlQuery> qrys, + AffinityTopologyVersion topVer, List<String> extraSpaces) { this.reqId = reqId; this.pageSize = pageSize; @@ -91,7 +97,7 @@ public class GridQueryRequest implements Message { /** * @return Topology version. */ - public long topologyVersion() { + public AffinityTopologyVersion topologyVersion() { return topVer; } @@ -165,7 +171,7 @@ public class GridQueryRequest implements Message { writer.incrementState(); case 4: - if (!writer.writeLong("topVer", topVer)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); @@ -221,7 +227,7 @@ public class GridQueryRequest implements Message { reader.incrementState(); case 4: - topVer = reader.readLong("topVer"); + topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/081e75bc/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 2483912..d4cdb4e 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -202,12 +202,31 @@ public class GridMapQueryExecutor { /** * @param cacheName Cache name. + * @param topVer Topology version. * @return Cache context or {@code null} if none. */ - @Nullable private GridCacheContext<?,?> cacheContext(String cacheName) { + @Nullable private GridCacheContext<?,?> cacheContext(String cacheName, AffinityTopologyVersion topVer) { GridCacheAdapter<?,?> cache = ctx.cache().internalCache(cacheName); - return cache == null ? null : cache.context(); + if (cache == null) // Since we've waited for for cache affinity updates, this must be a misconfiguration. + throw new CacheException("Cache does not exist on current node: [nodeId=" + ctx.localNodeId() + + ", cache=" + cacheName + ", topVer=" + topVer + "]"); + + return cache.context(); + } + + /** + * @param topVer Topology version. + * @throws IgniteCheckedException If failed. + */ + private void awaitForCacheAffinity(AffinityTopologyVersion topVer) throws IgniteCheckedException { + if (topVer == null) + return; // Backward compatibility. + + IgniteInternalFuture<?> fut = ctx.cache().context().exchange().affinityReadyFuture(topVer); + + if (fut != null) + fut.get(); } /** @@ -224,6 +243,13 @@ public class GridMapQueryExecutor { List<GridDhtLocalPartition> reserved = new ArrayList<>(); try { + // Topology version can be null in rolling restart with previous version! + final AffinityTopologyVersion topVer = req.topologyVersion(); + + // Await all caches to be deployed on this node and all the needed topology changes to arrive. + awaitForCacheAffinity(topVer); + + // Unmarshall query params. Collection<GridCacheSqlQuery> qrys; try { @@ -240,48 +266,39 @@ public class GridMapQueryExecutor { throw new IgniteException(e); } - List<GridCacheContext<?,?>> cctxs = new ArrayList<>(); + // Reserve primary partitions. + if (topVer != null) { + for (String cacheName : F.concat(true, req.space(), req.extraSpaces())) { + GridCacheContext<?,?> cctx = cacheContext(cacheName, topVer); - for (String cacheName : F.concat(true, req.space(), req.extraSpaces())) { - GridCacheContext<?,?> cctx = cacheContext(cacheName); + Set<Integer> partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer); - if (cctx == null) { // Cache was not deployed yet. - sendRetry(node, req.requestId()); + for (int partId : partIds) { + GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false); - return; - } - else - cctxs.add(cctx); - } - - for (GridCacheContext<?,?> cctx : cctxs) { // Lock primary partitions. - // TODO how to get all partitions for topology version consistently? - List<GridDhtLocalPartition> parts = cctx.topology().localPartitions(); - AffinityTopologyVersion affTopVer = cctx.topology().topologyVersion(); - - if (affTopVer.topologyVersion() != req.topologyVersion()) { - sendRetry(node, req.requestId()); + if (part != null) { + // Await for owning state. + part.owningFuture().get(); - return; - } + if (part.reserve()) { + reserved.add(part); - for (GridDhtLocalPartition part : parts) { - if (!part.primary(affTopVer)) - continue; + continue; + } + } - if (!part.reserve()) { + // Failed to reserve the partition. sendRetry(node, req.requestId()); return; } - - reserved.add(part); } } - GridCacheContext<?,?> cctx = cctxs.get(0); // Main cache context. + // Prepare to run queries. + GridCacheContext<?,?> mainCctx = cacheContext(req.space(), topVer); - qr = new QueryResults(req.requestId(), qrys.size(), cctx); + qr = new QueryResults(req.requestId(), qrys.size(), mainCctx); if (nodeRess.put(req.requestId(), qr) != null) throw new IllegalStateException(); @@ -293,10 +310,8 @@ public class GridMapQueryExecutor { // Run queries. int i = 0; - String space = req.space(); - for (GridCacheSqlQuery qry : qrys) { - ResultSet rs = h2.executeSqlQueryWithTimer(space, h2.connectionForSpace(space), qry.query(), + ResultSet rs = h2.executeSqlQueryWithTimer(req.space(), h2.connectionForSpace(req.space()), qry.query(), F.asList(qry.parameters())); if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { @@ -305,7 +320,7 @@ public class GridMapQueryExecutor { "SQL query executed.", EVT_CACHE_QUERY_EXECUTED, CacheQueryType.SQL.name(), - cctx.namex(), + mainCctx.namex(), null, qry.query(), null, @@ -348,6 +363,7 @@ public class GridMapQueryExecutor { finally { h2.setFilters(null); + // Release reserved partitions. for (GridDhtLocalPartition part : reserved) part.release(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/081e75bc/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 3391c97..68c7048 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -24,6 +24,7 @@ import org.apache.ignite.events.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.eventstorage.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.query.*; @@ -286,20 +287,17 @@ public class GridReduceQueryExecutor { r.conn = (JdbcConnection)h2.connectionForSpace(space); - final long topVer = ctx.cluster().get().topologyVersion(); + AffinityTopologyVersion topVer = ctx.discovery().topologyVersionEx(); - // TODO get projection for this topology version. - ClusterGroup dataNodes = ctx.grid().cluster().forDataNodes(space); + Collection<ClusterNode> nodes = ctx.discovery().cacheAffinityNodes(space, topVer); if (cctx.isReplicated() || qry.explain()) { - assert qry.explain() || dataNodes.node(ctx.localNodeId()) == null : "We must be on a client node."; + assert qry.explain() || !nodes.contains(ctx.cluster().get().localNode()) : "We must be on a client node."; // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node. - dataNodes = dataNodes.forRandom(); + nodes = Collections.singleton(F.rand(nodes)); } - final Collection<ClusterNode> nodes = dataNodes.nodes(); - for (GridCacheSqlQuery mapQry : qry.mapQueries()) { GridMergeTable tbl;