ignite-484 - explicit partitions list
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/20ec22b8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/20ec22b8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/20ec22b8 Branch: refs/heads/ignite-484-1 Commit: 20ec22b85cddf8dfb5c453758aa83dcb49fbbd32 Parents: a12aadf Author: S.Vladykin <svlady...@gridgain.com> Authored: Mon May 25 03:20:54 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Mon May 25 03:20:54 2015 +0300 ---------------------------------------------------------------------- .../processors/query/GridQueryIndexing.java | 5 ++- .../processors/query/GridQueryProcessor.java | 4 +- .../h2/twostep/messages/GridQueryRequest.java | 36 +++++++++++++-- .../processors/query/h2/IgniteH2Indexing.java | 43 ++++++++++++++++-- .../query/h2/twostep/GridMapQueryExecutor.java | 47 ++++++++++++++++++-- .../h2/twostep/GridReduceQueryExecutor.java | 33 +++++++++++++- 6 files changed, 151 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/20ec22b8/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java index 6b1401d..216773e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java @@ -222,8 +222,11 @@ public interface GridQueryIndexing { /** * Returns backup filter. * + * @param caches List of caches. * @param topVer Topology version. + * @param parts Partitions. * @return Backup filter. */ - public IndexingQueryFilter backupFilter(AffinityTopologyVersion topVer); + public IndexingQueryFilter backupFilter(List<String> caches, AffinityTopologyVersion topVer, + List<int[]> parts); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/20ec22b8/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index afd0386..202ec75 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -603,7 +603,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { throw new CacheException("Failed to find SQL table for type: " + type); final GridCloseableIterator<IgniteBiTuple<K,V>> i = idx.query(space, sqlQry, F.asList(params), typeDesc, - idx.backupFilter(null)); + idx.backupFilter(null, null, null)); if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { ctx.event().record(new CacheQueryExecutedEvent<>( @@ -670,7 +670,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { String sql = qry.getSql(); Object[] args = qry.getArgs(); - GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter(null)); + GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter(null, null, null)); if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { ctx.event().record(new CacheQueryExecutedEvent<>( http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/20ec22b8/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java index 2d53944..74b4392 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java @@ -54,7 +54,11 @@ public class GridQueryRequest implements Message { /** */ @GridDirectCollection(String.class) - private Collection<String> extraSpaces; + private List<String> extraSpaces; + + /** */ + @GridDirectCollection(int[].class) + private List<int[]> parts; /** * Default constructor. @@ -70,6 +74,7 @@ public class GridQueryRequest implements Message { * @param qrys Queries. * @param topVer Topology version. * @param extraSpaces All space names participating in query other than {@code space}. + * @param parts Optional partitions for unstable topology. */ public GridQueryRequest( long reqId, @@ -77,7 +82,8 @@ public class GridQueryRequest implements Message { String space, Collection<GridCacheSqlQuery> qrys, AffinityTopologyVersion topVer, - List<String> extraSpaces) { + List<String> extraSpaces, + List<int[]> parts) { this.reqId = reqId; this.pageSize = pageSize; this.space = space; @@ -85,12 +91,20 @@ public class GridQueryRequest implements Message { this.qrys = qrys; this.topVer = topVer; this.extraSpaces = extraSpaces; + this.parts = parts; + } + + /** + * @return All the needed partitions for {@link #space()} and {@link #extraSpaces()}. + */ + public List<int[]> partitions() { + return parts; } /** * @return All extra space names participating in query other than {@link #space()}. */ - public Collection<String> extraSpaces() { + public List<String> extraSpaces() { return extraSpaces; } @@ -181,6 +195,12 @@ public class GridQueryRequest implements Message { return false; writer.incrementState(); + + case 6: + if (!writer.writeCollection("partitions", parts, MessageCollectionItemType.INT_ARR)) + return false; + + writer.incrementState(); } return true; @@ -241,6 +261,14 @@ public class GridQueryRequest implements Message { return false; reader.incrementState(); + + case 6: + parts = reader.readCollection("partitions", MessageCollectionItemType.INT_ARR); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); } return true; @@ -253,6 +281,6 @@ public class GridQueryRequest implements Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 6; + return 7; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/20ec22b8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 1cfc314..0ee9876 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -1354,21 +1354,56 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** {@inheritDoc} */ - @Override public IndexingQueryFilter backupFilter(AffinityTopologyVersion topVer) { + @Override public IndexingQueryFilter backupFilter( + @Nullable final List<String> caches, + @Nullable final AffinityTopologyVersion topVer, + @Nullable final List<int[]> parts + ) { final AffinityTopologyVersion topVer0 = topVer != null ? topVer : AffinityTopologyVersion.NONE; return new IndexingQueryFilter() { @Nullable @Override public <K, V> IgniteBiPredicate<K, V> forSpace(String spaceName) { final GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(spaceName); - if (cache.context().isReplicated() || cache.configuration().getBackups() == 0) + if (cache.context().isReplicated() || (cache.configuration().getBackups() == 0 && parts == null)) return null; + final GridCacheAffinityManager aff = cache.context().affinity(); + + if (parts != null) { + int idx = caches.indexOf(spaceName); + + final int[] parts0 = parts.get(idx); + + if (parts0.length < 64) { + return new IgniteBiPredicate<K,V>() { + @Override public boolean apply(K k, V v) { + int p = aff.partition(k); + + for (int p0 : parts0) { + if (p0 == p) + return true; + } + + return false; + } + }; + } + + return new IgniteBiPredicate<K,V>() { + @Override public boolean apply(K k, V v) { + int p = aff.partition(k); + + return Arrays.binarySearch(parts0, p) >= 0; + } + }; + } + final ClusterNode locNode = ctx.discovery().localNode(); return new IgniteBiPredicate<K, V>() { @Override public boolean apply(K k, V v) { - return cache.context().affinity().primary(locNode, k, topVer0); + return aff.primary(locNode, k, topVer0); } }; } @@ -1392,7 +1427,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { * @return Current topology version. */ public AffinityTopologyVersion topologyVersion() { - return ctx.discovery().topologyVersionEx(); + return ctx.cache().context().exchange().readyAffinityVersion(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/20ec22b8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 112949f..06bad76 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -217,16 +217,22 @@ public class GridMapQueryExecutor { /** * @param cacheNames Cache names. * @param topVer Topology version. + * @param parts Explicit partitions. * @param reserved Reserved list. * @return {@code true} If all the needed partitions successfully reserved. * @throws IgniteCheckedException If failed. */ - private boolean reservePartitions(Collection<String> cacheNames, AffinityTopologyVersion topVer, + private boolean reservePartitions(Collection<String> cacheNames, AffinityTopologyVersion topVer, List<int[]> parts, List<GridDhtLocalPartition> reserved) throws IgniteCheckedException { + assert parts == null || parts.size() == cacheNames.size(); + + int i = 0; + for (String cacheName : cacheNames) { GridCacheContext<?,?> cctx = cacheContext(cacheName, topVer); - Set<Integer> partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer); + Collection<Integer> partIds = parts != null ? wrap(parts.get(i++)) : + cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer); for (int partId : partIds) { GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false); @@ -250,6 +256,37 @@ public class GridMapQueryExecutor { } /** + * @param ints Integers. + * @return Collection wrapper. + */ + private static Collection<Integer> wrap(final int[] ints) { + return new AbstractCollection<Integer>() { + @Override public Iterator<Integer> iterator() { + return new Iterator<Integer>() { + /** */ + private int i = 0; + + @Override public boolean hasNext() { + return i < ints.length; + } + + @Override public Integer next() { + return ints[i++]; + } + + @Override public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + @Override public int size() { + return ints.length; + } + }; + } + + /** * Executing queries locally. * * @param node Node. @@ -280,6 +317,8 @@ public class GridMapQueryExecutor { throw new CacheException(e); } + List<String> caches = (List<String>)F.concat(true, req.space(), req.extraSpaces()); + // Topology version can be null in rolling restart with previous version! final AffinityTopologyVersion topVer = req.topologyVersion(); @@ -288,7 +327,7 @@ public class GridMapQueryExecutor { h2.awaitForCacheAffinity(topVer); // Reserve primary partitions. - if (!reservePartitions(F.concat(true, req.space(), req.extraSpaces()), topVer, reserved)) { + if (!reservePartitions(caches, topVer, req.partitions(), reserved)) { sendRetry(node, req.requestId()); return; @@ -303,7 +342,7 @@ public class GridMapQueryExecutor { if (nodeRess.put(req.requestId(), qr) != null) throw new IllegalStateException(); - h2.setFilters(h2.backupFilter(topVer)); + h2.setFilters(h2.backupFilter(caches, topVer, req.partitions())); // TODO Prepare snapshots for all the needed tables before the run. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/20ec22b8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index eb6db88..0836a75 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -279,6 +279,21 @@ public class GridReduceQueryExecutor { } /** + * @param set Set. + * @return Array. + */ + private static int[] unbox(Set<Integer> set) { + int[] arr = new int[set.size()]; + + int i = 0; + + for (int x : set) + arr[i++] = x; + + return arr; + } + + /** * @param cctx Cache context. * @param qry Query. * @return Cursor. @@ -304,12 +319,27 @@ public class GridReduceQueryExecutor { if (F.isEmpty(nodes)) throw new CacheException("No data nodes found for cache: " + space); + List<String> extraSpaces = extraSpaces(space, qry.spaces()); + + List<int[]> parts = null; + if (cctx.isReplicated() || qry.explain()) { assert qry.explain() || !nodes.contains(ctx.cluster().get().localNode()) : "We must be on a client node."; // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node. nodes = Collections.singleton(F.rand(nodes)); } + else if (ctx.cache().context().exchange().hasPendingExchange()) { // TODO isActive ?? + parts = new ArrayList<>(extraSpaces == null ? 1 : extraSpaces.size() + 1); + + parts.add(unbox(cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer))); + + if (extraSpaces != null) { + for (String extraSpace : extraSpaces) + parts.add(unbox(ctx.cache().internalCache(extraSpace).context() + .affinity().primaryPartitions(ctx.localNodeId(), topVer))); + } + } for (GridCacheSqlQuery mapQry : qry.mapQueries()) { GridMergeTable tbl; @@ -355,8 +385,7 @@ public class GridReduceQueryExecutor { boolean ok = false; try { - send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, - extraSpaces(space, qry.spaces()))); + send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, parts)); ok = true; }