ignite-484-1 - compilation
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d340fe72 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d340fe72 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d340fe72 Branch: refs/heads/ignite-980 Commit: d340fe72a99deab268dc019f6eaf474702f408b8 Parents: 51bf4b1 Author: S.Vladykin <svlady...@gridgain.com> Authored: Thu Jun 11 10:04:22 2015 +0300 Committer: S.Vladykin <svlady...@gridgain.com> Committed: Thu Jun 11 10:04:22 2015 +0300 ---------------------------------------------------------------------- .../h2/twostep/GridReduceQueryExecutor.java | 26 +++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d340fe72/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 3d2ae46..343a439 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.query.h2.twostep; import org.apache.ignite.*; -import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; @@ -38,7 +37,7 @@ import org.apache.ignite.marshaller.*; import org.apache.ignite.plugin.extensions.communication.*; import org.h2.command.*; import org.h2.command.ddl.*; -import org.h2.command.dml.Query; +import org.h2.command.dml.*; import org.h2.engine.*; import org.h2.expression.*; import org.h2.index.*; @@ -395,9 +394,10 @@ public class GridReduceQueryExecutor { /** * @param cctx Cache context. * @param qry Query. + * @param keepPortable Keep portable. * @return Cursor. */ - public QueryCursor<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry, boolean keepPortable) { + public Iterator<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry, boolean keepPortable) { for (;;) { long qryReqId = reqIdGen.incrementAndGet(); @@ -501,7 +501,6 @@ public class GridReduceQueryExecutor { retry = true; // If remote node asks us to retry then we have outdated full partition map. - // TODO is this correct way to wait for a new map?? h2.awaitForReadyTopologyVersion((AffinityTopologyVersion)state); } } @@ -534,7 +533,7 @@ public class GridReduceQueryExecutor { continue; } - return new QueryCursorImpl<>(new GridQueryCacheObjectsIterator(new Iter(res), cctx, cctx.keepPortable())); + return new GridQueryCacheObjectsIterator(new Iter(res), cctx, keepPortable); } catch (IgniteCheckedException | RuntimeException e) { U.closeQuiet(r.conn); @@ -687,7 +686,22 @@ public class GridReduceQueryExecutor { } } - return new QueryCursorImpl<>(new GridQueryCacheObjectsIterator(new Iter(res), cctx, cctx.keepPortable())); + // Filter nodes where not all the replicated caches loaded. + for (String extraSpace : extraSpaces) { + GridCacheContext<?,?> extraCctx = cacheContext(extraSpace); + + if (!extraCctx.isReplicated()) + continue; + + Set<ClusterNode> dataNodes = owningReplicatedDataNodes(extraCctx); + + for (Set<ClusterNode> partLoc : partLocs) { + partLoc.retainAll(dataNodes); + + if (partLoc.isEmpty()) + return null; // Retry. + } + } } // Collect the final partitions mapping.