ignite-484-1 - collocated partitions + replicated caches
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d389ada8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d389ada8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d389ada8 Branch: refs/heads/ignite-sprint-6 Commit: d389ada8b9546994d3dec5da00349f831f81fcb0 Parents: 357b4c0 Author: S.Vladykin <svlady...@gridgain.com> Authored: Wed Jun 3 22:55:16 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Wed Jun 3 22:55:16 2015 +0300 ---------------------------------------------------------------------- .../processors/query/GridQueryIndexing.java | 3 +- .../h2/twostep/messages/GridQueryRequest.java | 13 +- .../processors/query/h2/IgniteH2Indexing.java | 12 +- .../query/h2/twostep/GridMapQueryExecutor.java | 15 +- .../h2/twostep/GridReduceQueryExecutor.java | 248 +++++++++++++------ .../IgniteCacheQueryNodeRestartSelfTest.java | 10 +- 6 files changed, 206 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d389ada8/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java index de35201..98c2af7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java @@ -243,6 +243,5 @@ public interface GridQueryIndexing { * @param parts Partitions. * @return Backup filter. */ - public IndexingQueryFilter backupFilter(List<String> caches, AffinityTopologyVersion topVer, - List<int[]> parts); + public IndexingQueryFilter backupFilter(List<String> caches, AffinityTopologyVersion topVer, int[] parts); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d389ada8/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java index 99ef094..6465bbc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java @@ -57,8 +57,7 @@ public class GridQueryRequest implements Message { private List<String> extraSpaces; /** */ - @GridDirectCollection(int[].class) - private List<int[]> parts; + private int[] parts; /** * Default constructor. @@ -83,7 +82,7 @@ public class GridQueryRequest implements Message { Collection<GridCacheSqlQuery> qrys, AffinityTopologyVersion topVer, List<String> extraSpaces, - List<int[]> parts) { + int[] parts) { this.reqId = reqId; this.pageSize = pageSize; this.space = space; @@ -110,14 +109,14 @@ public class GridQueryRequest implements Message { /** * @return All the needed partitions for {@link #space()} and {@link #extraSpaces()}. */ - public List<int[]> partitions() { + public int[] partitions() { return parts; } /** * @param parts All the needed partitions for {@link #space()} and {@link #extraSpaces()}. */ - public void partitions(List<int[]> parts) { + public void partitions(int[] parts) { this.parts = parts; } @@ -217,7 +216,7 @@ public class GridQueryRequest implements Message { writer.incrementState(); case 6: - if (!writer.writeCollection("partitions", parts, MessageCollectionItemType.INT_ARR)) + if (!writer.writeIntArray("partitions", parts)) return false; writer.incrementState(); @@ -283,7 +282,7 @@ public class GridQueryRequest implements Message { reader.incrementState(); case 6: - parts = reader.readCollection("partitions", MessageCollectionItemType.INT_ARR); + parts = reader.readIntArray("partitions"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d389ada8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index ffedfb3..da497a2 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -1357,7 +1357,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { @Override public IndexingQueryFilter backupFilter( @Nullable final List<String> caches, @Nullable final AffinityTopologyVersion topVer, - @Nullable final List<int[]> parts + @Nullable final int[] parts ) { final AffinityTopologyVersion topVer0 = topVer != null ? topVer : AffinityTopologyVersion.NONE; @@ -1371,16 +1371,12 @@ public class IgniteH2Indexing implements GridQueryIndexing { final GridCacheAffinityManager aff = cache.context().affinity(); if (parts != null) { - int idx = caches.indexOf(spaceName); - - final int[] parts0 = parts.get(idx); - - if (parts0.length < 64) { // Fast scan for small arrays. + if (parts.length < 64) { // Fast scan for small arrays. return new IgniteBiPredicate<K,V>() { @Override public boolean apply(K k, V v) { int p = aff.partition(k); - for (int p0 : parts0) { + for (int p0 : parts) { if (p0 == p) return true; @@ -1397,7 +1393,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { @Override public boolean apply(K k, V v) { int p = aff.partition(k); - return Arrays.binarySearch(parts0, p) >= 0; + return Arrays.binarySearch(parts, p) >= 0; } }; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d389ada8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 9d9060a..ede0e2e 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -222,21 +222,24 @@ public class GridMapQueryExecutor { * @return {@code true} If all the needed partitions successfully reserved. * @throws IgniteCheckedException If failed. */ - private boolean reservePartitions(Collection<String> cacheNames, AffinityTopologyVersion topVer, List<int[]> parts, + private boolean reservePartitions(Collection<String> cacheNames, AffinityTopologyVersion topVer, int[] parts, List<GridDhtLocalPartition> reserved) throws IgniteCheckedException { - assert parts == null || parts.size() == cacheNames.size(); - - int i = 0; + Collection<Integer> partIds = parts == null ? null : wrap(parts); for (String cacheName : cacheNames) { GridCacheContext<?,?> cctx = cacheContext(cacheName, topVer); - Collection<Integer> partIds = parts != null ? wrap(parts.get(i++)) : - cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer); + int partsCnt = cctx.affinity().partitions(); + + if (parts == null) + partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer); for (int partId : partIds) { GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false); + if (partId >= partsCnt) + break; // We can have more partitions because `parts` array is shared for all caches. + if (part != null) { // Await for owning state. part.owningFuture().get(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d389ada8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index c445844..87ac2f4 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -56,6 +56,8 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.*; + /** * Reduce query executor. */ @@ -279,35 +281,85 @@ public class GridReduceQueryExecutor { } /** - * @param set Set. - * @return Array. + * @param readyTop Latest ready topology. + * @return {@code true} If preloading is active. */ - private static int[] unbox(Set<Integer> set) { - if (set == null) - return null; + private boolean isPreloadingActive(AffinityTopologyVersion readyTop) { + AffinityTopologyVersion freshTop = ctx.discovery().topologyVersionEx(); - int[] arr = new int[set.size()]; + int res = readyTop.compareTo(freshTop); - int i = 0; + assert res <= 0 : readyTop + " " + freshTop; - for (int x : set) - arr[i++] = x; + return res < 0; + } - return arr; + /** + * @param name Cache name. + * @return Cache context. + */ + private GridCacheContext<?,?> cacheContext(String name) { + return ctx.cache().internalCache(name).context(); } /** - * @param readyTop Latest ready topology. - * @return {@code true} If preloading is active. + * @param topVer Topology version. + * @param cctx Cache context for main space. + * @param extraSpaces Extra spaces. + * @return Data nodes. */ - private boolean isPreloadingActive(AffinityTopologyVersion readyTop) { - AffinityTopologyVersion freshTop = ctx.discovery().topologyVersionEx(); + private Collection<ClusterNode> dataNodes( + AffinityTopologyVersion topVer, + final GridCacheContext<?,?> cctx, + List<String> extraSpaces + ) { + String space = cctx.name(); - int res = readyTop.compareTo(freshTop); + Set<ClusterNode> nodes = new HashSet<>(ctx.discovery().cacheAffinityNodes(space, topVer)); - assert res <= 0 : readyTop + " " + freshTop; + if (F.isEmpty(nodes)) + throw new CacheException("No data nodes found for cache: " + space); - return res < 0; + if (!F.isEmpty(extraSpaces)) { + for (String extraSpace : extraSpaces) { + GridCacheContext<?,?> extraCctx = cacheContext(extraSpace); + + if (extraCctx.isLocal()) + continue; + + if (cctx.isReplicated() && !extraCctx.isReplicated()) + throw new CacheException("Queries running on replicated cache should not contain JOINs " + + "with partitioned tables."); + + Collection<ClusterNode> extraNodes = ctx.discovery().cacheAffinityNodes(extraSpace, topVer); + + if (F.isEmpty(extraNodes)) + throw new CacheException("No data nodes found for cache: " + extraSpace); + + if (cctx.isReplicated() && extraCctx.isReplicated()) { + nodes.retainAll(extraNodes); + + if (nodes.isEmpty() && !isPreloadingActive(topVer)) + throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace + + "' have distinct set of data nodes."); + } + else if (!cctx.isReplicated() && extraCctx.isReplicated()) { + if (!extraNodes.containsAll(nodes) && !isPreloadingActive(topVer)) + throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace + + "' have distinct set of data nodes."); + } + else if (!cctx.isReplicated() && !extraCctx.isReplicated()) { + if ((extraNodes.size() != nodes.size() || !nodes.containsAll(extraNodes)) && + !isPreloadingActive(topVer)) + throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace + + "' have distinct set of data nodes."); + } + else + throw new IllegalStateException(); + } + } + + return nodes; } /** @@ -331,15 +383,12 @@ public class GridReduceQueryExecutor { AffinityTopologyVersion topVer = h2.readyTopologyVersion(); - Collection<ClusterNode> nodes = ctx.discovery().cacheAffinityNodes(space, topVer); - - if (F.isEmpty(nodes)) - throw new CacheException("No data nodes found for cache: " + space); - List<String> extraSpaces = extraSpaces(space, qry.spaces()); - // Explicit partition mapping for unstable topology: {nodeId -> {cacheName -> {parts}}} - Map<ClusterNode, Map<String, Set<Integer>>> gridPartsMap = null; + Collection<ClusterNode> nodes = dataNodes(topVer, cctx, extraSpaces); + + // Explicit partition mapping for unstable topology. + Map<ClusterNode, IntArray> gridPartsMap = null; if (cctx.isReplicated() || qry.explain()) { assert qry.explain() || !nodes.contains(ctx.cluster().get().localNode()) : "We must be on a client node."; @@ -348,14 +397,10 @@ public class GridReduceQueryExecutor { nodes = Collections.singleton(F.rand(nodes)); } else if (isPreloadingActive(topVer)) { - gridPartsMap = new HashMap<>(nodes.size(), 1f); - - collectPartitionOwners(gridPartsMap, cctx); + gridPartsMap = partitionLocations(cctx, extraSpaces); - if (extraSpaces != null) { - for (String extraSpace : extraSpaces) - collectPartitionOwners(gridPartsMap, ctx.cache().internalCache(extraSpace).context()); - } + if (gridPartsMap == null) + continue; // Retry. nodes = gridPartsMap.keySet(); } @@ -462,46 +507,114 @@ public class GridReduceQueryExecutor { } } - /** - * Collects actual partition owners for the cache context int the given map. - * - * @param gridPartsMap Target map. - * @param cctx Cache context. - */ - private void collectPartitionOwners( - Map<ClusterNode,Map<String,Set<Integer>>> gridPartsMap, - GridCacheContext<?,?> cctx - ) { - int partsCnt = cctx.affinity().partitions(); + @SuppressWarnings("unchecked") + private Map<ClusterNode, IntArray> partitionLocations(final GridCacheContext<?,?> cctx, List<String> extraSpaces) { + assert !cctx.isReplicated() && !cctx.isLocal() : cctx.name() + " must be partitioned"; + + int maxParts = cctx.affinity().partitions(); + + if (extraSpaces != null) { // Find max number of partitions for partitioned caches. + for (String extraSpace : extraSpaces) { + GridCacheContext<?,?> extraCctx = cacheContext(extraSpace); + + if (extraCctx.isReplicated() || extraCctx.isLocal()) + continue; + + int parts = extraCctx.affinity().partitions(); + + if (parts > maxParts) + maxParts = parts; + } + } + + Set<ClusterNode>[] partLocs = new Set[maxParts]; - for (int p = 0; p < partsCnt; p++) { - // We don't care about exact topology version here, we just need to get all the needed partition - // owners in actual state. + // Fill partition locations for main cache. + for (int p = 0, parts = cctx.affinity().partitions(); p < parts; p++) { List<ClusterNode> owners = cctx.topology().owners(p); if (F.isEmpty(owners)) - continue; // All primary and backup nodes are dead now for this partition. We sorrow. + throw new CacheException("No data nodes found for cache '" + cctx.name() + "' for partition " + p); + + partLocs[p] = new HashSet<>(owners); + } + + if (extraSpaces != null) { + // Find owner intersections for each participating partitioned cache partition. + // We need this for logical collocation between different partitioned caches with the same affinity. + for (String extraSpace : extraSpaces) { + GridCacheContext<?,?> extraCctx = cacheContext(extraSpace); - ClusterNode owner = F.rand(owners); + if (extraCctx.isReplicated() || extraCctx.isLocal()) + continue; + + for (int p = 0, parts = extraCctx.affinity().partitions(); p < parts; p++) { + List<ClusterNode> owners = extraCctx.topology().owners(p); - Map<String, Set<Integer>> nodePartsMap = gridPartsMap.get(owner); + if (F.isEmpty(owners)) + throw new CacheException("No data nodes found for cache '" + extraSpace + + "' for partition " + p); - if (nodePartsMap == null) { - nodePartsMap = new HashMap<>(); + if (partLocs[p] == null) + partLocs[p] = new HashSet<>(owners); + else { + partLocs[p].retainAll(owners); // Intersection of owners. - gridPartsMap.put(owner, nodePartsMap); + if (partLocs[p].isEmpty()) + return null; // Intersection is empty -> retry. + } + } } - Set<Integer> parts = nodePartsMap.get(cctx.name()); + // Filter nodes where not all the replicated caches loaded. + for (String extraSpace : extraSpaces) { + GridCacheContext<?,?> extraCctx = cacheContext(extraSpace); - if (parts == null) { - parts = new TreeSet<>(); // We need them sorted. + if (!extraCctx.isReplicated()) + continue; - nodePartsMap.put(cctx.name(), parts); + Set<ClusterNode> dataNodes = new HashSet<>(ctx.discovery().cacheAffinityNodes(extraSpace, NONE)); + + if (dataNodes.isEmpty()) + throw new CacheException("No data nodes found for cache '" + extraSpace + "'"); + + for (int p = 0, extraParts = extraCctx.affinity().partitions(); p < extraParts; p++) { + dataNodes.retainAll(extraCctx.topology().owners(p)); + + if (dataNodes.isEmpty()) + throw new CacheException("No data nodes found for cache '" + extraSpace + + "' for partition " + p); + } + + for (Set<ClusterNode> partLoc : partLocs) { + partLoc.retainAll(dataNodes); + + if (partLoc.isEmpty()) + return null; // Intersection is empty -> retry. + } } + } + + // Collect the final partitions mapping. + Map<ClusterNode, IntArray> res = new HashMap<>(); + + // Here partitions in all IntArray's will be sorted in ascending order, this is important. + for (int p = 0; p < partLocs.length; p++) { + Set<ClusterNode> pl = partLocs[p]; + + assert !F.isEmpty(pl) : pl; + + ClusterNode n = pl.size() == 1 ? F.first(pl) : F.rand(pl); + + IntArray parts = res.get(n); + + if (parts == null) + res.put(n, parts = new IntArray()); parts.add(p); } + + return res; } /** @@ -580,7 +693,7 @@ public class GridReduceQueryExecutor { private void send( Collection<ClusterNode> nodes, Message msg, - Map<ClusterNode, Map<String, Set<Integer>>> gridPartsMap + Map<ClusterNode,IntArray> gridPartsMap ) { boolean locNodeFound = false; @@ -595,7 +708,7 @@ public class GridReduceQueryExecutor { ctx.io().send(node, GridTopic.TOPIC_QUERY, copy(msg, node, gridPartsMap), GridIoPolicy.PUBLIC_POOL); } catch (IgniteCheckedException e) { - U.error(log, "Failed to send message to node: " + node, e); + U.warn(log, e.getMessage()); } } @@ -609,28 +722,21 @@ public class GridReduceQueryExecutor { * @param gridPartsMap Partitions map. * @return Copy of message with partitions set. */ - private Message copy(Message msg, ClusterNode node, Map<ClusterNode, Map<String, Set<Integer>>> gridPartsMap) { + private Message copy(Message msg, ClusterNode node, Map<ClusterNode,IntArray> gridPartsMap) { if (gridPartsMap == null) return msg; - Map<String,Set<Integer>> nodeParts = gridPartsMap.get(node); - - assert nodeParts != null; + GridQueryRequest res = new GridQueryRequest((GridQueryRequest)msg); - GridQueryRequest req = (GridQueryRequest)msg; + IntArray parts = gridPartsMap.get(node); - List<int[]> parts = new ArrayList<>(nodeParts.size()); + assert parts != null : node; - parts.add(unbox(nodeParts.get(req.space()))); - - if (req.extraSpaces() != null) { - for (String extraSpace : req.extraSpaces()) - parts.add(unbox(nodeParts.get(extraSpace))); - } + int[] partsArr = new int[parts.size()]; - GridQueryRequest res = new GridQueryRequest(req); + parts.toArray(partsArr); - res.partitions(parts); + res.partitions(partsArr); return res; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d389ada8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java index 128e148..edba352 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java @@ -176,11 +176,19 @@ public class IgniteCacheQueryNodeRestartSelfTest extends GridCacheAbstractSelfTe Thread.sleep(duration); + info("Stopping.."); + done.set(true); - fut1.get(); fut2.get(); + info("Restarts stopped."); + + fut1.get(); + + info("Queries stopped."); + + info("Awaiting rebalance events [restartCnt=" + restartCnt.get() + ']'); boolean success = lsnr.awaitEvents(GRID_CNT * 2 * restartCnt.get(), 15000);