Repository: incubator-ignite Updated Branches: refs/heads/ignite-745 ed2360877 -> 01bcfd8a7 (forced update)
ignite-389 Avoid backups filtering in case of partition scan query Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5d6bb532 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5d6bb532 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5d6bb532 Branch: refs/heads/ignite-745 Commit: 5d6bb532c7de35cfea7674b5fc1446e72a5fa985 Parents: f00a9e9 Author: agura <ag...@gridgain.com> Authored: Thu May 28 18:30:08 2015 +0300 Committer: agura <ag...@gridgain.com> Committed: Thu May 28 18:30:08 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/cache/query/ScanQuery.java | 12 +- .../cache/query/GridCacheQueryAdapter.java | 122 +++---------------- .../cache/query/GridCacheQueryManager.java | 9 +- 3 files changed, 28 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d6bb532/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java index f56b0c7..e6b69bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ScanQuery.java @@ -46,6 +46,11 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K, V>> { this(null, null); } + /** + * Creates partition scan query returning all entries for given partition. + * + * @param part Partition. + */ public ScanQuery(int part) { this(part, null); } @@ -62,9 +67,10 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K, V>> { /** * Create scan query with filter. * + * @param part Partition. * @param filter Filter. If {@code null} then all entries will be returned. */ - public ScanQuery(Integer part, @Nullable IgniteBiPredicate<K, V> filter) { + public ScanQuery(@Nullable Integer part, @Nullable IgniteBiPredicate<K, V> filter) { setPartition(part); setFilter(filter); } @@ -96,7 +102,7 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K, V>> { * * @return Partition number or {@code null}. */ - public Integer getPartition() { + @Nullable public Integer getPartition() { return part; } @@ -106,7 +112,7 @@ public final class ScanQuery<K, V> extends Query<Cache.Entry<K, V>> { * * @param part Partition number over which this query should iterate. */ - public void setPartition(Integer part) { + public void setPartition(@Nullable Integer part) { this.part = part; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d6bb532/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index 6574f0a..2f32faa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -26,14 +26,15 @@ import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.query.*; +import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.security.*; + import org.jetbrains.annotations.*; import java.util.*; -import java.util.concurrent.*; import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.*; @@ -457,7 +458,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { return (CacheQueryFuture<R>)(loc ? qryMgr.queryFieldsLocal(bean) : qryMgr.queryFieldsDistributed(bean, nodes)); else if (type == SCAN && part != null && nodes.size() > 1) - return new CacheQueryFallbackFuture(nodes, bean, qryMgr); + return new CacheQueryFallbackFuture<>(nodes, bean, qryMgr); else return (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes)); } @@ -524,9 +525,10 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { /** * Wrapper for queries with fallback. */ - private static class CacheQueryFallbackFuture<R> extends GridCacheQueryFutureAdapter<Object, Object, R> { - /** Target. */ - private GridCacheQueryFutureAdapter<?, ?, R> fut; + private static class CacheQueryFallbackFuture<R> extends GridFutureAdapter<Collection<R>> + implements CacheQueryFuture<R> { + /** Query future. */ + private volatile GridCacheQueryFutureAdapter<?, ?, R> fut; /** Backups. */ private final Queue<ClusterNode> nodes; @@ -559,13 +561,10 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { ClusterNode node = F.first(F.view(nodes, IS_LOC_NODE)); - if (node != null) { + if (node != null) fallbacks.add(node); - fallbacks.addAll(F.view(nodes, F.not(IS_LOC_NODE))); - } - else - fallbacks.addAll(nodes); + fallbacks.addAll(node != null ? F.view(nodes, F.not(IS_LOC_NODE)) : nodes); return fallbacks; } @@ -576,10 +575,11 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { private void init() { ClusterNode node = nodes.poll(); - fut = (GridCacheQueryFutureAdapter<?, ?, R>)(node.isLocal() ? qryMgr.queryLocal(bean) : - qryMgr.queryDistributed(bean, Collections.singleton(node))); + GridCacheQueryFutureAdapter<?, ?, R> fut0 = + (GridCacheQueryFutureAdapter<?, ?, R>)(node.isLocal() ? qryMgr.queryLocal(bean) : + qryMgr.queryDistributed(bean, Collections.singleton(node))); - fut.listen(new IgniteInClosure<IgniteInternalFuture<Collection<R>>>() { + fut0.listen(new IgniteInClosure<IgniteInternalFuture<Collection<R>>>() { @Override public void apply(IgniteInternalFuture<Collection<R>> fut) { try { onDone(fut.get()); @@ -592,26 +592,8 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { } } }); - } - - /** {@inheritDoc} */ - @Override protected boolean onPage(UUID nodeId, boolean last) { - return fut.onPage(nodeId, last); - } - /** {@inheritDoc} */ - @Override protected void loadPage() { - fut.loadPage(); - } - - /** {@inheritDoc} */ - @Override protected void loadAllPages() throws IgniteInterruptedCheckedException { - fut.loadAllPages(); - } - - /** {@inheritDoc} */ - @Override protected void cancelQuery() throws IgniteCheckedException { - fut.cancelQuery(); + fut = fut0; } /** {@inheritDoc} */ @@ -625,84 +607,8 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { } /** {@inheritDoc} */ - @Override void clear() { - fut.clear(); - } - - /** {@inheritDoc} */ - @Override public long endTime() { - return fut.endTime(); - } - - /** {@inheritDoc} */ - @Override protected void enqueue(Collection<?> col) { - fut.enqueue(col); - } - - /** {@inheritDoc} */ - @Override boolean fields() { - return fut.fields(); - } - - /** {@inheritDoc} */ - @Override public Collection<R> get() throws IgniteCheckedException { - return fut.get(); - } - - /** {@inheritDoc} */ - @Override public Collection<R> get(long timeout, TimeUnit unit) throws IgniteCheckedException { - return fut.get(timeout, unit); - } - - /** {@inheritDoc} */ @Override public R next() { return fut.next(); } - - /** {@inheritDoc} */ - @Override public Collection<R> nextPage() throws IgniteCheckedException { - return fut.nextPage(); - } - - /** {@inheritDoc} */ - @Override public boolean onDone(Collection<R> res, Throwable err) { - return fut.onDone(res, err); - } - - /** {@inheritDoc} */ - @Override public Collection<R> nextPage(long timeout) throws IgniteCheckedException { - return fut.nextPage(timeout); - } - - /** {@inheritDoc} */ - @Override protected void onNodeLeft(UUID evtNodeId) { - fut.onNodeLeft(evtNodeId); - } - - /** {@inheritDoc} */ - @Override public void onPage(@Nullable UUID nodeId, @Nullable Collection<?> data, - @Nullable Throwable err, boolean finished) { - fut.onPage(nodeId, data, err, finished); - } - - /** {@inheritDoc} */ - @Override public void onTimeout() { - fut.onTimeout(); - } - - /** {@inheritDoc} */ - @Override public void printMemoryStats() { - fut.printMemoryStats(); - } - - /** {@inheritDoc} */ - @Override public GridCacheQueryBean query() { - return fut.query(); - } - - /** {@inheritDoc} */ - @Override public IgniteUuid timeoutId() { - return fut.timeoutId(); - } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5d6bb532/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index fac3d8f..652d62e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -795,7 +795,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte !locPart.reserve()) throw new GridDhtInvalidPartitionException(part, "Partition can't be reserved"); - iter = new Iterator<K>() { private Iterator<KeyCacheObject> iter0 = locPart.keySet().iterator(); @@ -1329,9 +1328,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte K key = row.getKey(); - // Filter backups for SCAN queries. Other types are filtered in indexing manager. - if (!cctx.isReplicated() && cctx.config().getCacheMode() != LOCAL && qry.type() == SCAN && - !incBackups && !cctx.affinity().primary(cctx.localNode(), key, topVer)) { + // Filter backups for SCAN queries, if it isn't partition scan. + // Other types are filtered in indexing manager. + if (!cctx.isReplicated() && qry.type() == SCAN && qry.partition() == null && + cctx.config().getCacheMode() != LOCAL && !incBackups && + !cctx.affinity().primary(cctx.localNode(), key, topVer)) { if (log.isDebugEnabled()) log.debug("Ignoring backup element [row=" + row + ", cacheMode=" + cctx.config().getCacheMode() + ", incBackups=" + incBackups +