ignite-921 Create scan query able to iterate over single partition
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b58bb122 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b58bb122 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b58bb122 Branch: refs/heads/ignite-389 Commit: b58bb122da1d4c196b8125a28a4c1df33a9fc82f Parents: 982235b Author: agura <ag...@gridgain.com> Authored: Mon May 25 20:26:04 2015 +0300 Committer: agura <ag...@gridgain.com> Committed: Tue May 26 21:18:49 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 15 +- .../processors/cache/GridCacheSwapManager.java | 55 ++++- .../processors/cache/IgniteCacheProxy.java | 2 +- .../distributed/dht/GridDhtLocalPartition.java | 7 + .../processors/cache/query/CacheQuery.java | 2 +- .../query/GridCacheDistributedQueryManager.java | 3 + .../cache/query/GridCacheQueryAdapter.java | 228 ++++++++++++++++++- .../cache/query/GridCacheQueryManager.java | 200 +++++++++------- .../cache/query/GridCacheQueryRequest.java | 31 ++- .../datastructures/GridCacheSetImpl.java | 4 +- ...achePartitionedPreloadLifecycleSelfTest.java | 2 +- ...CacheReplicatedPreloadLifecycleSelfTest.java | 6 +- .../GridCacheSwapScanQueryAbstractSelfTest.java | 112 ++++++--- ...CacheScanPartitionQueryFallbackSelfTest.java | 213 +++++++++++++++++ .../cache/IgniteCacheAbstractQuerySelfTest.java | 53 ++++- .../IgniteCacheQuerySelfTestSuite.java | 2 + 16 files changed, 795 insertions(+), 140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index d390037..d7cec9e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1317,8 +1317,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V final Collection<KeyCacheObject> loadedKeys = new GridConcurrentHashSet<>(); - IgniteInternalFuture<Object> readFut = - readThroughAllAsync(absentKeys, true, skipVals, null, subjId, taskName, new CI2<KeyCacheObject, Object>() { + IgniteInternalFuture<Object> readFut = readThroughAllAsync(absentKeys, true, skipVals, null, + subjId, taskName, new CI2<KeyCacheObject, Object>() { /** Version for all loaded entries. */ private GridCacheVersion nextVer = ctx.versions().next(); @@ -1948,7 +1948,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @param filter Optional filter. * @return Put operation future. */ - public IgniteInternalFuture<V> getAndPutAsync0(final K key, final V val, @Nullable final CacheEntryPredicate... filter) { + public IgniteInternalFuture<V> getAndPutAsync0(final K key, final V val, + @Nullable final CacheEntryPredicate... filter) { A.notNull(key, "key", val, "val"); if (keyCheck) @@ -3117,7 +3118,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Override public boolean lockAll(@Nullable Collection<? extends K> keys, long timeout) throws IgniteCheckedException { + @Override public boolean lockAll(@Nullable Collection<? extends K> keys, long timeout) + throws IgniteCheckedException { if (F.isEmpty(keys)) return true; @@ -3689,7 +3691,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (!ctx0.isSwapOrOffheapEnabled() && ctx0.kernalContext().discovery().size() == 1) return localIteratorHonorExpirePolicy(opCtx); - CacheQueryFuture<Map.Entry<K, V>> fut = ctx0.queries().createScanQuery(null, ctx.keepPortable()) + CacheQueryFuture<Map.Entry<K, V>> fut = ctx0.queries().createScanQuery(null, null, ctx.keepPortable()) .keepAll(false) .execute(); @@ -3918,7 +3920,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return t; } - catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException | IgniteTxRollbackCheckedException e) { + catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException | + IgniteTxRollbackCheckedException e) { throw e; } catch (IgniteCheckedException e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index eb82218..e4b1cbd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -1211,7 +1211,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { checkIteratorQueue(); if (offHeapEnabled() && !swapEnabled()) - return rawOffHeapIterator(true, true); + return rawOffHeapIterator(null, true, true); if (swapEnabled() && !offHeapEnabled()) return rawSwapIterator(true, true); @@ -1227,7 +1227,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { private Map.Entry<byte[], byte[]> cur; { - it = rawOffHeapIterator(true, true); + it = rawOffHeapIterator(null, true, true); advance(); } @@ -1554,11 +1554,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { /** * @param c Key/value closure. + * @param part Partition. * @param primary Include primaries. * @param backup Include backups. * @return Off-heap iterator. */ public <T> GridCloseableIterator<T> rawOffHeapIterator(final CX2<T2<Long, Integer>, T2<Long, Integer>, T> c, + Integer part, boolean primary, boolean backup) { @@ -1574,24 +1576,31 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion(); - Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) : - cctx.affinity().backupPartitions(cctx.localNodeId(), ver); + Set<Integer> parts; + + if (part == null) + parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) : + cctx.affinity().backupPartitions(cctx.localNodeId(), ver); + else + parts = Collections.singleton(part); return new CloseablePartitionsIterator<T, T>(parts) { @Override protected GridCloseableIterator<T> partitionIterator(int part) - throws IgniteCheckedException - { + throws IgniteCheckedException { return offheap.iterator(spaceName, c, part); } }; } /** + * + * @param part Partition. * @param primary Include primaries. * @param backup Include backups. * @return Raw off-heap iterator. */ - public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawOffHeapIterator(final boolean primary, + public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawOffHeapIterator(@Nullable Integer part, + final boolean primary, final boolean backup) { if (!offheapEnabled || (!primary && !backup)) @@ -1626,8 +1635,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion(); - Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) : - cctx.affinity().backupPartitions(cctx.localNodeId(), ver); + Set<Integer> parts; + + if (part == null) + parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), ver) : + cctx.affinity().backupPartitions(cctx.localNodeId(), ver); + else + parts = Collections.singleton(part); return new CloseablePartitionsIterator<Map.Entry<byte[], byte[]>, IgniteBiTuple<byte[], byte[]>>(parts) { private Map.Entry<byte[], byte[]> cur; @@ -1701,6 +1715,29 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { } /** + * @param part Partition. + * @return Raw off-heap iterator. + * @throws IgniteCheckedException If failed. + */ + public GridCloseableIterator<Map.Entry<byte[], byte[]>> rawSwapIterator(int part) + throws IgniteCheckedException + { + if (!swapEnabled) + return new GridEmptyCloseableIterator<>(); + + checkIteratorQueue(); + + return new CloseablePartitionsIterator<Map.Entry<byte[], byte[]>, Map.Entry<byte[], byte[]>>( + Collections.singleton(part)) { + @Override protected GridCloseableIterator<Map.Entry<byte[], byte[]>> partitionIterator(int part) + throws IgniteCheckedException + { + return swapMgr.rawIterator(spaceName, part); + } + }; + } + + /** * @param primary If {@code true} includes primary entries. * @param backup If {@code true} includes backup entries. * @param topVer Topology version. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index f840015..0009127 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -353,7 +353,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V if (filter instanceof ScanQuery) { IgniteBiPredicate<K, V> p = ((ScanQuery)filter).getFilter(); - qry = ctx.queries().createScanQuery(p != null ? p : ACCEPT_ALL, isKeepPortable); + qry = ctx.queries().createScanQuery(p != null ? p : ACCEPT_ALL, null, isKeepPortable); if (grp != null) qry.projection(grp); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 0749f66..8ac3809 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -149,6 +149,13 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition> } /** + * @return Keys belonging to partition. + */ + public Set<KeyCacheObject> keySet() { + return map.keySet(); + } + + /** * @return Entries belonging to partition. */ public Collection<GridDhtCacheEntry> entries() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java index 0658828..2d2db1b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java @@ -76,7 +76,7 @@ import org.jetbrains.annotations.*; * </li> * <li> * Joins will work correctly only if joined objects are stored in - * collocated mode or at least one side of the join is stored in + * colocated mode or at least one side of the join is stored in * {@link org.apache.ignite.cache.CacheMode#REPLICATED} cache. Refer to * {@link AffinityKey} javadoc for more information about colocation. * </li> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index a579aab..2b93144 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -229,6 +229,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage false, null, req.keyValueFilter(), + req.partition(), req.className(), req.clause(), req.includeMetaData(), @@ -518,6 +519,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage qry.query().clause(), clsName, qry.query().scanFilter(), + qry.query().partition(), qry.reducer(), qry.transform(), qry.query().pageSize(), @@ -626,6 +628,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage qry.query().clause(), null, null, + null, qry.reducer(), qry.transform(), qry.query().pageSize(), http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index 4b1fc87..05198a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -21,7 +21,9 @@ import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.query.*; import org.apache.ignite.internal.util.typedef.*; @@ -31,6 +33,7 @@ import org.apache.ignite.plugin.security.*; import org.jetbrains.annotations.*; import java.util.*; +import java.util.concurrent.*; import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.*; @@ -56,6 +59,9 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { /** */ private final IgniteBiPredicate<Object, Object> filter; + /** Partition. */ + private Integer part; + /** */ private final boolean incMeta; @@ -95,6 +101,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { * @param clsName Class name. * @param clause Clause. * @param filter Scan filter. + * @param part Partition. * @param incMeta Include metadata flag. * @param keepPortable Keep portable flag. */ @@ -103,6 +110,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { @Nullable String clsName, @Nullable String clause, @Nullable IgniteBiPredicate<Object, Object> filter, + @Nullable Integer part, boolean incMeta, boolean keepPortable) { assert cctx != null; @@ -113,6 +121,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { this.clsName = clsName; this.clause = clause; this.filter = filter; + this.part = part; this.incMeta = incMeta; this.keepPortable = keepPortable; @@ -132,6 +141,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { * @param dedup Enable dedup flag. * @param prj Grid projection. * @param filter Key-value filter. + * @param part Partition. * @param clsName Class name. * @param clause Clause. * @param incMeta Include metadata flag. @@ -149,6 +159,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { boolean dedup, ClusterGroup prj, IgniteBiPredicate<Object, Object> filter, + @Nullable Integer part, @Nullable String clsName, String clause, boolean incMeta, @@ -165,6 +176,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { this.dedup = dedup; this.prj = prj; this.filter = filter; + this.part = part; this.clsName = clsName; this.clause = clause; this.incMeta = incMeta; @@ -334,6 +346,13 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { } /** + * @return Partition. + */ + @Nullable public Integer partition() { + return part; + } + + /** * @throws IgniteCheckedException If query is invalid. */ public void validate() throws IgniteCheckedException { @@ -376,10 +395,12 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { return execute(null, rmtTransform, args); } + /** {@inheritDoc} */ @Override public QueryMetrics metrics() { return metrics.copy(); } + /** {@inheritDoc} */ @Override public void resetMetrics() { metrics = new GridCacheQueryMetricsAdapter(); } @@ -418,18 +439,34 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { taskHash = cctx.kernalContext().job().currentTaskNameHash(); - GridCacheQueryBean bean = new GridCacheQueryBean(this, (IgniteReducer<Object, Object>)rmtReducer, + final GridCacheQueryBean bean = new GridCacheQueryBean(this, (IgniteReducer<Object, Object>)rmtReducer, (IgniteClosure<Object, Object>)rmtTransform, args); - GridCacheQueryManager qryMgr = cctx.queries(); + final GridCacheQueryManager qryMgr = cctx.queries(); boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(cctx.localNodeId()); if (type == SQL_FIELDS || type == SPI) return (CacheQueryFuture<R>)(loc ? qryMgr.queryFieldsLocal(bean) : qryMgr.queryFieldsDistributed(bean, nodes)); - else - return (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes)); + else { + final CacheQueryFuture<R> fut = + (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes)); + + if (type == SCAN && part != null) { + assert nodes.size() == 1; + + final Queue<ClusterNode> backups = new LinkedList<>( + cctx.affinity().backups(part, cctx.affinity().affinityTopologyVersion())); + + if (F.isEmpty(backups)) + return fut; + + return new CacheQueryFallbackFuture<>(backups, bean, qryMgr, fut); + } + + return fut; + } } /** @@ -448,14 +485,14 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { case REPLICATED: if (prj != null) - return nodes(cctx, prj); + return nodes(cctx, prj, partition()); return cctx.affinityNode() ? Collections.singletonList(cctx.localNode()) : - Collections.singletonList(F.rand(nodes(cctx, null))); + Collections.singletonList(F.rand(nodes(cctx, null, partition()))); case PARTITIONED: - return nodes(cctx, prj); + return nodes(cctx, prj, partition()); default: throw new IllegalStateException("Unknown cache distribution mode: " + cacheMode); @@ -467,13 +504,17 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { * @param prj Projection (optional). * @return Collection of data nodes in provided projection (if any). */ - private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx, @Nullable final ClusterGroup prj) { + private static Collection<ClusterNode> nodes(final GridCacheContext<?, ?> cctx, + @Nullable final ClusterGroup prj, @Nullable final Integer part) { assert cctx != null; return F.view(CU.allNodes(cctx), new P1<ClusterNode>() { @Override public boolean apply(ClusterNode n) { + AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion(); + return cctx.discovery().cacheAffinityNode(n, cctx.name()) && - (prj == null || prj.node(n.id()) != null); + (prj == null || prj.node(n.id()) != null) && + (part == null || cctx.affinity().primary(n, part.intValue(), topVer)); } }); } @@ -482,4 +523,173 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { @Override public String toString() { return S.toString(GridCacheQueryAdapter.class, this); } + + /** + * Wrapper for queries with fallback. + */ + private static class CacheQueryFallbackFuture<R> extends GridCacheQueryFutureAdapter<Object, Object, R> { + /** Target. */ + private GridCacheQueryFutureAdapter<?, ?, R> target; + + /** Backups. */ + private final Queue<ClusterNode> backups; + + /** Bean. */ + private final GridCacheQueryBean bean; + + /** Query manager. */ + private final GridCacheQueryManager qryMgr; + + /** + * @param backups Backups. + * @param bean Bean. + * @param qryMgr Query manager. + * @param fut Future. + */ + public CacheQueryFallbackFuture(Queue<ClusterNode> backups, GridCacheQueryBean bean, + GridCacheQueryManager qryMgr, CacheQueryFuture<R> fut) { + this.backups = backups; + this.bean = bean; + this.qryMgr = qryMgr; + this.target = (GridCacheQueryFutureAdapter<?, ?, R>)fut; + + init(); + } + + /** + * + */ + private void init() { + target.listen(new IgniteInClosure<IgniteInternalFuture<Collection<R>>>() { + @Override public void apply(IgniteInternalFuture<Collection<R>> fut) { + try { + onDone(fut.get()); + } + catch (IgniteCheckedException e) { + if (F.isEmpty(backups)) + onDone(e); + else { + Set<ClusterNode> backup = Collections.singleton(backups.poll()); + + target = + (GridCacheQueryFutureAdapter<?, ?, R>)qryMgr.queryDistributed(bean, backup); + + init(); + } + } + } + }); + } + + /** {@inheritDoc} */ + @Override protected boolean onPage(UUID nodeId, boolean last) { + return target.onPage(nodeId, last); + } + + /** {@inheritDoc} */ + @Override protected void loadPage() { + target.loadPage(); + } + + /** {@inheritDoc} */ + @Override protected void loadAllPages() throws IgniteInterruptedCheckedException { + target.loadAllPages(); + } + + /** {@inheritDoc} */ + @Override protected void cancelQuery() throws IgniteCheckedException { + target.cancelQuery(); + } + + /** {@inheritDoc} */ + @Override public int available() { + return target.available(); + } + + /** {@inheritDoc} */ + @Override public boolean cancel() throws IgniteCheckedException { + return target.cancel(); + } + + /** {@inheritDoc} */ + @Override void clear() { + target.clear(); + } + + /** {@inheritDoc} */ + @Override public long endTime() { + return target.endTime(); + } + + /** {@inheritDoc} */ + @Override protected void enqueue(Collection<?> col) { + target.enqueue(col); + } + + /** {@inheritDoc} */ + @Override boolean fields() { + return target.fields(); + } + + /** {@inheritDoc} */ + @Override public Collection<R> get() throws IgniteCheckedException { + return target.get(); + } + + /** {@inheritDoc} */ + @Override public Collection<R> get(long timeout, TimeUnit unit) throws IgniteCheckedException { + return target.get(timeout, unit); + } + + /** {@inheritDoc} */ + @Override public R next() { + return target.next(); + } + + /** {@inheritDoc} */ + @Override public Collection<R> nextPage() throws IgniteCheckedException { + return target.nextPage(); + } + + /** {@inheritDoc} */ + @Override public boolean onDone(Collection<R> res, Throwable err) { + return target.onDone(res, err); + } + + /** {@inheritDoc} */ + @Override public Collection<R> nextPage(long timeout) throws IgniteCheckedException { + return target.nextPage(timeout); + } + + /** {@inheritDoc} */ + @Override protected void onNodeLeft(UUID evtNodeId) { + target.onNodeLeft(evtNodeId); + } + + /** {@inheritDoc} */ + @Override public void onPage(@Nullable UUID nodeId, @Nullable Collection<?> data, + @Nullable Throwable err, boolean finished) { + target.onPage(nodeId, data, err, finished); + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + target.onTimeout(); + } + + /** {@inheritDoc} */ + @Override public void printMemoryStats() { + target.printMemoryStats(); + } + + /** {@inheritDoc} */ + @Override public GridCacheQueryBean query() { + return target.query(); + } + + /** {@inheritDoc} */ + @Override public IgniteUuid timeoutId() { + return target.timeoutId(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 16a8028..fac3d8f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -52,6 +52,8 @@ import java.util.concurrent.*; import static org.apache.ignite.cache.CacheMode.*; import static org.apache.ignite.events.EventType.*; import static org.apache.ignite.internal.GridClosureCallMode.*; +import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.*; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.*; /** @@ -111,8 +113,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte final Object recipient = recipient(nodeId, entry.getKey()); entry.getValue().listen(new CIX1<IgniteInternalFuture<QueryResult<K, V>>>() { - @Override - public void applyx(IgniteInternalFuture<QueryResult<K, V>> f) + @Override public void applyx(IgniteInternalFuture<QueryResult<K, V>> f) throws IgniteCheckedException { f.get().closeIfNotShared(recipient); } @@ -768,98 +769,139 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte final boolean backups = qry.includeBackups() || cctx.isReplicated(); - final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt = new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() { - private IgniteBiTuple<K, V> next; + final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt = + new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() { + private IgniteBiTuple<K, V> next; - private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc); + private IgniteCacheExpiryPolicy expiryPlc = cctx.cache().expiryPolicy(plc); - private Iterator<K> iter = backups ? prj.keySet().iterator() : prj.primaryKeySet().iterator(); + private Iterator<K> iter; - { - advance(); - } + private GridDhtLocalPartition locPart; - @Override public boolean onHasNext() { - return next != null; - } + { + Integer part = qry.partition(); - @Override public IgniteBiTuple<K, V> onNext() { - if (next == null) - throw new NoSuchElementException(); + if (part == null || dht == null) + iter = backups ? prj.keySet().iterator() : prj.primaryKeySet().iterator(); + else if (part < 0 || part >= cctx.affinity().partitions()) + iter = F.emptyIterator(); + else { + AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion(); - IgniteBiTuple<K, V> next0 = next; + locPart = dht.topology().localPartition(part, topVer, false); - advance(); + if (locPart == null || (locPart.state() != OWNING && locPart.state() != RENTING) || + !locPart.reserve()) + throw new GridDhtInvalidPartitionException(part, "Partition can't be reserved"); - return next0; - } - private void advance() { - IgniteBiTuple<K, V> next0 = null; + iter = new Iterator<K>() { + private Iterator<KeyCacheObject> iter0 = locPart.keySet().iterator(); - while (iter.hasNext()) { - next0 = null; + @Override public boolean hasNext() { + return iter0.hasNext(); + } - K key = iter.next(); + @Override public K next() { + KeyCacheObject key = iter0.next(); - V val; + return key.value(cctx.cacheObjectContext(), false); + } - try { - val = prj.localPeek(key, CachePeekModes.ONHEAP_ONLY, expiryPlc); + @Override public void remove() { + iter0.remove(); + } + }; } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to peek value: " + e); - val = null; - } + advance(); + } - if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) { - dht.sendTtlUpdateRequest(expiryPlc); + @Override public boolean onHasNext() { + return next != null; + } - expiryPlc = cctx.cache().expiryPolicy(plc); - } + @Override public IgniteBiTuple<K, V> onNext() { + if (next == null) + throw new NoSuchElementException(); - if (val != null) { - next0 = F.t(key, val); + IgniteBiTuple<K, V> next0 = next; - if (checkPredicate(next0)) - break; - else - next0 = null; - } + advance(); + + return next0; } - next = next0 != null ? - new IgniteBiTuple<>(next0.getKey(), next0.getValue()) : - null; + private void advance() { + IgniteBiTuple<K, V> next0 = null; - if (next == null) - sendTtlUpdate(); - } + while (iter.hasNext()) { + next0 = null; - @Override protected void onClose() { - sendTtlUpdate(); - } + K key = iter.next(); + + V val; + + try { + val = prj.localPeek(key, CachePeekModes.ONHEAP_ONLY, expiryPlc); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to peek value: " + e); + + val = null; + } + + if (dht != null && expiryPlc != null && expiryPlc.readyToFlush(100)) { + dht.sendTtlUpdateRequest(expiryPlc); + + expiryPlc = cctx.cache().expiryPolicy(plc); + } + + if (val != null) { + next0 = F.t(key, val); + + if (checkPredicate(next0)) + break; + else + next0 = null; + } + } - private void sendTtlUpdate() { - if (dht != null && expiryPlc != null) { - dht.sendTtlUpdateRequest(expiryPlc); + next = next0 != null ? + new IgniteBiTuple<>(next0.getKey(), next0.getValue()) : + null; - expiryPlc = null; + if (next == null) + sendTtlUpdate(); } - } - private boolean checkPredicate(Map.Entry<K, V> e) { - if (keyValFilter != null) { - Map.Entry<K, V> e0 = (Map.Entry<K, V>)cctx.unwrapPortableIfNeeded(e, qry.keepPortable()); + @Override protected void onClose() { + sendTtlUpdate(); - return keyValFilter.apply(e0.getKey(), e0.getValue()); + if (locPart != null) + locPart.release(); } - return true; - } - }; + private void sendTtlUpdate() { + if (dht != null && expiryPlc != null) { + dht.sendTtlUpdateRequest(expiryPlc); + + expiryPlc = null; + } + } + + private boolean checkPredicate(Map.Entry<K, V> e) { + if (keyValFilter != null) { + Map.Entry<K, V> e0 = (Map.Entry<K, V>)cctx.unwrapPortableIfNeeded(e, qry.keepPortable()); + + return keyValFilter.apply(e0.getKey(), e0.getValue()); + } + + return true; + } + }; final GridIterator<IgniteBiTuple<K, V>> it; @@ -914,7 +956,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte throws IgniteCheckedException { IgniteBiPredicate<K, V> filter = qry.scanFilter(); - Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawSwapIterator(true, backups); + Integer part = qry.partition(); + + Iterator<Map.Entry<byte[], byte[]>> it = part == null ? cctx.swap().rawSwapIterator(true, backups) : + cctx.swap().rawSwapIterator(part); return scanIterator(it, filter, qry.keepPortable()); } @@ -930,10 +975,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte if (cctx.offheapTiered() && filter != null) { OffheapIteratorClosure c = new OffheapIteratorClosure(filter, qry.keepPortable()); - return cctx.swap().rawOffHeapIterator(c, true, backups); + return cctx.swap().rawOffHeapIterator(c, qry.partition(), true, backups); } else { - Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawOffHeapIterator(true, backups); + Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawOffHeapIterator(qry.partition(), true, backups); return scanIterator(it, filter, qry.keepPortable()); } @@ -1222,7 +1267,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte try { // Preparing query closures. - IgniteClosure<Map.Entry<K, V>, Object> trans = (IgniteClosure<Map.Entry<K, V>, Object>)qryInfo.transformer(); + IgniteClosure<Map.Entry<K, V>, Object> trans = + (IgniteClosure<Map.Entry<K, V>, Object>)qryInfo.transformer(); + IgniteReducer<Map.Entry<K, V>, Object> rdc = (IgniteReducer<Map.Entry<K, V>, Object>)qryInfo.reducer(); injectResources(trans); @@ -1529,11 +1576,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte fut.onDone(executeQuery(qryInfo.query(), qryInfo.arguments(), false, qryInfo.query().subjectId(), taskName, recipient(qryInfo.senderId(), qryInfo.requestId()))); } - catch (Error e) { - fut.onDone(e); - - throw e; - } catch (Throwable e) { fut.onDone(e); @@ -1843,7 +1885,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte return new IgniteBiPredicate<K, V>() { @Override public boolean apply(K k, V v) { - return cache.context().affinity().primary(ctx.discovery().localNode(), k, AffinityTopologyVersion.NONE); + return cache.context().affinity().primary(ctx.discovery().localNode(), k, NONE); } }; } @@ -2920,6 +2962,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte null, null, null, + null, false, keepPortable); } @@ -2928,17 +2971,19 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * Creates user's predicate based scan query. * * @param filter Scan filter. + * @param part Partition. * @param keepPortable Keep portable flag. * @return Created query. */ - @SuppressWarnings("unchecked") public CacheQuery<Map.Entry<K, V>> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter, - boolean keepPortable) { + @Nullable Integer part, boolean keepPortable) { + return new GridCacheQueryAdapter<>(cctx, SCAN, null, null, (IgniteBiPredicate<Object, Object>)filter, + part, false, keepPortable); } @@ -2962,6 +3007,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte clsName, search, null, + null, false, keepPortable); } @@ -2982,6 +3028,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte null, qry, null, + null, false, keepPortable); } @@ -3002,6 +3049,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte null, qry, null, + null, incMeta, keepPortable); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java index 845077f..7577954 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java @@ -26,6 +26,8 @@ import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.*; import org.apache.ignite.plugin.extensions.communication.*; +import org.jetbrains.annotations.*; + import java.io.*; import java.nio.*; import java.util.*; @@ -109,6 +111,9 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache /** */ private int taskHash; + /** Partition. */ + private Integer part; + /** * Required by {@link Externalizable} */ @@ -173,6 +178,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache * @param clause Query clause. * @param clsName Query class name. * @param keyValFilter Key-value filter. + * @param part Partition. * @param rdc Reducer. * @param trans Transformer. * @param pageSize Page size. @@ -189,6 +195,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache String clause, String clsName, IgniteBiPredicate<Object, Object> keyValFilter, + @Nullable Integer part, IgniteReducer<Object, Object> rdc, IgniteClosure<Object, Object> trans, int pageSize, @@ -211,6 +218,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache this.clause = clause; this.clsName = clsName; this.keyValFilter = keyValFilter; + this.part = part; this.rdc = rdc; this.trans = trans; this.pageSize = pageSize; @@ -414,6 +422,13 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache return taskHash; } + /** + * @return partition. + */ + @Nullable public Integer partition() { + return part; + } + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); @@ -537,6 +552,11 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache writer.incrementState(); + case 21: + if (!writer.writeInt("part", part != null ? part : -1)) + return false; + + writer.incrementState(); } return true; @@ -701,6 +721,15 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache reader.incrementState(); + case 21: + int part0 = reader.readInt("part"); + + part = part0 == -1 ? null : part0; + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); } return true; @@ -713,7 +742,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 21; + return 22; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java index f516968..c0e763f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java @@ -114,7 +114,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite } CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null, - new GridSetQueryPredicate<>(id, collocated), false, false); + new GridSetQueryPredicate<>(id, collocated), -1, false, false); Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion()); @@ -345,7 +345,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite private GridCloseableIterator<T> iterator0() { try { CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null, - new GridSetQueryPredicate<>(id, collocated), false, false); + new GridSetQueryPredicate<>(id, collocated), null, false, false); Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java index 9d41074..4601586 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java @@ -176,7 +176,7 @@ public class GridCachePartitionedPreloadLifecycleSelfTest extends GridCachePrelo for (int j = 0; j < G.allGrids().size(); j++) { GridCacheAdapter<Object, MyValue> c2 = ((IgniteKernal)grid(j)).internalCache("two"); - CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null, false); + CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null, null, false); int totalCnt = F.sumInt(qry.execute(new IgniteReducer<Map.Entry<Object, MyValue>, Integer>() { @IgniteInstanceResource http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java index 62bf3f7..cc8217d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java @@ -179,7 +179,7 @@ public class GridCacheReplicatedPreloadLifecycleSelfTest extends GridCachePreloa for (int j = 0; j < G.allGrids().size(); j++) { GridCacheAdapter<Object, MyValue> c2 = ((IgniteKernal)grid(j)).internalCache("two"); - CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null, false); + CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null, null, false); final int i0 = j; final int j0 = i; @@ -207,8 +207,8 @@ public class GridCacheReplicatedPreloadLifecycleSelfTest extends GridCachePreloa Object v1 = e.getValue(); Object v2 = ((IgniteKernal)grid).getCache("one").get(key); - assertNotNull("Cache c1 misses value for key [i=" + j0 + ", j=" + i0 + - ", missedKey=" + key + ", cache=" + ((IgniteKernal)grid).getCache("one").values() + ']', v2); + assertNotNull("Cache c1 misses value for key [i=" + j0 + ", j=" + i0 + ", missedKey=" + + key + ", cache=" + ((IgniteKernal)grid).getCache("one").values() + ']', v2); assertEquals(v1, v2); } catch (IgniteCheckedException e1) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java index 068a46c..6ccfbc2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheSwapScanQueryAbstractSelfTest.java @@ -115,49 +115,81 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA * @throws Exception If failed. */ public void testQuery() throws Exception { - checkQuery(((IgniteKernal)grid(0)).internalCache(ATOMIC_CACHE_NAME)); + checkQuery(((IgniteKernal)grid(0)).internalCache(ATOMIC_CACHE_NAME), false); - checkQuery(((IgniteKernal)grid(0)).internalCache(TRANSACTIONAL_CACHE_NAME)); + checkQuery(((IgniteKernal)grid(0)).internalCache(TRANSACTIONAL_CACHE_NAME), false); + + checkQuery(((IgniteKernal)grid(0)).internalCache(ATOMIC_CACHE_NAME), true); + + checkQuery(((IgniteKernal)grid(0)).internalCache(TRANSACTIONAL_CACHE_NAME), true); } /** * @param cache Cache. + * @param scanPartitions Scan partitions. * @throws Exception If failed. */ @SuppressWarnings("unchecked") - private void checkQuery(GridCacheAdapter cache) throws Exception { + private void checkQuery(GridCacheAdapter cache, boolean scanPartitions) throws Exception { final int ENTRY_CNT = 500; - for (int i = 0; i < ENTRY_CNT; i++) - cache.getAndPut(new Key(i), new Person("p-" + i, i)); + Map<Integer, Map<Key, Person>> entries = new HashMap<>(); + + for (int i = 0; i < ENTRY_CNT; i++) { + Key key = new Key(i); + Person val = new Person("p-" + i, i); + + int part = cache.context().affinity().partition(key); + + cache.getAndPut(key, val); + + Map<Key, Person> partEntries = entries.get(part); + + if (partEntries == null) + entries.put(part, partEntries = new HashMap<>()); + + partEntries.put(key, val); + } try { - CacheQuery<Map.Entry<Key, Person>> qry = cache.context().queries().createScanQuery( - new IgniteBiPredicate<Key, Person>() { - @Override public boolean apply(Key key, Person p) { - assertEquals(key.id, (Integer)p.salary); + int partitions = scanPartitions ? cache.context().affinity().partitions() : 1; - return key.id % 2 == 0; - } - }, false); + for (int i = 0; i < partitions; i++) { + CacheQuery<Map.Entry<Key, Person>> qry = cache.context().queries().createScanQuery( + new IgniteBiPredicate<Key, Person>() { + @Override public boolean apply(Key key, Person p) { + assertEquals(key.id, (Integer)p.salary); - Collection<Map.Entry<Key, Person>> res = qry.execute().get(); + return key.id % 2 == 0; + } + }, (scanPartitions ? i : null), false); - assertEquals(ENTRY_CNT / 2, res.size()); + Collection<Map.Entry<Key, Person>> res = qry.execute().get(); - for (Map.Entry<Key, Person> e : res) { - Key k = e.getKey(); - Person p = e.getValue(); + if (!scanPartitions) + assertEquals(ENTRY_CNT / 2, res.size()); - assertEquals(k.id, (Integer)p.salary); - assertEquals(0, k.id % 2); - } + for (Map.Entry<Key, Person> e : res) { + Key k = e.getKey(); + Person p = e.getValue(); - qry = cache.context().queries().createScanQuery(null, false); + assertEquals(k.id, (Integer)p.salary); + assertEquals(0, k.id % 2); - res = qry.execute().get(); + if (scanPartitions) { + Map<Key, Person> partEntries = entries.get(i); - assertEquals(ENTRY_CNT, res.size()); + assertEquals(p, partEntries.get(k)); + } + } + + qry = cache.context().queries().createScanQuery(null, (scanPartitions ? i : null), false); + + res = qry.execute().get(); + + if (!scanPartitions) + assertEquals(ENTRY_CNT, res.size()); + } testMultithreaded(cache, ENTRY_CNT / 2); } @@ -185,7 +217,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA return key.id % 2 == 0; } - }, false); + }, null, false); for (int i = 0; i < 250; i++) { Collection<Map.Entry<Key, Person>> res = qry.execute().get(); @@ -229,7 +261,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA return val % 2 == 0; } - }, false); + }, null, false); Collection<Map.Entry<String, Long>> res = qry.execute().get(); @@ -244,7 +276,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA assertEquals(0, val % 2); } - qry = cache.context().queries().createScanQuery(null, false); + qry = cache.context().queries().createScanQuery(null, null, false); res = qry.execute().get(); @@ -284,7 +316,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA return key % 2 == 0; } - }, false); + }, null, false); Collection<Map.Entry<Integer, byte[]>> res = qry.execute().get(); @@ -299,7 +331,7 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA assertEquals(0, key % 2); } - qry = cache.context().queries().createScanQuery(null, false); + qry = cache.context().queries().createScanQuery(null, null, false); res = qry.execute().get(); @@ -367,5 +399,29 @@ public abstract class GridCacheSwapScanQueryAbstractSelfTest extends GridCommonA this.name = name; this.salary = salary; } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + Person person = (Person)o; + + if (salary != person.salary) + return false; + + return !(name != null ? !name.equals(person.name) : person.name != null); + + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int result = name != null ? name.hashCode() : 0; + + return 31 * result + salary; + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java new file mode 100644 index 0000000..3b1b842 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.plugin.extensions.communication.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +/** + * Tests partition scan query fallback. + */ +public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractTest { + /** Grid count. */ + private static final int GRID_CNT = 5; + + /** Kys count. */ + private static final int KEYS_CNT = 1; + + /** Backups. */ + private int backups; + + /** Cache mode. */ + private CacheMode cacheMode; + + /** Fallback. */ + private boolean fallback; + + /** Primary node id. */ + private static volatile UUID expNodeId; + + /** Fail node id. */ + private static volatile UUID failNodeId; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCommunicationSpi(new TestCommunicationSpi()); + + CacheConfiguration ccfg = defaultCacheConfiguration(); + ccfg.setCacheMode(cacheMode); + ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC); + ccfg.setBackups(backups); + ccfg.setNearConfiguration(null); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testPrimary() throws Exception { + cacheMode = CacheMode.PARTITIONED; + backups = 0; + failNodeId = null; + fallback = false; + + doTestScanPartition(); + } + + /** + * @throws Exception If failed. + */ + public void testFallbackToBackup() throws Exception { + cacheMode = CacheMode.PARTITIONED; + backups = 1; + failNodeId = null; + fallback = true; + + doTestScanPartition(); + } + + /** + * @throws Exception If failed. + */ + protected void doTestScanPartition() throws Exception { + try { + Ignite ignite = startGrids(GRID_CNT); + + IgniteCacheProxy<Integer, Integer> cache = + (IgniteCacheProxy<Integer, Integer>)ignite.<Integer, Integer>cache(null); + + Map<Integer, Map<Integer, Integer>> entries = new HashMap<>(); + + for (int i = 0; i < KEYS_CNT; i++) { + cache.put(i, i); + + int part = cache.context().affinity().partition(i); + + Map<Integer, Integer> partEntries = entries.get(part); + + if (partEntries == null) + entries.put(part, partEntries = new HashMap<>()); + + partEntries.put(i, i); + } + + IgniteBiTuple<Integer, UUID> tup = remotePartition(cache.context(), true); + + int part = tup.get1(); + + if (fallback) + failNodeId = tup.get2(); + else + expNodeId = tup.get2(); + + if (fallback) + expNodeId = remoteBackup(part, cache.context()); + + CacheQuery<Map.Entry<Integer, Integer>> qry = cache.context().queries().createScanQuery(null, part, false); + + CacheQueryFuture<Map.Entry<Integer, Integer>> fut = qry.execute(); + + Collection<Map.Entry<Integer, Integer>> expEntries = fut.get(); + + for (Map.Entry<Integer, Integer> e : expEntries) { + Map<Integer, Integer> map = entries.get(part); + + if(map == null) + assertTrue(expEntries.isEmpty()); + else + assertEquals(map.get(e.getKey()), e.getValue()); + } + } + finally { + stopAllGrids(); + } + } + + /** + * @param cctx Cctx. + * @param primary Primary. + */ + private IgniteBiTuple<Integer, UUID> remotePartition(final GridCacheContext cctx, boolean primary) { + ClusterNode node = F.first(cctx.kernalContext().grid().cluster().forRemotes().nodes()); + + GridCacheAffinityManager affMgr = cctx.affinity(); + + AffinityTopologyVersion topVer = affMgr.affinityTopologyVersion(); + + Set<Integer> parts = primary ? + affMgr.primaryPartitions(node.id(), topVer) : affMgr.backupPartitions(node.id(), topVer); + + return new IgniteBiTuple<>(F.first(parts), node.id()); + } + + /** + * @param part Partition. + * @param cctx Cctx. + */ + private UUID remoteBackup(int part, final GridCacheContext cctx) { + final UUID locUuid = cctx.localNodeId(); + + GridCacheAffinityManager affMgr = cctx.affinity(); + + AffinityTopologyVersion topVer = affMgr.affinityTopologyVersion(); + + return F.first(F.view(affMgr.backups(part, topVer), new IgnitePredicate<ClusterNode>() { + @Override public boolean apply(ClusterNode node) { + return !node.id().equals(locUuid); + } + })).id(); + } + + /** + * + */ + private static class TestCommunicationSpi extends TcpCommunicationSpi { + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg) + throws IgniteSpiException { + Object origMsg = ((GridIoMessage)msg).message(); + + if (origMsg instanceof GridCacheQueryRequest) { + if (node.id().equals(failNodeId)) + throw new IgniteSpiException(""); + else + assertEquals(expNodeId, node.id()); + } + + super.sendMessage(node, msg); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java index 1a60bbd..eb5027c 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java @@ -61,6 +61,9 @@ import static org.junit.Assert.*; * Various tests for cache queries. */ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstractTest { + /** Key count. */ + private static final int KEY_CNT = 5000; + /** Cache store. */ private static TestStore store = new TestStore(); @@ -643,6 +646,47 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac } /** + * @throws Exception In case of error. + */ + public void testScanPartitionQuery() throws Exception { + IgniteCache<Integer, Integer> cache = ignite.cache(null); + + GridCacheContext cctx = ((IgniteCacheProxy)cache).context(); + + Map<Integer, Map<Integer, Integer>> entries = new HashMap<>(); + + for (int i = 0; i < KEY_CNT; i++) { + cache.put(i, i); + + int part = cctx.affinity().partition(i); + + Map<Integer, Integer> partEntries = entries.get(part); + + if (partEntries == null) + entries.put(part, partEntries = new HashMap<>()); + + partEntries.put(i, i); + } + + for (int i = 0; i < cctx.affinity().partitions(); i++) { + CacheQuery<Map.Entry<Integer, Integer>> qry = + ((IgniteCacheProxy<Integer, Integer>)cache).context().queries().createScanQuery(null, i, false); + + CacheQueryFuture<Map.Entry<Integer, Integer>> fut = qry.execute(); + + Map<Integer, Integer> exp = entries.get(i); + + Collection<Map.Entry<Integer, Integer>> actual = fut.get(); + + if (exp == null) + assertTrue(actual.isEmpty()); + else + for (Map.Entry<Integer, Integer> entry : actual) + assertTrue(entry.getValue().equals(exp.get(entry.getKey()))); + } + } + + /** * JUnit. * * @throws Exception In case of error. @@ -1048,11 +1092,13 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac for (int i = 0; i < 20; i++) cache.put(i, i); - QueryCursor<Cache.Entry<Integer, Integer>> q = cache.query(new ScanQuery<>(new IgniteBiPredicate<Integer,Integer>() { + IgniteBiPredicate<Integer, Integer> filter = new IgniteBiPredicate<Integer, Integer>() { @Override public boolean apply(Integer k, Integer v) { return k >= 10; } - })); + }; + + QueryCursor<Cache.Entry<Integer, Integer>> q = cache.query(new ScanQuery<>(filter)); q.getAll(); @@ -1187,7 +1233,8 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac * @return {@code true} if index has a table for given class. * @throws IgniteCheckedException If failed. */ - private boolean hasIndexTable(Class<?> cls, GridCacheQueryManager<Object, Object> qryMgr) throws IgniteCheckedException { + private boolean hasIndexTable(Class<?> cls, GridCacheQueryManager<Object, Object> qryMgr) + throws IgniteCheckedException { return qryMgr.size(cls) != -1; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b58bb122/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index f42963a..713cf84 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -70,6 +70,8 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(GridCacheCrossCacheQuerySelfTest.class); suite.addTestSuite(GridCacheQuerySerializationSelfTest.class); + // Scan queries. + suite.addTestSuite(CacheScanPartitionQueryFallbackSelfTest.class); // Fields queries. suite.addTestSuite(IgniteCacheLocalFieldsQuerySelfTest.class);